Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Books
  Краткое описание
 Linux
 W. R. Стивенс TCP 
 W. R. Стивенс IPC 
 A.Rubini-J.Corbet 
 K. Bauer 
 Gary V. Vaughan 
 Д Вилер 
 В. Сталлинг 
 Pramode C.E. 
 Steve Pate 
 William Gropp 
 K.A.Robbins 
 С Бекман 
 Р Стивенс 
 Ethereal 
 Cluster 
 Languages
 C
 Perl
 M.Pilgrim 
 А.Фролов 
 Mendel Cooper 
 М Перри 
 Kernel
 C.S. Rodriguez 
 Robert Love 
 Daniel Bovet 
 Д Джеф 
 Максвелл 
 G. Kroah-Hartman 
 B. Hansen 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5164 
 Trees...935 
 Максвелл 3...861 
 Go Web ...814 
 William Gropp...796 
 Ethreal 3...779 
 Ethreal 4...766 
 Gary V.Vaughan-> Libtool...765 
 Rodriguez 6...756 
 Steve Pate 1...749 
 Ext4 FS...748 
 Clickhouse...748 
 Ethreal 1...736 
 Secure Programming for Li...721 
 C++ Patterns 3...711 
 Ulrich Drepper...693 
 Assembler...687 
 DevFS...655 
 Стивенс 9...644 
 MySQL & PosgreSQL...622 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Глава 27 : Клиент-сервер

Код данной главы лежит тут

Серверную архитектуру можно разбить на следующие направления :

  
 	1 Последовательные серверы - в текущий момент может обрабатывать одного клиента ,
 		остальные стоят в очереди
 	2 Параллельные серверы на основе fork - традиционное большинство юниксовых серверов
 	3 Параллельные серверы на основе select 
 	4 Параллельные серверы на основе потоков 
 
В этой главе рассматриваются еще 2 разновидности параллельных серверов :
  
 	1 Предварительное создание дочерних процессов - preforking . При запуске сервера создается
 		пул дочерних процессов  
 	2 Предварительное создание дочерних потоков - prethreading . При запуске сервера создается
 		пул дочерних процессов
 
Сравнительные характеристики различных серверных архитектур можно свести в таблице :
Для всех серверов будет использоваться одна версия клиента -
  
 //server/client.c
 
 #define	MAXN	16384		/* max #bytes to request from server */
 
 int
 main(int argc, char **argv)
 {
 	int		i, j, fd, nchildren, nloops, nbytes;
 	pid_t	pid;
 	ssize_t	n;
 	char	request[MAXLINE], reply[MAXN];
 
 	if (argc != 6)
 		err_quit("usage: client < hostname or IPaddr>  <#children> "
 				 "<#loops/child> <#bytes/request>");
 
 	nchildren = atoi(argv[3]);
 	nloops = atoi(argv[4]);
 	nbytes = atoi(argv[5]);
 	snprintf(request, sizeof(request), "%d\n", nbytes); /* newline at end */
 
 	for (i = 0; i < nchildren; i++) {
 		if ( (pid = Fork()) == 0) {		/* child */
 			for (j = 0; j < nloops; j++) {
 				fd = Tcp_connect(argv[1], argv[2]);
 
 				Write(fd, request, strlen(request));
 
 				if ( (n = Readn(fd, reply, nbytes)) != nbytes)
 					err_quit("server returned %d bytes", n);
 
 				Close(fd);		/* TIME_WAIT on client, not server */
 			}
 			printf("child %d done\n", i);
 			exit(0);
 		}
 		/* parent loops around to fork() again */
 	}
 
 	while (wait(NULL) > 0)	/* now parent waits for all children */
 		;
 	if (errno != ECHILD)
 		err_sys("wait error");
 
 	exit(0);
 }
  	
 
Каждый раз при запуске клиента мы задаем ip , порт сервера , число дочерних процессов , порождаемых с помощью fork , число запросов , которое каждый дочерний процесс будет посылать серверу , и количество байт , отсылаемых сервером. Родительский процесс клиента форкает дочерние процессы , и каждый из них порождает коннект с сервером . Потом клиент закрывает каждое соединение , при этом TIME_WAIT имеет место на его стороне. Формат запуска клиента из командной строки :
  
 # client < hostname or IPaddr> < port> < #children> < #loops/child> < #bytes/request>
 
