Читать интересную книгу UNIX: взаимодействие процессов - Уильям Стивенс

Шрифт:

-
+

Интервал:

-
+

Закладка:

Сделать
1 ... 58 59 60 61 62 63 64 65 66 ... 128

//pxsem/prodcons2.c

1  #include "unpipc.h"

2  #define NBUFF 10

3  int nitems; /* только для чтения производителем и потребителем */

4  struct { /* общие данные производителя и потребителя */

5   int buff[NBUFF];

6   sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

7  } shared;

8  void *produce(void *), *consume(void *);

9  int

10 main(int argc, char **argv)

11 {

12  pthread_t tid_produce, tid_consume;

13  if (argc != 2)

14   err_quit("usage: prodcons2 <#items>");

15  nitems = atoi(argv[1]);

16  /* инициализация трех семафоров */

17  Sem_init(&shared.mutex, 0, 1);

18  Sem_init(&shared.nempty, 0, NBUFF);

19  Sem_init(&shared.nstored, 0, 0);

20  Set_concurrency(2);

21  Pthread_create(&tid_produce, NULL, produce, NULL);

22  Pthread_create(&tid_consume, NULL, consume, NULL);

23  Pthread_join(tid_produce, NULL);

24  Pthread_join(tid_consume, NULL):

25  Sem_destroy(&shared.mutex);

26  Sem_destroy(&shared.nempty):

27  Sem_destroy(&shared.nstored);

28  exit(0);

29 }

30 void *

31 produce(void *arg)

32 {

33  int i;

34  for (i = 0; i < nitems; i++) {

35   Sem_wait(&shared.nempty); /* ожидание одного свободного поля */

36   Sem_wait(&shared.mutex);

37   shared.buff[i % NBUFF] = i; /* помещение i в циклический буфер */

38   Sem_post(&shared.mutex);

39   Sem_post(&shared.nstored); /* поместили еще один элемент */

40  }

41  return(NULL);

42 }

43 void *

44 consume(void *arg)

45 {

46  int i;

47  for (i = 0; i < nitems; i++) {

48   Sem_wait(&shared.nstored); /* ожидаем появления хотя бы одного готового для обработки элемента */

49   Sem_wait(&shared.mutex);

50   if (shared.buff[i % NBUFF] != i)

51    printf("buff[*d] = *dn", i, shared.buff[i % NBUFF]);

52   Sem_post(&shared.mutex);

53   Sem_post(&shared.nempty); /* еще одно пустое поле */

54  }

55  return(NULL);

56 }

Выделение семафоров

6 Мы объявляем три семафора типа sem_t, и теперь это сами семафоры, а не указатели на них.

Вызов sem_init

16-27 Мы вызываем sem_init вместо sem_open* а затем sem_destroy вместо sem_unlink. Вызывать sem_destroy на самом деле не требуется, поскольку программа все равно завершается.

Остальные изменения обеспечивают передачу указателей на три семафора при вызовах sem_wait и sem_post.

10.9. Несколько производителей, один потребитель

Решение в разделе 10.6 относится к классической задаче с одним производителем и одним потребителем. Новая, интересная модификация программы позволит нескольким производителям работать с одним потребителем. Начнем с решения из листинга 10.11, в котором использовались размещаемые в памяти семафоры. В листинге 10.12 приведены объявления глобальных переменных и функция main.

Листинг 10.12. Функция main задачи с несколькими производителями

//pxsem/prodcons3.c

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define MAXNTHREADS 100

4  int nitems, nproducers; /* только для чтения производителем и потребителем */

5  struct { /* общие данные */

6   int buff[NBUFF];

7   int nput;

8   int nputval;

9   sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

10 } shared;

11 void *produce(void *), *consume(void *);

12 int

13 main(int argc, char **argv)

