消息队列&RabbitMQ(2)

6.8k words

alt text

单独使用

依赖

1
2
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>

基本使用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ConnectionFactory factory = new ConnectionFactory();

// 设定连接信息
factory.setHost("192.168.1.5");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/test");
// 在连接下建立信道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare("my_queue1", false, false, false, null);
// 队列绑定到交换机
channel.queueBind("my_queue1", "amq.direct", "queue1");
// 发布新的消息
channel.basicPublish("amq.direct", "queue1", null, "Hello World!".getBytes());
} catch (Exception e) {
e.printStackTrace();
}

生产者和消费者

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接和信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

// 声明队列
final String QUEUE_NAME = "my_queue01";
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 发布消息
String message = "hello RabbitMQ!!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

System.out.println("消息发送成功!");

// 关闭连接和信道
RabbitMQUtils.closeSource(channel, connection);
}
}

消费者1 消费一次消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

final String QUEUE_NAME = "my_queue01";
// 消息被接收时执行的回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8) );
for (byte b : message.getBody()) {
System.out.println("sleep..." + b);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
// 消息被取消时执行的回调函数
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息中断...");
};

// 消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

RabbitMQUtils.closeSource(channel, connection);
}
}

这里将autoAck设置为true ,但是这种方法其实是不安全的,因为当消费者接受到队列的消息后,将会立即给队列发送应答消息,队列将删除该条消息。这就会导致一个问题:当消费者接受消息后没有完成其对应的任务,又因为队列中已经将消息删除了,消费者的任务将彻底的执行失败!

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 在连接下建立信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// basicAck确认应答,basicNack、basicReject拒绝应答
channel.basicConsume("my_queue1", false,
(consumerTag, message) -> {
System.out.println(new String(message.getBody()));
// 第一个参数为当前消息标签,参数multiple,是否批量处理队列中所有消息,为false时只处理当前消息
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
// 最后一个参数requeue,是否将当前消息放回队列,重新排队,为false时消息被丢弃
//channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true);
//channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> {

});

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RabbitMQUtils {

/**
* 获取连接
* @return 信道(Channel)
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接属性
factory.setHost("172.50.92.70");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
return factory.newConnection();
}

/**
* 关闭资源
* @param channel 信道
* @param connection 连接
*/
public static void closeSource(Channel channel, Connection connection) {
// 关闭资源
if (channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

消费者手动应答

1
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

不公平分发

需要在使用不公平分发的所有消费端设置

1
channel.basicQos(1);

持久化

消息持久化
消息队列持久化

生产者发布确认

SpringBoot整合MQ

依赖

1
2
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>

rabbitmq服务器配置

1
2
3
4
5
6
spring:
rabbitmq:
addresses: 192.168.1.5
username: admin
password: 123456
virtual-host: /test

配置交换机,消息队列以及绑定

RabbitConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class RabbitConfiguration {

// 定义交换机Bean,可以定义多个
@Bean("directExchange")
public Exchange exchange() {
return ExchangeBuilder
.directExchange("amq.direct")
.build();
}

// 定义消息队列
@Bean("myQueue")
public Queue queue() {
return QueueBuilder.
nonDurable("myQueue") // 非持久化
.build();
}

// 将上面定义的交换机和队列进行绑定
@Bean("binding")
public Binding binding(@Qualifier("directExchange") Exchange exchange,
@Qualifier("myQueue") Queue queue) {
return BindingBuilder
.bind(queue) // 绑定队列
.to(exchange) // 到交换机
.with("queue1") // 自定义routingKey
.noargs(); // 不需要额外参数
}

}

生产者和消费者

使用RabbitTemplate

生产者 - 一次发送

1
2
3
4
5
6
7
@Resource
RabbitTemplate rabbitTemplate; // 封装了大量rabbitmq操作的工具类,由Starter提供

void publisher() {
// 自动转换类型(故此处不用getBody()转byte数组)并发送
rabbitTemplate.convertAndSend("amq.direct", "queue1", "Hello spring & amqp!");
}

消费者 - 监听

1
2
3
4
5
6
7
8
9
@Component
public class TestListener {

// 定义此方法为队列myQueue(可以指定多个队列)的监听器,一旦监听到新的消息,就会接受并处理
@RabbitListener(queues = "myQueue")
public void test(Message message) {
System.out.println(new String(message.getBody()));
}
}
1
2
3
4
5
// 可以直接使用String类型接收,会自动转换
@RabbitListener(queues = "myQueue")
public void test(String message) {
System.out.println(message);
}

生产者接受反馈

生产者 - 使用convertSendAndReceive

1
2
Object res = rabbitTemplate.convertSendAndReceive("amq.direct", "queue1", "Hello spring & amqp!");
System.out.println("收到消费者响应:" + res);

消费者 - 添加返回值

1
2
3
4
5
@RabbitListener(queues = "myQueue")
public String test(String message) {
System.out.println(message);
return message + "已收到,响应成功!";
}

若此时消费者返回值为void,则生产者会等待响应,最后得到null

接收json格式的消息,并获取对应实体类

实体类

1
2
3
4
5
@Data
public class User {
int id;
String name;
}

converter

1
2
3
4
5
// 创建一个用于JSON转换的Bean
@Bean("jacksonConverter")
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}

手动发送
{“id”:1, “name”:”userName”}

消费者

1
2
3
4
@RabbitListener(queues = "myQueue")
public void test(User user) {
System.out.println(user);
}

指定messageConverter

1
2
3
4
@RabbitListener(queues = "myQueue", messageConverter = "jacksonConverter")
public void test(User user) {
System.out.println(user);
}

此时不指定也能正确输出

生产者发送
生产者 - 直接发送实体类

1
rabbitTemplate.convertAndSend("amq.direct", "queue1", new User());

发送的实体类在消息队列中以json格式保存
alt text