Skip to main content

About three-party system docking-------Using MQ landing production solution

About three-party system docking-------Using MQ landing production solution

This article is based on the transformation of the previous article to achieve two-way data transmission between systems .
Article link: Production environment message related solutions (consistency, sequential rows, reliability issues)_Message docking message sequence-CSDN Blog

1: Demand background

System A of company A and system B of company B (company B is Party A), referred to as system AB, transmit data through RabbitMQ . Because system B has been connected to other systems through MQ, MQ is also used for system A. Do docking, so that the amount of change for system B is basically 0

advantage

1: Application decoupling

There is no strong coupling between systems

2: Asynchronous

No real-time response is required between the two systems, improving user experience and system throughput

3: Peak clipping

Improve system stability

4: Low cost of multi-system docking code

If system A interacts with systems C and D later, system A basically does not need to be redeveloped. It only needs to copy the data and send it to the new team. The development efficiency is high.

shortcoming

1: The system complexity increases.
It needs to rely on MQ middleware and intermediate services. It has high requirements for the high availability of intermediate services and MQ.

2: High maintenance costs

Developers need to be familiar with relevant MQ knowledge and the readability of intermediate services. It is difficult to hand over after personnel changes, and the requirements for developers are high.

Applicable scene

1: If your system only needs to connect to one external system, it is not recommended to use this method. If your system connects to multiple systems or if you plan to connect to multiple systems in the future, then this method is recommended.

2: If your system needs to connect to comprehensive collaborative office software systems such as Enterprise WeChat, DingTalk, Feishu, etc., such as pushing approval messages and other messages from your system, then this method is recommended.

2: Systems involved

1: Internal system

Company A’s self-developed system and platform system (hereinafter referred to as System A)

2: Three-party system

Company B’s self-developed system (hereinafter referred to as System B) and ERP system

Three: Technology stack

1: Jdk8, Spring Boot2.7.15

2: Redis7.0.11 one master, two slaves and three sentinels mode

3: Mysql8.0 master-slave replication, read-write separation

4: Nginx1.24.0

5: RabbitMQ3.7.4 (provided by three parties, no need to build it yourself)

6: 3 Alibaba Cloud servers, 16 cores 8G

Four: Architecture design

Company A's self-developed system and platform system are interconnected internally and do not require us to deal with it (the platform system can be regarded as a docking platform, specifically used for docking with other systems. The platform system in this article is docked with the [intermediary service] we wrote ourselves) system)

Company B's self-developed system and the ERP system are internally connected and do not require our processing (Company B's self-developed system has similar functions to Company A's platform system above and is specifically used to connect with other systems)

RabbitMQ is originally owned by Company B, so we can use it directly.

What we need to do is to write an [intermediate service] to connect to company A's platform system through REST service, and connect to company B's self-developed system through MQ (for convenience, we will call them system A and system B later). The architecture diagram is as follows

About three-party system docking-------Using MQ landing production solution

Overall architecture design diagram 1

Because they have their own internal processing and docking, we do not need to deal with it. Therefore, for the convenience of understanding, it is simplified as follows.It can be understood that system A and system B transmit data through intermediate services and MQ.

About three-party system docking-------Using MQ landing production solution

Overall architecture design diagram 2

Intermediate service deployment information

About three-party system docking-------Using MQ landing production solution

Tencent Cloud deployment diagram

The outermost layer is Tencent Cloud load balancing to 3 servers. Operations and maintenance are responsible for setting up. The following is our own deployment.

Server 1: Nginx, intermediate service, Redis master, Redis sentinel 1, Mysql database

Server 2: Nginx, intermediate service, Redis slave 1, Redis sentinel 2, Mysql slave database 1

Server 3: Nginx, intermediate service, Redis slave 2, Redis sentinel 3, Mysql slave database 2

Five: Detailed architecture

1: If the problem of message loss is guaranteed (message reliability)

