Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Глава 6 : Очереди сообщений System V

Исходники для этой страницы лежат тут

Очередь сообщений можно рассматривать как связный список сообщений . Сообщения помещаются в очередь и извлекаются из нее. Каждое сообщение есть запись , и каждому сообщению присваивается приоритет. Отличие очереди от фифо в том , что для фифо нельзя произвести запись в канал до тех пор , пока не появится считывающий данные процесс. Для добавления в очередь нет необходимости в ожидающем процессе . Отличие очереди от каналов также в том , что данные в очереди сохраняются и после завершения работы процесса , который создал эту очередь , в то время как в каналах это невозможно .

Ядро хранит очереди в обьектах типа msgid_ds :
Пример очереди из 3-х сообщений :
Создать новую очередь или получить доступ к уже существующей можно с помощью

       
 	int msgget(key_t key , int flag)
 
Функция возвращает положительный идентификатор в случае успеха. Ключ нужно получить с помощью ftok. Флаг представляет из себя комбинацию флагов на чтение-запись.

При создании новой очереди инициализируются поля :

       
 	msg_perm.uid  - пользователь
 	msg_perm.cuid - группа
 	msg_perm.mode - флаги
 	msg_qnum = msg_lspid = msg_lrpid = msg_time = 0 
 	msg_ctime - текущее время
 	msg_qbytes - системное ограничение на размер очереди
 
Помещать сообщения в очередь можно с помощью
       
 	int msgsnd(int msgid , const void *ptr , size_t len , int flag)
 	
 	Возвращает 0 в случае успеха
 
Указатель ptr указывает на тип сообщения :
       
 	struct msgbuf
 	{
 		long mtype;
 		char mtext[]
 	}
 
Можно указывать свои собственные произвольные типы сообщений - например такое , которое будет состоять из 2-х полей - числа и текста :
       
 	struct my_msgbuf
 	{
 		long mtype;
 		int m1;
 		char mtext[1024];
 	}
 
Аргумент flag в функции msgsnd может быть 0 либо IPC_NOWAIT. Последний означает , что если в очереди нет свободного места , ядро не будет ждать , когда оно появится , и блокировки не будет . В этом случае msgsnd вернет ошибку .

Сообщение может быть считано с помощью функции

       
 	ssize_t msgrcv(int msgid , const void *ptr , size_t len , long type , int flag)
 
ptr указывает на приемник данных.

Управлять очередями сообщений можно с помощью

       
 	int msgctl(int msgid , int cmd , struct msgid_ds *buf)
 
 	Возвращает 0 в случае успеха
 
Аргумент cmd может принимать 3 значения :
       
 	IPC_RMID - удаление очереди с идентификатором msgid
 	IPC_SET  - модификация пермишинов очереди
 	IPC_STAT - возвращает структуру очереди
 
Напишем программу , которая создаст очередь , поместит в нее одно-байтовое сообщение , вызовет функцию msgstat , выполнит команду ipcs , а затем удалит очередь .

       
 //svmsg/ctl.c
 
 int main(int argc, char **argv)
 {
 	int				msqid;
 	struct msqid_ds	info;
 	struct msgbuf buf ;
 
 
 	msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
 
 	buf.mtype = 1;
 	buf.mtext[0] = 1;
 	Msgsnd(msqid, &buf, 1, 0);
 
 	Msgctl(msqid, IPC_STAT, &info);
 	printf("read-write: %03o, cbytes = %lu, qnum = %lu, qbytes = %lu\n",
 		   info.msg_perm.mode & 0777, (long) info.msg_cbytes,
 		   (long) info.msg_qnum, (long) info.msg_qbytes);
 
 	system("ipcs -q");
 
 	Msgctl(msqid, IPC_RMID, NULL);
 	exit(0);
 }
 
У меня вывод был такой
       
 read-write: 000, cbytes = 0, qnum = 0, qbytes = 0
 
 ------ Message Queues --------
 key        msqid      owner      perms      used-bytes   messages
 0x00000000 425984     root       311        1            1
 
Напишем клиент-серверную программу с использованием 2-х очередей сообщений , одна из них будет передавать сообщения от клиента серверу , вторая наоборот

Сценарий работы следующий : запускаем сначала сервер , потом в другом окне клиента. В клиенте набираем путь к файлу , содержание которого хотим получить . Сервер создает обе очереди , если они уже существуют , это не страшно , ибо мы не указываем флаг IPC_EXCL . В функции server() будут вызваны функции mesg_send и mesg_recv :

       
 //svmsgcliserv/server_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 	readid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
 	writeid = Msgget(MQ_KEY2, SVMSG_MODE | IPC_CREAT);
 
 	server(readid, writeid);
 
 	exit(0);
 }
 
Клиент открывает созданные сервером очереди :
       
 //svmsgcliserv/client_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 		/* 4assumes server has created the queues */
 	writeid = Msgget(MQ_KEY1, 0);
 	readid = Msgget(MQ_KEY2, 0);
 
 	client(readid, writeid);
 
 		/* 4now we can delete the queues */
 	Msgctl(readid, IPC_RMID, NULL);
 	Msgctl(writeid, IPC_RMID, NULL);
 
 	exit(0);
 }
 
 