Рассмотрим параллельный сервер с одним дочерним процессом для каждого клиента .
  
 //server/serv01.c 
 
 int
 main(int argc, char **argv)
 {
 	int					listenfd, connfd;
 	pid_t				childpid;
 	void				sig_chld(int), sig_int(int), web_child(int);
 	socklen_t			clilen, addrlen;
 	struct sockaddr		*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv01 [ < host> ] < port#>");
 	cliaddr = Malloc(addrlen);
 
 	Signal(SIGCHLD, sig_chld);
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		if ( (connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
 			if (errno == EINTR)
 				continue;		/* back to for() */
 			else
 				err_sys("accept error");
 		}
 
 		if ( (childpid = Fork()) == 0) {	/* child process */
 			Close(listenfd);	/* close listening socket */
 			web_child(connfd);	/* process the request */
 			exit(0);
 		}
 		Close(connfd);			/* parent closes connected socket */
 	}
 }
 /* end serv01 */
 
 /* include sigint */
 void
 sig_int(int signo)
 {
 	void	pr_cpu_time(void);
 
 	pr_cpu_time();
 	exit(0);
 }
 /* end sigint */
 
 

Рассмотрим вариант preforking-сервера , который работает для систем 4.4BSD . Сервер генерирует сразу при старте необходимое количество процессов. Если в какой-то момент их число будет исчерпано , сервер будет продолжать обслуживать клиентов , но процесс замедлится . Сервер должен постоянно проверять количество свободных дочерних процессов , и когда их число становится ниже допустимой нормы , генерировать их опять . С другой стороны , если их слишком много , сервер должен их прибивать . Рассмотрим первую версию такого сервера .

  
 //server/serv02.c
 
 static int		nchildren;
 static pid_t	*pids;
 
 int
 main(int argc, char **argv)
 {
 	int			listenfd, i;
 	socklen_t	addrlen;
 	void		sig_int(int);
 	pid_t		child_make(int, int, int);
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv02 [ < host> ] < port#> <#children>");
 	nchildren = atoi(argv[argc-1]);
 	pids = Calloc(nchildren, sizeof(pid_t));
 
 	for (i = 0; i < nchildren; i++)
 		pids[i] = child_make(i, listenfd, addrlen);	/* parent returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; )
 		pause();	/* everything done by children */
 }
 /* end serv02 */
 
 /* include sigint */
 void
 sig_int(int signo)
 {
 	int		i;
 	void	pr_cpu_time(void);
 
 		/* 4terminate all children */
 	for (i = 0; i < nchildren; i++)
 		kill(pids[i], SIGTERM);
 	while (wait(NULL) > 0)		/* wait for all children */
 		;
 	if (errno != ECHILD)
 		err_sys("wait error");
 
 	pr_cpu_time();
 	exit(0);
 }
 /* end sigint */
 
 
