编辑推荐: |
本文主要介绍了介绍了消息队列的内容并详细介绍了RabbitMQ以及SpringAMQP。希望对您的学习有所帮助。
本文来自于博客园,由Linda编辑、推荐。 |
|
我们在微服务中一个命令会逐渐调用各个微服务,但如果一一调用不仅需要微服务实时同步交互还会浪费效率
所以我们通常会采用MQ,也就是消息队列Message Queue来处理这个问题
下面我们会通过几个方法介绍消息队列:
MQ
RabbitMQ
SpringAMQP
MQ
首先我们先来介绍消息队列的各个信息
同步通信
首先我们需要先去了解同步通信:
概念:当一个微服务与另一个微服务建立连接时,双方必须同时接受并且处于空闲状态便于两者交互
举例:比如我们使用手机打电话,我们这边发送打电话的请求,另一方必须也处于空闲状态并接收这个请求,两者才能打电话成功
微服务举例:Feign调用就属于同步方式,虽然调用可以实时得到结果
我们给出一个同步通信的简单例子:
我们对上图进行简单解释:
我们可以很明显的感觉到同步通信的优点:
时效性较强,可以立即得到结果
但是缺点也非常的多:
耦合度高:每次加入新的需求都需要修改原先的代码
性能和吞吐能力下降:调用者需要等待服务者全部完成服务后才会得到响应,若服务者过多导致速度过慢
有额外的资源消耗:调用链中每个服务在等待过程中无法释放自己已保留的资源,必须等当前服务结束后才可释放
有级联失败问题:当其中一个服务出现错误,整条调用链出现错误
异步通信
我们同样给出异步通信的概念:
异步通信整体分为三部分:发布者,Broker,订阅者
其中发布者就相当于我们的用户,发布者只需要发布一条信息,这条信息会携带一定的信息
其中订阅者就相当于我们的微服务,订阅者会去依次处理自己所接收到的信息,然后做出对应的操作
其中Broker就是消息队列,Broker会去接收信息,并将信息传递给订阅者,它并不会去记录信息来自哪也不去记录信息去往哪
那么异步通信的优点其实很明显:
吞吐量提升:无需等待订阅者处理完成,发送方会直接获得一个响应,吞吐量提升
故障隔离:服务没有直接调用,不存在级联失败问题,当一个微服务故障时只有该微服务失效
流量削峰:不管发布事件的流量波动多大,都由Broker接收,消费者可以按照自己的速度去处理事件
耦合度低:每个服务都单独存在,当需要使用到某个服务时,该服务只需要去订阅该Broker,不需要做额外源代码修改
无额外资源消费:由于每个微服务单独存在,所以不存在链表关系,不会去提前占用资源,只有自己使用时才会占用资源
但是缺点同样明显:
Broker核心工具:需要依赖于Broker的可靠、安全、性能
业务复杂性:业务之间没有链表连接,而是信息直接传递,没有线性关系,难以追踪判断
技术对比
我们来认识一下市面上常见的消息队列:
我们给出一些消息队列选择的建议:
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
追求可靠性:RabbitMQ、RocketMQ
追求可用性:Kafka、 RocketMQ 、RabbitMQ
RabbitMQ
我们主要去学习RabbitMQ的基本使用
基本安装
我们如果要去使用RabbitMQ,首先需要先进行插件安装:
线上拉取镜像
docker pull rabbitmq:3-management
|
在线生成容器
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
|
基本入门
首先我们需要知道最基本的消息队列模型:
最基本的消息队列模型只包括三个元素,分别是publisher(发布者),queue(消息队列),consumer(订阅者)
他们的用途分别是:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息
其基本流程图为:
那么下面我们就来完成一个基本的RabbitMQ的小项目(只需了解):
首先我们需要一个父工程,在父工程下有两个子工程
我们首先去书写发布者的发送代码
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168. 150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue";
channel.queueDeclare (queueName, false, false, false, null);
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println( "发送消息成功:【" + message + "】");
channel.close();
connection.close();
}
}
|
然后我们去书写订阅者接收代码
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent. TimeoutException;
public class ConsumerTest {
public static void main (String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
Connection connection = factory.newConnection();
Channel channel = connection. createChannel();
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println ("接收到消息:【" + message + "】");
}
});
System.out.println("等待 接收消息。。。。");
}
}
|
到这里我们已经基本了解了RabbitMQ的使用,让我们进入下一章节!
SpringAMQP
SpringAMQP是针对MQ的API更新,也就是使用简单的API去完成上述复杂的RabbitMQ使用过程
RabbitMQ消息模型
在正式接收SpringAMQP之前,我们需要先去了解一下RabbitMQ的五种常见消息模型:
基本消息队列
存在一条链关系,发布者发布信息交给消息队列,订阅者从消息队列订阅消息
工作消息队列
存在一个消息队列连接多个订阅者
正常情况下订阅者均等保存所获取的消息,但可以通过设置来改变订阅者当前可保存信息个数
发布订阅广播版
除消息队列外,存在一个交换器Exchange,交换器在广播状态下会将消息发送给所有相连接的消息队列
发布订阅路由版
交换器选择性地将信息交给不同的消息队列
交换器传递的信息会附带一个key值,而不同的消息队列存在一个或多个key值,如果相符合就将其信息传递给该消息队列
发布订阅主题版
一种功能类似于路由版的发布订阅方式
将传统的key值转化为多个字段的拼接值,采用"."进行拼接,其中可以采用"*"代替一个字段,采用"#"代替一个或多个字段
SpringAMQP简单介绍
首先我们需要去了解AMQP:
用于应用程序之间的传递业务信息的开放标准
该协议与平台与编程语言无关,更加符合微服务的独立性要求
那么我们再去了解SpringAMQP:
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发布消息和接收消息,利用SpringBoot对其实现了自动装配
其实简单来说SpringAMQP为我们提供了三个功能:
自动声明队列、交换机及其绑定关系
基于注解的监听器模式,异步接收消息
封装了RabbitTemplate工具,用于发送消息
SpringAQMP简单消息队列
我们利用SpringAMQP来实现简单消息队列:
在父工程中导入依赖
<dependency>
<groupId>org.springframework. boot</groupId>
<artifactId>spring-boot- starter-amqp</artifactId>
</dependency>
|
配置RabbitMQ地址
spring:
rabbitmq:
host: 192.168.150.101
port: 5672
virtual-host: /
username: itcast
password: 123321
|
编写Publisher测试类发送消息
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp. <br>rabbit.core.RabbitTemplate;
import org.springframework.beans. factory.annotation.Autowired;
import org.springframework.boot. test.context.SpringBootTest;
import org.springframework.test. context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convert AndSend(queueName, message);
}
}
|
编写Listener监听者类监听消息
package cn.itcast.mq.listener;
import org.springframework.amqp. rabbit.annotation.RabbitListener;
import org.springframework. stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage (String msg) throws InterruptedException {
System.out.println ("spring 消费者接收到消息:【" + msg + "】");
}
}
|
SpringAMQP工作消息队列
我们先来简单介绍一下工作消息队列:
工作消息队列无非就是将一个消息队列与多个订阅者签订在一起
这多个订阅者的功能大部分情况下是一样的,只是为了做一个简单的负载均衡处理
每个订阅者都会去不断获取消息队列中的消息直到订阅者自身阈值或者消息已经被获取完毕
我们来使用SpringAMQP来实现工作消息队列:
我们采用发布者发布多条消息
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convert AndSend(queueName, message + i);
Thread.sleep(20);
}
}
|
我们采用两个订阅者来订阅消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接 收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2 (String msg) throws InterruptedException {
System.err.println("消费者2.... ....接收到消息:【" + msg + "】 " + LocalTime.now());
Thread.sleep(200);
}
|
修改最大阈值来加快效率
spring:
rabbitmq:
listener:
simple:
prefetch: 1
|
SpringAMQP发布订阅广播
我们首先来详细介绍一下发布订阅(广播)的结构:
发布订阅广播主要是在消息队列的划分上加上了一层交换机系统
在发布订阅广播中交换机会将从发布者获得信息传递给全部所有与之相连的消息队列以供处理
需要注意Exchange(交换机)只负责转发消息,不具备存储消息的能力,如果没有与之相连的消息队列就会导致信息丢失
我们同样采用SpringAQMP来实现发布订阅广播:
@Test
public void testFanoutExchange() {
String exchangeName = "qiuluo.fanout";
String message = "hello, everyone!";
rabbitTemplate.convertAndSend (exchangeName, "", message);
}
|
消费者Consumer获取信息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1 接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者 2接收到Fanout消息:【" + msg + "】");
}
|
设置交换机和消息队列并进行绑定
package cn.itcast.mq.config;
import org.springframework .amqp.core.Binding;
import org.springframework. amqp.core.BindingBuilder;
import org.springframework. amqp.core.FanoutExchange;
import org.springframework. amqp.core.Queue;
import org.springframework.
context.annotation.Bean;
import org.springframework. context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding bindingQueue1 (Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder. bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding bindingQueue2 (Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind (fanoutQueue2).to(fanoutExchange);
}
}
|
SpringAMQP发布订阅路由
我们同样来简单介绍一下发布订阅路由:
发布订阅系列都是需要交换机与消息队列进行绑定,由交换机决定消息应当发往哪个消息队列
在该模式下需要进行路由选择,在发送消息时会传递一个key值,这个值在publisher发送时所携带的
每一个队列也会有一个或多个对应的key值,当交换机获得信息后,会对key进行比对,若相同就传递给对应的消息队列
我们下面采用SpringAMQP的注解声明方式来实现发布订阅路由:
发布者发布消息
@Test
public void testSendDirectExchange() {
String exchangeName = "qiuluo.direct";
String message = "红色警报!";
rabbitTemplate.convertAndSend (exchangeName, "red", message);
}
|
订阅者处理消息(采用注解方式来绑定交换机和消息队列)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct. queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "qiuluo. direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到 direct.queue2的消息:【" + msg + "】");
}
|
SpringAMQP发布订阅主题
我们同样来简单介绍一下发布订阅路由:
发布订阅主题实际上和发布订阅路由一样是进行队列选择的
但是主题的key值是由多个部分组成的,其中采用"."来进行分割,例如:China.weather
我们可以采用"*"来代替一个key值,同时我们可以采用"#"来代替一个或多个key值,更具有灵活性
我们同样采用SpringAMQP来给出一个发布订阅主题的案例:
发布者发布消息
@Test
public void testSendTopicExchange() {
String exchangeName = "qiuluo.topic";
String message = "喜报!胜!";
rabbitTemplate.convertAndSend (exchangeName, "china.news", message);
}
|
订阅者获得消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收 到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到 topic.queue2的消息:【" + msg + "】");
}
|
消息转换器
最后我们介绍一个简单的知识点:
由于我们的RabbitMQ在存储信息时会进行序列化处理,而默认的Spring序列化处理是JDK序列化处理
而JDK序列化处理存在有多种缺点:数据体积大,存在安全漏洞,可读性差等
所以我们在正常使用时通常会去更换默认消息转换器,采用JSON消息转换器:
导入jackson依赖
<dependency>
<groupId>com.fasterxml. jackson.dataformat</groupId>
<artifactId>jackson- dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
|
在启动类中添加一个Bean即可
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
|
|