RabbitMQ message queue
RabbitMQ message queue
Table of contents
URL:
1. Project preparation
1. Import dependencies
2. Extract tool classes
Where are the configuration properties?
2. Code writing
1. Simple mode
producer
consumer
2.Work queues work queue mode
producer
Consumer 1
Consumer 2
3.Publish/Subscribe publishing and subscription model
producer
Consumer 1
Consumer 2
4.Routing routing mode
producer
Consumer 1
Consumer 2
5.Topics wildcard mode
producer
Consumer 1
Consumer 2
3. Summary
URL:
RabbitMQ official address:[RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQhttp://www.rabbitmq.com/](http://www.rabbitmq.com/ "RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ")
Official website corresponding mode introduction:RabbitMQ Tutorials — RabbitMQhttps://www.rabbitmq.com/getstarted.html
1. Project preparation
1. Import dependencies
Add dependencies in the pom file of the maven project.
2. <groupId>com.rabbitmq</groupId>
3. <artifactId>amqp-client</artifactId>
4. <version>5.6.0</version>
5. </dependency>
2. Extract tool classes
Both producers and consumers have to create a connection factory and create a connection.
2. ConnectionFactory connectionFactory = new ConnectionFactory();
3. //主机;默认localhost
4. connectionFactory.setHost("localhost");
5. //连接端口;默认5672
6. connectionFactory.setPort(5672);
7. //虚拟主机;默认/
8. connectionFactory.setVirtualHost("/yh");
9. //用户名;默认guest
10. connectionFactory.setUsername("yh");
11. //密码; 默认guest
12. connectionFactory.setPassword("yh");
13.
14. //2. 创建连接;
15. Connection connection = connectionFactory.newConnection();
To save code, a tool class RabbitGetConnection.java can be extracted.
2. import com.rabbitmq.client.ConnectionFactory;
3.
4. import java.io.IOException;
5. import java.util.concurrent.TimeoutException;
6.
7. public class RabbitGetConnection {
8.
9. public static Connection getConnection(){
10. //1. 创建连接工厂(设置RabbitMQ的连接参数);
11. ConnectionFactory connectionFactory = new ConnectionFactory();
12. //主机;默认localhost
13. connectionFactory.setHost("localhost");
14. //连接端口;默认5672
15. connectionFactory.setPort(5672);
16. //虚拟主机;默认/
17. connectionFactory.setVirtualHost("/yh");
18. //用户名;默认guest
19. connectionFactory.setUsername("admin");
20. //密码;默认guest
21. connectionFactory.setPassword("admin");
22. //2. 创建连接;
23. Connection connection = null;
24. try {
25. connection = connectionFactory.newConnection();
26. } catch (IOException e) {
27. e.printStackTrace();
28. } catch (TimeoutException e) {
29. throw new RuntimeException(e);
30. }
31.
32. return connection;
33. }
34. }
Where are the configuration properties?
Hold down the Ctrl key and click the newConnection() method to view the source code
2. Code writing
1. Simple mode
In order to be more intuitive, the extracted tool classes are not used previously.
producer
2.
3. import com.rabbitmq.client.Channel;
4. import com.rabbitmq.client.Connection;
5. import com.rabbitmq.client.ConnectionFactory;
6.
7. /**
8. * 简单模式:发送消息
9. */
10. public class Producer {
11. static final String QUEUE_NAME = "simple_queue";
12.
13. public static void main(String[] args) throws Exception {
14. //1. 创建连接工厂(设置RabbitMQ的连接参数);
15. ConnectionFactory connectionFactory = new ConnectionFactory();
16. //主机;默认localhost
17. connectionFactory.setHost("localhost");
18. //连接端口;默认5672
19. connectionFactory.setPort(5672);
20. //虚拟主机;默认/
21. connectionFactory.setVirtualHost("/yh");
22. //用户名;默认guest
23. connectionFactory.setUsername("yh");
24. //密码;默认guest
25. connectionFactory.setPassword("yh");
26.
27. //2. 创建连接;
28. Connection connection = connectionFactory.newConnection();
29. //3. 创建频道;
30. Channel channel = connection.createChannel();
31. //4. 声明队列;
32. /**
33. * 参数1:队列名称
34. * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
35. * 参数3:是否独占本连接
36. * 参数4:是否在不使用的时候队列自动删除
37. * 参数5:其它参数
38. */
39. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
40. //5. 发送消息;
41. String message = "你好!小兔纸。";
42.
43. /**
44. * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
45. * 参数2:路由key,简单模式中可以使用队列名称
46. * 参数3:消息其它属性
47. * 参数4:消息内容
48. */
49. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
50. System.out.println("已发送消息:" + message);
51. //6. 关闭资源
52. channel.close();
53. connection.close();
54. }
55. }
consumer
2.
3. import cn.yh.rabbitmq.util.ConnectionUtil;
4. import com.rabbitmq.client.*;
5.
6. import java.io.IOException;
7.
8. /**
9. * 简单模式;消费者接收消息
10. */
11. public class Consumer {
12. public static void main(String[] args) throws Exception {
13. //1. 创建连接工厂;
14. //2. 创建连接;(抽取一个获取连接的工具类)
15. Connection connection = ConnectionUtil.getConnection();
16. //3. 创建频道;
17. Channel channel = connection.createChannel();
18. //4. 声明队列;
19. /**
20. * 参数1:队列名称
21. * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
22. * 参数3:是否独占本连接
23. * 参数4:是否在不使用的时候队列自动删除
24. * 参数5:其它参数
25. */
26. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
27. //5. 创建消费者(接收消息并处理消息);
28. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
29. @Override
30. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
31. //路由key
32. System.out.println("路由key为:" + envelope.getRoutingKey());
33. //交换机
34. System.out.println("交换机为:" + envelope.getExchange());
35. //消息id
36. System.out.println("消息id为:" + envelope.getDeliveryTag());
37. //接收到的消息
38. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
39. }
40. };
41. //6. 监听队列
42. /**
43. * 参数1:队列名
44. * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
45. * 如果设置为false则需要手动确认
46. * 参数3:消费者
47. */
48. channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
49. }
50. }
2.Work queues work queue mode
producer
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.Channel;
4. import com.rabbitmq.client.Connection;
5. import com.rabbitmq.client.ConnectionFactory;
6.
7. public class Producer {
8. static final String QUEUE_NAME = "work_queue";
9. public static void main(String[] args) throws Exception {
10. //创建连接
11. Connection connection = ConnectionUtil.getConnection();
12. // 创建频道
13. Channel channel = connection.createChannel();
14. // 声明(创建)队列
15. /**
16. * 参数1:队列名称
17. * 参数2:是否定义持久化队列
18. * 参数3:是否独占本次连接
19. * 参数4:是否在不使用的时候自动删除队列
20. * 参数5:队列其它参数
21. */
22. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
23. for (int i = 1; i <= 30; i++) {
24. // 发送信息
25. String message = "你好;小兔子!work模式--" + i;
26. /**
27. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
28. * 参数2:路由key,简单模式可以传递队列名称
29. * 参数3:消息其它属性
30. * 参数4:消息内容
31. */
32. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
33. System.out.println("已发送消息:" + message);
34. }
35. // 关闭资源
36. channel.close();
37. connection.close();
38. }
39. }
Consumer 1
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5.
6. public class Consumer1 {
7. public static void main(String[] args) throws Exception {
8. Connection connection = ConnectionUtil.getConnection();
9. // 创建频道
10. Channel channel = connection.createChannel();
11. // 声明(创建)队列
12. /**
13. * 参数1:队列名称
14. * 参数2:是否定义持久化队列
15. * 参数3:是否独占本次连接
16. * 参数4:是否在不使用的时候自动删除队列
17. * 参数5:队列其它参数
18. */
19. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
20. //一 次只能接收并处理一个消息
21. channel.basicQos(1);
22. //创建消费者;并设置消息处理
23. DefaultConsumer consumer = new DefaultConsumer(channel){
24. @Override
25. /**
26. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
27. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
28. (收 到消息失败后是否需要重新发送)
29. * properties 属性信息
30. * body 消息
31. */
32. public void handleDelivery(String consumerTag, Envelope envelope,
33. AMQP.BasicProperties properties, byte[] body) throws IOException {
34. try {
35. //路由key
36. System.out.println("路由key为:" + envelope.getRoutingKey());
37. //交换机
38. System.out.println("交换机为:" + envelope.getExchange());
39. //消息id
40. System.out.println("消息id为:" + envelope.getDeliveryTag());
41. //收到的消息
42. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
43. 8"));
44. Thread.sleep(1000);
45. //确认消息
46. channel.basicAck(envelope.getDeliveryTag(), false);
47. } catch (InterruptedException e) {
48. e.printStackTrace();
49. }
50. }
51. };
52. //监听消息
53. /**
54. * 参数1:队列名称
55. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
56. * 参数3:消息接收到后回调
57. */
58. channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
59. }
60. }
Consumer 2
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5. public class Consumer2 {
6. public static void main(String[] args) throws Exception {
7. Connection connection = ConnectionUtil.getConnection();
8. // 创建频道
9. Channel channel = connection.createChannel();
10. // 声明(创建)队列
11. /**
12. * 参数1:队列名称
13. * 参数2:是否定义持久化队列
14. * 参数3:是否独占本次连接
15. * 参数4:是否在不使用的时候自动删除队列
16. * 参数5:队列其它参数
17. */
18. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
19. //一次只能接收并处理一个消息
20. channel.basicQos(1);
21. //创建消费者;并设置消息处理
22. DefaultConsumer consumer = new DefaultConsumer(channel){
23. @Override
24. /**
25. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
26. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
27. (收到消息失败后是否需要重新发送)
28. * properties 属性信息
29. * body 消息
30. */
31. public void handleDelivery(String consumerTag, Envelope envelope,
32. AMQP.BasicProperties properties, byte[] body) throws IOException {
33. try {
34. //路由key
35. System.out.println("路由key为:" + envelope.getRoutingKey());
36. //交换机
37. System.out.println("交换机为:" + envelope.getExchange());
38. //消息id
39. System.out.println("消息id为:" + envelope.getDeliveryTag());
40. //收到的消息
41. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
42. Thread.sleep(1000);
43. //确认消息
44. channel.basicAck(envelope.getDeliveryTag(), false);
45. } catch (InterruptedException e) {
46. e.printStackTrace();
47. }
48. }
49. };
50. //监听消息
51. /**
52. * 参数1:队列名称
53. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
54. 息,设置为false则需要手动确认
55. * 参数3:消息接收到后回调
56. */
57. channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
58. }
59. }
3.Publish/Subscribe publishing and subscription model
Publish and subscribe model: A message can be received by multiple consumers ; a queue for one consumer can only be listened to by one consumer. The switch type used in subscription mode is: broadcast.
producer
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.BuiltinExchangeType;
4. import com.rabbitmq.client.Channel;
5. import com.rabbitmq.client.Connection;
6.
7. /**
8. * 发布与订阅使用的交换机类型为:fanout
9. */
10. public class Producer {
11. //交换机名称
12. static final String FANOUT_EXCHAGE = "fanout_exchange";
13. //队列名称
14. static final String FANOUT_QUEUE_1 = "fanout_queue_1";
15. //队列名称
16. static final String FANOUT_QUEUE_2 = "fanout_queue_2";
17. public static void main(String[] args) throws Exception {
18. //创建连接
19. Connection connection = ConnectionUtil.getConnection();
20. // 创建频道
21. Channel channel = connection.createChannel();
22. /**
23. * 声明交换机
24. * 参数1:交换机名称
25. * 参数2:交换机类型,fanout、topic、direct、headers
26. */
27. channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
28. // 声明(创建)队列
29. /**
30. * 参数1:队列名称
31. * 参数2:是否定义持久化队列
32. * 参数3:是否独占本次连接
33. * 参数4:是否在不使用的时候自动删除队列
34. * 参数5:队列其它参数
35. */
36. channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
37. channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
38. //队列绑定交换机
39. channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
40. channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
41. for (int i = 1; i <= 10; i++) {
42. // 发送信息
43. String message = "你好;小兔子!发布订阅模式--" + i;
44. /**
45. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
46. * 参数2:路由key,简单模式可以传递队列名称
47. * 参数3:消息其它属性
48. * 参数4:消息内容
49. */
50. channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
51. System.out.println("已发送消息:" + message);
52. }
53. // 关闭资源
54. channel.close();
55. connection.close();
56. }
57. }
Consumer 1
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5.
6. public class Consumer1 {
7. public static void main(String[] args) throws Exception {
8. Connection connection = ConnectionUtil.getConnection();
9. // 创建频道
10. Channel channel = connection.createChannel();
11. //声明交换机
12. channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
13. // 声明(创建)队列
14. /**
15. * 参数1:队列名称
16. * 参数2:是否定义持久化队列
17. * 参数3:是否独占本次连接
18. * 参数4:是否在不使用的时候自动删除队列
19. * 参数5 :队列其它参数 */
20. channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
21. //队列绑定交换机
22. channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
23. //创建消费者;并设置消息处理
24. DefaultConsumer consumer = new DefaultConsumer(channel){
25. @Override
26. /**
27. * consumerTag 消息者标 签,在channel.basicConsume时候可以指定
28. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
29. * properties 属性信息
30. * body 消息
31. */
32. public void handleDelivery(String consumerTag, Envelope envelope,
33. AMQP.BasicProperties properties, byte[] body) throws IOException {
34. //路由key
35. System.out.println("路由key为:" + envelope.getRoutingKey());
36. //交换机
37. System.out.println("交换机为:" + envelope.getExchange());
38. //消息id
39. System.out.println("消息id为:" + envelope.getDeliveryTag());
40. //收到的消息
41. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
42. 8"));
43. }
44. };
45. //监听消息
46. /**
47. * 参数1:队列名称
48. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
49. 息,设置为false则需要手动确认
50. * 参数3:消息接收到后回调
51. */
52. channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
53. }
54. }
Consumer 2
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5.
6. public class Consumer2 {
7. public static void main(String[] args) throws Exception {
8. Connection connection = ConnectionUtil.getConnection();
9. // 创建频道
10. Channel channel = connection.createChannel();
11. //声明交换机
12. channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
13. // 声明(创建)队列
14. /**
15. * 参数1:队列名称
16. * 参数2:是否定义持久化队列
17. * 参数3:是否独占本次连接
18. * 参数4:是否在不使用的 时候自动删除队列
19. * 参数5:队列其它参数
20. */
21. channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
22. //队列绑定交换机
23. channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
24. //创建消费者;并设置消息处理
25. DefaultConsumer consumer = new DefaultConsumer(channel){
26. @Override
27. /**
28. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
29. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
30. (收到消息失败后是否需要重新发送)
31. * properties 属性信息
32. * body 消息
33. */
34. public void handleDelivery(String consumerTag, Envelope envelope,
35. AMQP.BasicProperties properties, byte[] body) throws IOException {
36. //路由key
37. System.out.println("路由key为:" + envelope.getRoutingKey());
38. //交换机
39. System.out.println("交换机为:" + envelope.getExchange());
40. //消息id
41. System.out.println("消息id为:" + envelope.getDeliveryTag());
42. //收到的消息
43. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
44. }
45. };
46. //监听消息
47. /**
48. * 参数1:队列名称
49. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
50. 息,设置为false则需要手动确认
51. * 参数3:消息接收到后回调
52. */
53. channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
54. }
55. }
4.Routing routing mode
Routing routing mode requires the queue to specify the routing key when binding it to the switch; the routing key needs to be carried when consuming and sending; only when the routing key of the message is completely consistent with the queue routing key can the queue receive the message.
producer
2. import com.rabbitmq.client.BuiltinExchangeType;
3. import com.rabbitmq.client.Channel;
4. import com.rabbitmq.client.Connection;
5.
6. /**
7. * 路由模式的交换机类型为:direct
8. */
9. public class Producer {
10. //交换机名称
11. static final String DIRECT_EXCHAGE = "direct_exchange";
12. //队列名称
13. static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
14. //队列名称
15. static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
16. public static void main(String[] args) throws Exception {
17. //创建连接
18. Connection connection = ConnectionUtil.getConnection();
19. // 创建频道
20. Channel channel = connection.createChannel();
21. /**
22. * 声明交换机
23. * 参数1:交换机名称
24. * 参数2:交换机类型,fanout、topic、direct、headers
25. */
26. channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
27. // 声明(创建)队列
28. /**
29. * 参数1:队列名称
30. * 参数2:是否定义持久化队列
31. * 参数3:是否独占本次连接
32. * 参数4:是否在不使用的时候自动删除队列
33. * 参数5:队列其它参数
34. */
35. channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
36. channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
37. //队列绑定交换机
38. channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
39. channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
40. // 发送信息
41. String message = "新增了商品。路由模式;routing key 为 insert " ;
42. /**
43. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
44. * 参数2:路由key,简单模式可以传递队列名称
45. * 参数3:消息其它属性
46. * 参数4:消息内容
47. */
48. channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
49. System.out.println("已发送消息:" + message);
50. // 发送信息
51. message = "修改了商品。路由模式;routing key 为 update" ;
52. /**
53. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
54. * 参数2:路由key,简单模式可以传递队列名称
55. * 参数3:消息其它属性
56. * 参数4:消息内容
57. */
58. channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
59. System.out.println("已发送消息:" + message);
60. // 关闭资源
61. channel.close();
62. connection.close();
63. }
64. }
Consumer 1
2.
3. import cn.yh.rabbitmq.util.ConnectionUtil;
4. import com.rabbitmq.client.*;
5. import java.io.IOException;
6.
7. public class Consumer1 {
8. public static void main(String[] args) throws Exception {
9. Connection connection = ConnectionUtil.getConnection();
10. // 创建频道
11. Channel channel = connection.createChannel();
12. //声明交换机
13. channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
14. // 声明(创建)队列
15. /**
16. * 参数1:队列名称
17. * 参数2:是否定义持久化队列
18. * 参数3:是否独占本次连接
19. * 参数4:是否在不使用的时候自动删除队列
20. * 参数5:队列其它参数
21. */
22. channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
23. //队列绑定交换机
24. channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
25. //创建消费者;并设置消息处理
26. DefaultConsumer consumer = new DefaultConsumer(channel){
27. @Override
28. /**
29. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
30. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
31. (收到消息失败后是否需要重新发送)
32. * properties 属性信息
33. * body 消息
34. */
35. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
36. //路由key
37. System.out.println("路由key为:" + envelope.getRoutingKey());
38. //交换机
39. System.out.println("交换机为:" + envelope.getExchange());
40. //消息id
41. System.out.println("消息id为:" + envelope.getDeliveryTag());
42. //收到的消息
43. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
44. }
45. };
46. //监听消息
47. /**
48. * 参数1:队列名称
49. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
50. 息,设置为false则需要手动确认
51. * 参数3:消息接收到后回调
52. */
53. channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
54. }
55. }
Consumer 2
2.
3. import cn.yh.rabbitmq.util.ConnectionUtil;
4. import com.rabbitmq.client.*;
5. import java.io.IOException;
6.
7. public class Consumer2 {
8. public static void main(String[] args) throws Exception {
9. Connection connection = ConnectionUtil.getConnection();
10. // 创建频道
11. Channel channel = connection.createChannel();
12. //声明交换机
13. channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
14. // 声明(创建)队列
15. /**
16. * 参数1:队列名称
17. * 参数2:是否定义持久化队列
18. * 参数3:是否独占本次连接
19. * 参数4:是否在不使用的时候自动删除队列
20. * 参数5:队列其它参数
21. */
22. channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
23. //队列绑定交换机
24. channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");
25. //创建消费者;并设置消息处理
26. DefaultConsumer consumer = new DefaultConsumer(channel){
27. @Override
28. /**
29. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
30. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
31. (收到消息失败后是否需要重新发送)
32. * properties 属性信息
33. * body 消息
34. */
35. public void handleDelivery(String consumerTag, Envelope envelope,
36. AMQP.BasicProperties properties, byte[] body) throws IOException {
37. //路由key
38. System.out.println("路由key为:" + envelope.getRoutingKey());
39. //交换机
40. System.out.println("交换机为:" + envelope.getExchange());
41. //消息id
42. System.out.println("消息id为:" + envelope.getDeliveryTag());
43. //收到的消息
44. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
45. }
46. };
47. //监听消息
48. /**
49. * 参数1:队列名称
50. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
51. 息,设置为false则需要手动确认
52. * 参数3:消息接收到后回调
53. */
54. channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
55. }
56. }
5.Topics wildcard mode
Topic topic mode can realize the functions of Publish/Subscribe publishing and subscription mode and Routing routing mode; only Topic can use wildcards when configuring routing key, which is more flexible.
Topics wildcard mode: messages can be delivered to the queue corresponding to the routing key according to the routing key; there can be multiple routing keys bound to the switch; the routing key in wildcard mode can use *
and #
; the configuration of the routing key after using the wildcard mode More flexible.
producer
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.BuiltinExchangeType;
4. import com.rabbitmq.client.Channel;
5. import com.rabbitmq.client.Connection;
6. /**
7. * 通配符Topic的交换机类型为:topic
8. */
9. public class Producer {
10. //交换机名称
11. static final String TOPIC_EXCHAGE = "topic_exchange";
12. //队列名称
13. static final String TOPIC_QUEUE_1 = "topic_queue_1";
14. //队列名称
15. static final String TOPIC_QUEUE_2 = "topic_queue_2";
16. public static void main(String[] args) throws Exception {
17. //创建连接
18. Connection connection = ConnectionUtil.getConnection();
19. // 创建频道
20. Channel channel = connection.createChannel();
21. /**
22. * 声明交换机
23. * 参数1:交换机名称
24. * 参数2:交换机类型,fanout、topic、topic、headers
25. */
26. channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
27. // 发送信息
28. String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
29. channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
30. System.out.println("已发送消息:" + message);
31. // 发送信息
32. message = "修改了商品。Topic模式;routing key 为 item.update" ;
33. channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
34. System.out.println("已发送消息:" + message);
35. // 发送信息
36. message = "删除了商品。Topic模式;routing key 为 item.delete" ;
37. channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
38. System.out.println("已发送消息:" + message);
39. // 关闭资源
40. channel.close();
41. connection.close();
42. }
43. }
Consumer 1
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5. public class Consumer1 {
6. public static void main(String[] args) throws Exception {
7. Connection connection = ConnectionUtil.getConnection();
8. // 创建频道
9. Channel channel = connection.createChannel();
10. //声明交换机
11. channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
12. // 声明(创建)队列
13. /**
14. * 参数1:队列名称
15. * 参数2:是否定义持久化队列
16. * 参数3:是否独占本次连接
17. * 参数4:是否在不使用的时候自动删除队列
18. * 参数5:队列其它参数
19. */
20. channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
21. //队列绑定交换机
22. channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
23. "item.update");
24. channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
25. "item.delete");
26. //创建消费者;并设置消息处理
27. DefaultConsumer consumer = new DefaultConsumer(channel){
28. @Override
29. /**
30. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
31. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
32. (收到消息失败后是否需要重新发送)
33. * properties 属性信息
34. * body 消息
35. */
36. public void handleDelivery(String consumerTag, Envelope envelope,
37. AMQP.BasicProperties properties, byte[] body) throws IOException {
38. //路由key
39. System.out.println("路由key为:" + envelope.getRoutingKey());
40. //交换机
41. System.out.println("交换机为:" + envelope.getExchange());
42. //消息id
43. System.out.println("消息id为:" + envelope.getDeliveryTag());
44. //收到的消息
45. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
46. }
47. };
48. //监听消息
49. /**
50. * 参数1:队列名称
51. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
52. 息,设置为false则需要手动确认
53. * 参数3:消息接收到后回调
54. */
55. channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
56. }
57. }
Consumer 2
2. import cn.yh.rabbitmq.util.ConnectionUtil;
3. import com.rabbitmq.client.*;
4. import java.io.IOException;
5. public class Consumer2 {
6. public static void main(String[] args) throws Exception {
7. Connection connection = ConnectionUtil.getConnection();
8. // 创建频道
9. Channel channel = connection.createChannel();
10. //声明交换机
11. channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
12. // 声明(创建)队列
13. /**
14. * 参数1:队列名称
15. * 参数2:是否定义持久化队列
16. * 参数3:是否独占本次连接
17. * 参数4:是否在不使用的时候自动删除队列
18. * 参数5:队列其它参数
19. */
20. channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
21. //队列绑定交换机
22. channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
23. //创建消费者;并设置消息处理
24. DefaultConsumer consumer = new DefaultConsumer(channel){
25. @Override
26. /**
27. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
28. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
29. (收到消息失败后是否需要重新发送)
30. * properties 属性信息
31. * body 消息
32. */
33. public void handleDelivery(String consumerTag, Envelope envelope,
34. AMQP.BasicProperties properties, byte[] body) throws IOException {
35. //路由key
36. System.out.println("路由key为:" + envelope.getRoutingKey());
37. //交换机
38. System.out.println("交换机为:" + envelope.getExchange());
39. //消息id
40. System.out.println("消息id为:" + envelope.getDeliveryTag());
41. //收到的消息
42. System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
43. }
44. };
45. //监听消息
46. /**
47. * 参数1:队列名称
48. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
49. 息,设置为false则需要手动确认
50. * 参数3:消息接收到后回调
51. */
52. channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
53. }
54. }
3. Summary
Goal : Compare and summarize the characteristics of RabbitMQ’s five modes
-
Not direct Exchange switch (default switch)
-
Simple mode: one producer and one consumer. The producer produces messages to a queue and is received by a consumer.
-
work queue work queue mode: one producer, multiple consumers (competition relationship), the producer sends a message to a queue, which can be monitored by multiple consumers; a message can only be received by one consumer, and the consumer There is a competitive relationship between
-
-
Use Exchange switch; subscription mode (switch: broadcast fanout, directed, wildcard topic)
-
Publish and subscribe mode: Using a fanout broadcast type switch, a message can be sent to all queues bound to the switch
-
Routing mode: A direct-type switch is used. The message will carry the routing key. The switch compares the routing key of the message with the routing key of the queue. If they are consistent, the queue can receive the message.
-
Wildcard mode: Using a topic wildcard type switch, the message will carry the routing key. The switch compares the routing key of the message with the routing key (*, #) of the queue. If they match, the queue can receive the message.
-