您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
消息中间件—RocketMQ消息发送
 
   次浏览      
 2019-1-8
 
编辑推荐:

本文来自于jianshu,文章介绍了RocketMQ网络架构图以及RocketMQ发送普通消息的全流程解读等相关内容。

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢?

大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款分布式消息队列中发送一条普通消息的大致流程和细节。

一、RocketMQ网络架构图

RocketMQ分布式消息队列的网络部署架构图如下图所示(其中,包含了生产者Producer发送普通消息至集群的两条主线)

RocketMQ部署架构.jpg

对于上图中几个角色的说明:

(1)NameServer:RocketMQ集群的命名服务器(也可以说是注册中心),它本身是无状态的(实际情况下可能存在每个NameServer实例上的数据有短暂的不一致现象,但是通过定时更新,在大部分情况下都是一致的),用于管理集群的元数据( 例如,KV配置、Topic、Broker的注册信息)。

(2)Broker(Master):RocketMQ消息代理服务器主节点,起到串联Producer的消息发送和Consumer的消息消费,和将消息的落盘存储的作用;

(3)Broker(Slave):RocketMQ消息代理服务器备份节点,主要是通过同步/异步的方式将主节点的消息同步过来进行备份,为RocketMQ集群的高可用性提供保障;

(4)Producer(消息生产者):在这里为普通消息的生产者,主要基于RocketMQ-Client模块将消息发送至RocketMQ的主节点。

对于上面图中几条通信链路的关系:

(1)Producer与NamerServer:每一个Producer会与NameServer集群中的一个实例建立TCP连接,从这个NameServer实例上拉取Topic路由信息;

(2)Producer和Broker:Producer会和它要发送的topic相关联的Master的Broker代理服务器建立TCP连接,用于发送消息以及定时的心跳信息;

(3)Broker和NamerServer:Broker(Master or Slave)均会和每一个NameServer实例来建立TCP连接。Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息。其中,Master与Master之间无连接,Master与Slave之间有连接;

二、客户端发送普通消息的demo方法

在RocketMQ源码工程的example包下就有最为简单的发送普通消息的样例代码(ps:对于刚刚接触RocketMQ的童鞋使用这个包下面的样例代码进行系统性的学习和调试)。

我们可以直接run下“org.apache.rocketmq.example.simple”包下Producer类的main方法即可完成一次普通消息的发送(主要代码如下,在这里需本地将NameServer和Broker实例均部署起来):