Create 2 new tables. The main table is used for polling of scheduled tasks, log viewing, and providing manual retry strategies. The sub-table is used to save the actual message body content.

Main table: id, mq_unique_id (message unique key), push_status (push status), retry_count (number of retries), err_msg (error message), push_to (push direction)

Subtable: id, pid (main table ID), content (message content), bill_name (document name), queue_name (message queue name)

Regardless of whether a message is received from system A or MQ, it will be dropped to the database first (to ensure reliability). After the database is successfully dropped, subsequent processing will be carried out. If it fails, a failure message will be returned to system A or MQ will be notified, and they will process it.

2: If the message repeatability is ensured

Parse the json data of both parties and process it through the unique message id. You can set a unique index. The java code captures the DuplicateKeyException exception and does not perform subsequent processing.

3: Database design

Main table: mq_master ``` 1. CREATE TABLE mq_master (

  1. id bigint NOT NULL COMMENT 'id',

  2. mq_unique_id varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '消息唯一id',

  3. push_status smallint NOT NULL DEFAULT '0' COMMENT '推送状态(0未推送1推送成功2推送失败)',

  4. retry_count smallint DEFAULT '0' COMMENT '重试次数',

  5. err_msg varchar(2000) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '失败信息',

  6. push_to varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '推送方向(0推送、1接收)',

  7. create_time datetime DEFAULT NULL COMMENT '创建时间',

  8. update_time datetime DEFAULT NULL COMMENT '修改时间',

  9. PRIMARY KEY (id),

  10. KEY push_status (push_status) USING BTREE

  11. ) ENGINE=InnoD


**Subtable: mq_slave**
``` 1. CREATE TABLE `mq_slave` (

2. `id` bigint NOT NULL,

3. `pid` bigint NOT NULL COMMENT '主表id',

4. `content` text CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '消息体',

5. `bill_name` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '单据名称',

6. `queue_name` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '队列名称',

7. `create_time` datetime DEFAULT NULL COMMENT '创建时间',

8. `update_time` datetime DEFAULT NULL COMMENT '修改时间',

9. PRIMARY KEY (`id`),

10. UNIQUE KEY `pid` (`pid`) USING BTREE

11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

12.



4: Interconnection architecture between intermediate services and system A

Add Nginx between system A and the intermediate service for load balancing, because the service deploys 3 servers

Main logic:

1: After the A system call, it is first stored in the database. After the database is successfully stored, the message is asynchronously sent to MQ. When sending the message to MQ, a unique ID is used to add a distributed lock to prevent the scheduled task from processing the message at the same time when sending.

    
2.

3. /**

4. * @Tutle: null.java

5. * @description: rest服务统一入口,所有的rest服务都写在此处

6. * @author: Carry

7. * @create: 2023-07-20 10:03

8. * @Version 1.0

9. **/

10. @Slf4j

11. @RestController

12. @RequestMapping("/middleware/sync")

13. public class RestRouterController {

14.

15. @Autowired

16. IMqService mqService;

17.

18. @Autowired

19. ThreadPoolTaskExecutor threadPoolTaskExecutor;

20.

21.

22. /*** @Author Carry

23. * @Description rest服务统一入口,发送消息到MQ,为了方便,调用前需要约定调用格式

24. * @Date 17:32 2023/11/9

25. * @Param [params]

26. * @return

27. **/

28. @PostMapping(value = "/start")

29. public Result start(@RequestBody CommonDto commonDto) {

30. Result result = new Result();

31. String errMsg = "";

32. result.setErrorCode(ResultEnum.SYSTEM_ERROR.getErrorCode());

33. result.setErrorMessage(errMsg);

34.

35. if (commonDto.getBillName() == null) {

36. errMsg = "单据名称必传";

37. log.error(errMsg);

38.

39. return result;

40. }

41.

42. String billName = commonDto.getBillName();

43. // 根据不同的单据,获取不同的处理实现类

44. IBusinessService businessService = BusinessBeanFactory.getBusinessBeanByBillName(billName);

45.

46. if (businessService == null) {

47. errMsg = "未定义此业务处理类";

48. log.error(errMsg);

49.

50. return result;

51. }

52.

53.

54. // 入库操作以及获取发送到MQ的转换后的数据

55. Map<String, Object> trans2MqData;

56. try {

57. trans2MqData = businessService.getDataToMq(commonDto);

58. } catch (Exception e) {

59. errMsg = "信息入库中间服务失败,失败信息:" + e.getMessage();

60. log.error(errMsg, e);

61.

62. return result;

63. }

64.

65. Boolean saveMsgFlag = (Boolean) trans2MqData.get("saveMsgFlag");

66. if (!saveMsgFlag) {

67. String saveMsgErrMsg = String.valueOf(trans2MqData.get("saveMsgErrMsg"));

68. errMsg = "信息入库中间服务失败:" + saveMsgErrMsg;

69. log.error(errMsg);

70.

71. return result;

72. }

73.

74. try {

75. String routingKey = String.valueOf(trans2MqData.get("routingKey"));

76. String msg = String.valueOf(trans2MqData.get("msg"));

77. String uniqueId = String.valueOf(trans2MqData.get("uniqueId").toString());

78.

79. threadPoolTaskExecutor.execute(() -> mqService.sendMsgToMq(routingKey, msg, uniqueId));

80. } catch (Exception e) {

81. // 捕获异常 交给定时任务执行

82. errMsg = "信息入库中间服务成功,发送至MQ失败,失败信息:" + e.getMessage();

83. log.error(errMsg, e);

84.

85. return result;

86. }

87.

88.

89. return new Result();

90. }

91.

92. }




![About three-party system docking-------Using MQ landing production solution](6b44e99974d17195ee2722671aabdc1f.png)

2: Add large-granularity distributed locks to scheduled tasks to prevent multiple services from starting scheduled tasks at the same time.

    
2. /**

3. * @Tutle: null.java

4. * @description: 定时任务轮询处理

5. * @author: Carry

6. * @create: 2023-08-04 08:41

7. * @Version 1.0

8. **/

9. @Component

10. @EnableScheduling

11. public class EventTask implements IBusinessService {

12.

13. private static final Logger log = LoggerFactory.getLogger(EventTask.class);

14.

15.

16. @Autowired

17. IMqMasterService mqMasterService;

18.

19. @Autowired

20. IMqSlaveService mqSlaveService;

21.

22. @Autowired

23. Sender sender;

24.

25. @Autowired

26. private RedissonClient redissonClient;

27.

28. /**

29. * @return void

30. * @Author Carry

31. * @Description 30s

32. * @Date 8:42 2023/8/4

33. * @Param []

34. **/

35. @Scheduled(cron = "0/30 * * * * ?")

36. public void toAMqTask() {

37. RLock rLock = redissonClient.getLock(Constants.LOCK_TASK_MQ);

38.

39. try {

40. boolean isLocked = rLock.tryLock(Constants.EXPIRATION_60S, TimeUnit.SECONDS);

41. if (!isLocked) {

42. log.error("toAMqTask定时任务未获取到分布式锁,放弃执行任务。。。。。。。");

43.

44. return;

45. }

46.

47. String pushTo = "0";

48. // 获取待发送到三方的数据

49. List<MqMaster> mqMasterList = this.getDataList(pushTo);

50.

51. if (CollectionUtils.isEmpty(mqMasterList)) {

52. return;

53. }

54.

55. this.doprocess(mqMasterList, pushTo);

56.

57. } catch (Exception e) {

58. log.error("toAMqTask error :{},################################", e.getMessage(), e);

59. } finally {

60. if (rLock != null && rLock.isHeldByCurrentThread()) {

61. rLock.unlock();

62. }

63. }

64.

65. }

66.

67. /**

68. * @return void

69. * @Author Carry

70. * @Description 30s

71. * @Date 8:42 2023/8/4

72. * @Param []

73. **/

74. @Scheduled(cron = "0/30 * * * * ?")

75. public void toSelfTask() {

76. RLock rLock = redissonClient.getLock(Constants.LOCK_TASK_MQ);

77.

78. try {

79. boolean isLocked = rLock.tryLock(Constants.EXPIRATION_60S, TimeUnit.SECONDS);

80. if (!isLocked) {

81. log.error("toSelfTask定时任务未获取到分布式锁,放弃执行任务。。。。。。。");

82.

83. return;

84. }

85.

86.

87. String pushTo = "1";

88.

89. List<MqMaster> mqMasterList = this.getDataList(pushTo);

90.

91. if (CollectionUtils.isEmpty(mqMasterList)) {

92. return;

93. }

94.

95. this.doprocess(mqMasterList, pushTo);

96.

97. } catch (Exception e) {

98. log.error("toSelfTask error :{},################################", e.getMessage(), e);

99. } finally {

100. if (rLock != null && rLock.isHeldByCurrentThread()) {

101. rLock.unlock();

102. }

103. }

104.

105.

106. }

107.

108.

109. public List<MqMaster> getDataList(String direction) {

110. MqMaster mqMaster = new MqMaster();

111. mqMaster.setPushTo(direction);

112. mqMaster.setRetryCount(5);

113.

114. // 获取待发送的数据

115. List<MqMaster> mqMasterList = mqMasterService.selecListToPush(mqMaster);

116.

117. return mqMasterList;

118. }

119.

120. /**

121. * @return

122. * @Author Carry

123. * @Description 统一处理

124. * @Date 12:33 2023/8/16

125. * @Param

126. **/

127. public void doprocess(List<MqMaster> mqMasterList, String pushTo) {

128. List<String> pidList = mqMasterList.stream().map(MqMaster::getId).collect(Collectors.toList());

129.

130. List<MqSlave> msgContentList = mqSlaveService.selectListByPidList(pidList);

131. if (CollectionUtils.isEmpty(msgContentList)) {

132. return;

133. }

134.

135. msgContentList.stream().forEach(x -> {

136. String uniqueId = x.getPid();

137. String msg = x.getContent();

138.

139. String routingKey;

140. String queueName = x.getQueueName();

141. String billName = x.getBillName();

142.

143.

144. // 发送到MQ

145. if ("0".equals(pushTo)) {

146. // 根据billName 获取路由key 直接发送 因为数据库已经是转换后的格式

147. routingKey = BillName2QueueConstant.billName2QueueMap.get(billName);

148.

149. sender.toMq(routingKey, msg, String.valueOf(uniqueId));

150. } else {

151. // 不同队列不同实现类处理

152. try {

153. IBusinessService businessService = BusinessBeanFactory.getBusinessBeanByQueueName(queueName);

154.

155. businessService.getDataToSelf(msg, uniqueId);

156. } catch (Exception e) {

157. log.error("定时任务发送到系统出错 error :{},################################", e.getMessage(), e);

158. }

159. }

160.

161. });

162.

163. }

164. }




![About three-party system docking-------Using MQ landing production solution](6b44e99974d17195ee2722671aabdc1f.png)

3: Update the message table if the message processing is successful or failed, and process it through RabbitMQ's reliableCallback mechanism.

    
2. /**

3. * @Tutle: null.java

4. * @description: 生产者端可靠性配置记录

5. * @author: Carry

6. * @create: 2023-07-20 17:21

7. * @Version 1.0

8. **/

9. @Configuration

10. @Component

11. public class ReliableConfig {

12.

13. private static final Logger log = LoggerFactory.getLogger(ReliableConfig.class);

14.

15. @Autowired

16. IMqMasterService mqMasterService;

17.

18. /**

19. * @return org.springframework.amqp.rabbit.core.RabbitTemplate

20. * @Author Carry

21. * @Description 生产者消息可靠性确认

22. * @Date 9:37 2023/8/18

23. * @Param [connectionFactory]

24. **/

25. @Bean

26. public RabbitTemplate reliableCallback(ConnectionFactory connectionFactory) {

27. RabbitTemplate rabbitTemplate = new RabbitTemplate();

28. rabbitTemplate.setConnectionFactory(connectionFactory);

29. // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数

30. rabbitTemplate.setMandatory(true);

31.

32. // 消息可靠性确认

33. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

34. @Override

35. public void confirm(CorrelationData correlationData, boolean ack, String cause) {

36. String uniqueId = String.valueOf(correlationData.getId());

37.

38. if (ack) {

39. log.error("数据发送到MQ成功,消息ID============" + uniqueId);

40. mqMasterService.successToUpdate(uniqueId);

41. } else {

42. log.error("ReliableConfigConfirmCallbackFalse:原因:" + cause);

43.

44. mqMasterService.errorToUpdate(uniqueId, cause);

45. }

46.

47. }

48. });

49.

50.

51.

52. }




