Skip to main content

RabbitMQ message queue

RabbitMQ message queue

Table of contents

URL:

1. Project preparation

1. Import dependencies

2. Extract tool classes

Where are the configuration properties?

2. Code writing

1. Simple mode

producer

consumer

2.Work queues work queue mode

producer

Consumer 1

Consumer 2

3.Publish/Subscribe publishing and subscription model

producer

Consumer 1

Consumer 2

4.Routing routing mode

producer

Consumer 1

Consumer 2

5.Topics wildcard mode

producer

Consumer 1

Consumer 2

3. Summary


URL:

RabbitMQ official address:[RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQRabbitMQ message queuehttp://www.rabbitmq.com/](http://www.rabbitmq.com/ "RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ")

Official website corresponding mode introduction:RabbitMQ Tutorials — RabbitMQRabbitMQ message queuehttps://www.rabbitmq.com/getstarted.html

1. Project preparation

1. Import dependencies

Add dependencies in the pom file of the maven project.

    
2. <groupId>com.rabbitmq</groupId>

3. <artifactId>amqp-client</artifactId>

4. <version>5.6.0</version>

5. </dependency>



2. Extract tool classes

Both producers and consumers have to create a connection factory and create a connection.

    
2. ConnectionFactory connectionFactory = new ConnectionFactory();

3. //主机;默认localhost

4. connectionFactory.setHost("localhost");

5. //连接端口;默认5672

6. connectionFactory.setPort(5672);

7. //虚拟主机;默认/

8. connectionFactory.setVirtualHost("/yh");

9. //用户名;默认guest

10. connectionFactory.setUsername("yh");

11. //密码;默认guest

12. connectionFactory.setPassword("yh");

13.

14. //2. 创建连接;

15. Connection connection = connectionFactory.newConnection();



To save code, a tool class RabbitGetConnection.java can be extracted.

    
2. import com.rabbitmq.client.ConnectionFactory;

3.

4. import java.io.IOException;

5. import java.util.concurrent.TimeoutException;

6.

7. public class RabbitGetConnection {

8.

9. public static Connection getConnection(){

10. //1. 创建连接工厂(设置RabbitMQ的连接参数);

11. ConnectionFactory connectionFactory = new ConnectionFactory();

12. //主机;默认localhost

13. connectionFactory.setHost("localhost");

14. //连接端口;默认5672

15. connectionFactory.setPort(5672);

16. //虚拟主机;默认/

17. connectionFactory.setVirtualHost("/yh");

18. //用户名;默认guest

19. connectionFactory.setUsername("admin");

20. //密码;默认guest

21. connectionFactory.setPassword("admin");

22. //2. 创建连接;

23. Connection connection = null;

24. try {

25. connection = connectionFactory.newConnection();

26. } catch (IOException e) {

27. e.printStackTrace();

28. } catch (TimeoutException e) {

29. throw new RuntimeException(e);

30. }

31.

32. return connection;

33. }

34. }



Where are the configuration properties?

Hold down the Ctrl key and click the newConnection() method to view the source code

RabbitMQ message queue RabbitMQ message queue

2. Code writing

1. Simple mode

In order to be more intuitive, the extracted tool classes are not used previously.

producer

    
2.

3. import com.rabbitmq.client.Channel;

4. import com.rabbitmq.client.Connection;

5. import com.rabbitmq.client.ConnectionFactory;

6.

7. /**

8. * 简单模式:发送消息

9. */

10. public class Producer {

11. static final String QUEUE_NAME = "simple_queue";

12.

13. public static void main(String[] args) throws Exception {

14. //1. 创建连接工厂(设置RabbitMQ的连接参数);

15. ConnectionFactory connectionFactory = new ConnectionFactory();

16. //主机;默认localhost

17. connectionFactory.setHost("localhost");

18. //连接端口;默认5672

19. connectionFactory.setPort(5672);

20. //虚拟主机;默认/

21. connectionFactory.setVirtualHost("/yh");

22. //用户名;默认guest

23. connectionFactory.setUsername("yh");

24. //密码;默认guest

25. connectionFactory.setPassword("yh");

26.

27. //2. 创建连接;

28. Connection connection = connectionFactory.newConnection();

29. //3. 创建频道;

30. Channel channel = connection.createChannel();

31. //4. 声明队列;

32. /**

33. * 参数1:队列名称

34. * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)

35. * 参数3:是否独占本连接

36. * 参数4:是否在不使用的时候队列自动删除

37. * 参数5:其它参数

38. */

39. channel.queueDeclare(QUEUE_NAME, true, false, false, null);

40. //5. 发送消息;

41. String message = "你好!小兔纸。";

42.

43. /**

44. * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)

45. * 参数2:路由key,简单模式中可以使用队列名称

46. * 参数3:消息其它属性

47. * 参数4:消息内容

48. */

49. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

50. System.out.println("已发送消息:" + message);

51. //6. 关闭资源

52. channel.close();

53. connection.close();

54. }

55. }



consumer

    
2.

3. import cn.yh.rabbitmq.util.ConnectionUtil;

4. import com.rabbitmq.client.*;

5.

6. import java.io.IOException;

7.

8. /**

9. * 简单模式;消费者接收消息

10. */

11. public class Consumer {

12. public static void main(String[] args) throws Exception {

13. //1. 创建连接工厂;

14. //2. 创建连接;(抽取一个获取连接的工具类)

15. Connection connection = ConnectionUtil.getConnection();

16. //3. 创建频道;

17. Channel channel = connection.createChannel();

18. //4. 声明队列;

19. /**

20. * 参数1:队列名称

21. * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)

22. * 参数3:是否独占本连接

23. * 参数4:是否在不使用的时候队列自动删除

24. * 参数5:其它参数

25. */

26. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

27. //5. 创建消费者(接收消息并处理消息);

28. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

29. @Override

30. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

31. //路由key

32. System.out.println("路由key为:" + envelope.getRoutingKey());

33. //交换机

34. System.out.println("交换机为:" + envelope.getExchange());

35. //消息id

36. System.out.println("消息id为:" + envelope.getDeliveryTag());

37. //接收到的消息

38. System.out.println("接收到的消息为:" + new String(body, "utf-8"));

39. }

40. };

41. //6. 监听队列

42. /**

43. * 参数1:队列名

44. * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;

45. * 如果设置为false则需要手动确认

46. * 参数3:消费者

47. */

48. channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);