Дополнительный аргумент в командной строке указывает , сколько нужно задать дочерних процессов . Выделяется массив , в который записываются из pid-ы. Дочерний процесс генерится с помощью child_make :
  
 //server/child02.c
 
 pid_t
 child_make(int i, int listenfd, int addrlen)
 {
 	pid_t	pid;
 	void	child_main(int, int, int);
 
 	if ( (pid = Fork()) > 0)
 		return(pid);		/* parent */
 
 	child_main(i, listenfd, addrlen);	/* never returns */
 }
 /* end child_make */
 
 /* include child_main */
 void
 child_main(int i, int listenfd, int addrlen)
 {
 	int				connfd;
 	void			web_child(int);
 	socklen_t		clilen;
 	struct sockaddr	*cliaddr;
 
 	cliaddr = Malloc(addrlen);
 
 	printf("child %ld starting\n", (long) getpid());
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 /* end child_main */
 
 
В следующей версии сервера будет реализована защита вызова функции accept при помощи блокировки на основе функции fcntl . В главной функции main будет добавлена функция my_lock_init перед началом цикла , в котором будут создаваться дочерние процессы . В функции child_main появляется блокировка перед вызовом функции accept и снятие блокировки после ее завершения .
  
 		my_lock_wait();
 		connfd = Accept(listenfd, cliaddr, &clilen);
 		my_lock_release();
 
Функция my_lock_init :
  
 void
 my_lock_init(char *pathname)
 {
     char	lock_file[1024];
 
 		/* 4must copy caller's string, in case it's a constant */
     strncpy(lock_file, pathname, sizeof(lock_file));
     Mktemp(lock_file);
 
     lock_fd = Open(lock_file, O_CREAT | O_WRONLY, FILE_MODE);
     Unlink(lock_file);			/* but lock_fd remains open */
 
 	lock_it.l_type = F_WRLCK;
 	lock_it.l_whence = SEEK_SET;
 	lock_it.l_start = 0;
 	lock_it.l_len = 0;
 
 	unlock_it.l_type = F_UNLCK;
 	unlock_it.l_whence = SEEK_SET;
 	unlock_it.l_start = 0;
 	unlock_it.l_len = 0;
 }
 /* end my_lock_init */
 
 
Создается временный файл , который хранится до тех пор , пока есть ссылки. Инициализируются 2 структуры flock : одна для блокировки файла , другая для снятия . Функции , устанавливающие и снимающие блокировку на файл :
  
 /* include my_lock_wait */
 void
 my_lock_wait()
 {
     int		rc;
     
     while ( (rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
 		if (errno == EINTR)
 			continue;
     	else
 			err_sys("fcntl error for my_lock_wait");
 	}
 }
 
 void
 my_lock_release()
 {
     if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0)
 		err_sys("fcntl error for my_lock_release");
 }
 /* end my_lock_wait */
 
Модернизируем последнюю версию сервера и заменим блокировку на основе файловых операций блокировкой с помощью мьютексов . Функция my_lock_init теперь будет выглядеть так :
  
 void
 my_lock_init(char *pathname)
 {
 	int		fd;
 	pthread_mutexattr_t	mattr;
 
 	fd = Open("/dev/zero", O_RDWR, 0);
 
 	mptr = Mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
 				MAP_SHARED, fd, 0);
 	Close(fd);
 
 	Pthread_mutexattr_init(&mattr);
 	Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
 	Pthread_mutex_init(mptr, &mattr);
 }
 /* end my_lock_init */
 
 /* include my_lock_wait */
 void
 my_lock_wait()
 {
 	Pthread_mutex_lock(mptr);
 }
 
 void
 my_lock_release()
 {
 	Pthread_mutex_unlock(mptr);
 }
 /* end my_lock_wait */
 	
 
Последней версией сервера с предварительным порождением процессов будет версия , в которой accept вызывается только родителем , и который передает присоединенный сокет потомку . Создадим структуру , содержащую информацию о дочернем процессе :
  
 typedef struct {
   pid_t		child_pid;		/* process ID */
   int		child_pipefd;	/* parent's stream pipe to/from child */
   int		child_status;	/* 0 = ready */
   long		child_count;	/* #connections handled */
 } Child;
 
Функция child_make :
  
 pid_t
 child_make(int i, int listenfd, int addrlen)
 {
 	int		sockfd[2];
 	pid_t	pid;
 	void	child_main(int, int, int);
 
 	Socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
 
 	if ( (pid = Fork()) > 0) {
 		Close(sockfd[1]);
 		cptr[i].child_pid = pid;
 		cptr[i].child_pipefd = sockfd[0];
 		cptr[i].child_status = 0;
 		return(pid);		/* parent */
 	}
 
 	Dup2(sockfd[1], STDERR_FILENO);		/* child's stream pipe to parent */
 	Close(sockfd[0]);
 	Close(sockfd[1]);
 	Close(listenfd);					/* child does not need this open */
 	child_main(i, listenfd, addrlen);	/* never returns */
 }
 
