B080-RabbitMQ
B080-RabbitMQ
Table of contents
-
-
- RabbitMQ understanding
-
-
- concept
- scenes to be used
- advantage
- AMQP protocol
- JMS
- RabbitMQ installation
-
- Install elang
- Install RabbitMQ
- Install management plug-in
- Login to RabbitMQ
- Message queue workflow
- RabbitMQ common models
-
- HelloWorld-Basic Message Model
-
- Producer sends message
-
- Guide package
- Get link tool class
- message producer
- consumer news
-
- simulated consumer
- Manually sign for messages
- Work Queues
-
- Sender
- Consume1
- Consume2
- Subscription Model-FANOUT-Broadcast
-
- Sender
- Consume1
- Consume2
- Subscription Model-Direct-Directed
-
- Sender
- Consume1
- Consume2
- Subscription Model-Topic-Wildcard
-
- Sender
- Consume1
- Consume2
- Summarize
- SpringBoot integrates RabbitMQ
-
- Guide package
- yml
- config
- producer
- consumer
RabbitMQ understanding
concept
The full name of MQ is Message Queue, which is message queue. It is also a queue and follows the FIFO principle. RabbitMQ is a message queue developed by the erlang language and implemented based on the AMQP (Advanced Message Queue Protocol) protocol. It is a communication method between applications. Message queues are widely used in distributed system development. Official address: http://www.rabbitmq.com/
scenes to be used
advantage
Task asynchronous processing:
The message queue notifies the message receiver for asynchronous processing of operations that do not require synchronous processing and take a long time. Improved application response time. (Throw it in and process it asynchronously separately by the receiver)
Eliminate peaks:
asynchronous speeding up (message sending), improving system stability (multiple system calls), service decoupling (5-10 services), sorting guarantee, eliminating peaks
(putting it in the queue does not need to be processed immediately, there is an intermediate status, after the message is distributed, it can be processed asynchronously by multiple subscribers respectively)
Service decoupling:
Application decoupling MQ is equivalent to an intermediary. The producer interacts with the consumer through MQ, which decouples the application.
(Split the single business into producers, message queues and consumers)
AMQP protocol
AMQP is a set of public message queue protocols. It was first proposed in 2003. It aims to define a standard format for message communication data from the protocol layer in order to solve the problem of inconsistent protocols in the MQ market. RabbitMQ is an MQ service developed following the AMQP standard protocol.
(Other Python, C#, and PHP can also be used)
JMS
JMS is a Java message service. It is a set of message service API standards provided by java. Its purpose is to provide a unified message communication standard for all java applications. Similar to java's jdbc, as long as it follows the jms standard, it can be used between applications. Communicate messages. What is the difference between it and AMQP? JMS is a message service standard exclusive to the Java language. It defines the standard at the API layer and can only be used for Java applications; while AMQP is a standard defined at the protocol layer and is cross-language.
(Can only be used in Java, and has basically been abandoned)
RabbitMQ installation
Install elang
otp_win64_20.2.exe
configures environment variables
Install RabbitMQ
rabbitmq-server-3.7.4.exe
can start or shut down the service through the task manager or start menu
Install management plug-in
Install the rabbitMQ management plug-in to facilitate the management of RabbitMQ on the browser side. Enter the sbin directory of RabbitMQ and use cmd to execute the command: rabbitmq-plugins.bat enable rabbitmq_management. After successful installation, restart RabbitMQ
(turn on the visual interface)
Restart MQ
Login to RabbitMQ
Enter the browser and enter: http://localhost:15672, initial account and password: guest/guest
Message queue workflow
RabbitMQ common models
HelloWorld-Basic Message Model
A producer and a consumer
Producer sends message
Guide package
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<!--和springboot2.0.5对应-->
<version>5.4.1</version>
</dependency>
</dependencies>
Get link tool class
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");//集群的时候才用这个参数
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}

message producer
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//消息的生产者
public class Sender {
public static final String HELLO_QUEUE="hello_queue";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//3.创建队列(hello这里用默认的交换机)
/* String queue :队列的名字,可自定义,
boolean durable: 持久化,
boolean exclusive:是否独占;大家都能用,传false,
boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,
Map<String, Object> arguments:没有其他要传的属性就传false */
channel.queueDeclare(HELLO_QUEUE, true, false, false, null);
String msg="今天中午吃啥";
//4.发送消息
channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());
channel.close();
conn.close();
}
}

