编辑推荐: |
本文来自于cnblogs,文章主要案例为主来介绍ActiveMQ消息的消费原理,从方法到过程以及方案等方面详细介绍。
|
|
消费端消费消息:
这里说了两种方法,两种方法可以接收消息,一种是使用同步阻塞的ActiveMQMessageConsumer#receive方法。另一种是使用消息监听器MessageListener。这里需要注意的是,在同一个session下,这两者不能同时工作,也就是说不能针对不同消息采用不同的接收方式。否则会抛出异常。至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控。
先通过ActiveMQMessageConsumer#receive 方法来对消息的接受一探究竟:
public Message
receive() throws JMSException {
checkClosed();
//检查receive和MessageListener是否同时配置在当前的会话中,有则抛出异常
checkMessageListener();
//如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令
sendPullCommand(0);
MessageDispatch md = dequeue(-1);//出列,获取消息
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
//发送ack给到broker
afterMessageIsConsumed(md, false);
//获取消息并返回
return createActiveMQMessage(md);
} |
下面简单的说一下以上几个核心方法中做了什么不为人知的事:
sendPullCommand(0) :发送pull命令从broker上获取消息,前提是prefetchSize=0并且unconsumedMessages为空。unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值
protected void
sendPullCommand(long timeout) throws JMSException
{
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 &&
unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
//向服务端异步发送messagePull指令
session.asyncSendPacket(messagePull);
}
} |
这里发送异步消息跟消息生产的原理是一样的。通过包装链去调用 Sokect 发送请求。
clearDeliveredList():
在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,主要用来清理已经分发的消息链表deliveredMessagesdeliveredMessages,存储分发给消费者但还为应答的消息链表
如果session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来做重发
如果session是非事务的,根据ACK的模式来选择不同的应答操作
这是个同步的过程:
private void
clearDeliveredList() {
if (clearDeliveredList) {//判断是否清楚
synchronized (deliveredMessages) {//采用双重检查锁
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {//是事务消息
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,
Boolean>(session.getTransactionContext ().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages)
{
previouslyDeliveredMessages.put (delivered.getMessage().getMessageId(),
false);
}
LOG.debug("{} tracking existing transacted
{} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId,
deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list
({}) on transport interrupt", getConsumerId(),
deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md: deliveredMessages) {
this.session.connection.rollbackDuplicate (this,
md.getMessage());
}
}
}
LOG.debug("{} clearing delivered list ({})
on transport interrupt", getConsumerId(),
deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
}
clearDeliveredList = false;
}
}
}
} |
dequeue(-1) :从unconsumedMessage中取出一个消息,在创建一个消费者时,就会为这个消费者创建一个未消费的消息通道,这个通道分为两种,一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel
;另一种是先进先出的分发通道FifoMessageDispatchChannel.至于为什么要存在这样一个消息分发通道,大家可以想象一下,如果消费者每次去消费完一个消息以后再去broker拿一个消息,效率是比较低的。所以通过这样的设计可以允许session能够一次性将多条消息分发给一个消费者。默认情况下对于queue来说,prefetchSize的值是1000
private MessageDispatch
dequeue (long timeout) throws JMSException {
try {
long deadline = 0;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
while (true) {//protected final MessageDispatchChannel
unconsumedMessages;
MessageDispatch md = unconsumedMessages.dequeue(timeout);
...........
} |
beforeMessageIsConsumed(md):这里面主要是做消息消费之前的一些准备工作,如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来说就是除了Topic和DupAck这两种情况),所有的消息先放到deliveredMessages链表的开头。并且如果当前是事务类型的会话,则判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。
否则,调用ackLater,批量应答, client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;这比对每条消息都逐个确认,在性能上要提高很多。
private void
beforeMessageIsConsumed(MessageDispatch md) throws
JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
} |
afterMessageIsConsumed:这个方法的主要作用是执行应答操作,这里面做以下几个操作
1.如果消息过期,则返回消息过期的ack
2. 如果是事务类型的会话,则不做任何处理
3. 如果是AUTOACK或者(DUPS_OK_ACK且是队列),并且是优化ack操作,则走批量确认ack
4.如果是DUPS_OK_ACK,则走ackLater逻辑
5.如果是CLIENT_ACK,则执行ackLater
private void
afterMessageIsConsumed(MessageDispatch md, boolean
messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false,
true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs
as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize()
* .65) || (optimizeAcknowledgeTimeOut > 0 &&
System.currentTimeMillis() >= (optimizeAckTimestamp
+ optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg
just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter
> 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge() ||session.isIndividualAcknowledge())
{
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater (md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid
session state.");
}
}
} |
其实在以上消息的接收过程中,我们仅仅能看到这个消息从一个本地变量中出队,并没有对远程消息中心发送通讯获取,那么这个消息时什么时候过来的呢?也就是消息出队中
unconsumedMessages 这个东东时什么时候初始化的呢 ?那么接下去我们应该去通过创建连接的时候去看看了,具体连接的时候都做了什么呢:connectionFactory.createConnection()
private void
afterMessageIsConsumed (MessageDispatch md, boolean
messageExpired) throws JMSException {
protected ActiveMQConnection createActiveMQConnection(String
userName, String password) throws JMSException
{
if (brokerURL == null) {
throw new ConfigurationException("brokerURL
not set.");
}
ActiveMQConnection connection = null;
try {// 果然发现了这个东东的初始化
Transport transport = createTransport();
// 创建连接
connection = createActiveMQConnection(transport,
factoryStats);
// 设置用户密码
connection.setUserName(userName);
connection.setPassword(password);
// 对连接做包装
configureConnection(connection);
// 启动一个后台传输线程
transport.start();
// 设置客户端消费的id
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} ......
} |
创建连接的过程就是创建除了一个带有链路包装的TcpTransport
并且创建连接,最后启动一个传输线程,而这里的 transport.start() 调用的应该是TcpTransport
里面的方法,然而这个类中并没有 start,而是在父类
ServiceSupport.start()中:
public void
start() throws Exception {
if (started.compareAndSet(false, true)) {
boolean success = false;
stopped.set(false);
try {
preStart();//一些初始化
doStart();
success = true;
} finally {
started.set(success);
}
for(ServiceListener l:this.serviceListeners) {
l.started(this);
}
}
} |
doStart 方法前做了一系列的初始化,然后调用 TcpTransport的doStart()
方法:
protected void
doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
} |
继而构建一个连接 设置一个 CountDownLatch 门闩 ,调用父类
TransportThreadSupport 的方法,新建了一个精灵线程并且启动:
protected void
doStart() throws Exception {
runner = new Thread(null, this, "ActiveMQ
Transport: " + toString(), stackSize);
runner.setDaemon(daemon);
runner.start();
} |
调用TransportThreadSupport.doStart().
创建了一个线程,传入的是 this,调用子类的 run 方法,也就是 TcpTransport.run().
public void
run() {
LOG.trace("TCP consumer thread for "
+ this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} catch (Throwable e){
stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected
error occurred: " + e);
ioe.initCause(e);
onException(ioe);
}finally {
stoppedLatch.get().countDown();
}
} |
run 方法主要是从 socket 中读取数据包,只要 TcpTransport
没有停止,它就会不断去调用 doRun:这里面,通过 wireFormat 对数据进行格式化,可以认为这是一个反序列化过程。wireFormat
默认实现是 OpenWireFormat,activeMQ 自定义的跨语言的wire 协议
protected void
doRun() throws IOException {
try {//通过 readCommand 去读取数据
Object command = readCommand();
//消费消息
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
protected Object readCommand() throws IOException
{
return wireFormat.unmarshal(dataIn);
} |
doConsume:流程走到了消费消息:
public void
doConsume(Object command) {
if (command != null) {//表示已经拿到了消息
if (transportListener != null) {
transportListener.onCommand(command);
} else {
LOG.error("No transportListener available
to process inbound command: " + command);
}
}
} |
TransportSupport 类中唯一的成员变量是 TransportListener
transportListener;,这也意味着一个 Transport 支持类绑定一个传送监听器类,传送监听器接口
TransportListener 最重要的方法就是 void onCommand(Object command);,它用来处理命令。那么这个
transportListener 是在那里初始化的呢?可以思考一下 既然是TransportSupport
唯一的成员变量,而我们锁创建的TcpTransport 是他的子类,那么是不是在创建该transport的时候亦或是在对他进行包装处理的时候做了初始化呢?
我们会在流程中看到在新建 ActiveMQConnectionFactory 的时候有一行关键的代码:
connection =
createActiveMQConnection(transport, factoryStats); |
在这个方法六面追溯下去:会进入 ActiveMQConnection
的构造方法
protected ActiveMQConnection (final
Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
// Configure a single threaded executor who's
core thread can timeout if
// idle
executor = new ThreadPoolExecutor (1, 1, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new
ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection
Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo (new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant (transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(),
-1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl (sessions,
this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates (transport.isFaultTolerant());
} |
从以上代码我们发现 this.transport.setTransportListener(this);
那么这个this是什么呢 ? 正是ActiveMQConnection ,看了一眼该类,发现这个类实现了
TransportListener ,本身就是一个TransportListener。所以上面 transportListener.onCommand(command);
就是 ActiveMQConnection.onCommand(command)。除了和 Transport相互绑定,还对线程池执行器
executor 进行了初始化。这哥执行器是后来要进行消息处理的。
这里面会针对不同的消息做分发,在ActiveMQMessageConsumer#receive方法中锁dequeue所返回的对象是MessageDispatch
。假设这里传入的 command 是MessageDispatch,那么这个 command 的 visit
方法就会调用processMessageDispatch 方法。剪切出其中的代码片段:
public Response
processMessageDispatch (MessageDispatch md) throws
Exception {
// 等待 Transport 中断处理完成
waitForTransportInterruptionProcessingToComplete();
// 这里通过消费者 ID 来获取消费者对象
//(ActiveMQMessageConsumer 实现了 ActiveMQDispatcher
接口),所以
//MessageDispatch 包含了消息应该被分配到那个消费者的映射信息
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching
via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
// 调用会话ActiveMQSession 自己的 dispatch 方法来处理这条消息
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}",
this, md, dispatchers);
}
return null;
} |
其中 ActiveMQDispatcher dispatcher =
dispatchers.get(md.getConsumerId());这行代码的 dispatchers
是在 通过session.createConsumer(destination); 的时候通过 ActiveMQMessageConsumer
的构造方法中有一行代码 :this.session.addConsumer(this); 将 this传入,即
ActiveMQMessageConsumer 对象。而这个 addConsumer 方法:
protected void
addConsumer (ActiveMQMessageConsumer consumer)
throws JMSException {
this.consumers.add(consumer);
if (consumer.isDurableSubscriber()) {
stats.onCreateDurableSubscriber();
}
this.connection.addDispatcher(consumer.getConsumerId(),
this);
} |
可以发现这里的初始化了:this.connection.addDispatcher(consumer.getConsumerId(),
this); 这里的this 即 ActiveMQSession。所以回到 ActiveMQConnection#onCommand方法内
processMessageDispatch 这个方法最后调用了 dispatcher.dispatch(md);
这个方法的核心功能就是处理消息的分发。:
public void
dispatch(MessageDispatch messageDispatch) {
try {
executor.execute(messageDispatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onClientInternalException(e);
}
} |
这里离我们真正要找的进行消息入队的结果很近了,进入executor.execute(messageDispatch);这个方法:
void execute(MessageDispatch
message) throws InterruptedException {
...........
//如果会话不是异步分发并且没有使用 sessionpool 分发,则调用 dispatch
发送消息
if (!session.isSessionAsyncDispatch() &&
!dispatchedBySessionPool) {
dispatch(message);
} else {//将消息直接放到队列里
messageQueue.enqueue(message);
wakeup();
}
} |
这里最后终于发现了入队,判断是否异步分发,不是的话走dispatch(message)
否则进入异步分发。默认是采用异步消息分发。所以,直接调用 messageQueue.enqueue,把消息放到队列中,并且调用
wakeup 方法:
public void
wakeup() {
if (!dispatchedBySessionPool) {//进一步验证
// //判断 session 是否为异步分发
if (session.isSessionAsyncDispatch()) {
try {
TaskRunner taskRunner = this.taskRunner;
if (taskRunner == null) {
synchronized (this) {
if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
//通过 TaskRunnerFactory 创建了一个任务运行类 taskRunner,这里把自己作为一个
task 传入到 createTaskRunner 中,
//说明当前的类一定是实现了 Task 接口的. 简单来说, 就是通过线程池去执行一个任务,完成异步调度
//这里由于executor != null 所以这个task的类型是PooledTaskRunner
this.taskRunner = session.connection .getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId());
}
taskRunner = this.taskRunner;
}
}
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {// 异步分发
while (iterate()) {
}
}
}
} |
所以,对于异步分发的方式,会调用 ActiveMQSessionExecutor 中的 iterate方法,我们来看看这个方法的代码
iterate ():这个方法里面做两个事
1.把消费者监听的所有消息转存到待消费队列中
2. 如果 messageQueue 还存在遗留消息,同样把消息分发(调度)出去
public boolean
iterate() {
// Deliver any messages queued on the consumer
to their listeners.<br> // 将消费者上排队的任何消息传递给它们的侦听器。
for (ActiveMQMessageConsumer consumer : this.session.consumers)
{
if (consumer.iterate()) {
return true;
}
}
// No messages left queued on the listeners..
so now dispatch messages
// queued on the session<br> // 侦听器上没有留下排队等待的消息。现在分派消息
MessageDispatch message = messageQueue.dequeueNoWait();
if (message == null) {
return false;
} else {// 分发(调度)消息
dispatch(message);
return !messageQueue.isEmpty();
}
} |
dispatch(message);消息确认分发。通过ActiveMQSessionExecutor的dispatch
方法,转到了 ActiveMQMessageConsumer 消费者类的 dispatch 方法:
public void
dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {// 判断消息是否为重发消息
if (this.info.isBrowser() || !session.connection.isDuplicate(this,
md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning())
{
//我这边通过consumer.receive()处理消息,所以这里listener为空,走下面
} else {
if (!unconsumedMessages.isRunning()) {
// delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage());
}
if (md.getMessage() == null) {
// End of browse or pull request timeout.
unconsumedMessages.enqueue(md);
} else {
if (!consumeExpiredMessage(md)) {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
.........
} |
最终会走入 unconsumedMessages.enqueue(md);添加消息。这里需要注意的是enqueue
方法:由于消费者可能处于阻塞状态,这里做了入队后回释放锁,也就是接触阻塞。
public void
enqueue(MessageDispatch message) {
synchronized (mutex) {
list.addLast(message);
mutex.notify();
}
} |
到这里为止,消息如何接受以及他的处理方式的流程,我们已经搞清楚了。其实在这个消息消费的流程中,已经在建立连接,创建消费者的时候就已经初始化好了消息队列了。结合上面的过程来看看整个消费流程的流程图
消费端的 PrefetchSize: 在消息发布的时候我们曾经研究过 producerWindowSize 。主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。对于客户端,也是类似存在这么一个属性来约束客户端的消息处理。activemq
的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口大小。不同的类型的队列,prefetchSize
的默认值也是不一样的.
1. 持久化队列和非持久化队列的默认值为 1000
2.持久化 topic 默认值为 100
3.非持久化队列的默认值为 Short.MAX_VALUE-1
测试方法是在MQ上生产1000条消息,先后启动comsumer1,comsumer2 两个消费者并且循环调用1000次消费,我妈会发现
comsumer2 拿不到消息,这个时候我们可以通过debug进入comsumer1 的ActiveMQConnect会发现里面有个属性的size=1000.其实就是这个prefetchSize,翻译过来是预取大小,消费端会根据prefetchSize
的大小批量获取数据。意思是在创建连接的时候会取获取1000条消息预加载到缓存中等待处理,这样子导致comsumer2去获取消息的时候
broker上已经空了。
prefetchSize 的设置方法:
在 createQueue 中添加 consumer.prefetchSize,就可以看到效果
Destination destination=session.createQueue ("myQueue?consumer.prefetchSize=10"); |
既然有批量加载,那么一定有批量确认,这样才算是彻底的优化,这就涉及到 optimizeAcknowledge optimizeAcknowledge
ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化ACK”,只有在为
true 的情况下,prefetchSize 以及optimizeAcknowledgeTimeout
参数才会有意义优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认
ack 了 0.65*prefetchSize 个消息才确认),这个在源码中有体现。在ActiveMQMessageConsumer#receive方法内的处理消息后的
afterMessageIsConsumed 方法内有一个判断:
if (ackCounter
+ deliveredCounter >= (info.getPrefetchSize()
* .65) || (optimizeAcknowledgeTimeOut > 0 &&
System.currentTimeMillis() >= (optimizeAckTimestamp
+ optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages (MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);//满足条件则发送批量应答ACK
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg
just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter
> 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
} |
broker 再次发送消息时又可以批量发送如果只是开启了 prefetchSize,每条消息都去确认的话,broker
在收到确认后也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK来手动延迟确认,
我们需要在 brokerUrl 指定 optimizeACK 选项
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://192.168.11.153:61616? jms.optimizeAcknowledge =true&jms.optimizeAcknowledgeTimeOut=10000"); |
注意,如果 optimizeAcknowledge 为 true,那么
prefetchSize 必须大于 0. 当 prefetchSize=0 的时候,表示 consumer
通过 PULL 方式从 broker 获取消息.
optimizeAcknowledge 和 prefetchSize 的作用,两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销
需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升
consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大的 prefetchSize
反而不能有效的达到提升消费性能的目的。因为过大的prefetchSize 不利于 consumer
端消息的负载均衡。因为通常情况下,我们都会部署多个 consumer 节点来提升消费端的消费性能。这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确认时,client
端发生故障,那么这些消息就有可能会被重新发送给其他consumer,那么这种风险就需要 client
端能够容忍“重复”消息。
消息的确认过程: 消息确认有四种 ACK_MODE,分别是:
1. AUTO_ACKNOWLEDGE = 1 自动确认
2.CLIENT_ACKNOWLEDGE = 2 客户端手动确认
3.DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
4.SESSION_TRANSACTED = 0 事务提交并确认
ACK_MODE 的选择影响着消息消费流程的走向。虽然 Client 端指定了 ACK 模式,但是在
Client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE
表示此确认指令的类型,不同的ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE
对消息进行不同的操作。
ACK_TYPE应答类型: DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker
端可以删除消息了
POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者
DLQ(死信队列),在消息处理的时候,dispatch方法内会判断该消息是否为重发消息
if (this.info.isBrowser()
|| !session.connection.isDuplicate(this, md.getMessage()))
{
if (listener != null && unconsumedMessages.isRunning())
{
// 这段为非重发消息,走else
} else {
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (redeliveryExpectedInCurrentTransaction(md,
true)) {
LOG.debug("{} tracking transacted redelivery
{}", getConsumerId(), md.getMessage());
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE,
1));
}
} else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md))
!= null) {
LOG.warn("{} delivering duplicate {}, pending
transaction completion on {} will rollback",
getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
session.getConnection().rollbackDuplicate(this,
md.getMessage());
dispatch(md);
} else {// 走POSION_ACK_TYPE 添加Active_DLQ 死信队列
LOG.warn("{} suppressing duplicate delivery
on connection, poison acking: {}", getConsumerId(),
md);
posionAck(md, "Suppressing duplicate delivery
on connection, consumer " + getConsumerId());
}
} |
REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer
处理消息时抛出了异常,broker 稍后会重新发送此消息
INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何
ACK_MODE 下
UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合
Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker
上确 认了消息)。
Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK
Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定此消息的后续操作。在
afterMessageIsConsumed 消息接收处理后会根据条件来设置 ACK_TYPE.
消息的重发机制原理: 在正常情况下,有几中情况会导致消息重新发送
在事务性会话中,没有调用 session.commit 确认消息宕机或者调用session.rollback
方法回滚消息
在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE
(客户端手动应答)的情况下,没有调用 session.commit或者调用了 recover 方法;
一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会给 broker
发送一个”poison ack”表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker
会把这个消息放到 DLQ(死信队列)。
死信队列: ActiveMQ 中默认的死信队列是 ActiveMQ.DLQ,如果没有特别的配置,有毒的消息都会被发送到这个队列。默认情况下,如果持久消息过期以后,也会被送到
DLQ 中。
只要在处理消息的时候抛出一个异常就可以演示,会看到控制台对于失败消息会重发6次,登陆ActiveMQ控制台会看到一个
ActiveMQ.DLQ。在创建队列的时候可以直接指定从ActiveMQ.DLQ去消费消息。
死信队列配置策略:
缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理,可以通过
individualDeadLetterStrategy 或 sharedDeadLetterStrategy
策略来进行修改。在activemq.xml上
<destinationPolicy>
<policyMap> <policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy
is used to prevent
slow topic consumers to block producers and affect
other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
--> <pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy> </policyEntry>
// “>”表示对所有队列生效,如果需要设置指定队列,则直接写队列名称 <policyEntry
queue=">"> <deadLetterStrategy>
//queuePrefix:设置死信队列前缀
//useQueueForQueueMessage 设置队列保存到死信。 <individualDeadLetterStrategy
queuePrefix ="DLQ."useQueueForQueueMessages ="true"/>
</deadLetterStrategy> </policyEntry>
</policyEntries> </policyMap>
</destinationPolicy> |
自动丢弃过期消息
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false"
/>
</deadLetterStrategy> |
ActiveMQ 静态网络配置:broker网络连接(broker的高性能方案)
修改 activeMQ 服务器的 activeMQ.xml, 增加如下配置,这个配置只能实现单向连接,实现双向连接需要各个节点都配置如下配置。
<networkConnectors>
<networkConnector uri="static:// (tcp://192.168.254.135:61616, tcp://192.168.254.136:61616)"/>
</networkConnectors> |
两个 Brokers 通过一个 static 的协议来进行网络连接。一个 Consumer 连接到BrokerB
的一个地址上,当 Producer 在 BrokerA 上以相同的地址发送消息,此时消息会被转移到
BrokerB 上,也就是说 BrokerA 会转发消息到BrokerB 上。
在activeMQ中,进行了静态网络桥接的两台节点而言,当 Producer
在 BrokerA 上以相同的地址发送10条消息。一个 Consumer 连接到BrokerB去消费消息,当消费了一半的时候出现异常了,那么剩下来未处理的消息会被存放到
BrokerB 的待处理消息队列中,此时要通过BrokerA再去消费是消费不到的,万一此刻BrokerB
挂了,那么哪些没有消费的消息将会丢失。mq给我们提供了一个有效的消息回流机制。
<policyEntry
queue=">" enableAudit="false">
<networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry> |
ActiveMQ 的优缺点 ActiveMQ 采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反比。
缺点
1.吞吐量低。由于 ActiveMQ 需要建立索引,导致吞吐量下降。这是无法克服的缺点,只要使用完全符合
JMS 规范的消息中间件,就要接受这个级别的TPS。
2.无分片功能。这是一个功能缺失,JMS 并没有规定消息中间件的集群、分片机制。而由于 ActiveMQ
是伟企业级开发设计的消息中间件,初衷并不是为了处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向拆分。ActiveMQ
官方不提供分片机制,需要自己实现。
适用场景
1. 对 TPS 要求比较低的系统,可以使用 ActiveMQ 来实现,一方面比较简单,能够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面
不适用的场景
1.消息量巨大的场景。ActiveMQ 不支持消息自动分片机制,如果消息量巨大,导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。
|