rabbitmq dead letter queue
rabbitmq dead letter queue
Table of contents
Conditions for becoming a dead letter
Message TTL expired
Queue reaches maximum length
Message rejected
delay queue
Delay queue usage scenarios
Message settings TTL
Queue setting TTL
The difference between the two
The producer delivers the message to the broker or directly to the queue , and the consumer retrieves the message from the queue.
Consumption, but sometimes due to specific reasons , some messages in****the queue cannot be consumed . If such messages are not processed later, they become dead letters. If there are dead letters, there will naturally be a dead letter queue.****
Conditions for becoming a dead letter
-
Exceeding the message survival time (TTL): You can set a survival time for the message. After this time period, if the message has not been consumed or is re-delivered to another queue, the message will become a dead letter.
-
Message Reject: When a message is rejected by the consumer, you can choose to re-deliver the message to another queue or mark it as a dead letter.
-
The message reaches the maximum number of retries: You can set a limit on the number of retries on the consumer side. When a message reaches a certain number of retries and still cannot be consumed, the message will become a dead letter.
-
Queue Overflow : When the number of messages in a queue exceeds the maximum capacity limit of the queue, new messages cannot enter the queue and are treated as dead letters.
Message TTL expired
producer
2. import com.rabbitmq.client.BuiltinExchangeType;
3. import com.rabbitmq.client.Channel;
4.
5. public class Producer {
6. private static final String NORMAL_EXCHANGE = "normal_exchange";
7.
8. public static void main(String[] argv) throws Exception {
9. try (Channel channel = RabbitMqUtils.getChannel()) {
10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
11. //设置消息的 TTL 时间
12. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
13. //该信息是用作演示队列个数限制
14. for (int i = 1; i <11 ; i++) {
15. String message="info"+i;
16. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
17. message.getBytes());
18. System.out.println("生产者发送消息:"+message);
19. }
20. }
21. }
22. }

Consumer 1
2. import com.rabbitmq.client.Channel;
3. import com.rabbitmq.client.DeliverCallback;
4.
5. import [java](/search?q=java).util.HashMap;
6. import java.util.Map;
7.
8. public class Consumer01 {
9. //普通交换机名称
10. private static final String NORMAL_EXCHANGE = "normal_exchange";
11. //死信交换机名称
12. private static final String DEAD_EXCHANGE = "dead_exchange";
13. public static void main(String[] argv) throws Exception {
14. Channel channel = RabbitMqUtils.getChannel();
15. //声明死信和普通交换机 类型为 direct
16. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
17. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
18. //声明死信队列
19. String deadQueue = "dead-queue";
20. channel.queueDeclare(deadQueue, false, false, false, null);
21. //死信队列绑定死信交换机与 routingkey
22. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
23. //正常队列绑定死信队列信息
24. Map<String, Object> params = new HashMap<>();
25. //正常队列设置死信交换机 参数 key 是固定值
26. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
27. //正常队列设置死信 routing-key 参数 key 是固定值
28. params.put("x-dead-letter-routing-key", "lisi");
29.
30. String normalQueue = "normal-queue";
31. channel.queueDeclare(normalQueue, false, false, false, params);
32. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
33. System.out.println("等待接收消息.....");
34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
35. String message = new String(delivery.getBody(), "UTF-8");
36. System.out.println("Consumer01 接收到消息"+message);
37. };
38. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
39. });
40. }
41. }

Consumer 2
2. import com.rabbitmq.client.Channel;
3. import com.rabbitmq.client.DeliverCallback;
4.
5. public class Consumer02 {
6. private static final String DEAD_EXCHANGE = "dead_exchange";
7. public static void main(String[] argv) throws Exception {
8. Channel channel = RabbitMqUtils.getChannel();
9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
10. String deadQueue = "dead-queue";
11. channel.queueDeclare(deadQueue, false, false, false, null);
12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
13. System.out.println("等待接收死信队列消息.....");
14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
15. String message = new String(delivery.getBody(), "UTF-8");
16. System.out.println("Consumer02 接收死信队列的消息" + message);
17. };
18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
19. });
20. }
21. }

Shut down consumer 1 and simulate a failure
normal queue
The message times out and enters the dead letter queue
Dead letter queue receiving messages
Queue reaches maximum length
producer
2. import com.rabbitmq.client.Channel;
3.
4. public class Producer {
5. private static final String NORMAL_EXCHANGE = "normal_exchange";
6. public static void main(String[] argv) throws Exception {
7. try (Channel channel = RabbitMqUtils.getChannel()) {
8. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
9. //该信息是用作演示队列个数限制
10. for (int i = 1; i <11 ; i++) {
11. String message="info"+i;
12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
13. System.out.println("生产者发送消息:"+message);
14. }
15. }
16. }
17. }