14 {

15  int i, count[MAXNTHREADS];

16  pthread_t tid_produce[MAXNTHREADS], tid_consume;

17  if (argc != 3)

18   err_quit("usage: prodcons3 <#items> <#producers>");

19  nitems = atoi(argv[1]);

20  nproducers = min(atoi(argv[2]), MAXNTHREADS);

21  /* инициализация трех семафоров */

22  Sem_init(&shared.mutex, 0, 1);

23  Sem_init(&shared.nempty, 0, NBUFF);

24  Sem_init(&shared.nstored, 0, 0);

25  /* создание всех производителей и одного потребителя */

26  Set_concurrency(nproducers + 1);

27  for (i = 0; i < nproducers; i++) {

28   count[i] = 0;

29   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

30  }

31  Pthread_create(&tid_consume, NULL, consume, NULL);

32  /* ожидание завершения всех производителей и потребителя */

33  for (i = 0; i < nproducers; i++) {

34   Pthread_join(tid_produce[i], NULL);

35   printf("count[%d] = %dn", i, count[i]);

36  }

37  Pthread_join(tid_consume, NULL);

38  Sem_destroy(&shared.mutex);

39  Sem_destroy(&shared.nempty);

40  Sem_destroy(&shared.nstored);

41  exit(0);

42 }

Глобальные переменные

4 Глобальная переменная nitems хранит число элементов, которые должны быть совместно произведены. Переменная nproducers хранит число потоков-производителей. Оба эти значения устанавливаются с помощью аргументов командной строки.

Общая структура

5-10 В структуру shared добавляются два новых элемента: nput, обозначающий индекс следующего элемента, куда должен быть помещен объект (по модулю BUFF), и nputval следующее значение, которое будет помещено в буфер. Эти две переменные взяты из нашего решения в листингах 7.1 и 7.2. Они нужны для синхронизации нескольких потоков-производителей.

Новые аргументы командной строки

17-20 Два новых аргумента командной строки указывают полное количество элементов, которые должны быть помещены в буфер, и количество потоков-производителей. 

Запуск всех потоков

21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.

В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.

Листинг 10.13. Функция, выполняемая всеми потоками-производителями

//pxsem/prodcons3.c

43 void *

44 produce(void *arg)

45 {

46  for (;;) {

47   Sem_wait(&shared.nempty); /* ожидание освобождения поля */

48   Sem_wait(&shared.mutex);

49   if (shared.nput >= nitems) {

50    Sem_post(&shared.nempty);

51    Sem_post(&shared.mutex);

52    return(NULL); /* готово */

53   }

54   shared.buff[shared.nput % NBUFF] = shared.nputval;

55   shared.nput++;

56   shared.nputval++;

57   Sem_post(&shared.mutex);

58   Sem_post(&shared.nstored); /* еще один элемент */

59   *((int *) arg) += 1;

60  }

61 }

Взаимное исключение между потоками-производителями

49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.

Завершение производителей

50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет

Sem_wait(&shared.nempty); /* ожидание пустого поля */

в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере, лишние потоки будут заблокированы навсегда, ожидая освобождения семафора nempty, и никогда не завершат свою работу.

Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.

Листинг 10.14. Функция, выполняемая потоком-потребителем

//pxsem/prodcons3.с

62 void *

63 consume(void *arg)

64 {

65  int i;

66  for (i = 0; i < nitems; i++) {

67   Sem_wait(&shared.nstored); /* ожидание помещения по крайней мере одного элемента в буфер */

68   Sem_wait(&shared.mutex);

69   if (shared.buff[i % NBUFF] != i)

70    printf("error: buff[%d] = %dn", i, shared.buff[i % NBUFF]);

71   Sem_post(&shared.mutex);

72   Sem_post(&shared.nempty); /* еще одно пустое поле */

73  }

74  return(NULL);

75 }

Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1 ... 58 59 60 61 62 63 64 65 66 ... 128
На этом сайте Вы можете читать книги онлайн бесплатно русская версия UNIX: взаимодействие процессов - Уильям Стивенс.

Оставить комментарий