Skip to main content

RabbitMQ

RabbitMQ

Why use MQ
1. Traffic peak reduction
2. Application decoupling
3. Asynchronous processing , such as the asynchronous release confirmation mentioned later

Four core concepts:
producer, switch , queue, consumer

working principle
RabbitMQ

Broker : An application that receives and distributes messages. RabbitMQ Server is Message Broker.
Virtual host : Designed for multi-tenancy and security reasons, the basic components of AMQP are divided into a virtual group, similar to the namespace concept in the network. When multiple different users use the services provided by the same RabbitMQ server, multiple vhosts can be divided, and each user creates exchange/queue and other
Connections in their own vhost: TCP connection
Channel between publisher/consumer and broker : If A Connection is established every time RabbitMQ is accessed. When the message volume is large, the overhead of establishing a TCP Connection will be huge and the efficiency will be low. Channel is a logical connection established inside the connection. If the application supports multi-threading, each thread usually creates a separate channel for communication. The AMQP method includes the channel id to help the client and message broker identify the channel, so the channels are completely isolated. of. Channel, as a lightweight Connection, greatly reduces the operating system's cost of establishing a TCP connection.
Exchange : the message reaches the first stop of the broker. According to the distribution rules, it matches the routing key in the query table and distributes the message to the queue. Commonly used types are: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue : the message is finally sent here to wait for the consumer to take it away
Binding : the virtual connection between exchange and queue, in binding Can contain routing key, Binding information is saved to the query table in exchange, used for message distribution basis

The code is roughly as follows:
configuration class: define the switch and queue, and then bind the two.
Producer: encapsulate the class and send it to the switch. The switch sends the message to the specified queue according
to the routing key. Consumer: processes the message.

Six modes.
RabbitMQ
Code for the first mode:

    public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin"); factory.setPassword("admin"); factory.setPassword("Admin")
factory.setUsername("admin"); factory.setPassword("123"); factory.setPassword("123"); //channel implements auto close interface.
//channel implements the autoclose interface to automatically close without displaying the close
try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
/**
* Generate a queue.
* 1. The name of the queue. The name of the queue is crucial for us to specify which queue our consumers will consume messages from.
* 2. whether the messages inside the queue are persistent or not default messages are stored in memory
* 3. whether the queue is for consumption by only one consumer whether it is shared or not true can be consumed by multiple consumers
* 4. whether the queue is automatically deleted after the last consumer opens a connection true Automatic deletion
* 5. Other parameters
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* Send a message
* 1. Send it to that switch
* 2. Which key is used for the route.
* 3. Other parameter information
* 4. The message body of the sent message
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("Message sent");;
}
}
}



    public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71"); factory.setUsername("182.92.234.71")
factory.setUsername("admin"); factory.setPassword("admin"); factory.setPassword("admin")
factory.setUsername("admin"); factory.setPassword("123"); factory.setPassword("123"); Connection connection = factory.newConnection()
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("Waiting to receive message ....") ;
// Interface callback for how the pushed message is consumed
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
// A callback interface to cancel the consumption if the queue is deleted during consumption.
CancelCallback cancelCallback=(consumerTag)->{ System.out.println("Message consumption was interrupted"); }; // A callback interface to cancel consumption, such as when the queue is deleted during consumption.
/**
* The consumer consumes the message
* 1. Consume which queue
* 2. Whether or not to auto answer after successful consumption true means auto answer false manual answer
* 3. Consumer unsuccessful consumption of the callback
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}



Producer: Get channel, declare queue (queueDeclare), send message (basicPublish)
Consumer: Get channel, receive and reject callback interface, consume message (basicConsume)

Message response
Automatic response (default)
To ensure that messages are not lost during consumption, you need to change the automatic response to manual response.

Three ways to answer manually. The advantage of manual answering is that it can respond in batches and reduce network congestion.

    Channel.basicAck(for positive acknowledgement); // RabbitMQ knows about the message and has successfully processed it, so it can be discarded now.
Channel.basicNack (for negative acknowledgement); // RabbitMQ already knows about the message and has successfully processed it.
Channel.basicReject(for negative acknowledgement); // One less parameter than Channel.basicNack, does not process the message, rejects it, and can be discarded.

How to ensure that the messages sent by the message producer are not lost when the RabbitMQ service is stopped. By default when RabbitMQ exits or crashes for some reason, it ignores queues and messages unless it is told not to do so. Two things need to be done to ensure that messages are not lost: We need to mark both the queue and the message as persistent.

将消息标记为持久化并不能完全保证不会丢失消息。如果需要
更强有力的持久化策略,参考后边课件发布确认章节。

不公平分发:

    int prefetchCount = 1; // prefetch value
channel.basicQos(prefetchCount ); // A prefetch value of 1 is the most conservative. Of course this will make the throughput very low, especially if the consumer connection latency is severe

RabbitMQ

Release Acknowledgement
Don't confuse a message answer with a release acknowledgement.
Message Answer (manual) is, your message has been consumed or rejected, a clear answer.
A release acknowledgement is that the message I sent to the queue, whether it was consumed or not is a different matter.

Once the message has been dropped to all matching queues, the broker sends an acknowledgement to the producer (containing the message's unique ID), which lets the producer know that the message has arrived correctly at the destination queue, or, if the message and the queue are persistent, the acknowledgement will be sent after the message has been written to disk.
Single release acknowledgement, batch release acknowledgement (just different logic than single release acknowledgement)

    channel.confirmSelect();

Asynchronous confirmation although the programming logic is more complex than the last two, but the most cost-effective, whether it is the reliability or efficiency of no comment, he is the use of callback functions to achieve the reliability of the message delivery, the middleware is also through the function callbacks to ensure that whether the delivery is successful.
RabbitMQ

    public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueDeclare); channel.queueDeclare(queueDeclare)
channel.queueDeclare(queueName, false, false, false, null);
//Enable release confirmation
channel.confirmSelect();
/**
* Thread-safe ordered a hash table for highly concurrent situations
* 1. Easily associate the serial number with the message
* 2. easily batch delete entries as long as given the serial number
* 3. Support concurrent access
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/**
* A callback to acknowledge receipt of a message
* 1. message sequence number
* 2. true Acknowledges messages less than or equal to the current sequence number.
* false acknowledges messages with the current sequence number
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
// Returns an unacknowledged message that is less than or equal to the current sequence number is a map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
//clear this part of the unconfirmed messages
confirmed.clear();
}else{
//clear only the messages with the current sequence number
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("Posted message "+message+"not confirmed, sequenceNumber "+sequenceNumber);;
};
/**
* Add an asynchronous acknowledgement listener
* 1. Acknowledgement callback for received messages
* 2. Callback to acknowledge receipt of the message.
*/
channel.addConfirmListener(ackCallback, null);
channel.addConfirmListener(ackCallback, null); long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "Message" + i;
/**
* channel.getNextPublishSeqNo() gets the next message's sequence number
* Make an association with the message body by the sequence number
* All unacknowledged message bodies
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("Publishing " + MESSAGE_COUNT + "asynchronous acknowledgement message, took " + (end - begin) + "ms"); }
}
}




RabbitMQ
Asynchronous: When outputting "1000 asynchronous release confirmation messages, took 62ms", the messages have been sent, but the confirmation is not completed, and the two are not delayed each other.

**How to deal with asynchronous unacknowledged messages
The best solution is to put the unconfirmed messages into a memory-based queue that can be accessed by the posting thread, e.g., use ConcurrentLinkedQueue to pass messages between the confirm callbacks (success and failure) and the posting thread. The thread that sends the message and the thread that listens. One thread listens, one thread sends and prints the result. How the two threads interact: ConcurrentLinkedQueue . The teacher here actually uses:ConcurrentSkipListMap .
Popup: If you need to [sequentially store] key-value pairs and want to be able to efficiently perform concurrent read and write operations, SkipListMap is a good choice. Only need thread-safe queues, [do not care about the order of the elements], LinkedQueue is more suitable.

1, send a message to send, put into ConcurrentSkipListMap, outstandingConfirms.put(channel.getNextPublishSeqNo (), message);
2, in the confirmation callback, delete the confirmed, the rest is unconfirmed.

ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);  

Pop-ups: _headMap method returns a view, the operation of the view will affect the original map, so empty the headMap no problem , _ (ConcurrentNavigableMap) This is a map to support high concurrency, headmap is to generate a map is not new a new map, _ headMap method, intercept the ConcurrentSkipListMap collection of elements, the sequenceNumber before the intercept into the ConcurrentNavigableMap.
headMap method, intercept ConcurrentSkipListMap collection elements, the sequenceNumber before the interception to the ConcurrentNavigableMap inside.

3, unconfirmed callback inside the processing, no confirmation of the message

Code model:
Producer: get connected, launched into the queue, but there are two asynchronous confirmation of the callback function
Consumer:

The following introduction of the switch.
The fanout code declares the switch and binds the temporary queue with a routing key in both the producer and the consumer.
The direct code declares the switch and binds the declared queue with a routing key in the consumer; in the producer, the switch is declared but not bound.
topic Code is to declare a switch and bind the declared queue with a routing key in the consumer; in the producer, the switch is declared but not bound.

Dead letter queue The code is in the consumer, dead letter queue binding dead letter switch, normal queue binding dead letter switch, channel.queueDeclare(normalQueue, false, false, false, params);; in the producer, the switch is declared but not bound.

The delay queue code is, roughly, the following.
Configuration class: defines the switch and queue, and then binds the two
Producer: encapsulated class, sent to the switch, the switch to follow the routing key to send the message to the specified queue
Consumer: processing the message

How to handle asynchronous unacknowledged messages
The best solution is to put the unconfirmed messages into a memory-based queue that can be accessed by the publishing thread, for example, using ConcurrentLinkedQueue which is a queue that passes messages between confirm callbacks and the publishing thread.

Publish/Subscribe
The core idea of the RabbitMQ message passing model is that messages produced by a producer are never sent directly to a queue. The producer can only send messages to the exchange, which has the simple job of receiving messages from the producer and pushing them into the queue.
Direct, topic, headers, fanout.
fanout
RabbitMQ

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 交换机在生产者中定义

direct

RabbitMQ
In this binding case, the producer publishes a message to the exchange, and messages with a binding key of orange are published to queue Q1. messages with binding keys of black and green are published to queue Q2, and messages with other message types are discarded.
RabbitMQ
topic
Although using a direct switch improves our system, it still has limitations - let's say we want to receive logs of types info.base and info.advantage, and a queue only wants info.base messages, then direct won't work. We can only use the topic type. The routing_key for messages sent to a topic switch cannot be written arbitrarily, but must meet certain requirements; it must be a list of words separated by dots. These words can be any words, for example: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit ", of this type.
In this list of rules, there are two of these substitutions that you should be aware of

  • (asterisk) can replace a word
    #(well sign) can replace zero or more words
    RabbitMQ

When a queue is bound to #, then the queue will receive all data, kind of like a fanout.
If there is no # or * in the queue's binding key, then the queue's binding type is direct.

Dead Message Queue
In order to ensure that the order business does not lose message data, you need to use RabbitMQ's dead letter queue mechanism, which puts messages into the dead letter queue when an exception occurs in message consumption. There are also, for example: the user in the mall order successfully and click to go to the payment in the specified time is not paid when the automatic expiration.
1, the message TTL expired
2, the queue reaches the maximum length (the queue is full, can not add data to the mq)
3、Message rejected (basic.reject or basic.nack) and requeue=false
RabbitMQ
DLX: (abbreviation for dead-letter-exchange) Dead Letter Queuing Switch
DLK: (abbreviation for dead-letter-routing-key) dead-letter-queue routingKey
TTL: (time-to-live abbreviation) time-to-live

    public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// Set the TTL time for the message
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//This information is used to demonstrate the limit of the number of queues.
for (int i = 1; i <11 ; i++) {
String message="info "+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("Producer sends message: "+message);
}
}
}
}
    public class Consumer01 {
//normal_exchange_name
private static final String NORMAL_EXCHANGE = "normal_exchange";
//Name of the dead letter switch
private static final String DEAD_EXCHANGE = "dead_exchange";; // Dead letter switch name
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
// Declare dead and normal exchanges of type direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// Declare deadQueue
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//dead letter queue bind dead letter switch with routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// Normal queue bind deadQueue information
Map<String, Object> params = new HashMap<>(); //Normal queue set deadQueue information.
//normal queue set dead message switch parameter key is fixed value
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);; //Normal queue set dead-letter-exchange parameter key is fixed value
//normal queue set dead-letter routing-key parameter key is fixed value
params.put("x-dead-letter-routing-key", "lisi");; //Normal queue set dead-letter routing-key

String normalQueue = "normal-queue"; channel.queueDeclare; channel.
channel.queueDeclare(normalQueue, false, false, false, params); channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("Waiting to receive message .....") ;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 received message "+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
}
}



Delayed Queue
Delayed queue, the queue is internally ordered, the most important characteristic is reflected in its delayed attributes, delayed queue elements are expected to be taken out and processed after or before the arrival of the specified time.

  1. Orders not paid within ten minutes are automatically canceled
  2. A message reminder is automatically sent to a newly created store that has not uploaded a product within ten days.
  3. Users are reminded by SMS if they have not logged in within three days after successful registration.
  4. If a user initiates a refund and it is not processed within three days, the relevant operation staff will be notified.
  5. After booking a meeting, each participant needs to be notified of the meeting ten minutes before the scheduled time point.
    RabbitMQ

The message sets the TTL and the queue sets the TTL.
If a message has a TTL set or enters a queue with a TTL set, the message will become "dead" if it is not consumed within the TTL set. If both the queue's TTL and the message's TTL are configured, the smaller value will be used, and there are two ways to set the TTL.
If the queue's TTL is set, then once a message expires, it will be discarded by the queue (if a dead message queue is configured, expired messages will be discarded to the dead message queue), whereas in the second way, even if a message is expired, it may not be discarded right away because the message's expiration date is determined before it is delivered to the consumer, and if there is a serious backlog of messages in the current queue, then the expired messages If the current queue has a serious backlog of messages, the expired message may survive for a longer period of time.
Delay queue, is not how long you want to delay the message to be processed, TTL is just how long you want the message to be delayed to become a dead message, on the other hand, the message that becomes a dead message will be delivered to the dead message queue, so that the consumer only needs to consume the message in the dead message queue all the time, because all of the messages in it are the messages you want to be processed immediately.

Dead letter queue, TTL, RabbitMQ implementation of delayed queuing of the two elements.

RabbitMQ
RabbitMQ
Note the extra type here: x-delayed-message

A request with a delay of 20000 (20 seconds) is sent first, then a request with a delay of 2000 (2 seconds).
It is logical that the 2000 (2 seconds) one should be consumed first, and the 20000 (20 seconds) one later.
The first one, 58:56 to 59:16 is exactly 20 seconds, but why is the second one, 02 to 16 seconds, 12 seconds, because the delay queue is sequential. In the case of multiple requests, you can only consume the first one before the second one, no matter how many seconds you need to delay. Remember the hidden flaw of setting ttl on messages?
RabbitMQ
RabbitMQ

    @Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_EXCHANGE_NAME = "delayed.
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

// Custom Exchange What we are defining here is a delayed exchange [CustomExchange]
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// The type of the custom exchange
args.put("x-delayed-type", "direct");
// switch name, switch type (as it appears after installing the plugin), whether it's persistent, whether it's auto-delete, args
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExc delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}



Delayed queues are useful in scenarios where delayed processing is required. Implementing delayed queues with RabbitMQ makes good use of RabbitMQ features such as reliable message delivery, reliable message casting, and dead queues to ensure that messages are consumed at least once and that messages that are not processed correctly are not discarded. In addition, the features of a RabbitMQ cluster make it possible to solve the problem of a single point of failure, so that a single node does not hang and cause the delayed queue to become unavailable or messages to be lost.
Of course, there are many other options for delayed queues, such as utilizing Java's DelayQueue, Redis' zset, Quartz, or kafka's wheel of time, each of which has its own characteristics, depending on the scenario to which they apply.

Release Acknowledgment Advanced

Don't confuse message answering with release acknowledgment.
A message answer (manual) is an explicit response (callback) that your message has been consumed or rejected.
Post Acknowledgment (asynchronous acknowledgement) is that the message was sent to the queue, whether it was consumed or not is a different matter, using a listener.
The advanced level of posting an acknowledgement is also through callbacks.
Sending to the queue requires a switch, but what happens when the switch goes down? This is what the high level is trying to solve

When the producer knows that the message has arrived correctly at the destination queue, the broker sends an acknowledgement to the producer (containing the unique ID of the message); if the message and the queue are persistent, the acknowledgement is sent after the message is written to disk
RabbitMQ

RabbitMQ

keycode

    @Component
@Slf4j
public class MyCallBack implements RabbitTemplate.

ConfirmCallback // We are implementing ConfirmCallback, an interface inside RabbitTemplate, but our implementation class MyCallBack is not inside RabbitTemplate, which means that RabbitTemplate will not be able to call ConfirmCallback in the future, so we need to inject our MyCallBack into RabbitTemplate's ConfirmCallback. We need to inject our MyCallBack into RabbitTemplate's ConfirmCallback interface.
@Autowired
RabbitTemplate rabbitTemplate.

@PostConstruct // This annotation comes after @Component and @Autowired.
public void init(){
rabbitTemplate.setConfirmCallback(this); // this is the current class.
}
// The current class, injected into the interface of this class, in order to use this interface, this implementation class.
// // The current class is injected into the interface of this class in order to use the interface.
//**
* A callback method for the switch, regardless of whether or not a message was received.
* CorrelationData, which sends the message we provide.
* Message correlation data
* ack
* Whether the switch received the message or not.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("Switch has received a message with id :{}",id);
}else{
log.info("The switch has not received the message with id :{}, due to cause :{}",id,cause);
}
}
}



The producer, should be able to tell that something is wrong with the switch and my message, is not going out.

The following figure simulates that the switch is working fine, the switch is answering correctly, but when the queue is not working properly and the queue failure message is not getting to the producer
RabbitMQ

In the case of only the producer confirmation mechanism is turned on, the switch receives the message, it will send an acknowledgement message directly to the message producer, if it is found that the message can not be routed (queue problems), then the message will be directly discarded, at this time, the producer is not aware of the message is discarded this event.

    @Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.

// We are implementing ConfirmCallback, an interface inside RabbitTemplate, but our implementation class MyCallBack is not inside RabbitTemplate, so RabbitTemplate will not be able to call ConfirmCallback in the future. We need to inject our MyCallBack into RabbitTemplate's ConfirmCallback interface.
@Autowired
RabbitTemplate rabbitTemplate.

@PostConstruct // This annotation comes after @Component and @Autowired.
public void init(){
rabbitTemplate.setConfirmCallback(this); // this, is the current class, switch
rabbitTemplate.setReturnCallback (this); // this, is the current class, queue
}
// The current class is injected into the interface of this class in order to use the interface, the implementation class.

// This is the current class.
* A callback method for the switch, regardless of whether or not a message was received.
* CorrelationData
* Message correlation data
* ack
* Whether the switch received the message or not.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("Switch has received a message with id :{}",id);
}else{
log.info("The switch has not received the message with id :{}, due to cause :{}",id,cause);
}
}
// Callback method when the message can't be routed (that's when the switch routes to the queue)
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error(" Message {}, returned by exchange {}, reason for return :{}, routing key:{}",new String(message.getBody()),exchange,replyText,routingKey);
}
}



Advanced release acknowledgment solves the problem of message loss (switches, queues) producer-unawareness, but not well enough. We handle it better with backup switches

Backup switch
RabbitMQ

It is possible to set up a queue with a dead-letter switch to store messages that fail to be processed, but these non-routable messages have no chance of making it to the queue, so it is not possible to use a dead-letter queue to store messages.

    @Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String CONFIRM_QUEUE_NAME = "confirm.


public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.
public static final String WARNING_QUEUE_NAME = "warning.queue";
// Declare the confirmation queue
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// Declare the confirm queue binding relationship
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}

// Declare the backup Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// Declare the backup exchange that confirms the Exchange switch
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
// Set the backup exchange for this exchange
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}

// Declare the warning queue
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// Declare the warning queue bindings
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExceptions")
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}

// Declare the backup queue
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// Declare the backup queue bindings
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
}



staple

    	// Declare the backup switch that confirms the Exchange switch
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
// Set the backup exchange for this exchange
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}

You need to generate an association between the confirming switch and the backup switch.

RabbitMQ
Backup switch, alarm queue for consumption (processing)

Caution:
When the mandatory parameter and the backup switch can be used together, if both are turned on at the same time, where exactly do the messages go? The answer to this question is the backup switch has a higher priority.

RabbitMQ Other Knowledge Points

Idempotency, Priority Queues, Inert Queues

Idempotence:
The result of one or more requests from the user for the same operation is the same, without the side effects of multiple hits. To give the simplest example, that is, payment, the user to buy goods after payment, payment deduction success, but the return results of the network exception, at this time, the money has been deducted, the user clicked the button again, at this time the second deduction will be carried out to return the results of the success of the user to check the balance found that more deductions, the flow of water records have also become two.

Repeated consumption of messages:
When the consumer consumes the message in MQ, MQ has sent the message to the consumer. When the consumer returns ack to MQ, the network is interrupted, so MQ does not receive the confirmation information, and the message will be resent to other parties. consumer, or send it to the consumer again after the network is reconnected, but in fact the consumer has successfully consumed the message, causing the consumer to consume duplicate messages.

The solution to the idempotence of MQ consumers generally uses a global ID or writes a unique identifier such as a timestamp or UUID or the consumer of the order consumes the messages in MQ. You can also use the ID of MQ to judge, or you can follow your own rules. Generate a globally unique id. Each time a message is consumed, use this id to first determine whether the message has been consumed.

During the peak period of business when massive orders are generated, messages may be repeated on the production side. At this time, the consumer side must implement idempotence, which means that our messages will never be consumed multiple times, even if we receive Same news. There are two mainstream idempotent operations in the industry
: a. Unique ID + fingerprint code mechanism , using the database primary key to remove duplication. The advantage is that it is simple to implement, just one splice, and then query to determine whether it is duplicated; the disadvantage is that when concurrency is high, if it is a single database, there will be a writing performance bottleneck. Of course, you can also use sub-databases and sub-tables to improve performance, but it is not our most recommended option. Way.
b. Use the atomicity of redis to achieve it. Using redis to execute the setnx command is naturally idempotent . So as to achieve non-repeated consumption. (setnx: only join when the key does not exist, otherwise do not join)

Example of priority queue.
In the past, our back-end system used redis to store scheduled polling. Everyone knows that redis can only use List to make a simple message queue, and cannot implement a priority scenario. Therefore, when the order volume is large, RabbitMQ is used for transformation and optimization. If it is found that the order is from a large customer, it will be given a relatively high priority, otherwise it will be the default priority.
RabbitMQ
Of course, when working, I use code, not pages.
The following things need to be done to enable the queue to achieve priority: the queue needs to be set as a priority queue, the message needs to set the priority of the message, and the consumer needs to wait for [the message has been sent to the queue] before consuming it. Because, in this way, there is Opportunity to sort messages.

lazy queue

RabbitMQ introduced the concept of lazy queue starting from version 3.6.0. The lazy queue will store messages on disk as much as possible, and will only be loaded into memory when the consumer consumes the corresponding message. One of its important design goals is to be able to support longer queues, that is, to support more message storage. Lazy queues are necessary when consumers are unable to consume messages for a long time due to various reasons (such as consumers going offline, downtime, or shutting down due to maintenance, etc.).
By default, when a producer sends a message to RabbitMQ, the message in the queue will be stored in memory as much as possible, so that the message can be sent to the consumer faster. Even persistent messages retain a copy in memory while being written to disk. When RabbitMQ needs to release memory, it will page the messages in the memory to the disk. This operation will take a long time and will also block the queue operation, making it impossible to receive new messages. Although the developers of RabbitMQ have been upgrading related algorithms, the results are still not ideal, especially when the message volume is particularly large.

The queue has two modes: default and lazy. The default mode is default, and no changes are required for versions prior to 3.6.0. The lazy mode is the mode of the lazy queue. It can be set in the parameters when calling the channel.queueDeclare method, or it can be set through the Policy. If a queue is set using both methods at the same time, the Policy method has higher priority.
If you want to change the mode of an existing queue through declaration, you can only delete the queue first (the same is true for switches), and then re-declare a new one. When declaring the queue, you can set the mode of the queue through the "x-queue-mode" parameter. The values ​​are "default" and "lazy". The following example demonstrates the declaration details of a lazy queue:

    Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

RabbitMQ

RabbitMQ cluster

At the beginning, we introduced how to install and run the RabbitMQ service, but these are stand-alone versions and cannot meet the requirements of current real applications. What should you do if the RabbitMQ server encounters memory corruption, machine power outage, or motherboard failure?

Mirror queue, the reason for using mirroring
If there is only one Broker node in the RabbitMQ cluster, the failure of this node will cause the overall service to be temporarily unavailable, and may also lead to the loss of messages. All messages can be set to persistence, and the durable attribute of the corresponding queue is also set to true, but this still cannot avoid problems caused by caching: because there is a gap between the time the message is sent and the time it is written to the disk and the flushing action is performed. A short but problematic window of time . The publisherconfirm mechanism ensures that the client knows which messages have been stored on disk. However, you generally do not want to encounter service unavailability due to a single point of failure.
Introducing the Mirror Queue mechanism, the queue can be mirrored to other Broker nodes in the cluster. If a node in the cluster fails, the queue can automatically switch to another node in the mirror to ensure service continuity. Availability.

Federation switch:
The mirror queue is clustered, this is between two independent servers