![About three-party system docking-------Using MQ landing production solution](6b44e99974d17195ee2722671aabdc1f.png)

About three-party system docking-------Using MQ landing production solution

Intermediate service detailed design diagram 1

5: Interconnection architecture between intermediate services and MQ

In order to achieve high efficiency and reduce network overhead, use rabbitMQ's push mode, that is, use Spring AMQP's SimpleMessageListenerContainer for processing.

The characteristic of this mode is that the consumer has a buffer to cache these messages, so there will always be a bunch of messages to be processed in the memory. When the message body is too large, it will cause buffer overflow and consume a lot of memory. In order to To prevent this problem, it is recommended to compromise and set prefetch to 1

    
2. public SimpleMessageListenerContainer simpleMessageListenerContainer() {

3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

4. // 最少消费者数量

5. container.setConcurrentConsumers(2);

6. // 最大消费者数量

7. container.setMaxConcurrentConsumers(5);

8. // spring-amqp 2.0版开始,默认的prefetch值是250,提高吞吐量,【但是处理速度比较慢的大消息时】,消息可能在内存中大量堆积,消耗大量内存,

9. // 最大吞吐量 max-concurrency*prefetch

10. container.setPrefetchCount(1);

11. // RabbitMQ默认是自动确认 , AcknowledgeMode.NONE,这里改为手动确认消息

12. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

13. container.setTaskExecutor(taskExecutor());

14. // 设置多个如下: 前提是队列都是必须已经创建存在的

15. container.setQueueNames(QueueConstant.XXXXX,QueueConstant.XXXXX);

16. // 消息接收处理类 AckRecivicerListener

17. container.setMessageListener(ackRecivicerListener);

18.

19. return container;

20. }




