Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Books
  Краткое описание
 Linux
 W. R. Стивенс TCP 
 W. R. Стивенс IPC 
 A.Rubini-J.Corbet 
 K. Bauer 
 Gary V. Vaughan 
 Д Вилер 
 В. Сталлинг 
 Pramode C.E. 
 Steve Pate 
 William Gropp 
 K.A.Robbins 
 С Бекман 
 Р Стивенс 
 Ethereal 
 Cluster 
 Languages
 C
 Perl
 M.Pilgrim 
 А.Фролов 
 Mendel Cooper 
 М Перри 
 Kernel
 C.S. Rodriguez 
 Robert Love 
 Daniel Bovet 
 Д Джеф 
 Максвелл 
 G. Kroah-Hartman 
 B. Hansen 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5164 
 Trees...935 
 Максвелл 3...861 
 Go Web ...815 
 William Gropp...796 
 Ethreal 3...779 
 Ethreal 4...766 
 Gary V.Vaughan-> Libtool...765 
 Rodriguez 6...756 
 Steve Pate 1...749 
 Ext4 FS...748 
 Clickhouse...748 
 Ethreal 1...736 
 Secure Programming for Li...721 
 C++ Patterns 3...712 
 Ulrich Drepper...693 
 Assembler...687 
 DevFS...655 
 Стивенс 9...644 
 MySQL & PosgreSQL...622 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Глава 23 : Многопоточность

Код данной главы лежит тут

Использование fork для клонирования процессов имеет свои недостатки : стоимость этой операции достаточно высока , и обмен данными между родителем и потомком в этом случае - нетривиальная задача , решаемая с помощью IPC.

Программные потоки - threads - иногда называются облегченными процессами - lightweight processes. Для его создания требуется на порядок меньше времени. Все потоки внутри процесса разделяют :

  
 	глобальные переменные 
 	данные
 	открытые дескрипторы файлов
 	обработчики сигналов
 	текущий рабочий каталог
 
У каждого потока имеется собственные :
  
 	идентификатор
 	стек
 	приоритет
 
Потоки создаются функцией
  
 	int pthread_create (pthread_t * t , const pthread_attr_t * attr , void *arg)
 
 	возвращает 0 в случае успеха
 
При создании потока мы должны указать , какую функцию он будет выполнять . Выполнение потока начинается с нее , а завершение - либо явно с помощью pthread_exit , либо неявно при выходе из этой функции .

Функция pthread_join выполняет аналогичную роль , что и waitpid для fork :

  
 	int pthread_join( pthread_t t , void status)
 
 	возвращает 0 в случае успеха
 
Узнать свой идентификатор потока может с помощью
  
  	pthread_t pthread_self()
 
 	это аналог getpid для процессов
 
Поток может быть либо присоединенным (joinable) , либо отсоединенным (detached). В первом случае поток после своего завершения остается в подвешенном состоянии и не освобождает своих ресурсов до тех пор , пока не будет вызвана для него pthread_join(). Во втором случае поток сам все освобождает .

Функция pthread_detach меняет состояние потока , превращая его из присоединенного в отсоединенный . Поток сам может вызвать эту функцию

  
 	pthread_detach(pthread_self())	
 
Одним из способов завершения является функция pthread_exit(void * status).

Теперь мы перепишем эхо-сервер , приведенный в главе 5 . Для каждого клиента вместо процесса будет создаваться поток .

  
 //threads/tcpserv01.c
 
 int main(int argc, char **argv)
 {
 	int				listenfd, connfd;
 	socklen_t		addrlen, len;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: tcpserv01 [  ] ");
 
 	cliaddr = Malloc(addrlen);
 
 	for ( ; ; ) {
 		len = addrlen;
 		connfd = Accept(listenfd, cliaddr, &len);
 
 		Pthread_create(NULL, NULL, &doit, (void *) connfd);
 	}
 }
 
 static void *
 doit(void *arg)
 {
 	Pthread_detach(pthread_self());
 	str_echo((int) arg);	/* same function as before */
 	Close((int) arg);		/* we are done with connected socket */
 	return(NULL);
 }
 
 
Когда accept возвращает управление , мы создаем поток и передаем в функцию потока doit дескриптор присоединенного сокета . Внутри потка мы вызываем Pthread_detach и делаем поток отсоединенным . Мы обязаны в ней закрыть сокет с помощью close() , потому что за поток этого никто не сделает .

