顾名思义,微软消息队列(MSMQ)是一种给队列发送消息以便稍后进行处理的方法。消息由一个“Producer”(生产者)应用程序发送出去,再由一个“Consumer”(消费者)应用程序返回。
这两个应用程序可以在同一台机器上,在整个网络中,或甚至是位于并不总是连接在一起的不同机器上。MSMQ具有故障保险特性,因为如果第一次传送失败,它会重新发送消息。这样可保证你的应用程序消息到达它们的目的地。
我将应用一个叫做“TechRepublic”的队列。当你运行本文下载版本中的样本实例时,如果这个队列不存在,它会自动建立。
在前面的一篇文章中,Zach Smith说明了如何使用IPC通道在同一台机器上的两个进程间通信。他将在本文中说明如何在同一台机器或网络上的应用程序间实现进程间通信。
访问MSMQ
通过.NET访问队列由System.Messaging.MessageQueue对象完成。列表A说明了如何在一台名为“SRV-MESSAGING”的计算机上访问TechRepublic队列。
列表A
MessageQueue queue =
new MessageQueue("SRV-MESSAGINGTechRepublic"); |
注:要应用这个对象,你必须在你的项目中添加一个参考。
现在我们有了一个MessageQueue对象,这个对象为你提供与队列交互需要的所有功能。
如果队列不存在,你可以调用MessageQueue对象的静态Create方法编程建立队列。列表B中的代码说明如何检查队列是否存在,建立队列或给队列添加一个参考。
列表B
MessageQueue queue = null;
string queueName = "SRV-MESSAGINGTechRepublic";
if (MessageQueue.Exists(queueName))
queue = newMessageQueue(queueName);
else
queue = MessageQueue.Create(queueName, false); |
改写队列
改写队列时,用到MessageQueue.Send方法。列表C举例说明如何向TechRepublic队列发送一条消息。
列表C
queue.Send("My message body",
"Message Label"); |
在这个例子中,我们给TechRepublic队列发送一条正文为“My message body”的消息,并对这个消息应用了一个“Message
Label”标签。消息标签允许你不需阅读消息正文就可以分割消息。如果从计算机管理控制台中查看队列,还可在“队列消息”部分看到这些标签。
读取队列
可以使用几种方法从队列中读取消息。最常见的情况是从队列中取出所有消息,然后一次性处理它们。这时要调用MessageQueue.GetAllMessages方法。列表D举例说明如何应用这个方法。
列表D
System.Messaging.Message[] messages =
queue.GetAllMessages();
foreach (System.Messaging.Message message in messages)
{
//Do something with the message.
} |
你也可以用GetMessageEnumerator2方法代替上面的MessageQueue.GetAllMessages方法。虽然这两个方法的用法类似,但GetMessageEnumerator2只能向前(forward-only)。对于非常庞大的队列,则应用使用这个方法,而不是MessageQueue.GetAllMessages方法。
这是因为GetAllMessages方法领取所有消息,把它们保存在当地内存中;而GetMessageEnumerator2方法只领取当前消息在本地保存,在调用MoveNext时才领取下一条消息。列表E举例说明了GetMessageEnumerator2方法的用法。这段代码检查队列中的每一条消息,再删除它。
列表E
MessageEnumerator enumerator = queue.GetMessageEnumerator2();
while (enumerator.MoveNext())
enumerator.RemoveCurrent(); |
在使用GetMessageEnumerator2方法时,还要考虑另外一个问题,即你要访问队列中增加的任何新消息,即使它们是在你调用GetMessageEnumerator2后再增加的。这假定新消息被添加到队列末尾。
如果你只希望返回队列中的第一条消息,你应该使用MessageQueue.Receive方法。这个方法会领取队列中的第一条消息,在这个过程中将它从队列中删除。由于消息在读取的时候被删除,你可以确保你的进程是唯一收到消息的进程。Receive方法的应用实例如列表F所示。
列表F
System.Messaging.Message message = queue.Receive(); |
可以用Peek方法代替Receive方法。Peek方法像Receive方法一样领取队列中的第一条消息;但是,它在队列中保留消息备份。这允许你从队列中删除消息之前检查消息内容。Peek的语法与Receive类似。
列表G
System.Messaging.Message message = queue.Peek(); |
发送/接收序列化对象
虽然给队列发送文本的功能非常有用,但队列还允许你发送可序列化对象。这意味着你可以建立一个自定义的.NET类,实例化它的一个实例,将其发送给队列以便其它应用程序使用。要完成这个过程,首先得使用XML
Serializer序列化被发送的对象,然后对序列化对象放到消息的正文中。
例如,假设我们希望给TechRepublic消息队列发送以下对象(列表H):
列表H
[Serializable()]
publicclassMessageContent
{
privateDateTime _creationDate = DateTime.Now;
privatestring _messageText;
public MessageContent()
{
}
public MessageContent(string messageText)
{
_messageText = messageText;
}
publicstring MessageText
{
get { return _messageText; }
set { _messageText = value; }
}
publicDateTime CreationDate
{
get { return _creationDate; }
set { _creationDate = value; }
}
}
|
给队列发送这个对象的一个实例只需简单调用MessageQueue.Send方法,并把一个对象实例作为参数提交给这个方法。列表I说明了这种情况。
列表I
MessageContent message = newMessageContent("Hello
world!");
queue.Send(message, "Sample Message"); |
如你所见,上面的代码类似于我们前面发送正文为一个字符串的消息时使用的代码。接收一个包含序列化对象的消息更加困难一些。我们需要告诉消息它包含哪种对象。
为向消息指出它包含哪种对象,我们必须建立消息的格式化器(formatter)。给消息的Formatter属性指定一个System.Messaging.XmlMessageFormatter对象即可建立格式化器。由于我们的消息包含一个MessageContent对象,我们希望为它配置XmlMessageFormatter。
列表J
message.Formatter =
new System.Messaging.XmlMessageFormatter(
newType[1] { typeof(MessageContent) }
); |
既然我们已经给消息指定了一个格式化器,我们可以从消息中提取MessageContent对象。但在这之前,我们需要把message.Body属性的返回值分配给一个MessageContent对象。
列表K
MessageContent content = (MessageContent)message.Body; |
在这个例子中,“content”变量是我们向队列发送的原始MessageContent对象的序列化版本,我们可以访问原始对象的所有属性和值。
设定消息优先级别
在正常情况下,队列中的消息以先进先出的形式被访问。这表示如何你先发送消息A,再发送消息B,那么队列将首先返回消息A,然后才是消息B。在多数情况下,这样处理没有问题。但是,有时,由于一条消息比其它消息更加重要,你希望将它提到队列前面。要实现这种功能,你就需要设定消息优先级别。
一条消息的优先级别由它的Message.Priority属性值决定。下面是这个属性的所有有效值(全部来自MessagePriority的列举类型):
最高(Highest)
非常高(VeryHigh)
高(High)
高于正常级别(AboveNormal)
正常(Normal)
低(Low)
非常低(VeryLow)
最低(Lowest)
消息在队列中的位置由它的优先级别决定——例如,假如队列中有四条消息,两条消息的优先级别为“正常”(Normal),另两条为“高”(High)。则队列中消息排列如下:
High Priority A——这是发送给队列的第一条“高”优先级消息。
High Priority B——这是发送给队列的第二条“高”优先级消息。
Normal Priority A——这是发送队列的第一条“正常”优先级消息。
Normal Priority B——这是发送队列的第二条“正常”优先级消息。
根据这个顺序,如果我们给队列发送另一条“最高”优先级的消息,它将位于队列的顶部。
如果需要使用消息优先级功能,你必须修改发送消息的代码。因为Message对象的构造器没有指定消息优先级别的功能,你必须实例化一个Message对象,并在将它发送给队列之前给它设定相应的属性。列表L中的代码说明如何设定优先级别,并给队列发送一条“最高”优先级别的消息。
列表L
//Instantiate the queue
MessageQueue queue = newMessageQueue(queueName);
//Create a XmlSerializer for the object type we're sending.
XmlSerializer serializer = new
XmlSerializer(typeof(MessageContent));
//Instantiate a new message.
System.Messaging.Message queueMessage =
new System.Messaging.Message();
//Set the priority to Highest.
queueMessage.Priority = MessagePriority.Highest;
//Create our MessageContent object.
MessageContent messageContent =
newMessageContent("Hello world - IMPORTANT!");
//Serialize the MessageContent object into the queueMessage.
serializer.Serialize(queueMessage.BodyStream, messageContent);
//Send the message.
queue.Send(queueMessage, "HIGH PRIORITY");
|
这段代码和上面代码的最明显区别在于它使用了XmlFormatter。它实际是可选的,列表L中的代码也可用列表M中的代码代替。
列表M
//Instantiate a new message.
System.Messaging.Message queueMessage =
new System.Messaging.Message();
//Set the priority to Highest.
queueMessage.Priority = MessagePriority.Highest;
//Create our MessageContent object.
MessageContent messageContent =
newMessageContent("Hello world - IMPORTANT!");
//Set the body as the messageContent object.
queueMessage.Body = messageContent;
//Send the message.
queue.Send(queueMessage, "HIGH PRIORITY");
|
这段代码执行和列表L中的代码相同的任务,但代码更少。
应用
输入消费者请求是MSMQ功能的一个简单实例。消费者提出一个请求,由一个面向消费者的应用程序将它送交给消息队列。向队列发送请求后,它会向消费者送出一个确认(acknowledgement)。
然后,一个独立的进程从队列中提取消息,并运行任何所需的业务逻辑(business logic)。完成业务逻辑后,处理系统将向另一个队列提交一个响应。接下来,面向消费者的应用程序从队列中提取这个响应,并给消费者返回一个响应。
这种类型的配置能够加快面向消费者的应用程序的速度,使其迅速做出反应,同时在一个内部系统中完成大量处理工作。这样还可以将请求处理分散到多台内部机器上,提供可扩展性。
下载样本项目
欲获得本文显示的所有代码,请下载相关的实例项目。项目中包含你从一个队列中读或写消息所需的所有代码。
|