![About three-party system docking-------Using MQ landing production solution](6b44e99974d17195ee2722671aabdc1f.png)

The main logic is similar to the previous one:

1: After listening to the MQ message, it is first stored in the database. After the database is successfully stored, the message is asynchronously sent to the A system. When sending the message, use a unique ID and a distributed lock to prevent the scheduled task from processing the message at the same time.

2: Add large-granularity distributed locks to scheduled tasks to prevent multiple services from starting scheduled tasks at the same time.

3: Update the message table if the message processing is successful or failed.

About three-party system docking-------Using MQ landing production solution

Intermediate service detailed design diagram 2

Six: Code structure

Module dependencies, the api module is the outermost layer, just package this module

About three-party system docking-------Using MQ landing production solution

    
2. 1. api ----对外暴露接口

3. 2. base ----公共类

4. 3. business ----业务操作

5. 4. consumer ----消费者

6. 5. producer ----生产者

7. 6. scheduled-task ----定时任务



About three-party system docking-------Using MQ landing production solution

code dependency graph

Seven: Module details

1: API

The api is the main service, providing rest service interface calling, auth authentication, and resources configuration files. You can directly enter this module after installing the jar package.

About three-party system docking-------Using MQ landing production solution

2: scheduled-task

This service is mainly used for scheduled task rotation training and service retry.

