RabbitMQ从入门到精通---ACK机制

首先需要思考一个问题:我们为什么要MQ?

  • 通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
  • MQ提供松耦合的应用架构。

Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,这段数据就丢失了。如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即ack。为了保证数据能被正确处理而不仅仅是被Consumer收到,我们就不能采用no-ack或者auto-ack,我们需要手动ack(manual-ack)。在数据处理完成后手动发送ack,这个时候Server才将Message删除。

如何设置?

action in java
1
2
# Channel.java,将autoAck设置为false即可。
String basicConsume(String queue, boolean autoAck, Consumer callback);
action in spring
1
2
3
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="myConsumer" queues="myQueue"/>
</rabbit:listener-container>

忘记ack?

只能说后果很严重,因为没有ack的话,RabbitMQ Server会重新分发,并且RabbitMQ Server不会再发送数据给它(直至Server收到ack才会再次发送消息,参看测试用例),因为Server认为这个Consumer处理能力有限。这样就导致消息在RabbitMQ Server上堆积,最终造成内存泄露。

如何排查

可以使用以下命令查看没有被ACK的消息,如果有大量消息,那八九不离十就是忘记ack了。

1
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

ACK其他作用

ACK机制可以起到限流的作用,比如在消费者处理后,sleep一段时间,然后再ACK,这可以帮助消费者负载均衡。

当然,除了ACK,有两个比较重要的参数也在控制着consumer的load-balance,即prefetch和concurrency

prefetch & concurrency
  1. prefetch: 每次从一次性从broker里面取的待消费的消息的个数。prefetch可以改变RabbitMQ的循环分发机制,比如设置其值为1,当消费者在处理消息完成,还没有ACK的时候,RabbitMQ的Server不会再发消息给该消费者。
  2. concurrency: 这个表示每个listener创建多少个消费者(会创建多少个线程来消费)
  1. 首先,basic.qos是针对channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos信令。
  2. 其实basic.qos里还有另外两个参数可进行设置(global和prefetch_size),但RabbitMQ没有相应的实现。
  3. RabbitMQ如何挑选消费者?当RabbitMQ要将队列中的一条消息投递给消费者时,会遍历该队列上的消费者列表,选一个合适的消费者,然后将消息投递出去。其中挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到设置的prefetch_count个数,如果未ack的消息数达到了prefetch_count的个数,则不符合要求。

ACK测试

case1:没有ACK
1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = connection.createChannel();
// 消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("1号消费者 ===> " + message);
//getChannel().basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, null, consumer);

结论:没有ACK,消费者任然会正常消费,消费者断开重连后,RabbitMQ Server会重新将消息发送给消费者,消费者重新执行任务,这可能导致#重复消费#

case2:两个消费者,一个正常ACK,一个不ACK

两个消费者请使用两个Java实例运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Channel channel = connection.createChannel();
// 消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("1号消费者 ===> " + message);
// Thread.sleep(2000); // 延时ACK
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, null, consumer);
// 消费者
DefaultConsumer anotherConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("2号消费者 ===> " + message);
//getChannel().basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, null, anotherConsumer);

结论:测试发现,两个消费者交替消费(循环分发机制)。

  1. 重启2号消费者,这个时候RabbitMQ会将其消费过的消息重新发给1号消费者消费。
  2. 即使没有ACK,RabbitMQ也不是立即不发送消息到没有ack的consumer,其实这很好理解,既然ack可以延时,那Server完全有理由相信consumer回复只是比较慢而已,它不是不回。所以继续会发第二条。
  3. 本来以为是2号消费者重启过程中,1号消费者消费过快,等2号消费者起起来时,消息已经被1号消费者消费完,其实不是,就算给1号消费者加上延迟ACK,2号消费者也不会再接收消息。应该是RabbitMQ内部的某种记忆功能,将2号消费者没有ACK的消息,直接归给1号消费者消费。
  4. 再次执行生产者,此时2号消费者任然可以重新接收消息。
  5. 设置1号消费者prefetch=1(channel.basicQos(1)),2号消费者不做任何设置,然后两个消费者都订阅同一队列,开启acknowledge机制。RabbitMQ向1号消费者投递了一条消息后,消费者未对该消息进行ack,RabbitMQ不会再向该消费者投递消息,剩下的消息均投递给了2号消费者。这和第二个试验结果不同。

补充

我们知道,有了ACK机制,当消费者挂掉后,消息可以不丢失,但是如果RabbitMQ Server挂掉了呢?这就需要持久化机制。如果没有设置相应的持久化,会在Server退出后丢掉Exchange和Queue。

队列持久化

1
2
3
4
5
/*
* @param durable持久化
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

Exchange持久化

1
2
3
4
/*
* @param durable持久化
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

Message持久化

1
2
3
BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, properties, message.getBytes("UTF-8"));

引入spring-rabbit

1
2
3
4
5
6
> <dependency>
> <groupId>org.springframework.amqp</groupId>
> <artifactId>spring-rabbit</artifactId>
> <version>1.7.3.RELEASE</version>
> </dependency>
>

其中有一个类org.springframework.amqp.core.MessageDeliveryMode表示消息发布模式,参看toInt方法,可以看出:非持久化(1)和持久化(2)。

参考

  1. https://my.oschina.net/hncscwc/blog/195560
  2. https://emacsist.github.io/2015/12/18/%E5%BD%BB%E5%BA%95%E4%BA%86%E8%A7%A3Spring-%E4%B8%AD-RabbitMQ%E9%85%8D%E7%BD%AE%E7%9A%84concurrency-%E5%92%8C-task-executor%EF%BC%88%E5%BC%82%E6%AD%A5/
  3. http://emacsist.github.io/2015/12/14/Spring-%E4%B8%8E-RabbitMQ-%E7%BB%93%E5%90%88%E9%85%8D%E7%BD%AE%E4%BB%A5%E5%8F%8A%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9/
我知道是不会有人点的,但万一有人想不开呢!