Consumer 1
2. import com.rabbitmq.client.Channel;
3. import com.rabbitmq.client.DeliverCallback;
4.
5. import java.util.HashMap;
6. import java.util.Map;
7.
8. public class Consumer01 {
9. //普通交换机名称
10. private static final String NORMAL_EXCHANGE = "normal_exchange";
11. //死信交换机名称
12. private static final String DEAD_EXCHANGE = "dead_exchange";
13.
14. public static void main(String[] argv) throws Exception {
15. Channel channel = RabbitMqUtils.getChannel();
16. //声明死信和普通交换机 类型为 direct
17. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
18. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
19. //声明死信队列
20. String deadQueue = "dead-queue";
21. channel.queueDeclare(deadQueue, false, false, false, null);
22. //死信队列绑定死信交换机与 routingkey
23. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
24. //正常队列绑定死信队列信息
25. Map<String, Object> params = new HashMap<>();
26. //正常队列设置死信交换机 参数 key 是固定值
27. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
28. //正常队列设置死信 routing-key 参数 key 是固定值
29. params.put("x-dead-letter-routing-key", "lisi");
30. // 设置正常队列长度的限制
31. params.put("x-max-length",6);
32.
33. String normalQueue = "normal-queue";
34. channel.queueDeclare(normalQueue, false, false, false, params);
35. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
36. System.out.println("等待接收消息.....");
37. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
38. String message = new String(delivery.getBody(), "UTF-8");
39. System.out.println("Consumer01 接收到消息"+message);
40. };
41. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
42. });
43. }
44. }

Consumer 2
2. import com.rabbitmq.client.Channel;
3. import com.rabbitmq.client.DeliverCallback;
4.
5. public class Consumer02 {
6. private static final String DEAD_EXCHANGE = "dead_exchange";
7. public static void main(String[] argv) throws Exception {
8. Channel channel = RabbitMqUtils.getChannel();
9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
10. String deadQueue = "dead-queue";
11. channel.queueDeclare(deadQueue, false, false, false, null);
12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
13. System.out.println("等待接收死信队列消息.....");
14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
15. String message = new String(delivery.getBody(), "UTF-8");
16. System.out.println("Consumer02 接收死信队列的消息" + message);
17. };
18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
19. });
20. }
21. }

Close consumer 1 and simulate that no information is received
The dead letter queue consumed four messages
Analysis: The producer generates 10 messages, and the normal queue can only accept 6 messages. The extra messages are transferred to the dead letter queue.
Message rejected
producer
2. import com.rabbitmq.client.Channel;
3.
4. public class Producer {
5. private static final String NORMAL_EXCHANGE = "normal_exchange";
6. public static void main(String[] argv) throws Exception {
7. try (Channel channel = RabbitMqUtils.getChannel()) {
8. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
9. //该信息是用作演示队列个数限制
10. for (int i = 1; i <11 ; i++) {
11. String message="info"+i;
12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
13. System.out.println("生产者发送消息:"+message);
14. }
15. }
16. }
17. }

Consumer 1
2. import com.rabbitmq.client.BuiltinExchangeType;
3. import com.rabbitmq.client.Channel;
4. import com.rabbitmq.client.DeliverCallback;
5.
6. import java.util.HashMap;
7. import java.util.Map;
8.
9. public class Consumer01 {
10. //普通交换机名称
11. private static final String NORMAL_EXCHANGE = "normal_exchange";
12. //死信交换机名称
13. private static final String DEAD_EXCHANGE = "dead_exchange";
14. public static void main(String[] argv) throws Exception {
15. Channel channel = RabbitMqUtils.getChannel();
16. //声明死信和普通交换机 类型为 direct
17. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
18. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
19. //声明死信队列
20. String deadQueue = "dead-queue";
21. channel.queueDeclare(deadQueue, false, false, false, null);
22. //死信队列绑定死信交换机与 routingkey
23. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
24. //正常队列绑定死信队列信息
25. Map<String, Object> params = new HashMap<>();
26. //正常队列设置死信交换机 参数 key 是固定值
27. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
28. //正常队列设置死信 routing-key 参数 key 是固定值
29. params.put("x-dead-letter-routing-key", "lisi");
30. String normalQueue = "normal-queue";
31. channel.queueDeclare(normalQueue, false, false, false, params);
32. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
33. System.out.println("等待接收消息.....");
34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
35. String message = new String(delivery.getBody(), "UTF-8");
36. if(message.equals("info5")){
37. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
38. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
39. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
40. }else {
41. System.out.println("Consumer01 接收到消息"+message);
42. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
43. }
44. };
45. boolean autoAck = false;
46. channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
47. });
48. }
49. }

Consumer 2
2. import com.rabbitmq.client.Channel;
3. import com.rabbitmq.client.DeliverCallback;
4.
5. public class Consumer02 {
6. private static final String DEAD_EXCHANGE = "dead_exchange";
7. public static void main(String[] argv) throws Exception {
8. Channel channel = RabbitMqUtils.getChannel();
9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
10. String deadQueue = "dead-queue";
11. channel.queueDeclare(deadQueue, false, false, false, null);
12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
13. System.out.println("等待接收死 信队列消息.....");
14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
15. String message = new String(delivery.getBody(), "UTF-8");
16. System.out.println("Consumer02 接收死信队列的消息" + message);
17. };
18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
19. });
20. }
21. }

result
Consumer 1, refuses to accept message info5, info5 enters the dead letter queue
Consumer 2, the dead letter queue receives info5
delay queue
Delay queue , the queue is internally ordered, and its most important feature is reflected in its delay attribute. The elements in the delay queue are expected
Take out and process after or before the specified time. Simply put, the delay queue is a queue used to store elements that need to be processed at the specified time.
Delay queue usage scenarios
1. Orders that are not paid within ten minutes will be automatically canceled.
2. If a newly created store has not uploaded products within ten days, a message reminder will be automatically sent.
3. After the user successfully registers, if the user does not log in within three days, an SMS reminder will be sent.
4. The user initiates a refund. If the refund is not processed within three days, the relevant operating personnel will be notified.
5. After booking a meeting, all participants need to be notified ten minutes before the scheduled time to attend the meeting.
Message settings TTL
2. import com.rabbitmq.client.BuiltinExchangeType;
3. import com.rabbitmq.client.Channel;
4.
5. public class Producer {
6. private static final String NORMAL_EXCHANGE = "normal_exchange";
7.
8. public static void main(String[] argv) throws Exception {
9. try (Channel channel = RabbitMqUtils.getChannel()) {
10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
11. //设置消息的 TTL 时间
12. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
13. //该信息是用作演示队列个数限制
14. for (int i = 1; i <11 ; i++) {
15. String message="info"+i;
16. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
17. message.getBytes());
18. System.out.println("生产者发送消息:"+message);
19. }
20. }
21. }
22. }

Queue setting TTL
2.
3. import com.rabbitmq.client.BuiltinExchangeType;
4. import com.rabbitmq.client.Channel;
5. import com.rabbitmq.client.DeliverCallback;
6.
7. import java.util.HashMap;
8. import java.util.Map;
9.
10. public class Consumer01 {
11. //普通交换机名称
12. private static final String NORMAL_EXCHANGE = "normal_exchange";
13. //死信交换机名称
14. private static final String DEAD_EXCHANGE = "dead_exchange";
15. public static void main(String[] argv) throws Exception {
16. Channel channel = RabbitMqUtils.getChannel();
17. //声明死信和普通交换机 类型为 direct
18. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
19. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
20. //声明死信队列
21. String deadQueue = "dead-queue";
22. channel.queueDeclare(deadQueue, false, false, false, null);
23. //死信队列绑定死信交换机与 routingkey
24. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
25. //正常队列绑定死信队列信息
26. Map<String, Object> params = new HashMap<>();
27. //正常队列设置死信交换机 参数 key 是固定值
28. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
29. //正常队列设置死信 routing-key 参数 key 是固定值
30. params.put("x-dead-letter-routing-key", "lisi");
31. // 设置 TTL 值为 5000 毫秒(5 秒)
32. params.put("x-message-ttl", 5000);
33. String normalQueue = "normal-queue";
34. channel.queueDeclare(normalQueue, false, false, false, params);
35. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
36. System.out.println("等待接收消息.....");
37. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
38. String message = new String(delivery.getBody(), "UTF-8");
39. if(message.equals("info5")){
40. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
41. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
42. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
43. }else {
44. System.out.println("Consumer01 接收到消息"+message);
45. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
46. }
47. };
48. boolean autoAck = false;
49. channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
50. });
51. }
52. }

The difference between the two
If the TTL attribute of the queue is set , then once the message expires, it will be discarded by the queue ( if the dead letter queue is configured, it will be thrown into the dead letter queue). In the second method, even if the message expires, it may not be immediately discarded. Discard, because whether the message has expired is determined before****it is delivered to the consumer . If the current queue has a serious message backlog, the expired message may still survive for a long time; in addition, one thing to note is that if Not setting TTL means that the message will never expire. If TTL is set to 0 , it means that the message will be discarded unless it can be directly delivered to the consumer at this time.****