About three-party system docking-------Using MQ landing production solution

3: producer

Producer service. For MQ, our intermediate service is the producer. Messages sent to MQ rely on this service.

About three-party system docking-------Using MQ landing production solution

4: consumer

Consumer service. For MQ, our intermediate service is also a consumer. Receiving MQ messages depends on this service.

5: business

The business module is the most important module of the intermediate service. Most of the code is in this module, including functions such as addition, deletion, modification, and query, as well as the conversion of business documents. General message-related CRUD has been implemented, so you don’t need to pay attention to it. Mainly focus on your own. Business logic is sufficient, such as converting to MQ message format, receiving MQ messages and converting them into the format required by the system, etc.

About three-party system docking-------Using MQ landing production solution

Many common codes in it have been implemented, and TODO comments are added to those that need to be implemented by themselves, such as the parts of getting data into the database and sending to MQ. The template method pattern is used. Users only need to pay attention to the rewritten part:

    
2.

3. import com.alibaba.fastjson.JSON;

4. import com.alibaba.fastjson.serializer.SerializerFeature;

5. import com.carry.middleservice.business.service.common.impl.MsgServiceImpl;

6. import com.carry.middleservice.base.utils.SpringUtils;

7.

8. import java.util.*;

9.

10.

11. /**

12. * @Tutle: null.java

13. * @description: 统一业务处理类,发送到MQ

14. * @author: Carry

15. * @create: 2023-07-25 09:42

16. * @Version 1.0

17. **/

