一、消息队列(message
queue)
消息队列也是System V IPC机制之一。
消息队列与命名管道类似,
但少了打开和关闭管道方面的复杂性。
但使用消息队列并未解决我们在使用命名管道时遇到的一些问题,
如管道满时的阻塞问题。
消息队列提供了一种在两个不相关进程间传递数据的简单有效的方法。
与命名管道相比,
消息队列的优势在于,它独立于发送和接收进程而存在,
这消除了在同步命名管道的打开和关闭时可能产生的一些困难。
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。
而且,每个数据块被认为含有一个类型,
接收进程可以独立地接收含有不同类型值的数据块。
好消息是,
A. 我们可以通过发送消息来几乎完全避免命名管道的同步和阻塞问题。
而且,
B. 我们可以用一些方法来提前查看紧急消息。
坏消息是,
A. 与管道一样,每个数据块有一个最大长度的限制,
而且,
B. 系统中所有队列所包含的全部数据块的总长度也有一个上限。
Linux系统中有两个宏定义:
MSGMAX, 以字节为单位,定义了一条消息的最大长度。
MSGMNB, 以字节为单位,定义了一个队列的最大长度。
二、函数定义
消息队列的函数定义如下:
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds
*buf);
int msgget(key_t key, int msgflg);
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz,
long int msgtype, int msgflg);
int msgsnd(int msqid, const void *msg_ptr, size_t
msg_sz, int msgflg); |
1. msgget函数
我们用msgget函数来创建和访问一个消息队列:
int msgget(key_t key, int msgflg); |
参数: key : 键值, 和其他IPC机制一样, 程序必须提供一个键值来命名某个特定的消息队列。
特殊键值IPC_PRIVATE用于创建私有队列,
从理论上说,它应该只能被当前进程访问。
msgflag: 由9个权限标志组成。
由IPC_CREAT定义的一个特殊位必须和权限标志按位或才能创建一个新的消息队列。
返回值:
成功时,返回一个正整数,即队列标识;
失败时,返回-1.
2. msgsnd函数
该函数用来把消息添加到消息队列中:
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg); |
消息的结构受两方面约束:
A. 它的长度必须小于系统规定的上限;
B. 它必须以一个长整形成员变量开始,
接收函数将用这个成员变量来确定消息的类型。
当使用消息时,最好把消息结构定义如下:
struct my_message { long int message_type;
/* The data you wish to transfer */
} |
由于消息的接收要用到message_type, 所以不能忽略它, 且必须在声明自己的数据结构时包含它,
并且将它初始化为一个已知值。
参数:
msqid : 是由msgget函数返回的消息队列标识符。
msg_ptr: 一个指向准备发达消息的指针,
这个消息必须以前面说的以一个长整形成员变量开始。
msg_sz : 是msg_ptr指向的消息的长度,
这个长度不能包括长整形消息类型成员变量的长度。
msgflg : 控制在当前消息队列满或队列消息到达系统范围的限制时,
将要发生的事情。
IPC_NOWAIT, 函数将立即返回,不发送消息并且返回值为-1.
如果IPC_NOWAIT标志被消除,则发送进程将挂起,
以等待队列中腾出可用空间。
返回值:
成功时,返回0,消息数据的一份副本将被放到消息队列中;
失败时,返回-1.
3. msgrcv函数
该函数从一个消息队列中获取消息:
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg); |
参数: msqid : 由msgget函数返回的消息队列标识符; msg_ptr: 一个指向准备接收消息的指针,
和前面介绍的一样, 要以一个长整形成员变量开始。 msg_sz : 是msg_ptr指向的消息的长度,
它不包括长整形消息成员变量的长度。 msgtype: 是一个长整形, 它可以实现一个简单形式的接收优先级。
== 0, 将获取队列中的第一个可用消息; > 0, 将获取具有相同消息类型的第一个消息;
< 0, 将获取消息类型 <= msgtype的绝对值的第一个消息。
如果 只想按消息发送的顺序来接收它们,
就把msgtype设置为0.
如果 只想获取某一特定类型的消息,
就把msgtype设置为相应的类型值。
如果 只想接收类型 <= n的消息,
就把msgtype设置为-n.
msgflg: 用于控制当队列中没有相应类型的消息可以接收时将发送的事情。
如果 msgflg中的IPC_NOWAIT标志被设置,
函数立即返回-1.
如果 msgflg中的IPC_NOWAIT标志被消除,
进程将会挂起以等待一条相应类型的消息到达。
返回值:
成功时, 函数返回放到接收缓存区中的字节数,
消息被复制到由msg_ptr指向的用户分配的缓存中。
然后删除消息队列中对应的消息。
失败时, 返回-1.
4. msgctl函数
该函数用来控制消息队列,
int msgctl(int msqid, int cmd, struct msqid_ds *buf); |
msqid_ds结构至少包含以下成员:
struct msqid_ds { uid_t msg_perm.uid; uid_t msg_perm.gid mode_t msg_perm.mode; } |
参数: msqid : 由msgget返回的消息队列标识符。 command: 将要采取的动作,
它可以取3个值, IPC_STAT 把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET 如果进程有足够的权限, 就把消息队列的当着关联值设置为msqid_ds结构中给出的值。
IPC_RMID 删除消息队列 如果删除消息队列时, 某个进程正在msgsnd或msgrcv函数中等待,这两个函数将失败。
返回值:
成功时,返回0,
失败时,返回-1.
三、示例
msg1.c用于接收消息,
msg2.c用于发送消息。
允许两个程序都可以创建消息队列,
但只有接收者在接收完最后一个消息后可以删除它。
1. 接收者msg1.c
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h>
#include <sys/msg.h>
struct my_msg_st {
long int my_msg_type;
char some_text[BUFSIZ];
};
int main()
{
int running = 1;
int msgid;
struct my_msg_st some_data;
long int msg_to_receive = 0;
|
首先创建消息队列:
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if (msgid == -1)
{
fprintf(stderr, "msgget failed with error:
%d\n", errno);
exit(EXIT_FAILURE);
} |
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if (msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n",
errno);
exit(EXIT_FAILURE);
}
然后从队列中获取消息,直到遇见end消息为止。
最后,删除消息队列:
while(running) { if (msgrcv(msgid, (void *)&some_data, BUFSIZ, msg_to_receive, 0) == -1) { fprintf(stderr, "msgrcv failed with error: %d\n", errno); exit(EXIT_FAILURE); }
printf("You wrote: %s", some_data.some_text);
if (strncmp(some_data.some_text, "end",
3) == 0)
{
running = 0;
}
}
if (msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, “msgctl(IPC_RMID) failed\n”);
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
} |
2. 发送者程序msg2.c
通过调用msgsnd来发送用户输入的文本到消息队列中。
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h>
#include <sys/msg.h>
#define MAX_TEXT 512
struct my_msg_st
{
long int my_msg_type;
char some_text[MAX_TEXT];
};
int main()
{
int running = 1;
struct my_msg_st some_data;
int msgid;
char buffer[BUFSIZ];
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if (msgid == -1)
{
fprintf(stderr, "msgget failed with error:
%d\n", errno);
exit(EXIT_FAILURE);
}
while(running)
{
printf("Enter some text: ");
fgets(buffer, BUFSIZ, stdin);
some_data.my_msg_type = 1;
strcpy(some_data.some_text, buffer);
if (msgsnd(msgid, (void *)&some_data, MAX_TEXT,
0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
if (strncmp(buffer, "end", 3) == 0)
{
running = 0;
}
}
exit(EXIT_SUCCESS);
} |
不同于管道,
这里不需要进程自己来提供同步方法。
这是它的优势所在。
假如消息队列中有空间,
发送者可以创建队列,放一些数据到队列中,
然后在接收者启动之前退出。
我们将先行运行发送者msg2.
3. 下面是一些样本输出:
$ ./msg2 Enter some text: hello Enter some text: How are you today? Enter some text: end $ ./msg1 You wrote: hello You wrote: How are you today? You wrote: end $ |
4. 解析: 发送者程序通过msgget来创建一个消息队列, 然后用msgsnd向队列中增加消息。
接收者程序用msgget获得消息队列标识符, 然后开始接收消息, 直到接收到特殊文件end为止。
然后它用msgctl来删除消息队列以完成清理工作。 |