49. }

50. }



2.Work queues work queue mode

producer

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.Channel;

4. import com.rabbitmq.client.Connection;

5. import com.rabbitmq.client.ConnectionFactory;

6.

7. public class Producer {

8.    static final String QUEUE_NAME = "work_queue";

9.    public static void main(String[] args) throws Exception {

10.        //创建连接

11.        Connection connection = ConnectionUtil.getConnection();

12.        // 创建频道

13.        Channel channel = connection.createChannel();

14.        // 声明(创建)队列

15.        /**

16.         * 参数1:队列名称

17.         * 参数2:是否定义持久化队列

18.         * 参数3:是否独占本次连接

19.         * 参数4:是否在不使用的时候自动删除队列

20.         * 参数5:队列其它参数

21.         */

22.        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

23.        for (int i = 1; i <= 30; i++) {

24.            // 发送信息

25.            String message = "你好;小兔子!work模式--" + i;

26.            /**

27.             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage

28.             * 参数2:路由key,简单模式可以传递队列名称

29.             * 参数3:消息其它属性

30.             * 参数4:消息内容

31.             */

32.            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

33.            System.out.println("已发送消息:" + message);

34.       }

35.        // 关闭资源

36.        channel.close();

37.        connection.close();

38.   }

39. }



Consumer 1

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5.

