Skip to main content

RabbitMQ SpringBoot advanced usage

RabbitMQ SpringBoot advanced usage

Rabbit advanced usage

  • 1. Rabbit Springboot integration
    • 1.1 Introducing dependencies
  • 1.2 Add configuration
  • 1.3 Add Config
  • 1.4 Writing Consumer
  • 1.5 Send message
  • 2. Advanced usage of Rabbit
    • 2.1 Message sending pre-processor
  • 2.2 Message sending confirmation mechanism
  • 2.3 Message receiving post-processor
  • 2.4 Transaction messages

1. Rabbit Springboot integration

1.1 Introducing dependencies

    <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 Add configuration

    server:
port: 8080
spring:
application:
name: rabbitmq-test
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: test
virtual-host: /
publisher-returns: true

1.3 Add Config

    @Configuration
@EnableConfigurationProperties(MqProperties.class)
public class MqConfig {

@Autowired
private MqProperties mqProperties;

@Bean
public MessageConverter messageConverter() {
// 设置消息转换器
return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(mqProperties.getHost());
connectionFactory.setPort(mqProperties.getPort());
connectionFactory.setUsername(mqProperties.getUsername());
connectionFactory.setPassword(mqProperties.getPassword());
connectionFactory.setVirtualHost(mqProperties.getVirtualHost());
connectionFactory.setPublisherReturns(mqProperties.getPublisherReturns());
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return connectionFactory;
}

@Bean
public Queue queue() {
return new Queue("test-queue", true, false, false);
}

@Bean
public Exchange exchange() {
return new DirectExchange("direct-exchange-test", true, false);
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("direct-key").noargs();
}
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

1.4 Writing Consumer

    @Component
public class DirectConsumer extends MessageListenerAdapter {
private static final Logger logger = LoggerFactory.getLogger(DirectConsumer.class);

@Autowired
private MessageConverter messageConverter;

@Override
@RabbitListener(queues = {"test-queue"}, ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws Exception {
try {
Map<String, String> msg = (Map<String, String>) messageConverter.fromMessage(message);
// 获取 correlation id
String id = (String) message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);
// String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("directConsumer>>>>>>message={}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
logger.error("directConsumer>>>>>>exception", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

1.5 Send message

    @RestController
@RequestMapping("/mq")
public class MqController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{data}")
public void test(@PathVariable(value = "data", required = false) String data) {
Map<String, String> msg = new HashMap<>();
msg.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
if (StringUtils.isEmpty(data)) {
data = String.valueOf(System.currentTimeMillis());
}
msg.put("data", "this is data:" + data);
rabbitTemplate.convertAndSend("direct-exchange-test", "direct-key", msg, new CorrelationData(UUID.randomUUID().toString()));
}
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

2. Advanced usage of Rabbit

2.1 Message sending pre-processor

RabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {....})

    @Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 添加消息前置处理器。
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq before postProcess message={}", JSON.toJSONString(message));
return message;
}
});
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

Through the preprocessor, you can modify messages, save messages, set general headers, etc.

2.2 Message sending confirmation mechanism

Set message sending ConfirmCallback, and the current method will be called whether the message is sent successfully or failed.

    @Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData) {
logger.info("-----------------rabbitmq correlationData postProcess message={}, correlationData={}",
JSON.toJSONString(message), JSON.toJSONString(correlationData));
return correlationData;
}
});
rabbitTemplate.setConnectionFactory(connectionFactory());
// 设置消息转换器
rabbitTemplate.setMessageConverter(messageConverter());

// 消息确认,需要配置 spring.rabbitmq.publisher-confirm-type = correlated
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("setConfirmCallback>>>>>>correlationData={} ack={}, cause={}", JSON.toJSONString(correlationData), ack, cause);
}
});
//开启mandatory模式(开启失败回调)
rabbitTemplate.setMandatory(true);
//添加失败回调方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
logger.info("setReturnCallback>>>>>消息发送队列不可达, message:{}, exchange:{}, routingKey:{}, 原因:{}", message, exchange, routingKey, replyText);
});
return rabbitTemplate;
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

Determine whether the message is sent successfully by new RabbitTemplate.ConfirmCallback()in .confirm(CorrelationData correlationData, boolean ack, String cause)

  • ack=true: sent successfully
  • ack=false: sending failed

2.3 Message receiving post-processor

Executed before receiving the message.
Way 1:

    /**
* 添加SimpleRabbitListenerContainerFactory
* 通过 @RabbitListener(queues = {"test-queue"}, containerFactory = "containerFactory", ackMode = "MANUAL") 设置
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(
MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 并发消费者数,默认为 1
factory.setConcurrentConsumers(5);
// 最大并发消费者数,默认为 1
factory.setMaxConcurrentConsumers(10);
// 拒绝未确认的消息并重新将它们放回队列,默认为 true
factory.setDefaultRequeueRejected(false);
// 容器启动时是否自动启动,默认为 true
factory.setAutoStartup(true);
// 消息确认模式,默认为 AUTO
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 每个消费者在一次请求中预获取的消息数,默认为 1
factory.setPrefetchCount(5);
// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
factory.setReceiveTimeout(0L);
// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
// factory.setTransactionManager(platformTransactionManager());
// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
factory.setMessageConverter(messageConverter);
// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
// 重试失败的消息之前等待的时间,默认为 5000 毫秒
factory.setRecoveryInterval(5000L);
// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
factory.setMissingQueuesFatal(false);
// 监听器容器连接工厂
factory.setConnectionFactory(connectionFactory());
// 设置后置处理器
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
return message;
}
});
return factory;
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

Way 2:

    @Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("directConsumer") DirectConsumer directConsumer) {
String queueName = "test-queue";

SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(directConsumer);
return simpleMessageListenerContainer;
}

@Autowired(required = false)
private List<AbstractMessageListenerContainer> simpleMessageListenerContainers;

@PostConstruct
public void init() {
if (CollectionUtils.isEmpty(simpleMessageListenerContainers)) {
return;
}
for (AbstractMessageListenerContainer simpleMessageListenerContainer : simpleMessageListenerContainers) {
simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));
return message;
}
});
// 设置手动 ACK
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
}
}


![RabbitMQ SpringBoot advanced usage](6b44e99974d17195ee2722671aabdc1f.png)

2.4 Transaction messages

It is not recommended to use rabbitmq transaction messages, which has a great impact on performance. It is recommended to implement transactions through a message sending confirmation mechanism.