consumer news
simulated consumer
import com.rabbitmq.client.*;
import java.io.IOException;
//模拟消费者
public class Consume {
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);// 消费者标识
System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
System.out.println("消息内容:"+new String(body));
System.out.println("消费完成----------------");
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收
Consumer callback: 回调
*/
channel.basicConsume(Sender.HELLO_QUEUE,false,callback);
}
}

As long as the consumer is not turned off and the producer sends a message once, the consumer will automatically listen and consume the message once.
Manually sign for messages
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
//模拟消费者
public class Consume {
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);// 消费者标识
System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
// System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("消费完成----------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收不等于消费成功,处理逻辑走完没有报错才算签收成功
Consumer callback: 回调
*/
channel.basicConsume(Sender.HELLO_QUEUE,false,callback);
}
}

Work Queues
One producer and multiple consumers.
The default is polling, which can also be changed to the ability to do more work.
Sender
//消息 的生产者
/*
如果有多个消费者监听同一个队列,默认轮询
*/
public class Sender {
public static final String WORK_QUEUE="work_queue";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//3.创建队列
/*
String queue :队列的名字
boolean durable: 持久化
boolean exclusive:是否独占
boolean autoDelete: 用完即删
Map<String, Object> arguments
*/
channel.queueDeclare(WORK_QUEUE, true, false, false, null);
String msg="今天中午吃啥";
//4.发送消息
channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());
channel.close();
conn.close();
}
}

Consume1
//模拟消费者
public class Consume1 {
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
// channel.basicQos(1);
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(Sender.WORK_QUEUE,false,callback);
}
}

Consume2
//模拟消费者
public class Consume2 {
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
// channel.basicQos(1);
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("消息内容:"+new String(body));
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(Sender.WORK_QUEUE,false,callback);
}
}

Subscription Model-FANOUT-Broadcast
In broadcast mode, the message sending process is as follows:
- There can be multiple consumers
- Each consumer has its own queue (queue)
- Each queue must be bound to Exchange (switch)
- Production Messages sent by the producer can only be sent to the switch. The switch decides which queue to send to, and the producer cannot decide.
- The switch sends the message to all bound queues.
- Consumers of the queue can get the message. Enable one message to be consumed by multiple consumers
Sender
//消息的生产者
/*
变化
1.不创建 队列
2.创建交换机
3.给交换机发送消息
*/
public class Sender {
public static final String FANOUT_EXCHANGE="fanout_exchange";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//3.创建交换机
/*
exchange:交换机的名字
type:交换机的类型
durable:是否持久化
*/
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
String msg="今天晚上吃啥";
//4.发送消息
channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());
channel.close();
conn.close();
}
}

Consume1
//模拟消费者
/*
1.创建队列
2.队列绑定到交换机
3.每个消费者要监听自己的队列
*/
public class Consume1 {
public static final String FANOUT_QUEUE1="fanout_queue1";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);
//绑定到交换机
channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(FANOUT_QUEUE1,false,callback);
}
}

Consume2
//模拟消费者
/*
1.创建队列
2.队列绑定到交换机
3.每个消费者要监听自己的队列
*/
public class Consume2 {
public static final String FANOUT_QUEUE2="fanout_queue2";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);
//绑定到交换机
channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(FANOUT_QUEUE2,false,callback);
}
}

Subscription Model-Direct-Directed
Deliver the message to a queue or a queue that matches the specified routing key
Sender
//消息的生产者
/*
变化
1.交换机类型
2.给交换机发送消息,指定 routing key
*/
public class Sender {
public static final String DIRECT_EXCHANGE="direct_exchange";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//3.创建交换机
/*
exchange:交换机的名字
type:交换机的类型
durable:是否持久化
*/
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
String msg="今天晚上吃啥";
//4.发送消息
channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());
channel.close();
conn.close();
}
}

Consume1
//模拟消费者
/*
1.指定routing key
*/
public class Consume1 {
public static final String DIRECT_QUEUE1="direct_queue1";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);
//绑定到交换机
channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(DIRECT_QUEUE1,false,callback);
}
}

Consume2
//模拟消费者
/*
1.指定routing key
*/
public class Consume2 {
public static final String DIRECT_QUEUE2="direct_queue2";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);
//绑定到交换机
channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(DIRECT_QUEUE2,false,callback);
}
}

