
单独使用
依赖
1 2
| <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId>
|
基本使用例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.5"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/test");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare("my_queue1", false, false, false, null); channel.queueBind("my_queue1", "amq.direct", "queue1"); channel.basicPublish("amq.direct", "queue1", null, "Hello World!".getBytes()); } catch (Exception e) { e.printStackTrace(); }
|
生产者和消费者
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
final String QUEUE_NAME = "my_queue01"; channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello RabbitMQ!!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功!");
RabbitMQUtils.closeSource(channel, connection); } }
|
消费者1 消费一次消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
final String QUEUE_NAME = "my_queue01"; DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), StandardCharsets.UTF_8) ); for (byte b : message.getBody()) { System.out.println("sleep..." + b); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息中断..."); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
RabbitMQUtils.closeSource(channel, connection); } }
|
这里将autoAck设置为true ,但是这种方法其实是不安全的,因为当消费者接受到队列的消息后,将会立即给队列发送应答消息,队列将删除该条消息。这就会导致一个问题:当消费者接受消息后没有完成其对应的任务,又因为队列中已经将消息删除了,消费者的任务将彻底的执行失败!
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.basicConsume("my_queue1", false, (consumerTag, message) -> { System.out.println(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> {
});
|
工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class RabbitMQUtils {
public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.50.92.70"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin");
return factory.newConnection(); }
public static void closeSource(Channel channel, Connection connection) { if (channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } if (connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }
|
消费者手动应答
1
| channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
|
不公平分发
需要在使用不公平分发的所有消费端设置
持久化
消息持久化
消息队列持久化
生产者发布确认
SpringBoot整合MQ
依赖
1 2
| <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
|
rabbitmq服务器配置
1 2 3 4 5 6
| spring: rabbitmq: addresses: 192.168.1.5 username: admin password: 123456 virtual-host: /test
|
配置交换机,消息队列以及绑定
RabbitConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration public class RabbitConfiguration {
@Bean("directExchange") public Exchange exchange() { return ExchangeBuilder .directExchange("amq.direct") .build(); }
@Bean("myQueue") public Queue queue() { return QueueBuilder. nonDurable("myQueue") .build(); }
@Bean("binding") public Binding binding(@Qualifier("directExchange") Exchange exchange, @Qualifier("myQueue") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("queue1") .noargs(); }
}
|
生产者和消费者
使用RabbitTemplate
生产者 - 一次发送
1 2 3 4 5 6 7
| @Resource RabbitTemplate rabbitTemplate;
void publisher() { rabbitTemplate.convertAndSend("amq.direct", "queue1", "Hello spring & amqp!"); }
|
消费者 - 监听
1 2 3 4 5 6 7 8 9
| @Component public class TestListener {
@RabbitListener(queues = "myQueue") public void test(Message message) { System.out.println(new String(message.getBody())); } }
|
1 2 3 4 5
| @RabbitListener(queues = "myQueue") public void test(String message) { System.out.println(message); }
|
生产者接受反馈
生产者 - 使用convertSendAndReceive
1 2
| Object res = rabbitTemplate.convertSendAndReceive("amq.direct", "queue1", "Hello spring & amqp!"); System.out.println("收到消费者响应:" + res);
|
消费者 - 添加返回值
1 2 3 4 5
| @RabbitListener(queues = "myQueue") public String test(String message) { System.out.println(message); return message + "已收到,响应成功!"; }
|
若此时消费者返回值为void,则生产者会等待响应,最后得到null
接收json格式的消息,并获取对应实体类
实体类
1 2 3 4 5
| @Data public class User { int id; String name; }
|
converter
1 2 3 4 5
| @Bean("jacksonConverter") public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); }
|
手动发送
{“id”:1, “name”:”userName”}
消费者
1 2 3 4
| @RabbitListener(queues = "myQueue") public void test(User user) { System.out.println(user); }
|
指定messageConverter
1 2 3 4
| @RabbitListener(queues = "myQueue", messageConverter = "jacksonConverter") public void test(User user) { System.out.println(user); }
|
此时不指定也能正确输出
生产者发送
生产者 - 直接发送实体类
1
| rabbitTemplate.convertAndSend("amq.direct", "queue1", new User());
|
发送的实体类在消息队列中以json格式保存