Функции mesg_send и mesg_recv :
       
  ssize_t mesg_send(int id, struct mymesg *mptr)
 {
 	return(msgsnd(id, &(mptr->mesg_type), mptr->mesg_len, 0));
 }
 
 
 ssize_t mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
 	mptr->mesg_len = n;		/* return #bytes of data */
 
 	return(n);				/* -1 on error, 0 at EOF, else >0 */
 }
 
Наличие поля type в каждом сообщении дает возможность установить произвольный доступ к сообщениям. В pipe и fifo данные считываются именно в том порядке , в котором они поступили. В очередях можно установить произвольный доступ .

В следующем примере будет один сервер обслуживать одновременно несколько клиентов , причем мы создадим всего одну очередь , которая будет обслуживать всех. Клиент в качестве своего идентификатора будет отсылать серверу pid процесса . Сообщение , посылаемое серверу , будет иметь тип 1 , сообщения , посылаемые сервером , будут иметь pid текущего клиента . Создается единственная очередь , идентификатор которой будет использоваться в качестве аргумента.

Сценарий все тот же : запускаем сервер , а затем несколько клиентов , каждый из которых запрашивает путь к файлу и получает от сервера его содержимое , и все это выполняется с помощью одной очереди :

       
 //svmsgmpxlq/server_main.c
 
 int main(int argc, char **argv)
 {
 	int		msqid;
 
 	msqid = Msgget(MQ_KEY1, SVMSG_MODE | IPC_CREAT);
 
 	server(msqid, msqid);	/* same queue for both directions */
 
 	exit(0);
 }
 
 void server(int readfd, int writefd)
 {
 	FILE	*fp;
 	char	*ptr;
 	pid_t	pid;
 	ssize_t	n;
 	struct mymesg	mesg;
 
 	for ( ; ; ) {
 			/* 4read pathname from IPC channel */
 		mesg.mesg_type = 1;
 		if ( (n = Mesg_recv(readfd, &mesg)) == 0) {
 			err_msg("pathname missing");
 			continue;
 		}
 		mesg.mesg_data[n] = '\0';	/* null terminate pathname */
 
 		if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
 			err_msg("bogus request: %s", mesg.mesg_data);
 			continue;
 		}
 
 		*ptr++ = 0;			/* null terminate PID, ptr = pathname */
 		pid = atol(mesg.mesg_data);
 		mesg.mesg_type = pid;	/* for messages back to client */
 
 		if ( (fp = fopen(ptr, "r")) == NULL) {
 				/* 4error: must tell client */
 			snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
 					 ": can't open, %s\n", strerror(errno));
 			mesg.mesg_len = strlen(ptr);
 			memmove(mesg.mesg_data, ptr, mesg.mesg_len);
 			Mesg_send(writefd, &mesg);
 	
 		} else {
 				/* 4fopen succeeded: copy file to IPC channel */
 			while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
 				mesg.mesg_len = strlen(mesg.mesg_data);
 				Mesg_send(writefd, &mesg);
 			}
 			Fclose(fp);
 		}
 	
 			/* 4send a 0-length message to signify the end */
 		mesg.mesg_len = 0;
 		Mesg_send(writefd, &mesg);
 	}
 }
 
 
 
 
 
 int main(int argc, char **argv)
 {
 	int		msqid;
 
 		/* 4server must create the queue */
 	msqid = Msgget(MQ_KEY1, 0);
 
 	client(msqid, msqid);	/* same queue for both directions */
 
 	exit(0);
 }
 
 void client(int readfd, int writefd)
 {
 	size_t	len;
 	ssize_t	n;
 	char	*ptr;
 	struct mymesg	mesg;
 
 		/* 4start buffer with pid and a blank */
 	snprintf(mesg.mesg_data, MAXMESGDATA, "%ld ", (long) getpid());
 	len = strlen(mesg.mesg_data);
 	ptr = mesg.mesg_data + len;
 
 		/* 4read pathname */
 	Fgets(ptr, MAXMESGDATA - len, stdin);
 	len = strlen(mesg.mesg_data);
 	if (mesg.mesg_data[len-1] == '\n')
 		len--;				/* delete newline from fgets() */
 	mesg.mesg_len = len;
 	mesg.mesg_type = 1;
 
 		/* 4write PID and pathname to IPC channel */
 	Mesg_send(writefd, &mesg);
 
 		/* 4read from IPC, write to standard output */
 	mesg.mesg_type = getpid();
 	while ( (n = Mesg_recv(readfd, &mesg)) > 0)
 		Write(STDOUT_FILENO, mesg.mesg_data, n);
 }
 
 

Мы только что имели дело с последовательным сервером , все клиенты вставали в одну очередь на обслуживание .

