您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
rabbitmq解决分布式事务
 
作者:沐宇熙
   次浏览      
 2020-9-7 
 
编辑推荐:
本文主要介绍了RabbitMQ消息重试机制、解决幂等性、RabbitMQ签收模式配置、RabbitMQ死信队列、RabbitMQ解决分布式事务等内容。
来自于csdn,,由火龙果软件Anna编辑、推荐。

1.RabbitMQ消息重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?

答案:使用消息重试机制。(springboot默认有消息重试机制)

1.1 如何合适选择重试机制

消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)

消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。

1.2 如何实现重试机制

对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿

2.解决幂等性

网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,在重试过程中,可能会造成重复消费。

2.1 消费者如何保证消息幂等性,不被重复消费

使用全局MessageID判断消费方使用同一个,解决幂等性。

或者使用业务逻辑保证唯一(比如订单号码,保证一个订单号码只可能被插入一次数据库)

2.2 代码实现

2.2.1 生产者

FanoutIdemProducer

2.2.2 消费者

FanoutIdemEamilConsumer

2.2.3 配置文件

spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /springbootfanout
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000

3.RabbitMQ签收模式配置

3.1 开启手动应答模式

spring:
rabbitmq:
listener:
simple:
####开启手动ack
acknowledge-mode: manual

3.2 消费者

/**
* 描述:签收模式演示
* @author: myx
* @date: 2019-05-01
* Copyright ? 2019-grape. All rights reserved.
*/
@Component
public class AckEamilConsumer {
/**
* 消费者
* @param message
*/
@RabbitListener(queues = FanoutConstrant.QUEUE_EMAIL_NAME)
public void process( Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
System.out.println ( Thread.currentThread().getName()
+ ",邮件消费者获取生产者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties( ).getMessageId());
// 手动ack
Long deliveryTag = (Long) headers.get( AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
}
}

4.RabbitMQ死信队列

4.1 什么是死信队列

死信队列是当消息在一个队列因为下列原因:

消息被拒绝(basic.reject或basic.nack)并且requeue=false.

消息TTL过期

队列达到最大长度(队列满了,无法再添加数据到mq中)

变成了 “死信” 后被重新投递(publish)到另一个Exchange,该Exchange 就是DLX然后该Exchange 根据绑定规则转发到对应的队列上监听该队列 就可以重新消费.说白了就是没有被消费的消息换个地方重新被消费

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

4.2 应用场景分析

在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

channel.basicNack(message.getMessageProperties( ).getDeliveryTag( ), false, false); 丢弃消息

4.3 如何使用死信交换机

定义业务(普通)队列的时候指定参数

x-dead-letter-exchange: 用来设置死信后发送的交换机

x-dead-letter-routing-key:用来设置死信的routingKey

4.4 代码实现

4.4.1 配置

DeadFanoutConfig

4.4.2 邮件消费者

DeadEamilConsumer

4.4.3 死信消费者

DeadConsumer

5.RabbitMQ解决分布式事务

案例:

经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。

5.1 RabbitMQ解决分布式事务原理

采用最终一致性原理。

需要保证以下三要素:

1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)

2.MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)

3.如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)

5.2 模拟代码实现

5.2.1 生产者

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;

public ResponseBase addOrderAndDispatch() {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setName("订单名称");
orderEntity.setOrderCreatetime(new Date());
// 价格是300元
orderEntity.setOrderMoney(300d);
// 状态为 未支付
orderEntity.setOrderState(0);
Long commodityId = 30l;
// 商品id
orderEntity.setCommodityId(commodityId);
String orderId = UUID.randomUUID().toString();
orderEntity.setOrderId(orderId);

// 1.先下单,创建订单 (往订单数据库中插入一条数据)
int orderResult = orderMapper.addOrder(orderEntity);
System.out.println("orderResult:" + orderResult);
if (orderResult <= 0) {
return setResultError("下单失败!");
}
// 2.使用消息中间件将参数存在派单队列中
send(orderId);
return setResultSuccess();
}
private void send(String orderId) {
JSONObject jsonObect = new JSONObject();
jsonObect.put("orderId", orderId);
String msg = jsonObect.toJSONString();
System.out.println("msg:" + msg);
// 封装消息
Message message = MessageBuilder.withBody( msg.getBytes() ).setContentType( MessageProperties.CONTENT_TYPE_JSON )
.setContentEncoding("utf-8" ).setMessageId( orderId ).build();
// 构建回调返回的数据
CorrelationData correlationData = new CorrelationData(orderId);
// 发送消息
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend("order_exchange_name", " orderRoutingKey", message, correlationData);
}
// 生产消息确认机制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId();
System.out.println("消息id:" + correlationData.getId());
if (ack) {
System.out.println("消息发送确认成功");
} else {
send(orderId);
System.out.println("消息发送确认失败:" + cause);
}
}
}

