About three-party system docking-------Using MQ landing production solution
About three-party system docking-------Using MQ landing production solution
This article is based on the transformation of the previous article to achieve two-way data transmission between systems .
Article link: Production environment message related solutions (consistency, sequential rows, reliability issues)_Message docking message sequence-CSDN Blog
1: Demand background
System A of company A and system B of company B (company B is Party A), referred to as system AB, transmit data through RabbitMQ . Because system B has been connected to other systems through MQ, MQ is also used for system A. Do docking, so that the amount of change for system B is basically 0
advantage
1: Application decoupling
There is no strong coupling between systems
2: Asynchronous
No real-time response is required between the two systems, improving user experience and system throughput
3: Peak clipping
Improve system stability
4: Low cost of multi-system docking code
If system A interacts with systems C and D later, system A basically does not need to be redeveloped. It only needs to copy the data and send it to the new team. The development efficiency is high.
shortcoming
1: The system complexity increases.
It needs to rely on MQ middleware and intermediate services. It has high requirements for the high availability of intermediate services and MQ.
2: High maintenance costs
Developers need to be familiar with relevant MQ knowledge and the readability of intermediate services. It is difficult to hand over after personnel changes, and the requirements for developers are high.
Applicable scene
1: If your system only needs to connect to one external system, it is not recommended to use this method. If your system connects to multiple systems or if you plan to connect to multiple systems in the future, then this method is recommended.
2: If your system needs to connect to comprehensive collaborative office software systems such as Enterprise WeChat, DingTalk, Feishu, etc., such as pushing approval messages and other messages from your system, then this method is recommended.
2: Systems involved
1: Internal system
Company A’s self-developed system and platform system (hereinafter referred to as System A)
2: Three-party system
Company B’s self-developed system (hereinafter referred to as System B) and ERP system
Three: Technology stack
1: Jdk8, Spring Boot2.7.15
2: Redis7.0.11 one master, two slaves and three sentinels mode
3: Mysql8.0 master-slave replication, read-write separation
4: Nginx1.24.0
5: RabbitMQ3.7.4 (provided by three parties, no need to build it yourself)
6: 3 Alibaba Cloud servers, 16 cores 8G
Four: Architecture design
Company A's self-developed system and platform system are interconnected internally and do not require us to deal with it (the platform system can be regarded as a docking platform, specifically used for docking with other systems. The platform system in this article is docked with the [intermediary service] we wrote ourselves) system)
Company B's self-developed system and the ERP system are internally connected and do not require our processing (Company B's self-developed system has similar functions to Company A's platform system above and is specifically used to connect with other systems)
RabbitMQ is originally owned by Company B, so we can use it directly.
What we need to do is to write an [intermediate service] to connect to company A's platform system through REST service, and connect to company B's self-developed system through MQ (for convenience, we will call them system A and system B later). The architecture diagram is as follows
Overall architecture design diagram 1
Because they have their own internal processing and docking, we do not need to deal with it. Therefore, for the convenience of understanding, it is simplified as follows.It can be understood that system A and system B transmit data through intermediate services and MQ.
Overall architecture design diagram 2
Intermediate service deployment information
Tencent Cloud deployment diagram
The outermost layer is Tencent Cloud load balancing to 3 servers. Operations and maintenance are responsible for setting up. The following is our own deployment.
Server 1: Nginx, intermediate service, Redis master, Redis sentinel 1, Mysql database
Server 2: Nginx, intermediate service, Redis slave 1, Redis sentinel 2, Mysql slave database 1
Server 3: Nginx, intermediate service, Redis slave 2, Redis sentinel 3, Mysql slave database 2
Five: Detailed architecture
1: If the problem of message loss is guaranteed (message reliability)
Create 2 new tables. The main table is used for polling of scheduled tasks, log viewing, and providing manual retry strategies. The sub-table is used to save the actual message body content.
Main table: id, mq_unique_id (message unique key), push_status (push status), retry_count (number of retries), err_msg (error message), push_to (push direction)
Subtable: id, pid (main table ID), content (message content), bill_name (document name), queue_name (message queue name)
Regardless of whether a message is received from system A or MQ, it will be dropped to the database first (to ensure reliability). After the database is successfully dropped, subsequent processing will be carried out. If it fails, a failure message will be returned to system A or MQ will be notified, and they will process it.
2: If the message repeatability is ensured
Parse the json data of both parties and process it through the unique message id. You can set a unique index. The java code captures the DuplicateKeyException exception and does not perform subsequent processing.
3: Database design
Main table: mq_master
``` 1. CREATE TABLE mq_master
(
-
id
bigint NOT NULL COMMENT 'id', -
mq_unique_id
varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '消息唯一id', -
push_status
smallint NOT NULL DEFAULT '0' COMMENT '推送状态(0未推送1推送成功2推送失败)', -
retry_count
smallint DEFAULT '0' COMMENT '重试次数', -
err_msg
varchar(2000) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '失败信息', -
push_to
varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '推送方向(0推送、1接收)', -
create_time
datetime DEFAULT NULL COMMENT '创建时间', -
update_time
datetime DEFAULT NULL COMMENT '修改时间', -
PRIMARY KEY (
id
), -
KEY
push_status
(push_status
) USING BTREE -
) ENGINE=InnoD
**Subtable: mq_slave**
``` 1. CREATE TABLE `mq_slave` (
2. `id` bigint NOT NULL,
3. `pid` bigint NOT NULL COMMENT '主表id',
4. `content` text CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '消息体',
5. `bill_name` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '单据名称',
6. `queue_name` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '队列名称',
7. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
8. `update_time` datetime DEFAULT NULL COMMENT '修改时间',
9. PRIMARY KEY (`id`),
10. UNIQUE KEY `pid` (`pid`) USING BTREE
11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
12.
4: Interconnection architecture between intermediate services and system A
Add Nginx between system A and the intermediate service for load balancing, because the service deploys 3 servers
Main logic:
1: After the A system call, it is first stored in the database. After the database is successfully stored, the message is asynchronously sent to MQ. When sending the message to MQ, a unique ID is used to add a distributed lock to prevent the scheduled task from processing the message at the same time when sending.
2.
3. /**
4. * @Tutle: null.java
5. * @description: rest服务统一入口,所有的rest服务都写在此处
6. * @author: Carry
7. * @create: 2023-07-20 10:03
8. * @Version 1.0
9. **/
10. @Slf4j
11. @RestController
12. @RequestMapping("/middleware/sync")
13. public class RestRouterController {
14.
15. @Autowired
16. IMqService mqService;
17.
18. @Autowired
19. ThreadPoolTaskExecutor threadPoolTaskExecutor;
20.
21.
22. /*** @Author Carry
23. * @Description rest服务统一入口,发送消息到MQ,为了方便,调用前需要约定调用格式
24. * @Date 17:32 2023/11/9
25. * @Param [params]
26. * @return
27. **/
28. @PostMapping(value = "/start")
29. public Result start(@RequestBody CommonDto commonDto) {
30. Result result = new Result();
31. String errMsg = "";
32. result.setErrorCode(ResultEnum.SYSTEM_ERROR.getErrorCode());
33. result.setErrorMessage(errMsg);
34.
35. if (commonDto.getBillName() == null) {
36. errMsg = "单据名称必传";
37. log.error(errMsg);
38.
39. return result;
40. }
41.
42. String billName = commonDto.getBillName();
43. // 根据不同的单据,获取不同的处理实现类
44. IBusinessService businessService = BusinessBeanFactory.getBusinessBeanByBillName(billName);
45.
46. if (businessService == null) {
47. errMsg = "未定义此业务处理类";
48. log.error(errMsg);
49.
50. return result;
51. }
52.
53.
54. // 入库操作以及获取发送到MQ的转换后的数据
55. Map<String, Object> trans2MqData;
56. try {
57. trans2MqData = businessService.getDataToMq(commonDto);
58. } catch (Exception e) {
59. errMsg = "信息入库中间服务失败,失败信息:" + e.getMessage();
60. log.error(errMsg, e);
61.
62. return result;
63. }
64.
65. Boolean saveMsgFlag = (Boolean) trans2MqData.get("saveMsgFlag");
66. if (!saveMsgFlag) {
67. String saveMsgErrMsg = String.valueOf(trans2MqData.get("saveMsgErrMsg"));
68. errMsg = "信息入库中间服务失败:" + saveMsgErrMsg;
69. log.error(errMsg);
70.
71. return result;
72. }
73.
74. try {
75. String routingKey = String.valueOf(trans2MqData.get("routingKey"));
76. String msg = String.valueOf(trans2MqData.get("msg"));
77. String uniqueId = String.valueOf(trans2MqData.get("uniqueId").toString());
78.
79. threadPoolTaskExecutor.execute(() -> mqService.sendMsgToMq(routingKey, msg, uniqueId));
80. } catch (Exception e) {
81. // 捕获异常 交给定时任务执行
82. errMsg = "信息入库中间服务成功,发送至MQ失败,失败信息:" + e.getMessage();
83. log.error(errMsg, e);
84.
85. return result;
86. }
87.
88.
89. return new Result();
90. }
91.
92. }

2: Add large-granularity distributed locks to scheduled tasks to prevent multiple services from starting scheduled tasks at the same time.
2. /**
3. * @Tutle: null.java
4. * @description: 定时 任务轮询处理
5. * @author: Carry
6. * @create: 2023-08-04 08:41
7. * @Version 1.0
8. **/
9. @Component
10. @EnableScheduling
11. public class EventTask implements IBusinessService {
12.
13. private static final Logger log = LoggerFactory.getLogger(EventTask.class);
14.
15.
16. @Autowired
17. IMqMasterService mqMasterService;
18.
19. @Autowired
20. IMqSlaveService mqSlaveService;
21.
22. @Autowired
23. Sender sender;
24.
25. @Autowired
26. private RedissonClient redissonClient;
27.
28. /**
29. * @return void
30. * @Author Carry
31. * @Description 30s
32. * @Date 8:42 2023/8/4
33. * @Param []
34. **/
35. @Scheduled(cron = "0/30 * * * * ?")
36. public void toAMqTask() {
37. RLock rLock = redissonClient.getLock(Constants.LOCK_TASK_MQ);
38.
39. try {
40. boolean isLocked = rLock.tryLock(Constants.EXPIRATION_60S, TimeUnit.SECONDS);
41. if (!isLocked) {
42. log.error("toAMqTask定时任务未获取到分布式锁,放弃执行任务。。。。。。。");
43.
44. return;
45. }
46.
47. String pushTo = "0";
48. // 获取待发送到三方的数据
49. List<MqMaster> mqMasterList = this.getDataList(pushTo);
50.
51. if (CollectionUtils.isEmpty(mqMasterList)) {
52. return;
53. }
54.
55. this.doprocess(mqMasterList, pushTo);
56.
57. } catch (Exception e) {
58. log.error("toAMqTask error :{},################################", e.getMessage(), e);
59. } finally {
60. if (rLock != null && rLock.isHeldByCurrentThread()) {
61. rLock.unlock();
62. }
63. }
64.
65. }
66.
67. /**
68. * @return void
69. * @Author Carry
70. * @Description 30s
71. * @Date 8:42 2023/8/4
72. * @Param []
73. **/
74. @Scheduled(cron = "0/30 * * * * ?")
75. public void toSelfTask() {
76. RLock rLock = redissonClient.getLock(Constants.LOCK_TASK_MQ);
77.
78. try {
79. boolean isLocked = rLock.tryLock(Constants.EXPIRATION_60S, TimeUnit.SECONDS);
80. if (!isLocked) {
81. log.error("toSelfTask定时任务未获取到分布式锁,放弃执行任务。。。。。。。");
82.
83. return;
84. }
85.
86.
87. String pushTo = "1";
88.
89. List<MqMaster> mqMasterList = this.getDataList(pushTo);
90.
91. if (CollectionUtils.isEmpty(mqMasterList)) {
92. return;
93. }
94.
95. this.doprocess(mqMasterList, pushTo);
96.
97. } catch (Exception e) {
98. log.error("toSelfTask error :{},################################", e.getMessage(), e);
99. } finally {
100. if (rLock != null && rLock.isHeldByCurrentThread()) {
101. rLock.unlock();
102. }
103. }
104.
105.
106. }
107.
108.
109. public List<MqMaster> getDataList(String direction) {
110. MqMaster mqMaster = new MqMaster();
111. mqMaster.setPushTo(direction);
112. mqMaster.setRetryCount(5);
113.
114. // 获取待发送的数据
115. List<MqMaster> mqMasterList = mqMasterService.selecListToPush(mqMaster);
116.
117. return mqMasterList;
118. }
119.
120. /**
121. * @return
122. * @Author Carry
123. * @Description 统一处理
124. * @Date 12:33 2023/8/16
125. * @Param
126. **/
127. public void doprocess(List<MqMaster> mqMasterList, String pushTo) {
128. List<String> pidList = mqMasterList.stream().map(MqMaster::getId).collect(Collectors.toList());
129.
130. List<MqSlave> msgContentList = mqSlaveService.selectListByPidList(pidList);
131. if (CollectionUtils.isEmpty(msgContentList)) {
132. return;
133. }
134.
135. msgContentList.stream().forEach(x -> {
136. String uniqueId = x.getPid();
137. String msg = x.getContent();
138.
139. String routingKey;
140. String queueName = x.getQueueName();
141. String billName = x.getBillName();
142.
143.
144. // 发送到MQ
145. if ("0".equals(pushTo)) {
146. // 根据billName 获取路由key 直接发送 因为数据库已经是转换后的格式
147. routingKey = BillName2QueueConstant.billName2QueueMap.get(billName);
148.
149. sender.toMq(routingKey, msg, String.valueOf(uniqueId));
150. } else {
151. // 不同队列不同实现类处理
152. try {
153. IBusinessService businessService = BusinessBeanFactory.getBusinessBeanByQueueName(queueName);
154.
155. businessService.getDataToSelf(msg, uniqueId);
156. } catch (Exception e) {
157. log.error("定时任务发送到系统出错 error :{},################################", e.getMessage(), e);
158. }
159. }
160.
161. });
162.
163. }
164. }

3: Update the message table if the message processing is successful or failed, and process it through RabbitMQ's reliableCallback mechanism.
2. /**
3. * @Tutle: null.java
4. * @description: 生产者端可靠性配置记录
5. * @author: Carry
6. * @create: 2023-07-20 17:21
7. * @Version 1.0
8. **/
9. @Configuration
10. @Component
11. public class ReliableConfig {
12.
13. private static final Logger log = LoggerFactory.getLogger(ReliableConfig.class);
14.
15. @Autowired
16. IMqMasterService mqMasterService;
17.
18. /**
19. * @return org.springframework.amqp.rabbit.core.RabbitTemplate
20. * @Author Carry
21. * @Description 生产者消息可靠性确认
22. * @Date 9:37 2023/8/18
23. * @Param [connectionFactory]
24. **/
25. @Bean
26. public RabbitTemplate reliableCallback(ConnectionFactory connectionFactory) {
27. RabbitTemplate rabbitTemplate = new RabbitTemplate();
28. rabbitTemplate.setConnectionFactory(connectionFactory);
29. // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
30. rabbitTemplate.setMandatory(true);
31.
32. // 消息可靠性确认
33. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
34. @Override
35. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
36. String uniqueId = String.valueOf(correlationData.getId());
37.
38. if (ack) {
39. log.error("数据发送到MQ成功,消息ID============" + uniqueId);
40. mqMasterService.successToUpdate(uniqueId);
41. } else {
42. log.error("ReliableConfigConfirmCallbackFalse:原因:" + cause);
43.
44. mqMasterService.errorToUpdate(uniqueId, cause);
45. }
46.
47. }
48. });
49.
50.
51.
52. }