Теперь мы напишем другую программу , в которой сервер станет параллельным и будет работать по такой схеме :
Каждый клиент будет создавать свою очередь с ключом IPC_PRIVATE. Здесь вместо pid процесса клиенты будут сообщать серверу id очереди , с которой сервер и будет работать .

       
 //svmsgmpxnq/client_main.c
 
 int main(int argc, char **argv)
 {
 	int		readid, writeid;
 
 		/* 4server must create its well-known queue */
 	writeid = Msgget(MQ_KEY1, 0);
 		/* 4we create our own private queue */
 	readid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
 
 	client(readid, writeid);
 
 		/* 4and delete our private queue */
 	Msgctl(readid, IPC_RMID, NULL);
 
 	exit(0);
 }
 
 void client(int readid, int writeid)
 {
 	size_t	len;
 	ssize_t	n;
 	char	*ptr;
 	struct mymesg	mesg;
 
 		/* 4start buffer with msqid and a blank */
 	snprintf(mesg.mesg_data, MAXMESGDATA, "%d ", readid);
 	len = strlen(mesg.mesg_data);
 	ptr = mesg.mesg_data + len;
 
 		/* 4read pathname */
 	Fgets(ptr, MAXMESGDATA - len, stdin);
 	len = strlen(mesg.mesg_data);
 	if (mesg.mesg_data[len-1] == '\n')
 		len--;				/* delete newline from fgets() */
 	mesg.mesg_len = len;
 	mesg.mesg_type = 1;
 
 		/* 4write msqid and pathname to server's well-known queue */
 	Mesg_send(writeid, &mesg);
 
 		/* 4read from our queue, write to standard output */
 	while ( (n = Mesg_recv(readid, &mesg)) > 0)
 		Write(STDOUT_FILENO, mesg.mesg_data, n);
 }
 
 
Поскольку для каждого клиента будет порождаться процесс , нужно позаботиться о зомби. Функция обработки SIGCHLD :
       
 void sig_chld(int signo)
 {
 	pid_t	pid;
 	int		stat;
 
 	while ( (pid = waitpid(-1, &stat, WNOHANG)) > 0)
 		;
 	return;
 }
 
В ней происходит опрос статусов всех созданных дочерних процессов. При выходе из этой функции происходит нежелательное прерывание другой серверной функции - msgrcv . Для обработки возврата из функции sig_chld мы напишем обертку Mesg_recv , которая будет проверять , что вернула функция sig_chld , и если это ошибка , мы просто еще раз вызываем mesg_recv.
       
 void server(int readid, int writeid)
 {
 	FILE	*fp;
 	char	*ptr;
 	ssize_t	n;
 	struct mymesg	mesg;
 	void	sig_chld(int);
 
 	Signal(SIGCHLD, sig_chld);
 
 	for ( ; ; ) {
 			/* 4read pathname from our well-known queue */
 		mesg.mesg_type = 1;
 		if ( (n = Mesg_recv(readid, &mesg)) == 0) {
 			err_msg("pathname missing");
 			continue;
 		}
 		mesg.mesg_data[n] = '\0';	/* null terminate pathname */
 
 		if ( (ptr = strchr(mesg.mesg_data, ' ')) == NULL) {
 			err_msg("bogus request: %s", mesg.mesg_data);
 			continue;
 		}
 		*ptr++ = 0;			/* null terminate msgid, ptr = pathname */
 		writeid = atoi(mesg.mesg_data);
 
 		if (Fork() == 0) {		/* child */
 			if ( (fp = fopen(ptr, "r")) == NULL) {
 					/* 4error: must tell client */
 				snprintf(mesg.mesg_data + n, sizeof(mesg.mesg_data) - n,
 						 ": can't open, %s\n", strerror(errno));
 				mesg.mesg_len = strlen(ptr);
 				memmove(mesg.mesg_data, ptr, mesg.mesg_len);
 				Mesg_send(writeid, &mesg);
 		
 			} else {
 					/* 4fopen succeeded: copy file to client's queue */
 				while (Fgets(mesg.mesg_data, MAXMESGDATA, fp) != NULL) {
 					mesg.mesg_len = strlen(mesg.mesg_data);
 					Mesg_send(writeid, &mesg);
 				}
 				Fclose(fp);
 			}
 		
 				/* 4send a 0-length message to signify the end */
 			mesg.mesg_len = 0;
 			Mesg_send(writeid, &mesg);
 			exit(0);		/* child terminates */
 		}
 		/* parent just loops around */
 	}
 }
 
 
 
 
 ssize_t mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	n = msgrcv(id, &(mptr->mesg_type), MAXMESGDATA, mptr->mesg_type, 0);
 	mptr->mesg_len = n;		/* return #bytes of data */
 
 	return(n);				/* -1 on error, 0 at EOF, else >0 */
 }
 /* end mesg_recv */
 
 /* include Mesg_recv */
 ssize_t Mesg_recv(int id, struct mymesg *mptr)
 {
 	ssize_t	n;
 
 	do {
 		n = mesg_recv(id, mptr);
 	} while (n == -1 && errno == EINTR);
 
 	if (n == -1)
 		err_sys("mesg_recv error");
 
 	return(n);
 }
 

Теперь несколько слов об недостатках очередей System V : они идентифицируются не дескрипторами , а идентификаторами , поэтому с ними нельзя использовать функции select / poll . Этот недостаток проявляется тогда , когда нужно написать приложение , которое одновременно работает с сетевыми соединениями и с IPC.

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье