Spring Boot integrates RabbitMQ routing mode (Direct)
Spring Boot integrates RabbitMQ routing mode (Direct)
The routing mode (Direct mode) in RabbitMQ should be a mode that is used more in actual work. The difference between this mode and the publish and subscribe mode is that the routing mode needs to have a routingKey. In terms of configuration, the switch type needs to be injected. A switch bean object of type DirectExchange. During the binding process between the switch and the queue, the binding relationship needs to be bound to a routing key. Since the automatic confirmation mode is unlikely to be used in actual work, in the process of integrating the routing mode, we still use the double confirmation mechanism for sending messages and the manual confirmation mechanism on the consumer side to ensure the accurate delivery and message prevention of messages. lost.
1. Add configuration
In the configuration file, configure the relevant account information of rabbitmq and enable the message sending callback mechanism. The configuration file is actually the same as the publish-subscribe mode. The configuration details are as follows:
server:
port: 10001
spring:
application:
name: springboot-rabbitmq-s1
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
# 发送者开启 return 确认机制
publisher-returns: true
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated

2. Create configuration class
创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author alen
* @DATE 2022/6/7 23:50
*/
@Slf4j
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "direct-order-exchange";
public static final String SMS_QUEUE = "sms-direct-queue";
public static final String EMAIL_QUEUE = "email-direct-queue";
public static final String WECHAT_QUEUE = "wechat-direct-queue";
/**
* 1.
* 声明交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
/**
* directExchange的参数说明:
* 1. 交换机名称
* 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
* 3. 是否自动删除 false:不自动删除 true:自动删除
*/
return new DirectExchange(EXCHANGE_NAME, true, false);
}
/**
* 2.
* 声明队列
* @return
*/
@Bean
public Queue smsQueue() {
/**
* Queue构造函数参数说明
* 1. 队列名
* 2. 是否持久化 true:持久化 false:不持久化
*/
return new Queue(SMS_QUEUE, true);
}
@Bean
public Queue emailQueue() {
return new Queue(EMAIL_QUEUE, true);
}
@Bean
public Queue wechatQueue() {
return new Queue(WECHAT_QUEUE, true);
}
/**
* 3.
* 队列与交换机绑定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding wechatBinding() {
return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
}
/**
* 将自定义的RabbitTemplate对象注入bean容器
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启消息推送结果回调
rabbitTemplate.setMandatory(true);
//设置ConfirmCallback回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("==============ConfirmCallback start ===============");
log.info("回调数据:{}", correlationData);
log.info("确认结果:{}", ack);
log.info("返回原因:{}", cause);
log.info("==============ConfirmCallback end =================");
}
});
//设置ReturnCallback回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("==============ReturnCallback start ===============");
log.info("发送消息:{}", JSONUtil.toJsonStr(message));
log.info("结果状态码:{}", replyCode);
log.info("结果状态信息:{}", replyText);
log.info("交换机:{}", exchange);
log.info("路由key:{}", routingKey);
log.info("==============ReturnCallback end =================");
}
});
return rabbitTemplate;
}
}

3. Consumer configuration
在消费者项目的配置文件中开启手动确认,配置详情如下:
server:
port: 10002
spring:
application:
name: springboot-rabbitmq-s2
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
acknowledge-mode: manual

4. Create consumers
Create three consumers respectively, DirectEmailConsumer, DirectSmsConsumer, and DirectWechatConsumer to monitor the corresponding queues and consume them after receiving messages. The three consumers are similar, as follows:
4.1 DirectEmailConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author alen
* @DATE 2022/6/10 22:54
*/
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {
//标记消费者逻辑执行方法
@RabbitHandler
public void emailMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("Email direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}

4.2 DirectSmsConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author alen
* @DATE 2022/6/10 22:55
*/
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {
@RabbitHandler
public void smsMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("sms direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}

4.3 DirectWechatConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author chaoxian.wu
* @DATE 2022/6/10 22:55
*/
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {
@RabbitHandler
public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("wechat direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
//basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
// basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}

The above is all the code part. Next we enter the test to see the actual effect. First publish a message with routingKey=sms to see if only the corresponding queue receives the message. The message sending details:
package com.study.rabbitmq;
import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
for (long i = 1; i < 2; i++) {
//交换机名称
String exchangeName = "direct-order-exchange";
//路由key
String routingKey = "sms";
Order order = buildOrder(i);
orderService.createOrder(order, routingKey, exchangeName);
}
}
private Order buildOrder(long id) {
Order order = new Order();
order.setRequestId(id);
order.setUserId(id);
order.setOrderNo(UUID.randomUUID().toString());
order.setAmount(10L);
order.setGoodsNum(1);
order.setTotalAmount(10L);
return order;
}
}

We log in to the rabbitmq management background and check that only the sms-direct-queue queue has a message. The effect is as follows:
Let's start the consumer and see if only the consumer that listens to the sms-direct-queue queue has consumption logs. The effect is as follows:
Send another message with routingKey=email and consume the log. The effect is as shown below.
At this point, the routing mode of springboot integrating rabbitmq is actually over. This mode is still relatively common in work. What we demonstrate is the effect of a single point. In actual work, it is unlikely to use a single point of service deployment. Now we pay attention to it. High availability of services requires service cluster deployment, which also involves the issue of repeated consumption of messages that needs to be dealt with. Personally, I think that when encountering repeated consumption problems, the first thing I think of is distributed locks, haha~. But what to lock? It must be a unique attribute in the message. To prevent repeated consumption of messages.
In the whole process, there is actually a small problem that has not been verified, that is, the ReturnCallback callback mechanism is not triggered, because this will only be triggered when the switch fails to send the message to the queue. Then we can trigger it by sending a routingKey that does not exist. Now, we send a message with routingKey=duanxin, which will definitely not be sent successfully. Let’s check the effect through breakpoints. The effect is as follows:
Then all our common integrations are completed. Of course, the double confirmation mechanism is turned on. Although we can detect the results of message delivery, we can then provide early warning for the results of delivery failure. However, if this operation is enabled, it will inevitably affect the efficiency of message processing. Therefore, it is necessary to determine whether this confirmation mechanism needs to be used based on the actual business scenario.