//step1.启动DefaultMQProducer,启动时的具体流程一会儿会详细说明
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
try {
{
//step2.封装将要发送消息的内容
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//step3.发送消息流程,具体流程待会儿说
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
//Exception code
}
producer.shutdown();

三、RocketMQ发送普通消息的全流程解读

从上面一节中可以看出,消息生产者发送消息的demo代码还是较为简单的,核心就几行代码,但在深入研读RocketMQ的Client模块后,发现其发送消息的核心流程还是有一些复杂的。下面将主要从DefaultMQProducer的启动流程、send发送方法和Broker代理服务器的消息处理三方面分别进行分析和阐述。

3.1 DefaultMQProducer的启动流程

在客户端发送普通消息的demo代码部分,我们先是将DefaultMQProducer实例启动起来,里面调用了默认生成消息的实现类—DefaultMQProducerImpl的start()方法。

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

默认生成消息的实现类—DefaultMQProducerImpl的启动主要流程如下:

(1)初始化得到MQClientInstance实例对象,并注册至本地缓存变量—producerTable中;

(2)将默认Topic(“TBW102”)保存至本地缓存变量—topicPublishInfoTable中;

(3)MQClientInstance实例对象调用自己的start()方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务(包括,更新路由/清理下线Broker/发送心跳/持久化consumerOffset/调整线程池),并重新做一次启动(这次参数为false);

(4)最后向所有的Broker代理服务器节点发送心跳包;

总结起来,DefaultMQProducer的主要启动流程如下:

DefaultMQProducer的start方法启动过程.jpg

这里有以下几点需要说明:

(1)在一个客户端中,一个producerGroup只能有一个实例;

(2)根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

(3)根据不同的producerGroup,MQClientInstance将给出不同的MQProducer和MQConsumer(保存在本地缓存变量——producerTable和consumerTable中);

3.2 send发送方法的核心流程

通过Rocketmq的客户端模块发送消息主要有以下三种方法:

(1)同步方式

(2)异步方式

(3)Oneway方式

其中,使用(1)、(2)种方式来发送消息比较常见,具体使用哪一种方式需要根据业务情况来判断。本节内容将结合同步发送方式(同步发送模式下,如果有发送失败的最多会有3次重试(也可以自己设置),其他模式均1次)进行消息发送核心流程的简析。使用同步方式发送消息核心流程的入口如下:

/**
* 同步方式发送消息核心流程的入口,默认超时时间为3s
*
* @param msg 发送消息的具体Message内容
* @param timeout 其中发送消息的超时时间可以参数设置
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3.2.1 尝试获取TopicPublishInfo的路由信息

我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。如果消息的Topic和Body均没有问题的话,那么会调用—tryToFindTopicPublishInfo()方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量—topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer方法),这里分别会存在以下两种场景:

(1)生产者第一次发送消息(此时,Topic在NameServer中并不存在):因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量—topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102的TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量——topicPublishInfoTable。

另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage方法里面的super.msgCheck(ctx, requestHeader, response)消息前置校验中,会调用TopicConfigManager的createTopicInSendMessageMethod方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)。(ps:该部分内容其实属于Broker有点超本篇的范围,不过由于涉及新Topic的创建因此在略微提了下)

(2)生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后tryToFindTopicPublishInfo方法直接可以return。

在RocketMQ中该部分的核心方法源码如下(已经加了注释):

/**
* 根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
* 如果没有则更新路由信息,从nameserver端拉取最新路由信息
*
* topicPublishInfo
*
* @param topic
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(
final String topic) {
//step1.先从本地缓存变量topicPublishInfoTable中先get一次
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.
ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//step1.2 然后从nameServer上更新topic路由信息
this.mQClientFactory.updateTopicRo
uteInfoFromNameServer(topic);
//step2 然后再从本地缓存变量topicPublishInfoTable中
再get一次
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
/**
* 第一次的时候isDefault为false,第二次的时候default为true,
即为用默认的topic的参数进行更新
*/
this.mQClientFactory.updateTopicR
outeInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

/**
* 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
*
* @param topic
* @param timeoutMillis
* @param allowTopicNotExist
* @return
* @throws MQClientException
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final
String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
//设置请求头中的Topic参数后,发送获取Topic路由信息的request
请求给NameServer
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode
.GET_ROUTEINTO_BY_TOPIC, requestHeader);
//这里由于是同步方式发送,所以直接return response的响应
RemotingCommand response = this.remotingClient.invokeSync(null,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
//如果NameServer中不存在待发送消息的Topic
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll
.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer
is not exist value", topic);
}
break;
}
//如果获取Topic存在,则成功返回,利用TopicRouteData进行
解码,且直接返回TopicRouteData
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(),
response.getRemark());
}

将TopicRouteData转换至TopicPublishInfo路由信息的映射图如下:

Client中TopicRouteData到TopicPublishInfo的映射.jpg

其中,上面的TopicRouteData和TopicPublishInfo路由信息变量大致如下:

TopicRouteData变量内容.jpg

TopicPublishInfo变量内容.jpg

3.2.2 选择消息发送的队列

在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueuef()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义:

public class MQFaultStrategy {
//维护每个Broker发送消息的延迟
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//发送消息延迟容错开关
private boolean sendLatencyFaultEnable = false;
//延迟级别数组
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时长数组
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
......
}

这里通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:

(1)sendLatencyFaultEnable开关打开:在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L。

(2)sendLatencyFaultEnable开关关闭(默认关闭):采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息。

/**
* 根据sendLatencyFaultEnable开关是否打开来分两种情
况选择队列发送消息
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)
{
if (this.sendLatencyFaultEnable) {
try {
//1.在随机递增取模的基础上,再过滤掉not available
的Broker代理;对之前失败的,按一定的时间做退避
int index = tpInfo.getSendWhichQueue()
.getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().
size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().
get(pos);
if (latencyFaultTolerance.isAvailable(mq.
getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.
selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().
getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message
queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//2.采用随机递增取模的方式选择一个队列
(MessageQueue)来发送消息
return tpInfo.selectOneMessageQueue
(lastBrokerName);
}

3.2.3 发送封装后的RemotingCommand数据包

在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQ的Remoting通信模块真正发送消息的核心)。在该方法内总共完成以下几个步流程:

(1)根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的findBrokerAddressInPublish()方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则跟新路由信息;

(2)如果没有禁用,则发送消息前后会有钩子函数的执行(executeSendMessageHookBefore()/
executeSendMessageHookAfter()方法);

(3)将与该消息相关信息封装成RemotingCommand数据包,其中请求码RequestCode为以下几种之一:

a.SEND_MESSAGE(普通发送消息)

b.SEND_MESSAGE_V2(优化网络数据包发送)c.SEND_BATCH_MESSAGE(消息批量发送)

(4)根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s;

(5)这里,真正调用RocketMQ的Remoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync()方法中,代码具体如下:

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

(6)processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象;

(7)发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间;

(8)对于异常情况,且标志位—retryAnotherBrokerWhenNotStoreOK,设置为true时,在发送失败的时候,会选择换一个Broker;

在生产者发送完成消息后,客户端日志打印如下:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.3 Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:

(1)消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个。这里与上文中“尝试获取TopicPublishInfo的路由信息”一节中介绍的内容对应。如果Topic路由信息不存在,则Broker端日志输出如下:

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive
SendMessage request command, RemotingCommand [code=310,
language=JAVA, version=252, opaque=6, flag(B)=0,
remark=null, extFields={a=ProducerGroupName,
b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569,
h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD
46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-06-14 17:17:24 WARN SendMessageThread_1 -
the topic TopicTest not exist, producer: /172.20.21.162:62661
2018-06-14 17:17:24 INFO SendMessageThread_1 -
Create new topic by default topic:[TBW102] config:
[TopicConfig [topicName=TopicTest, readQueueNums=4,
writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG,
topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]

Topic路由信息新建后,第二次消息发送后,Broker端日志输出如下:

2018-08-02 16:26:13 INFO SendMessageThread_1 -
receive SendMessage request command,
RemotingCommand [code=310, language=JAVA,
version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest,
c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC20
8AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-08-02 16:26:13 INFO SendMessageThread_1 -
the msgInner's content is:MessageExt [queueId=2,
storeSize=0, queueOffset=0, sysFlag=0,
bornTimestamp=1533198373524,
bornHost=/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId=null,
commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message
[topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true,
TAGS=TagA}, body=11body's content is:Hello world]]

(2)构建MessageExtBrokerInner;

(3)调用“brokerController.getMessageStore().putMessage”
将MessageExtBrokerInner做落盘持久化处理;

(4)根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回;

四、总结

使用RocketMQ的客户端发送普通消息的流程大概到这里就分析完成。关于顺序消息、分布式事务消息等内容将在后续篇幅中陆续介绍,敬请期待。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

   
次浏览       
相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践