Release confirmation of rabbitmq message
Release confirmation of rabbitmq message
Release confirmation of rabbitmq message
Add related configuration to configuration file
# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调大宋这
spring.rabbitmq.publisher-returns=true
Release confirmation type
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}

ConfigureRabbitTemplate
@Slf4j
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
/**
*
* mandatory /ˈmændətəri/ 强制性的
* true 找不到队列时 broker会调用basic.return方法将消息返还给生产者
* false 找不到队列时 直接丢弃消息
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息已经到达Exchange");
} else {
log.info("消息没有到达Exchange");
}
if (correlationData != null) {
log.info("相关数据:" + correlationData);
}
if (cause != null) {
log.info("原因:" + cause);
}
});
/**
* 1.是否通过事物实现 或者对应 channel.txSelect()??
* 2.消息到达队列了,还未持久化,rabbitmq挂掉了.returnCallback会调用吗 ??
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息无法到达队列时触发");
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}

Configure test switches and queues
@Slf4j
@Configuration
public class ConfirmConfig {
@Bean
public Queue confirmQueue() {
return new Queue(Constant.CONFIRM_QUEUE, false);
}
@Bean
DirectExchange confirmExchange() {
DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, false, false);
// 设置备份交换机 当消息无法到达队列时会进入备份交换机
directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
return directExchange;
}
@Bean
Binding bindingConfirm() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);
}
@Bean
FanoutExchange backupExchange() {
return new FanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE, false, false);
}
@Bean
public Queue backupQueue() {
return new Queue(Constant.CONFIRM_BACKUP_QUEUE, false);
}
@Bean
public Queue warningQueue() {
return new Queue(Constant.CONFIRM_WARNING_QUEUE, false);
}
@Bean
Binding bindingConfirmBackup() {
return BindingBuilder.bind(backupQueue()).to(backupExchange());
}
@Bean
Binding bindingConfirmWarning() {
return BindingBuilder.bind(warningQueue()).to(backupExchange());
}
}

Send a message
Message cannot reach switch
@Autowired
RabbitTemplate rabbitTemplate;
String msg = "一条用于发布确认的消息";
@GetMapping("/noExchange")
public void noExchange() {
// 找不到交换机
rabbitTemplate.convertAndSend("noExchange", "noExchange", msg);
}
Note: Due to rabbitTemplate.setMandatory(true), it will also be called back when the switch cannot be reached.
ConfirmCallback 消息没有到达Exchange
ConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
Message arriving at switch cannot reach queue
@GetMapping("/toExchange")
public void toExchange() {
// 到达交换机,找不到队列
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, "xxx.xxx.xxx", msg);
}
ConfirmCallback 消息已经到达Exchange
No message was received that could not reach the queue, why?
Because the backup queue is configured, the message can successfully reach the backup queue.
Log out of the backup queue and try again
@Bean
DirectExchange confirmExchange() {
DirectExchange directExchange = new DirectExchange(Constant.CONFIRM_EXCHANGE, true, false);
// 设置备份交换机 当消息无法到达队列时会进入备份交换机
// directExchange.addArgument("alternate-exchange", Constant.CONFIRM_BACKUP_EXCHANGE);
return directExchange;
}
消息无法到达队列时触发
ReturnCallback: 消息:(Body:'一条用于发布确认的消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:myConfirmExchange
ReturnCallback: 路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange
Both ConfirmCallback and ReturnCallback are called
Successfully arrived at the queue
@GetMapping("/toQueue")
public void toQueue() {
// 正常到达队列
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE, Constant.CONFIRM_ROUTING_KEY, msg);
}
ConfirmCallback 消息已经到达Exchange