6. public class Consumer1 {

7. public static void main(String[] args) throws Exception {

8.        Connection connection = ConnectionUtil.getConnection();

9.        // 创建频道

10.        Channel channel = connection.createChannel();

11.        // 声明(创建)队列

12.        /**

13.         * 参数1:队列名称

14.         * 参数2:是否定义持久化队列

15.         * 参数3:是否独占本次连接

16.         * 参数4:是否在不使用的时候自动删除队列

17.         * 参数5:队列其它参数

18.         */

19.        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

20.        //一次只能接收并处理一个消息

21.        channel.basicQos(1);

22.        //创建消费者;并设置消息处理

23.        DefaultConsumer consumer = new DefaultConsumer(channel){

24.            @Override

25.            /**

26.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

27.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

28. (收到消息失败后是否需要重新发送)

29.             * properties 属性信息

30.             * body 消息

31.             */

32.            public void handleDelivery(String consumerTag, Envelope envelope,

33. AMQP.BasicProperties properties, byte[] body) throws IOException {

34.                try {

35.                    //路由key

36.                    System.out.println("路由key为:" + envelope.getRoutingKey());

37.                    //交换机

38.                    System.out.println("交换机为:" + envelope.getExchange());

39.                    //消息id

40.                    System.out.println("消息id为:" + envelope.getDeliveryTag());

41.                    //收到的消息

42.                    System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-

43. 8"));

44.                    Thread.sleep(1000);

45.                    //确认消息

46.                    channel.basicAck(envelope.getDeliveryTag(), false);

47.               } catch (InterruptedException e) {

48.                    e.printStackTrace();

49.               }

50.           }

51.       };

52.        //监听消息

53.        /**

54.         * 参数1:队列名称

55.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认

56.         * 参数3:消息接收到后回调

57.         */

58.        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);

59.   }

60. }



Consumer 2

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5. public class Consumer2 {

6.    public static void main(String[] args) throws Exception {

7.        Connection connection = ConnectionUtil.getConnection();

8.        // 创建频道

9.        Channel channel = connection.createChannel();

10.        // 声明(创建)队列

11.        /**

12.         * 参数1:队列名称

13.         * 参数2:是否定义持久化队列

14.         * 参数3:是否独占本次连接

15.         * 参数4:是否在不使用的时候自动删除队列

16.         * 参数5:队列其它参数

17.         */

18.        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

19.        //一次只能接收并处理一个消息

20.        channel.basicQos(1);

21.        //创建消费者;并设置消息处理

22.        DefaultConsumer consumer = new DefaultConsumer(channel){

23.            @Override

24.            /**

25.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

26.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

27. (收到消息失败后是否需要重新发送)

28.             * properties 属性信息

29.             * body 消息

30.             */

31.            public void handleDelivery(String consumerTag, Envelope envelope,

32. AMQP.BasicProperties properties, byte[] body) throws IOException {

33.                try {

34.                    //路由key

35.                    System.out.println("路由key为:" + envelope.getRoutingKey());

36.  //交换机

37.                    System.out.println("交换机为:" + envelope.getExchange());

38.                    //消息id

39.                    System.out.println("消息id为:" + envelope.getDeliveryTag());

40.                    //收到的消息

41.                    System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));

42.                    Thread.sleep(1000);

43.                    //确认消息

44.                    channel.basicAck(envelope.getDeliveryTag(), false);

45.               } catch (InterruptedException e) {

46.                    e.printStackTrace();

47.               }

48.           }

49.       };

50.        //监听消息

51.        /**

52.         * 参数1:队列名称

53.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

54. 息,设置为false则需要手动确认

55.         * 参数3:消息接收到后回调

56.         */

57.        channel.basicConsume(Producer.QUEUE_NAME, false, consumer);

58.   }

59. }



3.Publish/Subscribe publishing and subscription model

Publish and subscribe model: A message can be received by multiple consumers ; a queue for one consumer can only be listened to by one consumer. The switch type used in subscription mode is: broadcast.

producer

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.BuiltinExchangeType;

4. import com.rabbitmq.client.Channel;

5. import com.rabbitmq.client.Connection;

6.

7. /**

8. * 发布与订阅使用的交换机类型为:fanout

9. */

