Skip to main content

RabbitMQ’s Confirm mechanism

RabbitMQ’s Confirm mechanism

1. Reliability of the message

RabbitMQ provides a confirmation mechanism for Confirm.

The Confirm mechanism is used to confirm whether the message has been sent to the switch.

2.Java implementation

1. Import dependencies

          <dependency>

<groupId>com.[rabbitmq](/search?q=rabbitmq)</groupId>

<artifactId>amqp-client</artifactId>

<version>5.6.0</version>

</dependency>



2. Producer of Confirm mechanism

  package com.qf.mq2302.hello;



import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;



public class Send {

// Declare the name of the queue

public static final String QUEUE_NAME="queueA";



public static void main(String[] args) throws Exception {



//1. Get connection object

Connection conn = MQUtils.getConnection();



//Create a channel object, where most of the MQ operations are defined.

Channel channel = conn.createChannel();



//3 Enabling confirm

channel.confirmSelect();

//3. Declares a queue.

/**

* queue - the name of the queue

* durable - true means that the queue created is persistent (the queue will still exist when the mq is restarted)

* exclusive - whether the queue is exclusive or not (whether or not the pair can only be used by the current connection that created the queue)

* autoDelete - whether the queue can be automatically deleted by the mq server

* arguments - the other arguments of the queue, can be null

*/

// channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello doubleasdasda!";



//Producer how to send a message, just use the following method

/**

* exchange - the name of the switch, if it is an empty string, the message was sent to the default switch

* routingKey - the key of the route, when sending messages to the default switch, the routingkey represents the name of the queue.

* other properties - other properties of the message, can be null

* body - the content of the message, note that there is an array of bytes.

*/

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");



//Check if the message was sent successfully

try {

/**

* Determine if it was sent to the switch and return true if it was.

* If the channel is not sent to the wrong switch because of a wrong switch name, an exception will be thrown and the channel will be closed automatically.

*/

if (channel.waitForConfirms()) {

//If true is returned, the switch has successfully received the message.

System.out.println("The message has been successfully sent to the switch");

// Close resource

channel.close();

}else {

System.out.println("Message sent to the switch failed");

// Close resource

channel.close();

}

} catch (InterruptedException e) {

System.out.println("Message sent to the switch failed");

System.out.println("Failed message is: "+message);

}







conn.close();

}

}



3. Consumers of confirm mechanism

  package com.qf.mq2302.hello;



import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.client.Delivery;



import java.io.IOException;



public class Recv {

private final static String QUEUE_NAME="hello-queue";



public static void main(String[] args) throws Exception {

//1. Get connection object

Connection conn = MQUtils.getConnection();



//Create a channel object, where most of the MQ operations are defined.

Channel channel = conn.createChannel();



/**

* First parameter queue name

* The second parameter, durability

* The third parameter exclusivity

* Whether the fourth parameter is automatically deleted

* The fifth parameter, what type of queue can be defined

*/

channel.queueDeclare(QUEUE_NAME, false, false, false, null);



//3.The processing logic after this consumer receives the message is written in the DeliverCallback object

DeliverCallback deliverCallback =new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

System.out.println(consumerTag);

//From the Delivery object, you can get the producer, the byte array of the sent message

byte[] body = message.getBody();

String msg = new String(body, "utf-8");



//Write the consumer's business logic here, e.g., send an e-mail.

System.out.println(msg);



}

};



//4.Let the current consumer start consuming the messages in the (QUEUE_NAME) queue

/**

* queue – the name of the queue

* autoAck – true Indicates whether the current consumer is in autoconfirm mode. true means autoconfirm.

* deliverCallback – The logic of how a consumer handles a message when a message is sent to that consumer

* cancelCallback – When the consumer is canceled out, if you want to execute the code, write here

*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});











}











}



3. Integrate springboot implementation

1. Import dependencies

          <dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>



2.yml configuration file

  spring:

rabbitmq:

host: 8.140.244.227

port: 6786

username: test

password: test

virtual-host: /test

publisher-confirm-type: correlated #Enabling the producer's confirm mechanism in a springboot project



3.RabbitMQ configuration file

  package com.qf.bootmq2302.config;



import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;



@Configuration

public class RabbitConfig {





@Bean

public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){

RabbitTemplate rabbitTemplate = new RabbitTemplate();



//Set up the connection factory object

rabbitTemplate.setConnectionFactory(cachingConnectionFactory);





rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("correlationData:"+correlationData.getId());

System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));



//Through the id, you can go to redis and fetch the value message.





// represents whether the message is sent to the switch successfully, send failure false, send success true

System.out.println("ack:"+ack);

//Represents the cause of the error

System.out.println("cause:"+cause);

}

});



return rabbitTemplate;

}







}



4. The producer writes a Controller

     @Autowired

RabbitTemplate rabbitTemplate;

@GetMapping("/test1")

public String test1(String msg,String routkey){

System.out.println(msg);

String exchangeName = "";// Default switch

String routingkey = routkey;// Queue name



// Create a CorrelationData object.

CorrelationData correlationData = new CorrelationData();

correlationData.setId("001");



Message message = new Message(msg.getBytes(), null);

correlationData.setReturnedMessage(message);



// To store the message content and message number in redis, key=message number, value=message content.

//key = bootmq:failmessage:001



// Producers send messages

// The fourth parameter, which can carry a customized correlationData

rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);

return "ok";

}



5. The consumer writes a receive queue message

     @RabbitListener(queues = "queueA")

public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {





System.out.println(data);



//manual ack// If manual ack is turned on and no manual ack is given, follow prefetch: 1 # Equivalent to the amount of basicQos(1), that's it, won't give you more because you didn't confirm it. Confirm one, give you one.

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);



}



6. Consumer Profile

  spring:

rabbitmq:

host: 8.140.244.227

port: 6786

username: test

password: test

virtual-host: /test

#Manual ACK

listener:

simple:

acknowledge-mode: manual # Manual ack

prefetch: 1 # Equivalent to basicQos(1)