5.2.2 补单消费者

@Component
public class CreateOrderConsumer {
@Autowired
private OrderMapper orderMapper;

@RabbitListener(queues = "order_create_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("补单消费者" + msg + ",消息id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
String orderId = jsonObject.getString("orderId");
// 判断订单是否存在,如果不存在 实现自动补单机制
OrderEntity orderEntityResult = orderMapper.findOrderId(orderId);
if (orderEntityResult != null) {
System.out.println("订单已经存在 无需补单 orderId:" + orderId);
return;
}
// 订单不存在 ,则需要进行补单
OrderEntity orderEntity = new OrderEntity();
orderEntity.setName("订单名称");
orderEntity.setOrderCreatetime(new Date());
// 价格是300元
orderEntity.setOrderMoney(300d);
// 状态为 未支付
orderEntity.setOrderState(0);
Long commodityId = 30l;
// 商品id
orderEntity.setCommodityId( commodityId);
orderEntity.setOrderId(orderId);

// 1.先下单,创建订单 (往订单数据库中插入一条数据)
try {
int orderResult = orderMapper.addOrder(orderEntity);
System.out.println("orderResult:" + orderResult);
if (orderResult >= 0) {
// 手动签收消息,通知mq服务器端删除该消息
channel.basicAck( message.getMessageProperties().getDeliveryTag( ), false);
}
} catch (Exception e) {
// 丢弃该消息
channel.basicNack( message.getMessageProperties( ).getDeliveryTag( ), false, false);
}
}
}

5.2.3 RabbitmqConfig

@Component
public class RabbitmqConfig {
// 下单并且派单存队列
public static final String ORDER_DIC_QUEUE = "order_dic_queue";
// 补单队列,判断订单是否已经被创建
public static final String ORDER_CREATE_QUEUE = "order_create_queue";
// 下单并且派单交换机
private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";
// 1.定义订单队列
@Bean
public Queue directOrderDicQueue() {
return new Queue(ORDER_DIC_QUEUE);
}
// 2.定义补订单队列
@Bean
public Queue directCreateOrderQueue() {
return new Queue(ORDER_CREATE_QUEUE);
}
// 2.定义交换机
@Bean
DirectExchange directOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE_NAME);
}
// 3.订单队列与交换机绑定
@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind( directOrderDicQueue( )).to(directOrderExchange( )).with("orderRoutingKey");
}
// 3.补单队列与交换机绑定
@Bean
Binding bindingExchangeCreateOrder() {
return BindingBuilder.bind( directCreateOrderQueue( )).to(directOrderExchange( )).with("orderRoutingKey");
}
}

5.2.4 派单服务-消费者

@Component
public class DispatchConsumer {
@Autowired
private DispatchMapper dispatchMapper;
@RabbitListener(queues = "order_dic_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("派单服务平台" + msg + ",消息id:" + messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
String orderId = jsonObject.getString("orderId");
if (StringUtils.isEmpty(orderId)) {
// 日志记录
return;
}
DispatchEntity dispatchEntity = new DispatchEntity();
// 订单id
dispatchEntity.setOrderId(orderId);
// 外卖员id
dispatchEntity.setTakeoutUserId(12l);
// 外卖路线
dispatchEntity.setDispatchRoute("40,40");
try {
int insertDistribute = dispatchMapper.insertDistribute( dispatchEntity);
if (insertDistribute > 0) {
// 手动签收消息,通知mq服务器端删除该消息
channel.basicAck( message.getMessageProperties( ).getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
// // 丢弃该消息
channel.basicNack( message.getMessageProperties( ).getDeliveryTag(), false, false);
}
}
}

 

 

   
次浏览       
相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]
 
最新文章
架构设计-谈谈架构
实现SaaS(软件及服务)架构三大技术挑战
到底什么是数据中台?
响应式架构简介
业务架构、应用架构与云基础架构
最新课程
软件架构设计方法、案例与实践
从大型电商架构演进看互联网高可用架构设计
大型互联网高可用架构设计实践
企业架构师 (TOGAF官方认证)
嵌入式软件架构设计—高级实践
更多...   
成功案例
某新能源电力企业 软件架构设计方法、案例与实践
中航工业某研究所 嵌入式软件开发指南
某轨道交通行业 嵌入式软件高级设计实践
北京 航天科工某子公司 软件测试架构师
北京某领先数字地图 架构师(设计案例)
更多...