Intermediate service detailed design diagram 1
5: Interconnection architecture between intermediate services and MQ
In order to achieve high efficiency and reduce network overhead, use rabbitMQ's push mode, that is, use Spring AMQP's SimpleMessageListenerContainer for processing.
The characteristic of this mode is that the consumer has a buffer to cache these messages, so there will always be a bunch of messages to be processed in the memory. When the message body is too large, it will cause buffer overflow and consume a lot of memory. In order to To prevent this problem, it is recommended to compromise and set prefetch to 1
2. public SimpleMessageListenerContainer simpleMessageListenerContainer() {
3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
4. // 最少消费者数量
5. container.setConcurrentConsumers(2);
6. // 最大消费者数量
7. container.setMaxConcurrentConsumers(5);
8. // spring-amqp 2.0版开始,默认的prefetch值是250,提高吞吐量,【但是处理速度比较慢的大消息时】,消息可能在内存中大量堆积,消耗大量内存,
9. // 最大吞吐量 max-concurrency*prefetch
10. container.setPrefetchCount(1);
11. // RabbitMQ默认是自动确认 , AcknowledgeMode.NONE,这里改为手动确认消息
12. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
13. container.setTaskExecutor(taskExecutor());
14. // 设置多个如下: 前提是队列都是必须已经创建存在的
15. container.setQueueNames(QueueConstant.XXXXX,QueueConstant.XXXXX);
16. // 消息接收处理类 AckRecivicerListener
17. container.setMessageListener(ackRecivicerListener);
18.
19. return container;
20. }

