RabbitMQ从入门到精通---小试牛刀

前面说了基本概念,也尝试动手安装了RabbitMQ Server,接下来我觉得改动动手Coding了,毕竟老话说得好,“光说不练假把式”,不能仅停留在理论上,嗯,要动手!!!

Get Started

新建maven项目,巴拉巴拉…(一杯咖啡的功夫)maven,项目已经建好了,引入rabbit操作的jar包。

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>

Get Started

不多说,直接上代码!

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final String QUEUE_NAME = "myQueue";
final String EXCHANGE_NAME = "myExchange";
final String ROUTING_KEY = "myRoutingKey";
ConnectionFactory factory = new ConnectionFactory();
// 默认localhost
factory.setHost("localhost");
// 默认5672
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("huweitech");
// 创建连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列和Exchange
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));
// 关闭channel和连接
channel.close();
connection.close();
```
执行代码后在RabbitMQ的web控制台上可以看到
![](https://static.dev-heaven.com/images/Control%20Panel.png)
每次执行会发出一条消息,然后close掉连接。
#### 消费者

final String QUEUE_NAME = “myQueue”;

ConnectionFactory factory = new ConnectionFactory();
// 默认localhost
factory.setHost(“localhost”);
// 默认5672
factory.setPort(5672);
factory.setUsername(“admin”);
factory.setPassword(“huweitech”);
// 创建连接
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, “UTF-8”);
System.out.println(message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);

1
2
3
4
5
6
7
8
9
执行代码后可以在RabbitMQ的web控制台上可以看到
![](https://static.dev-heaven.com/images/Consumer.png)
消息被消费,然后给RabbitMQ Server发送ack(`channel.basicAck(envelope.getDeliveryTag(), false);`),确认消费。然后程序会挂起,继续等待接收消息。
*PS:关于ACK,在下篇博客中说明。*
以上就是简单的生产者消费者,其实可以更简单,在生产者中,其实可以不声明Exchange,改为如下:

final String QUEUE_NAME = “myQueue”;

ConnectionFactory factory = new ConnectionFactory();
// 默认localhost
factory.setHost(“localhost”);
// 默认5672
factory.setPort(5672);
factory.setUsername(“admin”);
factory.setPassword(“huweitech”);
// 创建连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = “hello world2211112222”;
channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes(“UTF-8”));

channel.close();
connection.close();

1
2
3
4
5
6
7
8
9
10
11
12
13
???可以不直接声明Exchange?你是不是骗我啊,明明说好的生产者发送到Exchange的啊!
> RabbitMQ默认有一个Exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,RabbitMQ管理控制台对这个default exchange做了描述:The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
### Channel
> 这里想说的是Channel类中的方法以及用法
#### basicQos
> prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

1
2
3
4
5
6
7
8
9
#### basicPublish
>
* routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
* mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
* immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
* BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable=true)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器至少将该消息路由到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

1
2
3
4
5
6
#### queueDeclare
> 声明队列
> * durable: 是否持久化,服务器宕机后能保存下来。
> * exclusive:是否为当前连接的专用队列,在连接断开后,会自动删除该队列。
> * autoDelete:消费者断开后,会自动删除该队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Maparguments) throws IOException;

1
2
3
4
5
#### exchangeDeclare
> 声明Exchange,四种类型:BuiltinExchangeType { DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")}
> * durable: 服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。消息publish的时候需要使用deliveryMode=2
> * autoDelete: 当已经没有消费者时,服务器是否可以删除该Exchange。

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Maparguments) throws IOException;

1
2
3
4
5
#### basicAck
> 当消息正常被消费后,发送ack标识通知Server。
> * deliveryTag: 可以理解为消息在server中的唯一索引(或者叫id)
> * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

void basicAck(long deliveryTag, boolean multiple) throws IOException;

1
2
3
4
5
#### basicNack
> 拒收Message
> requeue: 是否重新入队

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;

1
2
3
4
#### basicReject
> 拒收Message,和basicNack不同的是,每次只能拒收一条消息。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

1
2
3
4
5
6
7
#### basicConsume
> * autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
> * consumerTag:Server为消费者生成的一个标识
> * noLocal:如果只为true,则服务器不能将此channel上publish的Message发给该消费者。
> * exclusive:排他,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Maparguments, Consumer callback) throws IOException;

1
2
3
4
#### queueBind
> 将消费者和队列绑定

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Maparguments) throws IOException;
```

参考

我知道是不会有人点的,但万一有人想不开呢!