10. public class Producer {

11.    //交换机名称

12.    static final String FANOUT_EXCHAGE = "fanout_exchange";

13.    //队列名称

14.    static final String FANOUT_QUEUE_1 = "fanout_queue_1";

15.    //队列名称

16.    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

17.    public static void main(String[] args) throws Exception {

18.        //创建连接

19.        Connection connection = ConnectionUtil.getConnection();

20.        // 创建频道

21.        Channel channel = connection.createChannel();

22.        /**

23.         * 声明交换机

24.         * 参数1:交换机名称

25.         * 参数2:交换机类型,fanout、topic、direct、headers

26.         */

27.        channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

28.        // 声明(创建)队列

29.        /**

30.         * 参数1:队列名称

31.         * 参数2:是否定义持久化队列

32.         * 参数3:是否独占本次连接

33.         * 参数4:是否在不使用的时候自动删除队列

34.         * 参数5:队列其它参数

35.         */

36.        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);

37.        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);        

38. //队列绑定交换机

39.        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");

40.        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");

41.        for (int i = 1; i <= 10; i++) {

42.            // 发送信息

43.            String message = "你好;小兔子!发布订阅模式--" + i;

44.            /**

45.             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage

46.             * 参数2:路由key,简单模式可以传递队列名称

47.             * 参数3:消息其它属性

48.             * 参数4:消息内容

49.             */

50.            channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());

51.            System.out.println("已发送消息:" + message);

52.       }

53.        // 关闭资源

54.        channel.close();

55.        connection.close();

56.   }

57. }



Consumer 1

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5.

6. public class Consumer1 {

7.    public static void main(String[] args) throws Exception {

8.        Connection connection = ConnectionUtil.getConnection();

9.        // 创建频道

10.        Channel channel = connection.createChannel();

11.        //声明交换机

12.        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

13.        // 声明(创建)队列

14.        /**

15.         * 参数1:队列名称

16.         * 参数2:是否定义持久化队列

17.         * 参数3:是否独占本次连接

18.         * 参数4:是否在不使用的时候自动删除队列

19.         * 参数5:队列其它参数         */

20.        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

21.        //队列绑定交换机

22.        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");

23.        //创建消费者;并设置消息处理

24.        DefaultConsumer consumer = new DefaultConsumer(channel){

25.            @Override

26.            /**

27.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

28.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)

29.             * properties 属性信息

30.             * body 消息

31.             */

32.            public void handleDelivery(String consumerTag, Envelope envelope,

33. AMQP.BasicProperties properties, byte[] body) throws IOException {

34.                //路由key

35.                System.out.println("路由key为:" + envelope.getRoutingKey());

36.                //交换机

37.                System.out.println("交换机为:" + envelope.getExchange());

38.                //消息id

39.                System.out.println("消息id为:" + envelope.getDeliveryTag());

40.                //收到的消息

41.                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-

42. 8"));

43.           }

44.       };

45.        //监听消息

46.        /**

47.         * 参数1:队列名称

48.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

49. 息,设置为false则需要手动确认

50.         * 参数3:消息接收到后回调

51.         */

52.        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);

53.   }

54. }



Consumer 2

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5.

6. public class Consumer2 {

7.    public static void main(String[] args) throws Exception {

8. Connection connection = ConnectionUtil.getConnection();

9.        // 创建频道

10.        Channel channel = connection.createChannel();

11.        //声明交换机

12.        channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);

13.        // 声明(创建)队列

14.        /**

15.         * 参数1:队列名称

16.         * 参数2:是否定义持久化队列

17.         * 参数3:是否独占本次连接

18.         * 参数4:是否在不使用的时候自动删除队列

19.         * 参数5:队列其它参数

20.         */

21.        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);

22.        //队列绑定交换机

23.        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");

24.        //创建消费者;并设置消息处理

25.        DefaultConsumer consumer = new DefaultConsumer(channel){

26.            @Override

27.            /**

28.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

29.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

30. (收到消息失败后是否需要重新发送)

31.             * properties 属性信息

32.             * body 消息

33.             */

34.            public void handleDelivery(String consumerTag, Envelope envelope,

35. AMQP.BasicProperties properties, byte[] body) throws IOException {

36.                //路由key

37.                System.out.println("路由key为:" + envelope.getRoutingKey());

38.                //交换机

39.                System.out.println("交换机为:" + envelope.getExchange());

40.                //消息id

41.                System.out.println("消息id为:" + envelope.getDeliveryTag());

42.                //收到的消息

43.                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));

44.           }

45.       };

46.        //监听消息

