RabbitMQ’s Confirm mechanism
RabbitMQ’s Confirm mechanism
1. Reliability of the message
RabbitMQ provides a confirmation mechanism for Confirm.
The Confirm mechanism is used to confirm whether the message has been sent to the switch.
2.Java implementation
1. Import dependencies
<dependency>
<groupId>com.[rabbitmq](/search?q=rabbitmq)</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
2. Producer of Confirm mechanism
package com.qf.mq2302.hello;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
// Declare the name of the queue
public static final String QUEUE_NAME="queueA";
public static void main(String[] args) throws Exception {
//1. Get connection object
Connection conn = MQUtils.getConnection();
//Create a channel object, where most of the MQ operations are defined.
Channel channel = conn.createChannel();
//3 Enabling confirm
channel.confirmSelect();
//3. Declares a queue.
/**
* queue - the name of the queue
* durable - true means that the queue created is persistent (the queue will still exist when the mq is restarted)
* exclusive - whether the queue is exclusive or not (whether or not the pair can only be used by the current connection that created the queue)
* autoDelete - whether the queue can be automatically deleted by the mq server
* arguments - the other arguments of the queue, can be null
*/
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello doubleasdasda!";
//Producer how to send a message, just use the following method
/**
* exchange - the name of the switch, if it is an empty string, the message was sent to the default switch
* routingKey - the key of the route, when sending messages to the default switch, the routingkey represents the name of the queue.
* other properties - other properties of the message, can be null
* body - the content of the message, note that there is an array of bytes.
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//Check if the message was sent successfully
try {
/**
* Determine if it was sent to the switch and return true if it was.
* If the channel is not sent to the wrong switch because of a wrong switch name, an exception will be thrown and the channel will be closed automatically.
*/
if (channel.waitForConfirms()) {
//If true is returned, the switch has successfully received the message.
System.out.println("The message has been successfully sent to the switch");
// Close resource
channel.close();
}else {
System.out.println("Message sent to the switch failed");
// Close resource
channel.close();
}
} catch (InterruptedException e) {
System.out.println("Message sent to the switch failed");
System.out.println("Failed message is: "+message);
}
conn.close();
}
}
3. Consumers of confirm mechanism
package com.qf.mq2302.hello;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME="hello-queue";
public static void main(String[] args) throws Exception {
//1. Get connection object
Connection conn = MQUtils.getConnection();
//Create a channel object, where most of the MQ operations are defined.
Channel channel = conn.createChannel();
/**
* First parameter queue name
* The second parameter, durability
* The third parameter exclusivity
* Whether the fourth parameter is automatically deleted
* The fifth parameter, what type of queue can be defined
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.The processing logic after this consumer receives the message is written in the DeliverCallback object
DeliverCallback deliverCallback =new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(consumerTag);
//From the Delivery object, you can get the producer, the byte array of the sent message
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
//Write the consumer's business logic here, e.g., send an e-mail.
System.out.println(msg);
}
};
//4.Let the current consumer start consuming the messages in the (QUEUE_NAME) queue
/**
* queue – the name of the queue
* autoAck – true Indicates whether the current consumer is in autoconfirm mode. true means autoconfirm.
* deliverCallback – The logic of how a consumer handles a message when a message is sent to that consumer
* cancelCallback – When the consumer is canceled out, if you want to execute the code, write here
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
}
}
3. Integrate springboot implementation
1. Import dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.yml configuration file
spring:
rabbitmq:
host: 8.140.244.227
port: 6786
username: test
password: test
virtual-host: /test
publisher-confirm-type: correlated #Enabling the producer's confirm mechanism in a springboot project
3.RabbitMQ configuration file
package com.qf.bootmq2302.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//Set up the connection factory object
rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData:"+correlationData.getId());
System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));
//Through the id, you can go to redis and fetch the value message.
// represents whether the message is sent to the switch successfully, send failure false, send success true
System.out.println("ack:"+ack);
//Represents the cause of the error
System.out.println("cause:"+cause);
}
});
return rabbitTemplate;
}
}
4. The producer writes a Controller
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/test1")
public String test1(String msg,String routkey){
System.out.println(msg);
String exchangeName = "";// Default switch
String routingkey = routkey;// Queue name
// Create a CorrelationData object.
CorrelationData correlationData = new CorrelationData();
correlationData.setId("001");
Message message = new Message(msg.getBytes(), null);
correlationData.setReturnedMessage(message);
// To store the message content and message number in redis, key=message number, value=message content.
//key = bootmq:failmessage:001
// Producers send messages
// The fourth parameter, which can carry a customized correlationData
rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);
return "ok";
}
5. The consumer writes a receive queue message
@RabbitListener(queues = "queueA")
public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {
System.out.println(data);
//manual ack// If manual ack is turned on and no manual ack is given, follow prefetch: 1 # Equivalent to the amount of basicQos(1), that's it, won't give you more because you didn't confirm it. Confirm one, give you one.
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
6. Consumer Profile
spring:
rabbitmq:
host: 8.140.244.227
port: 6786
username: test
password: test
virtual-host: /test
#Manual ACK
listener:
simple:
acknowledge-mode: manual # Manual ack
prefetch: 1 # Equivalent to basicQos(1)