Skip to main content

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Summary

This article mainly explains the process of mall integrating RabbitMQ to implement delayed messages, taking sending delayed messages to cancel timeout orders as an example. RabbitMQ is a widely used open source message queue . It is lightweight and easy to deploy, and it supports a variety of messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Introduction to project usage framework

RabbitMQ

RabbitMQ is a widely used open source message queue. It is lightweight and easy to deploy, and it supports a variety of messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Installation and use of RabbitMQ
  1. Install Erlang, download address: erlang.org/download/ot…

Mall integrates RabbitMQ to implement delayed messages

  1. Install RabbitMQ, download address: [dl.bintray.com/rabbitmq/al…](https://link.juejin.cn?target=https%3A%2F%2Fdl.bintray.com%2Frabbitmq%2Fall%2Frabbitmq-server%2F3.7.14%2Frabbitmq-server-3.7.14.exe)

Mall integrates RabbitMQ to implement delayed messages

  1. After the installation is complete, enter the sbin directory under the RabbitMQ installation directory

Mall integrates RabbitMQ to implement delayed messages

  1. Enter cmd in the address bar and press Enter to start the command line, then enter the following command to start the management function:
    复制代码rabbitmq-plugins enable rabbitmq_management

Mall integrates RabbitMQ to implement delayed messages

  1. Visit the address to check whether the installation is successful: http://localhost:15672/

Mall integrates RabbitMQ to implement delayed messages

  1. Enter your account password and log in: guest guest
  2. Create an account and set its role to administrator: mall mall

Mall integrates RabbitMQ to implement delayed messages

  1. Create a new virtual host as:/mall

Mall integrates RabbitMQ to implement delayed messages

  1. Click on the mall user to enter the user configuration page

Mall integrates RabbitMQ to implement delayed messages

  1. Configure the permissions of the virtual host for the mall user

Mall integrates RabbitMQ to implement delayed messages

  1. At this point, the installation and configuration of RabbitMQ is complete.
RabbitMQ message model

Mall integrates RabbitMQ to implement delayed messages

logoChinese nameEnglish namedescribe
PproducerProducerThe sender of the message, which can send the message to the switch
CconsumerConsumerThe receiver of the message gets the message from the queue for consumption
XswitchExchangeReceive messages sent by producers and send them to the specified queue according to the routing key
QqueueQueueStore messages sent from the switch
typeSwitch typetypeDirect means sending messages directly according to the routing key (orange/black)

Lombok

Lombok adds very interesting additional functions to the Java language. You no longer need to hand-write getters, setters and other methods for entity classes, you can have them through an annotation.

Note: You need to install idea's Lombok plug-in and add dependencies in the pom file in the project.

Mall integrates RabbitMQ to implement delayed messages

Business scenario description

It is used to solve the problem of how to cancel the order when the order times out after the user places the order.

  • The user places an order (there will be a series of operations such as locking product inventory, using coupons, and earning points);
  • Generate an order and get the order id;
  • Get the set order timeout (assuming it is set to 60 minutes to cancel the order without payment);
  • Send a delayed message to RabbitMQ according to the order timeout time, so that it can trigger the order cancellation operation after the order times out;
  • If the user does not pay, cancel the order (a series of operations such as releasing locked product inventory, returning coupons, and returning points).

Integrate RabbitMQ to implement delayed messages

Add relevant dependencies in pom.xml

    复制代码<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

Modify SpringBoot configuration file

Modify the application.yml file and add Mongodb related configuration under the spring node.

    复制代码  rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true

Add the enumeration configuration class QueueEnum of the message queue

Constant definitions used to delay message queues and process cancellation order message queues, including switch name, queue name, and routing key name.

    复制代码package com.macro.mall.tiny.dto;

import lombok.Getter;

/**
* 消息队列枚举配置
* Created by macro on 2018/9/14.
*/
@Getter
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;

QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Add RabbitMQ configuration

Used to configure switches, queues, and binding relationships between queues and switches.

    复制代码package com.macro.mall.tiny.config;

import com.macro.mall.tiny.dto.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 消息队列配置
* Created by macro on 2018/9/14.
*/
@Configuration
public class RabbitMqConfig {

/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}

/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}

/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}

/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}

}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)
You can see the following switches and queues on the RabbitMQ management page

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Switch and queue description
  • mall.order.direct (the switch to which the order cancellation message queue is bound): The bound queue is mall.order.cancel. Once a message is sent with mall.order.cancel as the routing key, it will be sent to this queue.
  • mall.order.direct.ttl (the switch to which the order delay message queue is bound): The bound queue is mall.order.cancel.ttl. Once a message is sent with mall.order.cancel.ttl as the routing key, it will Forward to this queue and save it for a certain period of time. After timeout, the message will be automatically sent to mall.order.cancel (cancel order message consumption queue).

Add the sender of delayed messages CancelOrderSender

Used to send messages to the order delay message queue (mall.order.cancel.ttl).

    复制代码package com.macro.mall.tiny.component;

import com.macro.mall.tiny.dto.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 取消订单消息的发出者
* Created by macro on 2018/9/14.
*/
@Component
public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;

public void sendMessage(Long orderId,final long delayTimes){
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Add the receiver of the order cancellation message CancelOrderReceiver

Used to receive messages from the order cancellation message queue (mall.order.cancel).

    复制代码package com.macro.mall.tiny.component;

import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 取消订单消息的处理者
* Created by macro on 2018/9/14.
*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Add OmsPortalOrderService interface

    复制代码package com.macro.mall.tiny.service;

import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.dto.OrderParam;
import org.springframework.transaction.annotation.Transactional;

/**
* 前台订单管理Service
* Created by macro on 2018/8/30.
*/
public interface OmsPortalOrderService {

/**
* 根据提交信息生成订单
*/
@Transactional
CommonResult generateOrder(OrderParam orderParam);

/**
* 取消单个超时订单
*/
@Transactional
void cancelOrder(Long orderId);
}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Add the implementation class OmsPortalOrderServiceImpl of OmsPortalOrderService

    复制代码package com.macro.mall.tiny.service.impl;

import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.component.CancelOrderSender;
import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* 前台订单管理Service
* Created by macro on 2018/8/30.
*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;

@Override
public CommonResult generateOrder(OrderParam orderParam) {
//todo 执行一系类下单操作,具体参考mall项目
LOGGER.info("process generateOrder");
//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
sendDelayMessageCancelOrder(11L);
return CommonResult.success(null, "下单成功");
}

@Override
public void cancelOrder(Long orderId) {
//todo 执行一系类取消订单操作,具体参考mall项目
LOGGER.info("process cancelOrder orderId:{}",orderId);
}

private void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间,假设为60分钟
long delayTimes = 30 * 1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}

}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Add OmsPortalOrderController definition interface

    复制代码package com.macro.mall.tiny.controller;

import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

/**
* 订单管理Controller
* Created by macro on 2018/8/30.
*/
@Controller
@Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;

@ApiOperation("根据购物车信息生成订单")
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}


![Mall integrates RabbitMQ to implement delayed messages](6b44e99974d17195ee2722671aabdc1f.png)

Perform interface testing

Call the order interface

NOTE: The delay message time has been set to 30 seconds

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Mall integrates RabbitMQ to implement delayed messages

Project source code address

github.com/macrozheng/…