Функция main :
  
 //server/serv05.c
 
 static int		nchildren;
 
 int
 main(int argc, char **argv)
 {
 	int			listenfd, i, navail, maxfd, nsel, connfd, rc;
 	void		sig_int(int);
 	pid_t		child_make(int, int, int);
 	ssize_t		n;
 	fd_set		rset, masterset;
 	socklen_t	addrlen, clilen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv05 [  ]  <#children>");
 
 	FD_ZERO(&masterset);
 	FD_SET(listenfd, &masterset);
 	maxfd = listenfd;
 	cliaddr = Malloc(addrlen);
 
 	nchildren = atoi(argv[argc-1]);
 	navail = nchildren;
 	cptr = Calloc(nchildren, sizeof(Child));
 
 		/* 4prefork all the children */
 	for (i = 0; i < nchildren; i++) {
 		child_make(i, listenfd, addrlen);	/* parent returns */
 		FD_SET(cptr[i].child_pipefd, &masterset);
 		maxfd = max(maxfd, cptr[i].child_pipefd);
 	}
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		rset = masterset;
 		if (navail <= 0)
 			FD_CLR(listenfd, &rset);	/* turn off if no available children */
 		nsel = Select(maxfd, &rset, NULL, NULL, NULL);
 
 			/* 4check for new connections */
 		if (FD_ISSET(listenfd, &rset)) {
 			clilen = addrlen;
 			connfd = Accept(listenfd, cliaddr, &clilen);
 
 			for (i = 0; i < nchildren; i++)
 				if (cptr[i].child_status == 0)
 					break;				/* available */
 
 			if (i == nchildren)
 				err_quit("no available children");
 			cptr[i].child_status = 1;	/* mark child as busy */
 			cptr[i].child_count++;
 			navail--;
 
 			n = Write_fd(cptr[i].child_pipefd, "", 1, connfd);
 			Close(connfd);
 			if (--nsel == 0)
 				continue;	/* all done with select() results */
 		}
 
 			/* 4find any newly-available children */
 		for (i = 0; i < nchildren; i++) {
 			if (FD_ISSET(cptr[i].child_pipefd, &rset)) {
 				if ( (n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
 					err_quit("child %d terminated unexpectedly", i);
 				cptr[i].child_status = 0;
 				navail++;
 				if (--nsel == 0)
 					break;	/* all done with select() results */
 			}
 		}
 	}
 }
 
Теперь перейдем к серверу на основе потоков : в ней содержится один поток для каждого клиента .
  
 //server/serv06.c
 
 int
 main(int argc, char **argv)
 {
 	int				listenfd, connfd;
 	void			sig_int(int);
 	void			*doit(void *);
 	pthread_t		tid;
 	socklen_t		clilen, addrlen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 2)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 3)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv06 [ < host> ] < port#>");
 	cliaddr = Malloc(addrlen);
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		Pthread_create(&tid, NULL, &doit, (void *) connfd);
 	}
 }
 
 
Основной поток блокируется в вызове функции accept , и каждый раз , когда приходит новое клиентское соединение , pthread_create создает новый поток с функцией doit . Потоки неприсоединенные .

В следующей версии мы напишем сервер , в котором каждый поток будет вызывать accept . Структура , содержащая информацию о потоке :

  
 //server/pthread07.h 
 
 typedef struct {
   pthread_t		thread_tid;		/* thread ID */
   long			thread_count;	/* #connections handled */
 } Thread;
 Thread	*tptr;		/* array of Thread structures; calloc'ed */
 
Функция main :
  
 //server/serv07.c
 
 pthread_mutex_t	mlock = PTHREAD_MUTEX_INITIALIZER;
 
 int
 main(int argc, char **argv)
 {
 	int		i;
 	void	sig_int(int), thread_make(int);
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv07 [ < host> ] < port#> <#threads>");
 	nthreads = atoi(argv[argc-1]);
 	tptr = Calloc(nthreads, sizeof(Thread));
 
 	for (i = 0; i < nthreads; i++)
 		thread_make(i);			/* only main thread returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; )
 		pause();	/* everything done by threads */
 }
  
 
