Skip to main content

rabbitmq dead letter queue

rabbitmq dead letter queue

Table of contents

Conditions for becoming a dead letter

Message TTL expired

Queue reaches maximum length

Message rejected

delay queue

Delay queue usage scenarios

Message settings TTL

Queue setting TTL

The difference between the two


The producer delivers the message to the broker or directly to the queue , and the consumer retrieves the message from the queue.

Consumption, but sometimes due to specific reasons , some messages in****the queue cannot be consumed . If such messages are not processed later, they become dead letters. If there are dead letters, there will naturally be a dead letter queue.****

Conditions for becoming a dead letter

  1. Exceeding the message survival time (TTL): You can set a survival time for the message. After this time period, if the message has not been consumed or is re-delivered to another queue, the message will become a dead letter.

  2. Message Reject: When a message is rejected by the consumer, you can choose to re-deliver the message to another queue or mark it as a dead letter.

  3. The message reaches the maximum number of retries: You can set a limit on the number of retries on the consumer side. When a message reaches a certain number of retries and still cannot be consumed, the message will become a dead letter.

  4. Queue Overflow : When the number of messages in a queue exceeds the maximum capacity limit of the queue, new messages cannot enter the queue and are treated as dead letters.

Message TTL expired

producer

    
2. import com.rabbitmq.client.BuiltinExchangeType;

3. import com.rabbitmq.client.Channel;

4.

5. public class Producer {

6. private static final String NORMAL_EXCHANGE = "normal_exchange";

7.

8. public static void main(String[] argv) throws Exception {

9. try (Channel channel = RabbitMqUtils.getChannel()) {

10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

11. //设置消息的 TTL 时间

12. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

13. //该信息是用作演示队列个数限制

14. for (int i = 1; i <11 ; i++) {

15. String message="info"+i;

16. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,

17. message.getBytes());

18. System.out.println("生产者发送消息:"+message);

19. }

20. }

21. }

22. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 1

    
2. import com.rabbitmq.client.Channel;

3. import com.rabbitmq.client.DeliverCallback;

4.

5. import [java](/search?q=java).util.HashMap;

6. import java.util.Map;

7.

8. public class Consumer01 {

9. //普通交换机名称

10. private static final String NORMAL_EXCHANGE = "normal_exchange";

11. //死信交换机名称

12. private static final String DEAD_EXCHANGE = "dead_exchange";

13. public static void main(String[] argv) throws Exception {

14. Channel channel = RabbitMqUtils.getChannel();

15. //声明死信和普通交换机 类型为 direct

16. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

17. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

18. //声明死信队列

19. String deadQueue = "dead-queue";

20. channel.queueDeclare(deadQueue, false, false, false, null);

21. //死信队列绑定死信交换机与 routingkey

22. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

23. //正常队列绑定死信队列信息

24. Map<String, Object> params = new HashMap<>();

25. //正常队列设置死信交换机 参数 key 是固定值

26. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

27. //正常队列设置死信 routing-key 参数 key 是固定值

28. params.put("x-dead-letter-routing-key", "lisi");

29.

30. String normalQueue = "normal-queue";

31. channel.queueDeclare(normalQueue, false, false, false, params);

32. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

33. System.out.println("等待接收消息.....");

34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

35. String message = new String(delivery.getBody(), "UTF-8");

36. System.out.println("Consumer01 接收到消息"+message);

37. };

38. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {

39. });

40. }

41. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 2

    
2. import com.rabbitmq.client.Channel;

3. import com.rabbitmq.client.DeliverCallback;

4.

5. public class Consumer02 {

6. private static final String DEAD_EXCHANGE = "dead_exchange";

7. public static void main(String[] argv) throws Exception {

8. Channel channel = RabbitMqUtils.getChannel();

9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

10. String deadQueue = "dead-queue";

11. channel.queueDeclare(deadQueue, false, false, false, null);

12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

13. System.out.println("等待接收死信队列消息.....");

14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

15. String message = new String(delivery.getBody(), "UTF-8");

16. System.out.println("Consumer02 接收死信队列的消息" + message);

17. };

18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {

19. });

20. }

21. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Shut down consumer 1 and simulate a failure

rabbitmq dead letter queue

normal queue

rabbitmq dead letter queue The message times out and enters the dead letter queue

rabbitmq dead letter queue

Dead letter queue receiving messages

rabbitmq dead letter queue

Queue reaches maximum length

producer

    
2. import com.rabbitmq.client.Channel;

3.

4. public class Producer {

5. private static final String NORMAL_EXCHANGE = "normal_exchange";

6. public static void main(String[] argv) throws Exception {

7. try (Channel channel = RabbitMqUtils.getChannel()) {

8. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

9. //该信息是用作演示队列个数限制

10. for (int i = 1; i <11 ; i++) {

11. String message="info"+i;

12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());

13. System.out.println("生产者发送消息:"+message);

14. }

15. }

16. }

17. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 1

    
2. import com.rabbitmq.client.Channel;

3. import com.rabbitmq.client.DeliverCallback;

4.

5. import java.util.HashMap;

6. import java.util.Map;

7.

8. public class Consumer01 {

9. //普通交换机名称

10. private static final String NORMAL_EXCHANGE = "normal_exchange";

11. //死信交换机名称

12. private static final String DEAD_EXCHANGE = "dead_exchange";

13.

14. public static void main(String[] argv) throws Exception {

15. Channel channel = RabbitMqUtils.getChannel();

16. //声明死信和普通交换机 类型为 direct

17. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

18. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

19. //声明死信队列

20. String deadQueue = "dead-queue";

21. channel.queueDeclare(deadQueue, false, false, false, null);

22. //死信队列绑定死信交换机与 routingkey

23. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

24. //正常队列绑定死信队列信息

25. Map<String, Object> params = new HashMap<>();

26. //正常队列设置死信交换机 参数 key 是固定值

27. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

28. //正常队列设置死信 routing-key 参数 key 是固定值

29. params.put("x-dead-letter-routing-key", "lisi");

30. // 设置正常队列长度的限制

31. params.put("x-max-length",6);

32.

33. String normalQueue = "normal-queue";

34. channel.queueDeclare(normalQueue, false, false, false, params);

35. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

36. System.out.println("等待接收消息.....");

37. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

38. String message = new String(delivery.getBody(), "UTF-8");

39. System.out.println("Consumer01 接收到消息"+message);

40. };

41. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {

42. });

43. }

44. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 2

    
2. import com.rabbitmq.client.Channel;

3. import com.rabbitmq.client.DeliverCallback;

4.

5. public class Consumer02 {

6. private static final String DEAD_EXCHANGE = "dead_exchange";

7. public static void main(String[] argv) throws Exception {

8. Channel channel = RabbitMqUtils.getChannel();

9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

10. String deadQueue = "dead-queue";

11. channel.queueDeclare(deadQueue, false, false, false, null);

12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

13. System.out.println("等待接收死信队列消息.....");

14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

15. String message = new String(delivery.getBody(), "UTF-8");

16. System.out.println("Consumer02 接收死信队列的消息" + message);

17. };

18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {

19. });

20. }

21. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Close consumer 1 and simulate that no information is received

rabbitmq dead letter queue

The dead letter queue consumed four messages

rabbitmq dead letter queue

Analysis: The producer generates 10 messages, and the normal queue can only accept 6 messages. The extra messages are transferred to the dead letter queue.

rabbitmq dead letter queue

Message rejected

producer

    
2. import com.rabbitmq.client.Channel;

3.

4. public class Producer {

5. private static final String NORMAL_EXCHANGE = "normal_exchange";

6. public static void main(String[] argv) throws Exception {

7. try (Channel channel = RabbitMqUtils.getChannel()) {

8. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

9. //该信息是用作演示队列个数限制

10. for (int i = 1; i <11 ; i++) {

11. String message="info"+i;

12. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());

13. System.out.println("生产者发送消息:"+message);

14. }

15. }

16. }

17. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 1

    
2. import com.rabbitmq.client.BuiltinExchangeType;

3. import com.rabbitmq.client.Channel;

4. import com.rabbitmq.client.DeliverCallback;

5.

6. import java.util.HashMap;

7. import java.util.Map;

8.

9. public class Consumer01 {

10. //普通交换机名称

11. private static final String NORMAL_EXCHANGE = "normal_exchange";

12. //死信交换机名称

13. private static final String DEAD_EXCHANGE = "dead_exchange";

14. public static void main(String[] argv) throws Exception {

15. Channel channel = RabbitMqUtils.getChannel();

16. //声明死信和普通交换机 类型为 direct

17. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

18. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

19. //声明死信队列

20. String deadQueue = "dead-queue";

21. channel.queueDeclare(deadQueue, false, false, false, null);

22. //死信队列绑定死信交换机与 routingkey

23. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

24. //正常队列绑定死信队列信息

25. Map<String, Object> params = new HashMap<>();

26. //正常队列设置死信交换机 参数 key 是固定值

27. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

28. //正常队列设置死信 routing-key 参数 key 是固定值

29. params.put("x-dead-letter-routing-key", "lisi");

30. String normalQueue = "normal-queue";

31. channel.queueDeclare(normalQueue, false, false, false, params);

32. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

33. System.out.println("等待接收消息.....");

34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

35. String message = new String(delivery.getBody(), "UTF-8");

36. if(message.equals("info5")){

37. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");

38. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中

39. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

40. }else {

41. System.out.println("Consumer01 接收到消息"+message);

42. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

43. }

44. };

45. boolean autoAck = false;

46. channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {

47. });

48. }

49. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Consumer 2

    
2. import com.rabbitmq.client.Channel;

3. import com.rabbitmq.client.DeliverCallback;

4.

5. public class Consumer02 {

6. private static final String DEAD_EXCHANGE = "dead_exchange";

7. public static void main(String[] argv) throws Exception {

8. Channel channel = RabbitMqUtils.getChannel();

9. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

10. String deadQueue = "dead-queue";

11. channel.queueDeclare(deadQueue, false, false, false, null);

12. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

13. System.out.println("等待接收死信队列消息.....");

14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

15. String message = new String(delivery.getBody(), "UTF-8");

16. System.out.println("Consumer02 接收死信队列的消息" + message);

17. };

18. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {

19. });

20. }

21. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

result

Consumer 1, refuses to accept message info5, info5 enters the dead letter queue

rabbitmq dead letter queue

Consumer 2, the dead letter queue receives info5

rabbitmq dead letter queue

delay queue

Delay queue , the queue is internally ordered, and its most important feature is reflected in its delay attribute. The elements in the delay queue are expected

Take out and process after or before the specified time. Simply put, the delay queue is a queue used to store elements that need to be processed at the specified time.

Delay queue usage scenarios

1. Orders that are not paid within ten minutes will be automatically canceled.

2. If a newly created store has not uploaded products within ten days, a message reminder will be automatically sent.

3. After the user successfully registers, if the user does not log in within three days, an SMS reminder will be sent.

4. The user initiates a refund. If the refund is not processed within three days, the relevant operating personnel will be notified.

5. After booking a meeting, all participants need to be notified ten minutes before the scheduled time to attend the meeting.

Message settings TTL

    
2. import com.rabbitmq.client.BuiltinExchangeType;

3. import com.rabbitmq.client.Channel;

4.

5. public class Producer {

6. private static final String NORMAL_EXCHANGE = "normal_exchange";

7.

8. public static void main(String[] argv) throws Exception {

9. try (Channel channel = RabbitMqUtils.getChannel()) {

10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

11. //设置消息的 TTL 时间

12. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

13. //该信息是用作演示队列个数限制

14. for (int i = 1; i <11 ; i++) {

15. String message="info"+i;

16. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,

17. message.getBytes());

18. System.out.println("生产者发送消息:"+message);

19. }

20. }

21. }

22. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

Queue setting TTL

    
2.

3. import com.rabbitmq.client.BuiltinExchangeType;

4. import com.rabbitmq.client.Channel;

5. import com.rabbitmq.client.DeliverCallback;

6.

7. import java.util.HashMap;

8. import java.util.Map;

9.

10. public class Consumer01 {

11. //普通交换机名称

12. private static final String NORMAL_EXCHANGE = "normal_exchange";

13. //死信交换机名称

14. private static final String DEAD_EXCHANGE = "dead_exchange";

15. public static void main(String[] argv) throws Exception {

16. Channel channel = RabbitMqUtils.getChannel();

17. //声明死信和普通交换机 类型为 direct

18. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

19. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

20. //声明死信队列

21. String deadQueue = "dead-queue";

22. channel.queueDeclare(deadQueue, false, false, false, null);

23. //死信队列绑定死信交换机与 routingkey

24. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

25. //正常队列绑定死信队列信息

26. Map<String, Object> params = new HashMap<>();

27. //正常队列设置死信交换机 参数 key 是固定值

28. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);

29. //正常队列设置死信 routing-key 参数 key 是固定值

30. params.put("x-dead-letter-routing-key", "lisi");

31. // 设置 TTL 值为 5000 毫秒(5 秒)

32. params.put("x-message-ttl", 5000);

33. String normalQueue = "normal-queue";

34. channel.queueDeclare(normalQueue, false, false, false, params);

35. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

36. System.out.println("等待接收消息.....");

37. DeliverCallback deliverCallback = (consumerTag, delivery) -> {

38. String message = new String(delivery.getBody(), "UTF-8");

39. if(message.equals("info5")){

40. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");

41. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中

42. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

43. }else {

44. System.out.println("Consumer01 接收到消息"+message);

45. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

46. }

47. };

48. boolean autoAck = false;

49. channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {

50. });

51. }

52. }




![rabbitmq dead letter queue](6b44e99974d17195ee2722671aabdc1f.png)

The difference between the two

If the TTL attribute of the queue is set , then once the message expires, it will be discarded by the queue ( if the dead letter queue is configured, it will be thrown into the dead letter queue). In the second method, even if the message expires, it may not be immediately discarded. Discard, because whether the message has expired is determined before****it is delivered to the consumer . If the current queue has a serious message backlog, the expired message may still survive for a long time; in addition, one thing to note is that if Not setting TTL means that the message will never expire. If TTL is set to 0 , it means that the message will be discarded unless it can be directly delivered to the consumer at this time.****