Subscription Model-Topic-Wildcard
Compared with Direct, Topic type Exchange can route messages to different queues based on RoutingKey. It’s just that the Topic type Exchange allows the queue to use wildcards when binding the Routing key!
Routingkey generally consists of one or more words, separated by ".", for example: goods.insert
Wildcard rules:
#: Match one or more words
*: Match no more, no less, exactly 1 word
Sender
//消息的生产者
/*
变化
1.交换机类型
2.给交换机发送消息,指定 routing key
*/
public class Sender {
public static final String TOPIC_EXCHANGE="topic_exchange";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//3.创建交换机
/*
exchange:交换机的名字
type:交换机的类型
durable:是否持久化
*/
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
String msg="今天晚上吃啥";
//4.发送消息
channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());
channel.close();
conn.close();
}
}

Consume1
//模拟消费者
/*
1.指定routing key
*/
public class Consume1 {
public static final String TOPIC_QUEUE1="topic_queue1";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);
//绑定到交换机
/*
#.1到多个单词
*. 一个单词
*/
channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(TOPIC_QUEUE1,false,callback);
}
}

Consume2
//模拟消费者
/*
1.指定routing key
*/
public class Consume2 {
public static final String TOPIC_QUEUE2="topic_queue2";
public static void main(String[] args) throws Exception {
//1.获取连接
Connection conn = ConnectionUtil.getConnection();
//2.获取通道
Channel channel = conn.createChannel();
//同时处理的消息数量
channel.basicQos(1);
//创建队列
channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);
//绑定到交换机
/*
#.1到多个单词
*. 一个单词
*/
channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");
//回调
Consumer callback=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
//System.out.println(1/0);
System.out.println("消息内容:"+new String(body));
System.out.println("------------------------------------");
//所有业务逻辑结束以后 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//3.监听队列
/*
queue :队列名字
autoAck:自动签收 签收 不等于 消费成功
Consumer callback: 回调
*/
channel.basicConsume(TOPIC_QUEUE2,false,callback);
}
}

Summarize
01_hello
生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息
消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)
02_workqueue 默认轮询 可修改(能者多劳)
生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息
消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)
03_fanout 广播 将消息交给所有绑定到交换机的队列(多个消费者都能收到)
生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机
消费者 1.获取连接 2.获取通道 创 建队列 绑定到交换机 3.监听队列 (并回调)
04_direct 定向 把消息交给符合指定 routing key 的队列 一堆或一个
生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机
消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)
05_topic 通配符 把消息交给符合routing pattern (路由模式) 的队列 一堆或一个
生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机
消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)

SpringBoot integrates RabbitMQ
Guide package
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--spirngboot集成rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

yml
server:
port: 44000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual #手动签收
prefetch: 1 #消费者的消息并发处理数量
#publisher-confirms: true #消息发送到交换机失败回调
#publisher-returns: true #消息发送到队列失败回调
template:
mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";
public static final String QUEUE1_SPRINGBOOT="queue1_springboot";
public static final String QUEUE2_SPRINGBOOT="queue2_springboot";
//创建一个交换机
@Bean
public Exchange createExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();
}
//创建两个队列
@Bean
public Queue createQueue1(){
return new Queue(QUEUE1_SPRINGBOOT,true);
}
@Bean
public Queue createQueue2(){
return new Queue(QUEUE2_SPRINGBOOT,true);
}
//把交换机和队列绑定到一起
@Bean
public Binding bind1(){
return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();
}
@Bean
public Binding bind2(){
return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();
}
//消费者 还原对象方式(从MQ里取出json转为对象)
@Bean("rabbitListenerContainerFactory")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
return factory;
}
//放到消息队列里面的转换(转为json存进MQ)
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}

producer
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test(){
/*
问题:多系统之间 信息交互 传递对象
解决方案:转换为json存储
实现:
1.fastjson 对象 - josn (作业)
2.重写转换器模式
*/
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT
, "email.save", new User(1L,"文达"));
}
System.out.println("消息发送完毕");
}
}

consumer
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
//消费者
@Component
public class Consu {
@RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接
public void user(@Payload User user, Channel channel, Message message) throws IOException {
System.out.println(message);
System.out.println("user队列:"+user);
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})
public void email(@Payload User user,Channel channel,Message message ) throws IOException {
System.out.println(message);
System.out.println("email队列:"+user);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

Queue content can be passed as string, entity serialized object, json object,