В коде родительского процесса есть одна проблема : дескриптор сокета connfd не синхронизирован , и может возникнуть ситуацию , когда один и тот же дескриптор будет передан двум разным потокам . Для этого мы перепишем функцию main :

  
 //threads/tcpserv02.c
 
 int main(int argc, char **argv)
 {
 	int				listenfd, *iptr;
 	socklen_t		addrlen, len;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: tcpserv01 [  ] ");
 
 	cliaddr = Malloc(addrlen);
 
 	for ( ; ; ) {
 		len = addrlen;
 		iptr = Malloc(sizeof(int));
 		*iptr = Accept(listenfd, cliaddr, &len);
 
 		Pthread_create(NULL, NULL, &doit, iptr);
 	}
 }
 
 static void *
 doit(void *arg)
 {
 	int		connfd;
 
 	connfd = *((int *) arg);
 	free(arg);
 
 	Pthread_detach(pthread_self());
 	str_echo(connfd);		/* same function as before */
 	Close(connfd);			/* we are done with connected socket */
 	return(NULL);
 }
 
Каждый раз перед вызовом accept мы выделяем память для дескриптора , каждый поток получает свой дескриптор и освобождает память .

В параллельном программировании существует проблема совместного доступа разных потоков к глобальным данным , которые не синхронизированы . В следующем примере создаются 2 потока , которые по очереди делают инкремент одной глобальной переменной. Это пример того , как НЕ НАДО писать :

  
 #define	NLOOP 5000
 
 int				counter;		/* this is incremented by the threads */
 
 void	*doit(void *);
 
 int
 main(int argc, char **argv)
 {
 	pthread_t	tidA, tidB;
 
 	Pthread_create(&tidA, NULL, &doit, NULL);
 	Pthread_create(&tidB, NULL, &doit, NULL);
 
 	Pthread_join(tidA, NULL);
 	Pthread_join(tidB, NULL);
 
 	exit(0);
 }
 
 void *
 doit(void *vptr)
 {
 	int		i, val;
 
 	for (i = 0; i < NLOOP; i++) {
 		val = counter;
 		printf("%d: %d\n", pthread_self(), val + 1);
 		counter = val + 1;
 	}
 
 	return(NULL);
 }
 
 
Правильным является следующий подход , когда мы используем мьютекс .
  
 
 pthread_mutex_t	counter_mutex = PTHREAD_MUTEX_INITIALIZER;
 ...
 
 	for (i = 0; i < NLOOP; i++) {
 		Pthread_mutex_lock(&counter_mutex);
 
 		val = counter;
 		printf("%d: %d\n", pthread_self(), val + 1);
 		counter = val + 1;
 
 		Pthread_mutex_unlock(&counter_mutex);
 	}
 
Код внутри мьютекса гарантированно блокируется от доступа другим процессам .

Но это решение неоптимально , поскольку тратится дополнительное время процессора на проверку . Более правильным решением является использование условной переменной (conditional variable) в комбинации с мьютексом .

Условная переменная - это переменная типа pthread_cond_t , которая используется в 2-х функциях :

  
 	int pthread_cond_wait   ( pthread_cond_t *ptr , pthread_mutex_t * m)
 	int pthread_cond_signal ( pthread_cond_t *ptr , pthread_mutex_t * m)
 
 	обе возвращают 0 в случае успеха
 
