您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
针对高并发,可扩展的互联网架构,搭建消息队列
 
   次浏览      
 2018-6-21
 
编辑推荐:

本文来自于网络,文章详细介绍了目前主流的消息队列ZeroMQ及环境搭建的详细步骤,希望文章的介绍可以让大家有个新的认识。

想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.

  ZeroMQ介绍:(也拼写作 ?MQ、 0MQ 或 ZMQ) 是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个 C++ 编写成的单个库文件libzmq.dll, 可以链接到任何应用程序中。如果要在.NET 环境中使用,我们需要用到一个C#编写的名为 clrzmq.dll 包装库。ZeroMQ可以在 Windows、 OS X 和 Linux 等多种操作系统上运行, C、 C++、C#、 Java、 Python 等语言都可以编写ZeroMQ 应用程序这使得不同平台上的不同应用程序之间可以相互通讯。

1、环境搭建:

  codeproject专题,下载对应的Download binaries - 377.6 KB,解压缩到你的指定路径。

  这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:

  PM> Install-Package NetMQ

  为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。

  待安装好后,系统会自动添加NetMQ的引用。

  可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。

  先看个简单的demo,初步了解一下:

class Program
{
static void Main(string[] args)
{
using (NetMQContext context = NetMQContext.Create())
{
Task serverTask = Task.Factory.StartNew(() =>Server(context));
Task clientTask = Task.Factory.StartNew(() => Client(context));
Task.WaitAll(serverTask, clientTask);
}
}

static void Server(NetMQContext context)
{
using (NetMQSocket serverSocket = context.CreateResponseSocket())
{
serverSocket.Bind("tcp://*:5555");

while (true)
{
string message = serverSocket.ReceiveString();

Console.WriteLine("Receive message {0}", message);

serverSocket.Send("World");

if (message == "exit")
{
break;
}
}
}
}

static void Client(NetMQContext context)
{
using (NetMQSocket clientSocket = context.CreateRequestSocket())
{
clientSocket.Connect("tcp://127.0.0.1:5555");

while (true)
{
Console.WriteLine("Please enter your message:");
string message = Console.ReadLine();
clientSocket.Send(message);

string answer = clientSocket.ReceiveString();

Console.WriteLine("Answer from server: {0}", answer);

if (message == "exit")
{
break;
}
}
}
}
}

  代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。

  这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:

  Server:

/// <summary>
/// Mq服务端
/// </summary>
public class OctMQServer : IDisposable
{
public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
{
EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
if (handler != null) handler(this, e);
}

private int _port;
private NetMQSocket _serverSocket;
private ServerType _type;
private NetMQContext _context;

public void Init(int port, ServerType type)
{
_type = type;
_port = port;
_context = NetMQContext.Create();
CreateServer();
}

void CreateServer()
{
switch (_type)
{
case ServerType.Response:
_serverSocket = _context.CreateResponseSocket(); break;
case ServerType.Pub:
_serverSocket = _context.CreatePushSocket(); break;
case ServerType.Router:
_serverSocket = _context.CreateRouterSocket(); break;
case ServerType.Stream:
_serverSocket = _context.CreateStreamSocket(); break;
case ServerType.Push:
_serverSocket = _context.CreatePushSocket(); break;
case ServerType.XPub:
_serverSocket = _context.CreateXPublisherSocket(); break;
default:
_serverSocket = _context.CreateResponseSocket(); break;
}
_serverSocket.Bind("tcp://*:" + _port);
Task.Factory.StartNew(() =>
AsyncRead(_serverSocket), TaskCreationOptions.LongRunning);
}

private void AsyncRead(NetMQSocket serverSocket)
{
while (true)
{
var msg = serverSocket.ReceiveMessage();
OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(serverSocket, msg));
}
}


public NetMQSocket Server
{
get { return _serverSocket; }
}

public void Dispose()
{
_serverSocket.Dispose();
_context.Dispose();
}

public void Send(NetMQMessage msg)
{
_serverSocket.SendMessage(msg);
}

public NetMQMessage CreateMessage()
{
return new NetMQMessage();
}
}

  这样,使用者就可以根据枚举进行服务端的创建, 不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。

  Client:

/// <summary>
/// MQ客户端
/// </summary>
public class OctMQClient:IDisposable
{
public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
{
EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
if (handler != null) handler(this, e);
}

private int _port;
private NetMQSocket _clientSocket;
private ClientType _type;
private NetMQContext _context;
private string _ip;
private Task task;
public void Init(string ip, int port, ClientType type)
{
_type = type;
_ip = ip;
_port = port;
_context = NetMQContext.Create();
CreateClient();
}

void CreateClient()
{
switch (_type)
{
case ClientType.Request:
_clientSocket = _context.CreateRequestSocket(); break;
case ClientType.Sub:
_clientSocket = _context.CreateSubscriberSocket(); break;
case ClientType.Dealer:
_clientSocket = _context.CreateDealerSocket(); break;
case ClientType.Stream:
_clientSocket = _context.CreateStreamSocket(); break;
case ClientType.Pull:
_clientSocket = _context.CreatePullSocket(); break;
case ClientType.XSub:
_clientSocket = _context.CreateXSubscriberSocket(); break;
default:
_clientSocket = _context.CreateRequestSocket(); break;
}
_clientSocket.Connect("tcp://" + _ip + ":" + _port);
}

public void StartAsyncReceive()
{
task = Task.Factory.StartNew(() =>
AsyncRead(_clientSocket), TaskCreationOptions.LongRunning);

}

private void AsyncRead(NetMQSocket cSocket)
{
while (true)
{
var msg = cSocket.ReceiveMessage();
OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(cSocket, msg));
}
}