The main logic is similar to the previous one:
1: After listening to the MQ message, it is first stored in the database. After the database is successfully stored, the message is asynchronously sent to the A system. When sending the message, use a unique ID and a distributed lock to prevent the scheduled task from processing the message at the same time.
2: Add large-granularity distributed locks to scheduled tasks to prevent multiple services from starting scheduled tasks at the same time.
3: Update the message table if the message processing is successful or failed.
Intermediate service detailed design diagram 2
Six: Code structure
Module dependencies, the api module is the outermost layer, just package this module
2. 1. api ----对外暴露接口
3. 2. base ----公共类
4. 3. business ----业务操作
5. 4. consumer ----消费者
6. 5. producer ----生产者
7. 6. scheduled-task ----定时任务
code dependency graph
Seven: Module details
1: API
The api is the main service, providing rest service interface calling, auth authentication, and resources configuration files. You can directly enter this module after installing the jar package.
2: scheduled-task
This service is mainly used for scheduled task rotation training and service retry.
3: producer
Producer service. For MQ, our intermediate service is the producer. Messages sent to MQ rely on this service.
4: consumer
Consumer service. For MQ, our intermediate service is also a consumer. Receiving MQ messages depends on this service.
5: business
The business module is the most important module of the intermediate service. Most of the code is in this module, including functions such as addition, deletion, modification, and query, as well as the conversion of business documents. General message-related CRUD has been implemented, so you don’t need to pay attention to it. Mainly focus on your own. Business logic is sufficient, such as converting to MQ message format, receiving MQ messages and converting them into the format required by the system, etc.
Many common codes in it have been implemented, and TODO comments are added to those that need to be implemented by themselves, such as the parts of getting data into the database and sending to MQ. The template method pattern is used. Users only need to pay attention to the rewritten part:
2.
3. import com.alibaba.fastjson.JSON;
4. import com.alibaba.fastjson.serializer.SerializerFeature;
5. import com.carry.middleservice.business.service.common.impl.MsgServiceImpl;
6. import com.carry.middleservice.base.utils.SpringUtils;
7.
8. import java.util.*;
9.
10.
11. /**
12. * @Tutle: null.java
13. * @description: 统一业务处理类,发送到MQ
14. * @author: Carry
15. * @create: 2023-07-25 09:42
16. * @Version 1.0
17. **/
18. public abstract class AbstractMqBusinessService {
19.
20. /**
21. * @return java.util.Map<java.lang.String, java.lang.Object>
22. * @Author Carry
23. * @Description 组装发送到MQ的消息
24. * @Date 15:32 2023/11/10
25. * @Param [msgMap]
26. **/
27. public final Map<String, Object> trans2MqData(Map<String, Object> msgMap) {
28. Map<String, Object> result = new HashMap<>();
29.
30. // 1入库
31. result = this.self2Mysql(msgMap);
32.
33. // 2 获取路由
34. String routingKey = this.getRoutingKey();
35.
36. result.put("routingKey", routingKey);
37.
38. return result;
39. }
40.
41.
42.
43. /**
44. * @return java.lang.String
45. * @Author Carry
46. * @Description 获取路由信息,不同的单据自己实现此方法
47. * @Date 15:32 2023/11/10
48. * @Param []
49. **/
50. public abstract String getRoutingKey();
51.
52.
53. /**
54. * @return java.util.Map<java.lang.String, java.lang.Object>
55. * @Author Carry
56. * @Description 主数据 主表数据通用处理
57. * @Date 10:57 2023/8/15
58. * @Param [syncData]
59. **/
60. public Map<String, Object> self2Mysql(Map<String, Object> selfData) {
61. Map<String, Object> result = new HashMap<>();
62. String uniqueId = String.valueOf(selfData.get("uniqueId"));
63.
64. MsgServiceImpl msgService = SpringUtils.getBean(MsgServiceImpl.class);
65.
66. String selfDataStr = JSON.toJSONString(selfData, SerializerFeature.WriteMapNullValue, SerializerFeature.QuoteFieldNames);
67.
68. // 入库
69. boolean saveMsgFlag = msgService.saveMsg(null, selfDataStr, "0", uniqueId);
70.
71. // TODO 返回信息(根据自己系统的约定进行设计)
72. result.put("saveMsgFlag", saveMsgFlag);
73. result.put("uniqueId", uniqueId);
74. result.put("msg", selfDataStr);
75.
76.
77. return result;
78. }
79.
80.
81.
82. }

6: base
public method module