47.        /**

48.         * 参数1:队列名称

49.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

50. 息,设置为false则需要手动确认

51.         * 参数3:消息接收到后回调

52.         */

53.        channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);

54.   }

55. }



4.Routing routing mode

Routing routing mode requires the queue to specify the routing key when binding it to the switch; the routing key needs to be carried when consuming and sending; only when the routing key of the message is completely consistent with the queue routing key can the queue receive the message.

producer

    
2. import com.rabbitmq.client.BuiltinExchangeType;

3. import com.rabbitmq.client.Channel;

4. import com.rabbitmq.client.Connection;

5.

6. /**

7. * 路由模式的交换机类型为:direct

8. */

9. public class Producer {

10.    //交换机名称

11.    static final String DIRECT_EXCHAGE = "direct_exchange";

12.    //队列名称

13.    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";

14.    //队列名称

15.    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";

16.    public static void main(String[] args) throws Exception {

17.        //创建连接

18.        Connection connection = ConnectionUtil.getConnection();

19.        // 创建频道

20.        Channel channel = connection.createChannel();

21.        /**

22.         * 声明交换机

23.         * 参数1:交换机名称

24.         * 参数2:交换机类型,fanout、topic、direct、headers

25.         */

26.        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

27.        // 声明(创建)队列

28.        /**

29.         * 参数1:队列名称

30.         * 参数2:是否定义持久化队列

31.         * 参数3:是否独占本次连接

32.         * 参数4:是否在不使用的时候自动删除队列

33.         * 参数5:队列其它参数

34.         */

35.        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);

36.        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);

37.        //队列绑定交换机

38.        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");

39.        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");

40.        // 发送信息

41.        String message = "新增了商品。路由模式;routing key 为 insert " ;

42.        /**

43.         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage

44.         * 参数2:路由key,简单模式可以传递队列名称

45.         * 参数3:消息其它属性

46.         * 参数4:消息内容

47.         */

48. channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());

49.        System.out.println("已发送消息:" + message);

50.        // 发送信息

51.        message = "修改了商品。路由模式;routing key 为 update" ;

52.        /**

53.         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage

54.         * 参数2:路由key,简单模式可以传递队列名称

55.         * 参数3:消息其它属性

56.         * 参数4:消息内容

57.         */

58.        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());

59.        System.out.println("已发送消息:" + message);

60.        // 关闭资源

61.        channel.close();

62.        connection.close();

63.   }

64. }



Consumer 1

    
2.

3. import cn.yh.rabbitmq.util.ConnectionUtil;

4. import com.rabbitmq.client.*;

5. import java.io.IOException;

6.

7. public class Consumer1 {

8.    public static void main(String[] args) throws Exception {

9.        Connection connection = ConnectionUtil.getConnection();

10.        // 创建频道

11.        Channel channel = connection.createChannel();

12.        //声明交换机

13.        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

14.        // 声明(创建)队列

15.        /**

16.         * 参数1:队列名称

17.         * 参数2:是否定义持久化队列

18.         * 参数3:是否独占本次连接

19.         * 参数4:是否在不使用的时候自动删除队列

20.         * 参数5:队列其它参数

21.         */

22.        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);

23.        //队列绑定交换机

24. channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");

25.        //创建消费者;并设置消息处理

26.        DefaultConsumer consumer = new DefaultConsumer(channel){

27.            @Override

28.            /**

29.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

30.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

31. (收到消息失败后是否需要重新发送)

32.             * properties 属性信息

33.             * body 消息

34.             */

35.            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

36.                //路由key

37.                System.out.println("路由key为:" + envelope.getRoutingKey());

38.                //交换机

39.                System.out.println("交换机为:" + envelope.getExchange());

40.                //消息id

41.                System.out.println("消息id为:" + envelope.getDeliveryTag());

42.                //收到的消息

43.                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));

44.           }

45.       };

46.        //监听消息

47.        /**

48.         * 参数1:队列名称

49.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

50. 息,设置为false则需要手动确认

51.         * 参数3:消息接收到后回调

52.         */

53.        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);

54.   }

55. }



Consumer 2

    
2.

3. import cn.yh.rabbitmq.util.ConnectionUtil;

4. import com.rabbitmq.client.*;

5. import java.io.IOException;

6.

7. public class Consumer2 {

8.    public static void main(String[] args) throws Exception {

9.        Connection connection = ConnectionUtil.getConnection();

10.        // 创建频道

11. Channel channel = connection.createChannel();

12.        //声明交换机

13.        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);

14.        // 声明(创建)队列

15.        /**

16.         * 参数1:队列名称

17.         * 参数2:是否定义持久化队列

18.         * 参数3:是否独占本次连接

19.         * 参数4:是否在不使用的时候自动删除队列

20.         * 参数5:队列其它参数

21.         */

22.        channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);

23.        //队列绑定交换机

24.        channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");

25.        //创建消费者;并设置消息处理

26.        DefaultConsumer consumer = new DefaultConsumer(channel){

27.            @Override

28.            /**

29.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

30.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

31. (收到消息失败后是否需要重新发送)

32.             * properties 属性信息

33.             * body 消息

34.             */

35.            public void handleDelivery(String consumerTag, Envelope envelope,

36. AMQP.BasicProperties properties, byte[] body) throws IOException {

37.                //路由key

38.                System.out.println("路由key为:" + envelope.getRoutingKey());

39.                //交换机

40.                System.out.println("交换机为:" + envelope.getExchange());

41.                //消息id

42.                System.out.println("消息id为:" + envelope.getDeliveryTag());

43.                //收到的消息

44.                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));

45.           }

46.       };

47.        //监听消息

48.        /**

49.         * 参数1:队列名称

50.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

51. 息,设置为false则需要手动确认

52.         * 参数3:消息接收到后回调

53.         */

54.        channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);

55.   }

56. }



5.Topics wildcard mode

Topic topic mode can realize the functions of Publish/Subscribe publishing and subscription mode and Routing routing mode; only Topic can use wildcards when configuring routing key, which is more flexible.

Topics wildcard mode: messages can be delivered to the queue corresponding to the routing key according to the routing key; there can be multiple routing keys bound to the switch; the routing key in wildcard mode can use *and #; the configuration of the routing key after using the wildcard mode More flexible.

producer

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.BuiltinExchangeType;

4. import com.rabbitmq.client.Channel;

5. import com.rabbitmq.client.Connection;

6. /**

7. * 通配符Topic的交换机类型为:topic

8. */

9. public class Producer {

10.    //交换机名称

11.    static final String TOPIC_EXCHAGE = "topic_exchange";

12.    //队列名称

13.    static final String TOPIC_QUEUE_1 = "topic_queue_1";

14.    //队列名称

15.    static final String TOPIC_QUEUE_2 = "topic_queue_2";

16.    public static void main(String[] args) throws Exception {

17.        //创建连接

18.        Connection connection = ConnectionUtil.getConnection();

19.        // 创建频道

20.        Channel channel = connection.createChannel();

21.        /**

22.         * 声明交换机

23.         * 参数1:交换机名称

24.         * 参数2:交换机类型,fanout、topic、topic、headers

25.         */

26.        channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);

27.        // 发送信息

28.        String message = "新增了商品。Topic模式;routing key 为 item.insert " ;

29.        channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());

30.        System.out.println("已发送消息:" + message);

31.        // 发送信息

32.        message = "修改了商品。Topic模式;routing key 为 item.update" ;

33.        channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());

34.        System.out.println("已发送消息:" + message);

35. // 发送信息

36.        message = "删除了商品。Topic模式;routing key 为 item.delete" ;

37.        channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());

38.        System.out.println("已发送消息:" + message);

39.        // 关闭资源

40.        channel.close();

41.        connection.close();

42.   }

43. }



Consumer 1

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5. public class Consumer1 {

6.    public static void main(String[] args) throws Exception {

7.        Connection connection = ConnectionUtil.getConnection();

8.        // 创建频道

9.        Channel channel = connection.createChannel();

10.        //声明交换机

11.        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);

12.        // 声明(创建)队列

13.        /**

14.         * 参数1:队列名称

15.         * 参数2:是否定义持久化队列

16.         * 参数3:是否独占本次连接

17.         * 参数4:是否在不使用的时候自动删除队列

18.         * 参数5:队列其它参数

19.         */

20.        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);

