编辑推荐: |
本文主要讲解了
两阶段事务,消息最大努力交付,可靠消息最终一致性方案以及Rabbitmq 实战。
来自于简书,,由火龙果软件Anna编辑、推荐。 |
|
本文对比 二阶段事务、最大努力交付以及消息最终一致性,并给出部分解决方案,最终一致性方案参考阿里RockMQ事务消息。
一 2阶段事务
分布式系统最终一致性有N种方案,比如2PC(2阶段事务) ,以及三段提交等等,但开销较大,实现起来复杂,比如2阶段事务为例,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果
以开会为例:
甲乙丙丁四人要组织一个会议,需要确定会议时间,不妨设甲是协调者,乙丙丁是参与者。
投票阶段:
(1)甲发邮件给乙丙丁,周二十点开会是否有时间;
(2)甲回复有时间;
(3)乙回复有时间;
(4)丙迟迟不回复,此时对于这个活动,甲乙丙均处于阻塞状态,算法无法继续进行;
(5)丙回复有时间(或者没有时间);
提交阶段:
(1)协调者甲将收集到的结果反馈给乙丙丁(什么时候反馈,以及反馈结果如何,在此例中取决与丙的时间与决定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
不仅要锁住参与者的所有资源,而且要锁住协调者资源,开销大。一句话总结就是:2PC效率很低,分布式事务很难做。
在对事实性要求没有那么高的情况下,可以用基于最大努力交付 && 消息队列以及消息存储来解决最终一致性。
二 消息最大努力交付
所谓最大努力交付,就是俺反正用最大努力做,能不能成功,不做完全保证
会涉及到三个模块
上游应用,发消息到 MQ 队列。
下游应用(例如短信服务、邮件服务),接受请求,并返回通知结果。
最大努力通知服务,监听消息队列,将消息存储到数据库中,并按照通知规则调用下游应用的发送通知接口。
具体流程如下
1.上游应用发送 MQ 消息到 MQ 组件内,消息内包含通知规则和通知地址
2.最大努力通知服务监听到 MQ 内的消息,解析通知规则并放入延时队列等待触发通知
3.最大努力通知服务调用下游的通知地址,如果调用成功,则该消息标记为通知成功,如果失败则在满足通知规则(例如
5 分钟发一次,共发送 10 次)的情况下重新放入延时队列等待下次触发。
最大努力通知服务表示在不影响主业务的情况下,尽可能地确保数据的一致性。它需要开发人员根据业务来指定通知规则,在满足通知规则的前提下,尽可能的确保数据的一致,以达到最大努力的目的。
实现上也比较简单,目前主流消息队列都有ack机制,当没收到ack的时候用规则做定时重发即可。
优点:实现简单
缺点:无补偿机制,不保证能够送达
实现要点: 保证消息发送失败之后能够和业务一起回滚;消息接受方保证冥等性;定时重发机制,采用一定的重发策略,例如说指数增长,据说阿里采用redis的zset来完成.
消息进到zset后,DelayQ会通过timer触发(比如秒级),fork相应的消费线程去处理zset里ExecuteTime大于当前时间的消息。DelayQ拿到一条消息后,解析其中的callbackurl,并组装参数,push业务消息给Consumer.
Consumer返回处理成功,那么zrem Codis里的消息。如果处理失败,则计算其下次尝试时间,并更新其ExecuteTime.
三 可靠消息最终一致性方案
此方案涉及 3 个模块:
上游应用,执行业务并发送 MQ 消息。
可靠消息服务和 MQ 消息组件,协调上下游消息的传递,并确保上下游数据的一致性。
下游应用,监听 MQ 的消息并执行自身业务。
第一阶段:上游应用执行业务并发送 MQ 消息
上游应用将本地业务执行和消息发送绑定在同一个本地事务中,保证要么本地操作成功并发送 MQ 消息,要么两步操作都失败并回滚。
上游应用和可靠消息之间的业务交互图如下:
上游应用发送待确认消息到可靠消息系统
可靠消息系统保存待确认消息并返回
上游应用执行本地业务
上游应用通知可靠消息系统确认业务已执行并发送消息。
可靠消息系统修改消息状态为发送状态并将消息投递到 MQ 中间件。
以上每一步都可能出现失败情况,分析一下这 5 步出现异常后上游业务和消息发送是否一致:
上游应用执行完成,下游应用尚未执行或执行失败时,此事务即处于 BASE 理论的 Soft State
状态。
第二阶段:下游应用监听 MQ 消息并执行业务
下游应用监听 MQ 消息并执行业务,并且将消息的消费结果通知可靠消息服务。
可靠消息的状态需要和下游应用的业务执行保持一致,可靠消息状态不是已完成时,确保下游应用未执行,可靠消息状态是已完成时,确保下游应用已执行。
下游应用和可靠消息服务之间的交互图如下:
1.下游应用监听 MQ 消息组件并获取消息
2.下游应用根据 MQ 消息体信息处理本地业务
3.下游应用向 MQ 组件自动发送 ACK 确认消息被消费
4.下游应用通知可靠消息系统消息被成功消费,可靠消息将该消息状态更改为已完成。
以上每一步都可能出现失败情况,分析一下这 4 步出现异常后下游业务和消息状态是否一致:
通过分析以上两个阶段可能失败的情况,为了确保上下游数据的最终一致性,在可靠消息系统中,需要开发 消息状态确认
和 消息重发 两个功能以实现 BASE 理论的 Eventually Consistent 特性。
异常处理一:消息状态确认
可靠消息服务定时监听消息的状态,如果存在状态为待确认并且超时的消息,则表示上游应用和可靠消息交互中的步骤
4 或者 5 出现异常。
可靠消息则携带消息体内的信息向上游应用发起请求查询该业务是否已执行。上游应用提供一个可查询接口供可靠消息追溯业务执行状态,如果业务执行成功则更改消息状态为已发送,否则删除此消息确保数据一致。具体流程如下:
可靠消息查询超时的待确认状态的消息
向上游应用查询业务执行的情况
业务未执行,则删除该消息,保证业务和可靠消息服务的一致性。业务已执行,则修改消息状态为已发送,并发送消息到
MQ 组件。
异常处理二:消息重发
消息已发送则表示上游应用已经执行,接下来则确保下游应用也能正常执行。
可靠消息服务发现可靠消息服务中存在消息状态为已发送并且超时的消息,则表示可靠消息服务和下游应用中存在异常的步骤,无论哪个步骤出现异常,可靠消息服务都将此消息重新投递到
MQ 组件中供下游应用监听。
下游应用监听到此消息后,在保证幂等性的情况下重新执行业务并通知可靠消息服务此消息已经成功消费,最终确保上游应用、下游应用的数据最终一致性。具体流程如下:
可靠消息服务定时查询状态为已发送并超时的消息
可靠消息将消息重新投递到 MQ 组件中
下游应用监听消息,在满足幂等性的条件下,重新执行业务。
下游应用通知可靠消息服务该消息已经成功消费。
通过消息状态确认和消息重发两个功能,可以确保上游应用、可靠消息服务和下游应用数据的最终一致性。
四 肉身实战Rabbitmq
我们在rabbitmq上肉身实战了一下可靠消息,rabbitmq的发送过程如下
发送消息到消息服务
消息队列将消息发送给监听
消息监听接受并处理消息
我们来看看可能发送异常的四种
1 直接无法到达消息服务
网络断了,抛出异常,业务直接回滚即可。如果出现connenction error,直接增加 connection数即可
connectionFactory.setChannelCacheSize(100);
2 消息已经到达服务器,但返回的时候出现异常
rabbitmq提供了确认ack机制,可以用来确认消息是否有返回。因此我们可以在发送前在db中(内存或关系型数据库)先存一下消息,如果ack异常则进行重发
/**confirmcallback用来确认消息是否有送达消息队列*/
rabbitTemplate.setConfirmCallback((correlationData,
ack, cause) -> {
if (!ack) {
//try to resend msg
} else {
//delete msg in db
}
});
/**若消息找不到对应的Exchange会先触发returncallback */
rabbitTemplate.setReturnCallback((message, replyCode,
replyText, tmpExchange, tmpRoutingKey) -> {
try {
Thread.sleep(Constants.ONE_SECOND);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("send message failed: "
+ replyCode + " " + replyText);
rabbitTemplate.send(message);
}); |
如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true
但如果是找不到exchange,则会先触发returncallback
3 消息送达后,消息服务自己挂了
如果设置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盘上之后再发送的,确保消息已经存在硬盘上,万一消息服务挂了,消息服务恢复是能够再重发消息
4 未送达消费者
消息服务收到消息后,消息会处于"UNACK"的状态,直到客户端确认消息
channel.basicQos(1);
// accept only one unack-ed message at a time
(see below)
final Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '"
+ message + "'");
try {
doWork(message);
} finally {
//确认收到消息
channel.basicAck(envelope.getDeliveryTag(),
false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck,
consumer); |
5 确认消息丢失
消息返回时假设确认消息丢失了,那么消息服务会重发消息。注意,如果你设置了autoAck=
false,但又没应答 channel.baskAck也没有应答 channel.baskNack,那么会导致非常严重的错误:消息队列会被堵塞住,所以,无论如何都必须应答
6 消费者业务处理异常
消息监听接受消息并处理,假设抛异常了,第一阶段事物已经完成,如果要配置回滚则过于麻烦,即使做事务补偿也可能事务补偿失效的情况,所以这里可以做一个重复执行,比如guava的retry,设置一个指数时间来循环执行,如果n次后依然失败,发邮件、短信,用人肉来兜底。
|