Skip to main content

Java practice: Spring Boot integrates Canal and RabbitMQ to monitor database changes in real time and process them efficiently

Java practice: Spring Boot integrates Canal and RabbitMQ to monitor database changes in real time and process them efficiently

introduction

In modern microservice architecture , data changes often need to be propagated to various related services in a timely manner in order to update status synchronously or trigger business logic. Canal, as an open source MySQL binlog subscription and consumption component, can help us capture the addition, deletion and modification operations of the database in real time. As a message middleware, RabbitMQ can achieve asynchronous decoupling and reliable message transmission. This article will introduce in detail how to integrate Canal and RabbitMQ in the Spring Boot project to build a complete database change monitoring and message publishing mechanism.

1. Canal basic knowledge and configuration

  1. Canal principles and functions

Canal subscribes to MySQL's binlog log and parses it into JSON format messages, allowing us to obtain database table structure changes and row-level data changes in real time. This feature is particularly suitable for various application scenarios such as data synchronization, auditing, and cache updates.

  1. Install and deploy Canal Server

First, we need to install and start Canal Server on the server and configure the relevant MySQL source connection information. Here are only a brief description of the steps, please refer to the official documentation for specific operations.

  1. Create a Canal instance and subscribe to MySQL data

Create a canal instance and configure the corresponding database and table subscription rules so that it can start monitoring target data changes.

2. Spring Boot integrates RabbitMQ

  1. Add dependencies

Introduce RabbitMQ-related dependencies into the Spring Boot project and configure the basic connection information of RabbitMQ.

    <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  1. Configure RabbitMQ connection factory and queue

Configure the RabbitMQ connection properties and the queue to be created in the application.yml file.

    spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
queue: db-change-queue

3. Build Canal Client and publish messages to RabbitMQ

  1. Create Canal client

Use Spring Boot to integrate the Canal client library, write the CanalConnector configuration class, and establish a connection with the Canal Server.

    @Configuration
public class CanalConfig {

@Value("${canal.server.host}")
private String canalHost;

@Value("${canal.server.port}")
private Integer canalPort;

@Value("${canal.instance.destination}")
private String destination;

@Bean
public CanalConnector canalConnector() throws CanalClientException {
CanalConnectors connectors = CanalConnectors.newClusterSingleton(canalHost, canalPort);
return connectors.connect(destination);
}
}


![Java practice: Spring Boot integrates Canal and RabbitMQ to monitor database changes in real time and process them efficiently](6b44e99974d17195ee2722671aabdc1f.png)
  1. Writing a Canal message handler

Create a class that implements the CanalMessageListener interface, processes the received binlog events, converts the change data into a suitable message body, and then publishes it to RabbitMQ.

    @Component
public class CanalMessageProcessor implements CanalMessageListener {

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public void onMessage(Message message) {
// 解析message,获取变更数据
CanalEntry.Entry entry = ...;

if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 将变更数据转换为消息实体
MyChangeEvent event = convertToChangeEvent(entry);

// 发布消息到RabbitMQ
rabbitTemplate.convertAndSend("db-change-exchange", "db.change.routing.key", event);
}
}

// ...
}

// 消息实体MyChangeEvent类及其转换方法convertToChangeEvent省略...


![Java practice: Spring Boot integrates Canal and RabbitMQ to monitor database changes in real time and process them efficiently](6b44e99974d17195ee2722671aabdc1f.png)
  1. Spring AMQP configuration

Create switches, queues and binding relationships, and configure RabbitTemplate to send messages to the specified queue.

    @Configuration
public class RabbitConfig {

@Bean
Queue dbChangeQueue() {
return new Queue("db-change-queue", true);
}

@Bean
DirectExchange dbChangeExchange() {
return new DirectExchange("db-change-exchange");
}

@Bean
Binding bindingExchangeQueue(DirectExchange dbChangeExchange, Queue dbChangeQueue) {
return BindingBuilder.bind(dbChangeQueue).to(dbChangeExchange).with("db.change.routing.key");
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置默认交换机、路由键等
template.setExchange("db-change-exchange");
return template;
}
}


![Java practice: Spring Boot integrates Canal and RabbitMQ to monitor database changes in real time and process them efficiently](6b44e99974d17195ee2722671aabdc1f.png)

4. The receiving end processes RabbitMQ messages

  1. Create consumer

Create a RabbitMQ message consumer in the Spring Boot application, obtain messages from the "db-change-queue" queue, and execute corresponding business logic .

    @Service
@RabbitListener(queues = "db-change-queue")
public class ChangeEventListener {

@RabbitHandler
public void processDbChangeEvent(MyChangeEvent event) {
// 处理数据库变更事件,如更新缓存、触发业务流程等
// ...
}
}

5. Summary

Through the above steps, we successfully integrated Spring Boot with Canal and RabbitMQ, and built a messaging system that monitors MySQL database changes in real time and publishes change messages to RabbitMQ. However, in practical applications, attention must also be paid to issues such as exception handling, message confirmation, and idempotent design to ensure the stability and reliability of the system.
In addition, each link can be optimized according to business needs, such as using the advanced features of RabbitMQ (such as dead letter queue, delay queue, etc.) to enhance message processing capabilities, or adding more complex event filtering logic to the Canal client to meet specific monitoring needs.