消息队列&RabbitMQ(3)

6.3k words

死信队列

付款超时,取消订单

死信

  • 消息被拒绝(basic.reject/basic.nack),并且requeue = false(不放回队列)
  • 消息TTL过期
  • 队列达到最大长度

死信处理机制

alt text

  1. 正常队列将死信发送至死信交换机
  2. 死信交换机发往死信队列
  3. 对应的消费者处理死信

示例

在配置中新增死信队列和死信交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 死信交换机
@Bean("directDlExchange")
public Exchange dlExchange(){
return ExchangeBuilder.directExchange("dlq.direct").build();
}

// 死信队列
@Bean("myDlQueue")
public Queue dlQueue(){
return QueueBuilder
.nonDurable("myDlQueue")
.build();
}

// 死信交换机和死信队列进绑定
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,
@Qualifier("myDlQueue") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dl-queue")
.noargs();
}

给正常队列添加死信机制

  • 消息被拒绝(basic.reject/basic.nack),并且requeue = false(不放回队列)

    1
    2
    3
    4
    5
    6
    7
    8
    @Bean("myQueue")
    public Queue queue() {
    return QueueBuilder
    .nonDurable("myQueue")
    .deadLetterExchange("dlq.direct") // 指定死信交换机
    .deadLetterRoutingKey("dl-queue") // 指定routingKey
    .build();
    }
  • 消息TTL过期

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Bean("myQueue")
    public Queue queue() {
    return QueueBuilder
    .nonDurable("myQueue")
    .deadLetterExchange("dlq.direct") // 指定死信交换机
    .deadLetterRoutingKey("dl-queue") // 指定routingKey
    .ttl(5000) // 存活时间5秒
    .build();
    }
  • 队列达到最大长度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Bean("myQueue")
    public Queue queue() {
    return QueueBuilder
    .nonDurable("myQueue")
    .deadLetterExchange("dlq.direct") // 指定死信交换机
    .deadLetterRoutingKey("dl-queue") // 指定routingKey
    .maxLength(3) // 队列最大长度
    .build();
    }

    超过最大长度时,会把位于队首的消息丢进死信队列

工作队列

一个队列多个消费者
alt text

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "myQueue")
public void receiver(String message) {
System.out.println("1号消费者:" + message);
}

@RabbitListener(queues = "myQueue")
public void receiver2(String message) {
System.out.println("2号消费者:" + message);
}

轮询分发

消息队列中的消息被消费者轮流消费,轮询分发
两个消费者1,2

  • 输入aa,bb,cc,dd
    1号消费aa
    2号消费bb
    1号消费cc
    2号消费dd

  • 输入aa,bb,cc,dd
    1号消费aa
    2号消费bb
    1号消费cc
    2号宕机
    1号消费dd

队列连接的消费者
alt text
Prefetch count表示该消费者每次拿取消息的数量
轮询时,假如队列中已有700条消息,则
1号250条
2号250条
1号200条

自定消费者的Prefetch count

定义RabbitListenerContainerFactory

1
2
3
4
5
6
7
8
9
10
@Resource
private CachingConnectionFactory connectionFactory;

@Bean(name = "listenerFactory")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1); // 将PrefetchCount设定为1,一次取一个
return factory;
}

指定消费者创建时使用的containerFactory

1
2
3
4
@RabbitListener(queues = "myQueue", containerFactory = "listenerFactory")
public void receiver(String message) {
System.out.println("1号消费者:" + message);
}

alt text

使用注解创建多个消费者

设置concurrency

1
2
3
4
@RabbitListener(queues = "myQueue", containerFactory = "listenerFactory", concurrency = "6")
public void receiver(String message) {
System.out.println(message);
}

alt text

此处创建了6个

发布/订阅模式

消息分发到多个消息队列
alt text

使用fanout类型的路由器
alt text

定义交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 定义fanout交换机
@Bean("fanoutExchange")
public Exchange exchange() {
return ExchangeBuilder
.fanoutExchange("amq.fanout")
.build();
}

// 定义消息队列1
@Bean("myQueue1")
public Queue queue1() {
return QueueBuilder
.nonDurable("myQueue1")
.build();
}

// 定义消息队列2
@Bean("myQueue2")
public Queue queue2() {
return QueueBuilder
.nonDurable("myQueue2")
.build();
}

// 将fanout交换机和队列1进行绑定
@Bean("binding1")
public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,
@Qualifier("myQueue1") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.noargs();
}

// 将fanout交换机和队列2进行绑定
@Bean("binding2")
public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,
@Qualifier("myQueue2") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.noargs();
}

alt text

定义消费者

1
2
3
4
5
6
7
8
9
10
11
// 1号消费者监听队列1
@RabbitListener(queues = "myQueue1")
public void receiver(String message) {
System.out.println("1号消费者:" + message);
}

// 2号消费者监听队列2
@RabbitListener(queues = "myQueue2")
public void receiver2(String message) {
System.out.println("2号消费者:" + message);
}

路由器发送消息时无论是否指定routing key, 都会发送到所有绑定的队列上

路由模式

使用direct类型的交换机,通过指定不同的routing key发送到不同队列
可以使用不同routing key多次绑定同一队列
alt text

主题模式

routing key以模糊匹配进行转发

使用topic类型的交换机
alt text

  • *:表示1个单词(任意多个字母)
  • #:表示0个或多个单词

其中 . 也算一个单词,例如routing key为*.test.*时, .test. 也成立
同理routing key为test.#时, test 成立

定义交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 定义topic交换机
@Bean("topicExchange")
public Exchange exchange() {
return ExchangeBuilder
.topicExchange("amq.topic")
.build();
}

@Bean("myQueue")
public Queue queue1() {
return QueueBuilder
.nonDurable("myQueue")
.build();
}

@Bean("binding")
public Binding binding(@Qualifier("topicExchange") Exchange exchange,
@Qualifier("myQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("*.test.*")
.noargs();
}

alt text

amq.rabbitmq.trace - 用于日志追踪的预设交换机

  1. 开启追踪

    1
    rabbitmqctl trace_on -p [virtual host]

    关闭trace_off

  2. amq.rabbitmq.trace配置routing key绑定到队列

  • publish.{exchangename} 表示生产者发送的消息
  • deliver.{queuename} 表示消费者接收的消息
    alt text

trace记录的消息示例 - 一次生产一次消费
alt text

header Exchange 头部信息交换机

alt text

根据头部信息Headers转发消息
alt text

定义header交换机和队列,并配置header交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 定义headers交换机
@Bean("headersExchange")
public HeadersExchange exchange() { // 此处要声明为HeadersExchange,而不是Exchange,以便后续配置
return ExchangeBuilder
.headersExchange("amq.headers")
.build();
}

@Bean("myQueue")
public Queue queue() {
return QueueBuilder
.nonDurable("myQueue")
.build();
}

@Bean("binding")
public Binding binding(@Qualifier("headersExchange") HeadersExchange exchange,
@Qualifier("myQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.where("hello").matches("world") // Headers中存在key为"hello", value为"world"才可成功发送
//.whereAll("a", "b", "c").exist() // Headers中需同时存在"a", "b", "c"的key
//.whereAny("d", "e", "f").exist() // Headers中需存在"d", "e", "f"中的任意一个key
;
}

此处有重载,可以看到不同类型的交换机作参数,返回的配置不一样
alt text

传值Map以检测key-value
alt text

绑定关系
alt text

发送消息
alt text