Пример использования комбинации мютекс + условная переменная :

  
 #define	Pthread_mutex_lock(mptr) \
 	{	int  n; \
 		if ( (n = pthread_mutex_lock(mptr)) != 0) \
 			{ errno = n; err_sys("pthread_mutex_lock error"); } \
 	}
 #define	Pthread_mutex_unlock(mptr) \
 	{	int  n; \
 		if ( (n = pthread_mutex_unlock(mptr)) != 0) \
 			{ errno = n; err_sys("pthread_mutex_unlock error"); } \
 	}
 #define	Pthread_cond_wait(cptr,mptr) \
 	{	int  n; \
 		if ( (n = pthread_cond_wait(cptr,mptr)) != 0) \
 			{ errno = n; err_sys("pthread_cond_wait error"); } \
 	}
 #define	Pthread_cond_signal(cptr) \
 	{	int  n; \
 		if ( (n = pthread_cond_signal(cptr)) != 0) \
 			{ errno = n; err_sys("pthread_cond_signal error"); } \
 	}
 
 #define	NLOOP	   	 50
 #define	BUFFSIZE	 10
 
 struct buf_t {
   int		b_buf[BUFFSIZE];	/* the buffer which contains integer items */
   int		b_nitems;			/* #items currently in buffer */
   int		b_nextget;
   int		b_nextput;
   pthread_mutex_t	b_mutex;
   pthread_cond_t	b_cond_consumer;	/* consumer waiting to get */
   pthread_cond_t	b_cond_producer;	/* producer waiting to put */
 } buf_t;
 
 void	*produce_loop(void *);
 void	*consume_loop(void *);
 
 int
 main(int argc, char **argv)
 {
 	int			n;
 	pthread_t	tidA, tidB;
 
 	printf("main, addr(stack) = %x, addr(global) = %x, addr(func) = %x\n",
 			&n, &buf_t, &produce_loop);
 	if ( (n = pthread_create(&tidA, NULL, &produce_loop, NULL)) != 0)
 		errno = n, err_sys("pthread_create error for A");
 	if ( (n = pthread_create(&tidB, NULL, &consume_loop, NULL)) != 0)
 		errno = n, err_sys("pthread_create error for B");
 
 		/* wait for both threads to terminate */
 	if ( (n = pthread_join(tidA, NULL)) != 0)
 		errno = n, err_sys("pthread_join error for A");
 	if ( (n = pthread_join(tidB, NULL)) != 0)
 		errno = n, err_sys("pthread_join error for B");
 
 	exit(0);
 }
 
 void
 produce(struct buf_t *bptr, int val)
 {
 	Pthread_mutex_lock(&bptr->b_mutex);
 		/* Wait if buffer is full */
 	while (bptr->b_nitems >= BUFFSIZE)
 		Pthread_cond_wait(&bptr->b_cond_producer, &bptr->b_mutex);
 
 		/* There is room, store the new value */
 	printf("produce %d\n", val);
 	bptr->b_buf[bptr->b_nextput] = val;
 	if (++bptr->b_nextput >= BUFFSIZE)
 		bptr->b_nextput = 0;
 	bptr->b_nitems++;
 
 		/* Signal consumer */
 	Pthread_cond_signal(&bptr->b_cond_consumer);
 	Pthread_mutex_unlock(&bptr->b_mutex);
 }
 
 int
 consume(struct buf_t *bptr)
 {
 	int		val;
 
 	Pthread_mutex_lock(&bptr->b_mutex);
 		/* Wait if buffer is empty */
 	while (bptr->b_nitems <= 0)
 		Pthread_cond_wait(&bptr->b_cond_consumer, &bptr->b_mutex);
 
 		/* There is data, fetch the value */
 	val = bptr->b_buf[bptr->b_nextget];
 	printf("consume %d\n", val);
 	if (++bptr->b_nextget >= BUFFSIZE)
 		bptr->b_nextget = 0;
 	bptr->b_nitems--;
 
 		/* Signal producer; it might be waiting for space to store */
 	Pthread_cond_signal(&bptr->b_cond_producer);
 	Pthread_mutex_unlock(&bptr->b_mutex);
 
 	return(val);
 }
 
 void *
 produce_loop(void *vptr)
 {
 	int		i;
 
 	printf("produce_loop thread, addr(stack) = %x\n", &i);
 	for (i = 0; i < NLOOP; i++) {
 		produce(&buf_t, i);
 	}
 
 	return(NULL);
 }
 
 void *
 consume_loop(void *vptr)
 {
 	int		i, val;
 
 	printf("consume_loop thread, addr(stack) = %x\n", &i);
 	for (i = 0; i < NLOOP; i++) {
 		val = consume(&buf_t);
 	}
 
 	return(NULL);
 }
 
 

Яковлев С: По мотивам этой главы я написал тестовое приложение , которое выполняет следующую работу в модели "producer-consumer":

 1 Собираем утилиту и запускаем ее с 2-мя параметрами ,
    первый параметр - размер линейного списка , второй параметр - число потоков
 2 Создаем 2 массива потоков , которые параллельно начинают "насиловать" эту очередь :
 		добавлять в нее элементы и забирать их из нее .  
   Запустить можно например так :
     prodcons 100000 10 
 
 Код главной функции :
 
 int main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS] , count2[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
 
 	List = NULL ;
 
 	if (argc != 3)
 	{
 		printf("usage: prodcons <#items> <#threads>\n");
 		exit(0);
 	}
 	nitems = atoi(argv[1]);
 	nthreads = atoi(argv[2]);
 
 	for (i = 0; i < nthreads; i++) 
 	{
 		count[i] = 0;
 		pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 		count2[i] = 0;
 		pthread_create(&tid_consume[i], NULL, consume, &count2[i]);
 	}
 
 	for (i = 0; i < nthreads; i++) 
 	{
 		pthread_join(tid_produce[i], NULL);
 		pthread_join(tid_consume[i], NULL);
 		printf("count[%d] = %d  count2[%d] = %d\n", i, count[i], i, count2[i]);	
 	}
 
 	printf("producer_sum =%d  consumer_sum =%d\n" , put.producer_sum , get.consumer_sum);  // results in len == 3
 	printf("producer counter =%d  consumer counter =%d\n" , put.count , get.count);  // results in len == 3
     printf("queue length =%d\n",Length("List",List));  // results in len == 3
 
 	exit(0);
 }
 
 Исходники prodcons
 
 

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье