RabbitMQ in-depth - dead letter queue
RabbitMQ in-depth - dead letter queue
Preface
Previously, Lizhi sorted out the common queues, switches and related knowledge in RabbitMQ . In this article, Lizhi will sort out an important queue of RabbitMQ - the dead letter queue, mainly to understand the three ways of message flow to the dead letter queue. And the corresponding implementation demo. I hope it can help those in need
Article directory
Preface
dead letter queue
1 Basic concepts
2 Set the dead letter queue with expired message time TTL
3 The queue reaches the maximum length and a dead letter occurs.
4 Rejected messages lead to dead letters
Summarize
dead letter queue
1 Basic concepts
Dead letters are messages that cannot be consumed . Generally speaking, the producer delivers the message to the broker or directly to the queue. The consumer takes the message from the queue for consumption. However, sometimes due to specific reasons, some messages in the queue cannot be consumed. Being consumed, such a message will become a dead letter if there is no subsequent processing. If there is a dead letter, there will naturally be a dead letter queue.
Application scenario: In order to ensure that the message data of the order business is not lost, the dead letter queue mechanism of RabbitMQ needs to be used. When
an exception occurs in message consumption, the message is put into the dead letter queue. For example, if a user successfully places an order in the mall and clicks to pay, it will automatically expire if the payment is not made within the specified time.
Dead letters have a certain delay and can be processed as delayed messages.
Reasons for dead letters:
- Message TTL expired
- The queue reaches the maximum length (the queue is full and no more data can be added to mq)
- The message was rejected (basic.reject or basic.nack) and requeue=false.I
2 Set the dead letter queue with expired message time TTL
First, we declare the relationship between ordinary switches , dead letter switches, ordinary queues and dead letter queues in consumer Consumer1. At the same time, after the declaration, we make Consumer1 reject messages and observe the flow of messages sent by the message producer in RabbitMQ.
Set up consumer 1 of the dead letter queue
In the dead letter queue, we set up ordinary switches, dead letter switches, ordinary queues and dead letter queues. At the same time, in the normal queue, a Map type parameter in the queueDeclare method in the channel channel object is used to set the relationship between the dead letter switch and the ordinary switch, configure the TTL, RoutingKey, and declare its dead letter switch.
1. package com.crj.rabbitmq.deadQueue;
2.
3. import com.crj.rabbitmq.utils.RabbitMqUtil; 4. import com.rabbitmq.client.
4. import com.rabbitmq.client.BuiltinExchangeType; 5. import com.rabbitmq.client.
5. import com.rabbitmq.client.Channel; 6. import com.rabbitmq.client.
6. import com.rabbitmq.client.DeliverCallback;
7.
8. import java.util.HashMap; 8. import java.util.
9. import java.util.
10.
11. /**
12. * Dead letter queue
13. * Consumer 1: need to declare dead letter queue and normal queue
14. */
Public Class Consumer {
16. // Common switch name
public static final String NORMAL_EXCHANGE = "normal";
18. // Dead letter switch name
public static final String DEAD_EXCHANGE = "dead". 19. // Name of the normal switch. 20;
20. //Name of the normal queue
public static final String NORMAL_QUEUE = "normalQueue";
22. // The name of the dead letter queue
public static final String DEAD_QUEUE = "deadQueue";
24.
public static void main(String[] args) throws Exception { 25.
26. // Declare the channel
27. channel channel = RabbitMqUtil.getChannel();
28. // Declare the normal switch and the dead letter switch
29. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
30. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
31. /**
32. * Ordinary declaration queues and dead letter queues.
33. */
34. // Create a hashmap object to configure the parameters for connecting to the dead letter queue
35. Map<String, Object> arguments = new HashMap<>();
36. // Set the expiration time
37. arguments.put("x-message-ttl",10000);
38. // Normal queue to set the dead message switch
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
40. // Normal queue set dead letter exchanger
41. arguments.put("x-dead-letter-routing-key", "DEAD1");
42. // Declare the normal queue
43. channel.queueDeclare(NORMAL_QUEUE,false,false,arguments);
44. // Dead letter queue
45. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
46.
47. //Bind the queue and switch
48. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
49. channel.queueBind(DEAD_QUEUE,DEAD_QUEUE, "dead");
50.
51. //Receive the message
52. deliverCallback deliverCallback = (consumerTag, message)->{ 53.
53. System.out.println("Message received by Consumer1: "+new String(message.getBody(), "UTF-8"));
54. system.out.println("Received Queue: "+DEAD_QUEUE+"Received Key: "+message.getEnvelope().getRoutingKey());
55. }; //Consumer starts consuming the message.
56. // Consumer starts consuming the message
57. channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
58. }
59. }
It should be noted that it is generally not common to set the expiration time TTL in the normal queue. We usually set the TTL of the message at publish, so the configuration of the "x-message-ttl" parameter of the arguments object here can be commented out.
Consumer 2 that actually processes the message
At the consumer that handles dead letter queue messages, we only need to set the consumer to receive messages from the dead letter queue.
1. package com.crj.rabbitmq.deadQueue;
2.
3. import com.crj.rabbitmq.utils.RabbitMqUtil;
4. import com.rabbitmq.client.BuiltinExchangeType;
5. import com.rabbitmq.client.Channel;
6. import com.rabbitmq.client.DeliverCallback;
7.
8. import java.util.HashMap;
9. import java.util.Map;
10.
11. /**
12. * Dead letter queue
13. * Consumer 1: need to declare dead letter queue and normal queue
14. /* /
15. public class Consumer2 {
16. // Name of the dead letter queue
17. public static final String DEAD_QUEUE = "deadQueue";
18.
19. public static void main(String[] args) throws Exception {
20. // Declare the channel
21. channel channel = RabbitMqUtil.getChannel();
22. System.out.println("Waiting to receive a message");
23. // Receive the message
24. DeliverCallback deliverCallback = (consumerTag, message)->{
25. System.out.println("Message received by Consumer2: "+new String(message.getBody(), "UTF-8"));
26. System.out.println("Received Queue: "+DEAD_QUEUE+"Received Key: "+message.getEnvelope().getRoutingKey());;
27. };
28. // Consumer starts consuming the message
29. channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
30. }
31. }
Producer****
Here we use the build method of the AMQP. BasicProperties object to set the corresponding bad letter TTL.
1. package com.crj.rabbitmq.deadQueue;
2.
3. import com.crj.rabbitmq.utils.RabbitMqUtil;
4. import com.rabbitmq.client.AMQP;
5. import com.rabbitmq.client.Channel;
6.
7. public class Publish {
8. public static final String NORMAL_EXCHANGE = "normal";
9. public static final String NORMAL_QUEUE = "normalQueue";
10.
11. public static void main(String[] args) throws Exception {
12. Channel channel = RabbitMqUtil.getChannel();
13. //In Consumer has already declared the switch, so it can not be declared here
14. // Dead letter message, set TTL
15. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
16.
17. for (int i = 0; i < 11; i++) {
18. String message = "info "+i;
19. channel.basicPublish(NORMAL_EXCHANGE, "normal",properties,message.getBytes());
20. }
21. }
22. }
Before running Consumer2, we saw that the ordinary queue would flow messages to the dead letter queue after the TTL we set: 10s.
Finally, after starting Consumer2, I did receive messages in the dead letter queue.
3 The queue reaches the maximum length and a dead letter occurs.
In this part, we need to comment out the TTL of the message previously set in the producer, and at the same time enable the maximum message accumulation capacity of the normal queue in consumer 1.
arguments.put("x-max-length",6);
In this way, we can simulate the situation where dead letters are generated after the queue reaches the maximum length.
4 Rejected messages lead to dead letters
To enable the consumer's function of rejecting messages, you first need to turn off , and at the same time set up the logic for manual response. In the callback function that receives the message below, set the response in basicAck and implement message rejection in basicReject.
1. package com.crj.rabbitmq.deadQueue;
2.
3. import com.crj.rabbitmq.utils.RabbitMqUtil;
4. import com.rabbitmq.client.BuiltinExchangeType;
5. import com.rabbitmq.client.Channel;
6. import com.rabbitmq.client.DeliverCallback;
7.
8. import java.util.HashMap;
9. import java.util.Map;
10.
11. /**
12. * Dead letter queue
13. * Consumer 1: need to declare dead letter queue and normal queue
14. */
15. public class Consumer {
16. // Normal switch name
17. public static final String NORMAL_EXCHANGE = "normal";
18. // Dead letter switch name
19. public static final String DEAD_EXCHANGE = "dead";
20. //Name of the normal queue
21. public static final String NORMAL_QUEUE = "normalQueue";
22. //Name of the dead letter queue
23. public static final String DEAD_QUEUE = "deadQueue";
24.
25. public static void main(String[] args) throws Exception {
26. // Declare the channel
27. channel channel = RabbitMqUtil.getChannel();
28. // Declare the normal switch and the dead letter switch
29. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
30. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT).
31. /**
32. * Declare normal queues and dead letter queues.
33. */
34. // Create a hashmap object to configure the parameters for connecting to the dead letter queue
35. Map<String, Object> arguments = new HashMap<>();
36.
37. // Normal queue setup for dead letter switches
38. arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
39. //Set the dead letter RoutingKey
40. arguments.put("x-dead-letter-routing-key", "dead1");
41.
42. // Declare a normal queue
43. channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
44. //Dead letter queue
45. channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
46.
47. //Bind the queue to the switch
48. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE, "normal");
49. channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE, "dead1");
50.
51. System.out.println("Waiting to receive message 》》》》》》》》》》》");
52. //Receive the message
53. DeliverCallback deliverCallback = (consumerTag, message)->{
54. String msg = new String(message.getBody(), "UTF-8");
55. if (msg.equals("info5")){
56. System.out.println("Consumer1 received the message: "+msg+": this message is rejected"); 57. //Here the second parameter is set.
57. //Here the second parameter sets whether the rejected message should be stuffed back to the original queue
58. channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
59. }else {
60. System.out.println("Message received by Consumer1: "+new String(message.getBody(), "UTF-8"));
61. //Successful answer, here set not batch operation
62. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
63. }
64. }; }
65. // Turn on manual answering
66. // Consumers start consuming the message
67. channel.basicConsume(DEAD_QUEUE,false,deliverCallback,(consumerTag)->{});
68. }
69. }
Summarize
The three mechanisms of time expiration, message rejection, and queue capacity limitation will cause messages to be forwarded to the dead letter queue. So, in addition to continuing to save messages in these three situations, what other role does the dead letter queue have? In the next article, Litchi will sort out the delay queue. I believe everyone will gain something from reading the next article~
Today has become the past, but we still look forward to the future tomorrow! I am Litchi, and I will accompany you on the road of technological growth~~~
If the blog post is helpful to you, you can give Lizhi three clicks. Your support and encouragement are Lizhi’s biggest motivation!
If the content of the blog post is incorrect, you are also welcome to criticize and correct it in the comment area below! ! !