Фугкция thread_main ;
  
 //server/pthread07.c
 
 void
 thread_make(int i)
 {
 	void	*thread_main(void *);
 
 	Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
 	return;		/* main thread returns */
 }
 
 void *
 thread_main(void *arg)
 {
 	int				connfd;
 	void			web_child(int);
 	socklen_t		clilen;
 	struct sockaddr	*cliaddr;
 
 	cliaddr = Malloc(addrlen);
 
 	printf("thread %d starting\n", (int) arg);
 	for ( ; ; ) {
 		clilen = addrlen;
     	Pthread_mutex_lock(&mlock);
 		connfd = Accept(listenfd, cliaddr, &clilen);
 		Pthread_mutex_unlock(&mlock);
 		tptr[(int) arg].thread_count++;
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 
 
Ну и наконец последняя версия сервера : главный поток создает пул потоков , после чего он же вызывает accept и передает клиентское соединение одному из свободных потоков . Структура потока :
  
 //server/pthread08.h
 
 typedef struct {
   pthread_t		thread_tid;		/* thread ID */
   long			thread_count;	/* #connections handled */
 } Thread;
 Thread	*tptr;		/* array of Thread structures; calloc'ed */
 
 #define	MAXNCLI	32
 int					clifd[MAXNCLI], iget, iput;
 pthread_mutex_t		clifd_mutex;
 pthread_cond_t		clifd_cond;
 
Функция main :
  
 //server/serv08.c
 
 static int			nthreads;
 pthread_mutex_t		clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t		clifd_cond = PTHREAD_COND_INITIALIZER;
 
 int
 main(int argc, char **argv)
 {
 	int			i, listenfd, connfd;
 	void		sig_int(int), thread_make(int);
 	socklen_t	addrlen, clilen;
 	struct sockaddr	*cliaddr;
 
 	if (argc == 3)
 		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
 	else if (argc == 4)
 		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
 	else
 		err_quit("usage: serv08 [  ]  <#threads>");
 	cliaddr = Malloc(addrlen);
 
 	nthreads = atoi(argv[argc-1]);
 	tptr = Calloc(nthreads, sizeof(Thread));
 	iget = iput = 0;
 
 		/* 4create all the threads */
 	for (i = 0; i < nthreads; i++)
 		thread_make(i);		/* only main thread returns */
 
 	Signal(SIGINT, sig_int);
 
 	for ( ; ; ) {
 		clilen = addrlen;
 		connfd = Accept(listenfd, cliaddr, &clilen);
 
 		Pthread_mutex_lock(&clifd_mutex);
 		clifd[iput] = connfd;
 		if (++iput == MAXNCLI)
 			iput = 0;
 		if (iput == iget)
 			err_quit("iput = iget = %d", iput);
 		Pthread_cond_signal(&clifd_cond);
 		Pthread_mutex_unlock(&clifd_mutex);
 	}
 }
 /* end serv08 */
 
 void
 sig_int(int signo)
 {
 	int		i;
 	void	pr_cpu_time(void);
 
 	pr_cpu_time();
 
 	for (i = 0; i < nthreads; i++)
 		printf("thread %d, %ld connections\n", i, tptr[i].thread_count);
 
 	exit(0);
 }
 
Функция thread_make :
  
 void
 thread_make(int i)
 {
 	void	*thread_main(void *);
 
 	Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
 	return;		/* main thread returns */
 }
 
 void *
 thread_main(void *arg)
 {
 	int		connfd;
 	void	web_child(int);
 
 	printf("thread %d starting\n", (int) arg);
 	for ( ; ; ) {
     	Pthread_mutex_lock(&clifd_mutex);
 		while (iget == iput)
 			Pthread_cond_wait(&clifd_cond, &clifd_mutex);
 		connfd = clifd[iget];	/* connected socket to service */
 		if (++iget == MAXNCLI)
 			iget = 0;
 		Pthread_mutex_unlock(&clifd_mutex);
 		tptr[(int) arg].thread_count++;
 
 		web_child(connfd);		/* process the request */
 		Close(connfd);
 	}
 }
 

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье