死信队列
付款超时,取消订单
死信
- 消息被拒绝(basic.reject/basic.nack),并且requeue = false(不放回队列)
- 消息TTL过期
- 队列达到最大长度
死信处理机制

- 正常队列将死信发送至死信交换机
- 死信交换机发往死信队列
- 对应的消费者处理死信
示例
在配置中新增死信队列和死信交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Bean("directDlExchange") public Exchange dlExchange(){ return ExchangeBuilder.directExchange("dlq.direct").build(); }
@Bean("myDlQueue") public Queue dlQueue(){ return QueueBuilder .nonDurable("myDlQueue") .build(); }
@Bean("dlBinding") public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange, @Qualifier("myDlQueue") Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("dl-queue") .noargs(); }
|
给正常队列添加死信机制
工作队列
一个队列多个消费者

1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "myQueue") public void receiver(String message) { System.out.println("1号消费者:" + message); }
@RabbitListener(queues = "myQueue") public void receiver2(String message) { System.out.println("2号消费者:" + message); }
|
轮询分发
消息队列中的消息被消费者轮流消费,轮询分发
两个消费者1,2
队列连接的消费者

Prefetch count表示该消费者每次拿取消息的数量
轮询时,假如队列中已有700条消息,则
1号250条
2号250条
1号200条
自定消费者的Prefetch count
定义RabbitListenerContainerFactory
1 2 3 4 5 6 7 8 9 10
| @Resource private CachingConnectionFactory connectionFactory;
@Bean(name = "listenerFactory") public SimpleRabbitListenerContainerFactory listenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); return factory; }
|
指定消费者创建时使用的containerFactory
1 2 3 4
| @RabbitListener(queues = "myQueue", containerFactory = "listenerFactory") public void receiver(String message) { System.out.println("1号消费者:" + message); }
|

使用注解创建多个消费者
设置concurrency
1 2 3 4
| @RabbitListener(queues = "myQueue", containerFactory = "listenerFactory", concurrency = "6") public void receiver(String message) { System.out.println(message); }
|

此处创建了6个
发布/订阅模式
消息分发到多个消息队列

使用fanout类型的路由器

定义交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Bean("fanoutExchange") public Exchange exchange() { return ExchangeBuilder .fanoutExchange("amq.fanout") .build(); }
@Bean("myQueue1") public Queue queue1() { return QueueBuilder .nonDurable("myQueue1") .build(); }
@Bean("myQueue2") public Queue queue2() { return QueueBuilder .nonDurable("myQueue2") .build(); }
@Bean("binding1") public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange, @Qualifier("myQueue1") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .noargs(); }
@Bean("binding2") public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange, @Qualifier("myQueue2") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .noargs(); }
|

定义消费者
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "myQueue1") public void receiver(String message) { System.out.println("1号消费者:" + message); }
@RabbitListener(queues = "myQueue2") public void receiver2(String message) { System.out.println("2号消费者:" + message); }
|
路由器发送消息时无论是否指定routing key, 都会发送到所有绑定的队列上
路由模式
使用direct类型的交换机,通过指定不同的routing key发送到不同队列
可以使用不同routing key多次绑定同一队列

主题模式
routing key以模糊匹配进行转发
使用topic类型的交换机

- *:表示1个单词(任意多个字母)
- #:表示0个或多个单词
其中 .
也算一个单词,例如routing key为*.test.*
时, .test. 也成立
同理routing key为test.#
时, test 成立
定义交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Bean("topicExchange") public Exchange exchange() { return ExchangeBuilder .topicExchange("amq.topic") .build(); }
@Bean("myQueue") public Queue queue1() { return QueueBuilder .nonDurable("myQueue") .build(); }
@Bean("binding") public Binding binding(@Qualifier("topicExchange") Exchange exchange, @Qualifier("myQueue") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("*.test.*") .noargs(); }
|

amq.rabbitmq.trace - 用于日志追踪的预设交换机
开启追踪
1
| rabbitmqctl trace_on -p [virtual host]
|
关闭trace_off
amq.rabbitmq.trace配置routing key绑定到队列
publish.{exchangename}
表示生产者发送的消息deliver.{queuename}
表示消费者接收的消息

trace记录的消息示例 - 一次生产一次消费


根据头部信息Headers转发消息

定义header交换机和队列,并配置header交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Bean("headersExchange") public HeadersExchange exchange() { return ExchangeBuilder .headersExchange("amq.headers") .build(); }
@Bean("myQueue") public Queue queue() { return QueueBuilder .nonDurable("myQueue") .build(); }
@Bean("binding") public Binding binding(@Qualifier("headersExchange") HeadersExchange exchange, @Qualifier("myQueue") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .where("hello").matches("world") ; }
|
此处有重载,可以看到不同类型的交换机作参数,返回的配置不一样

传值Map以检测key-value

绑定关系

发送消息