18. public abstract class AbstractMqBusinessService {

19.

20. /**

21. * @return java.util.Map<java.lang.String, java.lang.Object>

22. * @Author Carry

23. * @Description 组装发送到MQ的消息

24. * @Date 15:32 2023/11/10

25. * @Param [msgMap]

26. **/

27. public final Map<String, Object> trans2MqData(Map<String, Object> msgMap) {

28. Map<String, Object> result = new HashMap<>();

29.

30. // 1入库

31. result = this.self2Mysql(msgMap);

32.

33. // 2 获取路由

34. String routingKey = this.getRoutingKey();

35.

36. result.put("routingKey", routingKey);

37.

38. return result;

39. }

40.

41.

42.

43. /**

44. * @return java.lang.String

45. * @Author Carry

46. * @Description 获取路由信息,不同的单据自己实现此方法

47. * @Date 15:32 2023/11/10

48. * @Param []

49. **/

50. public abstract String getRoutingKey();

51.

52.

53. /**

54. * @return java.util.Map<java.lang.String, java.lang.Object>

55. * @Author Carry

56. * @Description 主数据 主表数据通用处理

57. * @Date 10:57 2023/8/15

58. * @Param [syncData]

59. **/

60. public Map<String, Object> self2Mysql(Map<String, Object> selfData) {

61. Map<String, Object> result = new HashMap<>();

62. String uniqueId = String.valueOf(selfData.get("uniqueId"));

63.

64. MsgServiceImpl msgService = SpringUtils.getBean(MsgServiceImpl.class);

65.

66. String selfDataStr = JSON.toJSONString(selfData, SerializerFeature.WriteMapNullValue, SerializerFeature.QuoteFieldNames);

67.

68. // 入库

69. boolean saveMsgFlag = msgService.saveMsg(null, selfDataStr, "0", uniqueId);

70.

71. // TODO 返回信息(根据自己系统的约定进行设计)

72. result.put("saveMsgFlag", saveMsgFlag);

73. result.put("uniqueId", uniqueId);

74. result.put("msg", selfDataStr);

75.

76.

77. return result;

78. }

79.

80.

81.

82. }




![About three-party system docking-------Using MQ landing production solution](6b44e99974d17195ee2722671aabdc1f.png)

6: base

public method module

About three-party system docking-------Using MQ landing production solution