public NetMQSocket Client
{
get { return _clientSocket; }
}

public T GetClient<T>() where T : NetMQSocket
{
return (T)_clientSocket;
}

public void Send(NetMQMessage msg)
{
_clientSocket.SendMessage(msg);
}

public NetMQMessage CreateMessage()
{
return new NetMQMessage();
}

public NetMQMessage ReceiveMessage()
{
return _clientSocket.ReceiveMessage();
}

public void Dispose()
{
_clientSocket.Dispose();
_context.Dispose();
if (task != null)
{
task.Dispose();
}
}
}

  客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。

  先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。

  Server Console

  这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。

class Program
{
private static OctMQServer _server;
static ServerType _type;
static void Main(string[] args)
{
AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;
CreateCmd();
}

/// <summary>
/// 创建mq对象
/// </summary>
static void Create()
{
_server = new OctMQServer();
_server.OnReceive += server_OnReceive;
_server.Init(5555, _type);

}

/// <summary>
/// 选择类型
/// </summary>
private static void CreateCmd()
{
Csl.Wl(ConsoleColor.Red, "请选择您要创建的MQ服务端类型");
Csl.Wl(ConsoleColor.Yellow, "1.PUB 2.REP");
var key = System.Console.ReadLine();
switch (key)
{
case "1":
{
_type = ServerType.Pub;
Create();
Cmd();
}

break;
case "2":
_type = ServerType.Response;
Create();
Cmd();
break;
default:
{
CreateCmd();

}
break;
}
}

static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
{
Csl.WlEx((Exception)e.ExceptionObject);
}

/// <summary>
/// 接收消息
/// </summary>
private static void Cmd()
{
if (_type == ServerType.Pub)
{
Csl.Wl(ConsoleColor.Red, "请输入您要发个订阅者的信息主题与信息用空格分开");
}
else
{
Csl.Wl(ConsoleColor.Red, "等待消息");
}
var cmd = System.Console.ReadLine();

switch (cmd)
{
case "exit":
Csl.Wl("正在关闭应用程序。。。等待最后一个心跳执行完成。。。");
_server.Dispose();
break;

default:
{
var str = cmd.Split(' ');
var msg = _server.CreateMessage();
msg.Append(str[0],Encoding.UTF8);
msg.Append(str[1],Encoding.UTF8);
_server.Send(msg);
Cmd();
break;
}
return;
}
}

static void server_OnReceive(object sender, DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
{
var msg = e.Arg2;
var server = e.Arg1;
Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8));
server.Send("你好,您的请求已处理,并返回消息及处理结果",Encoding.UTF8);
}
}

  Client Form

  客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:

  响应式:

  订阅者式:

  不会做gif ,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1-》2->3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。

  然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。

  贴出客户端代码:

public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
Csl.Init();
}
/// <summary>
/// mq客户端
/// </summary>
private OctMQClient _client;

/// <summary>
/// 订阅者模式连接
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btnConn_Click(object sender, EventArgs e)
{
_client = new OctMQClient();
_client.OnReceive += _client_OnReceive;

_client.Init(txtip.Text,int.Parse(txtport.Text),
ClientType.Sub);
var sub = (SubscriberSocket) _client.Client;
sub.Subscribe(txtTop.Text);
_client.StartAsyncReceive();

}

/// <summary>
/// 订阅者模式受到消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void _client_OnReceive(object sender, Core.Args.DataEventArgs<NetMQ.NetMQSocket,
NetMQ.NetMQMessage> e)
{
var msg = e.Arg2;
Csl.Wl("主题:"+msg.Pop().ConvertToString
(Encoding.UTF8));
Csl.Wl("内容:" + msg.Pop().ConvertToString
(Encoding.UTF8));
}

/// <summary>
/// 发送响应消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btnSend_Click(object sender, EventArgs e)
{
using (_client = new OctMQClient())
{
_client.Init(txtip.Text, int.Parse(txtport.Text), ClientType.Request);
var content = txtContent.Text;
var msg = _client.CreateMessage();
msg.Append(content, Encoding.UTF8);
_client.Send(msg);
var rmsg = _client.ReceiveMessage();
var reqStr = rmsg.Pop().ConvertToString(Encoding.UTF8);
Csl.Wl(reqStr);
}

}

/// <summary>
/// 释放资源
/// </summary>
/// <param name="e"></param>
protected override void OnClosed(EventArgs e)
{
base.OnClosed(e);
if (_client != null)
{
_client.Dispose();
}
}
}

  好了,大家先消化一下,等系列写完了,我会提交到github上。下一期,会写一些并发情况下的应用。

   
次浏览       
相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践