21.        //队列绑定交换机

22.        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,

23. "item.update");

24.        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,

25. "item.delete");

26.        //创建消费者;并设置消息处理

27.        DefaultConsumer consumer = new DefaultConsumer(channel){

28.            @Override

29. /**

30.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

31.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

32. (收到消息失败后是否需要重新发送)

33.             * properties 属性信息

34.             * body 消息

35.             */

36.            public void handleDelivery(String consumerTag, Envelope envelope,

37. AMQP.BasicProperties properties, byte[] body) throws IOException {

38.                //路由key

39.                System.out.println("路由key为:" + envelope.getRoutingKey());

40.                //交换机

41.                System.out.println("交换机为:" + envelope.getExchange());

42.                //消息id

43.                System.out.println("消息id为:" + envelope.getDeliveryTag());

44.                //收到的消息

45.                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));

46.           }

47.       };

48.        //监听消息

49.        /**

50.         * 参数1:队列名称

51.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

52. 息,设置为false则需要手动确认

53.         * 参数3:消息接收到后回调

54.         */

55.        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);

56.   }

57. }



Consumer 2

    
2. import cn.yh.rabbitmq.util.ConnectionUtil;

3. import com.rabbitmq.client.*;

4. import java.io.IOException;

5. public class Consumer2 {

6.    public static void main(String[] args) throws Exception {

7.        Connection connection = ConnectionUtil.getConnection();

8.        // 创建频道

9.        Channel channel = connection.createChannel();

10.        //声明交换机

11.        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);

12. // 声明(创建)队列

13.        /**

14.         * 参数1:队列名称

15.         * 参数2:是否定义持久化队列

16.         * 参数3:是否独占本次连接

17.         * 参数4:是否在不使用的时候自动删除队列

18.         * 参数5:队列其它参数

19.         */

20.        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);

21.        //队列绑定交换机

22.        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");

23.        //创建消费者;并设置消息处理

24.        DefaultConsumer consumer = new DefaultConsumer(channel){

25.            @Override

26.            /**

27.             * consumerTag 消息者标签,在channel.basicConsume时候可以指定

28.             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志

29. (收到消息失败后是否需要重新发送)

30.             * properties 属性信息

31.             * body 消息

32.             */

33.            public void handleDelivery(String consumerTag, Envelope envelope,

34. AMQP.BasicProperties properties, byte[] body) throws IOException {

35.                //路由key

36.                System.out.println("路由key为:" + envelope.getRoutingKey());

37.                //交换机

38.                System.out.println("交换机为:" + envelope.getExchange());

39.                //消息id

40.                System.out.println("消息id为:" + envelope.getDeliveryTag());

41.                //收到的消息

42.                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));

43.           }

44.       };

45.        //监听消息

46.        /**

47.         * 参数1:队列名称

48.         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消

49. 息,设置为false则需要手动确认

50.         * 参数3:消息接收到后回调

51.         */

52.        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);

53.   }

54. }



3. Summary

Goal : Compare and summarize the characteristics of RabbitMQ’s five modes

  • Not direct Exchange switch (default switch)

    1. Simple mode: one producer and one consumer. The producer produces messages to a queue and is received by a consumer.

    2. work queue work queue mode: one producer, multiple consumers (competition relationship), the producer sends a message to a queue, which can be monitored by multiple consumers; a message can only be received by one consumer, and the consumer There is a competitive relationship between

  • Use Exchange switch; subscription mode (switch: broadcast fanout, directed, wildcard topic)

    1. Publish and subscribe mode: Using a fanout broadcast type switch, a message can be sent to all queues bound to the switch

    2. Routing mode: A direct-type switch is used. The message will carry the routing key. The switch compares the routing key of the message with the routing key of the queue. If they are consistent, the queue can receive the message.

    3. Wildcard mode: Using a topic wildcard type switch, the message will carry the routing key. The switch compares the routing key of the message with the routing key (*, #) of the queue. If they match, the queue can receive the message.