Stage 6: Service Framework Basics (Chapter 2-MQ (Service Asynchronous Communication))
Stage 6: Service Framework Basics (Chapter 2-MQ (Service Asynchronous Communication))
Stage 6: [Service Framework Basics](/search?q=Service Framework Basics) (Chapter 2-MQ (Service Asynchronous Communication))
- RabbitMQ
- 1. First introduction to MQ
-
- 1.1. Synchronous and asynchronous communication
-
- 1.1.1. Synchronous communication
- 1.1.2.Asynchronous communication
- 1.2.Technical comparison:
- 2. Quick Start
-
- 2.1. Install RabbitMQ [==Important==]
-
- 2.1.1. Install
RabbitMQ
, refer to the pre-course materials:
- 2.1.1. Install
- 2.1.2. Introduction
- 2.2.==RabbitMQ message model==
- 2.3.==Import Demo project==
- 2.4. Getting Started Case [==Not important, too cumbersome==]
-
- 2.4.1.publisher implementation
- 2.4.2.consumer implementation
- 2.5. Summary
- 3. SpringAMQP [==Important==]
-
- 3.1. Basic Queue simple queue model [==Official
HelloWorld
first case==]
- 3.1. Basic Queue simple queue model [==Official
-
- 3.1.1.Message sending
- 3.1.2. Message reception
- 3.1.3.Testing
- 3.2. WorkQueue task model [==Official second case==]
-
- 3.2.1.Message sending
- 3.2.2. Message reception
- 3.2.3.Testing
- 3.2.4. Those who can do more work (improvements in the average distribution of messages)
- 3.2.5. Summary
- 3.3.Publish /Subscribe
- 3.4. Fanout (broadcast) [==Official third case== (With a switch, ==The switch sends messages to all queues==)]
-
- 3.4.1. Declare queues and switches
- 3.4.2. Message sending
- 3.4.3. Message reception
- 3.4.4. Summary
- 3.5. Direct [==Official fourth case==]
-
- 3.5.1. Declaring queues and switches based on annotations ==
- 3.5.2. Message sending
- 3.5.3.==Summary==
- 3.6. Topic (wildcard) [==Official fifth case==]
-
- 3.6.1.Description
- 3.6.2. Message sending
- 3.6.3. Message reception
- 3.6.4.==Summary==
- 3.7. Message converter【==Important==】
-
- 3.7.1. Test the default converter
- 3.7.2. Configure JSON converter
RabbitMQ
1. First introduction to MQ
1.1. Synchronous and asynchronous communication
There are two ways of communication between microservices: synchronous and asynchronous.:
- Synchronous communication: Just like making a phone call, real-time response is required.
- Asynchronous communication: Just like sending an email, you don’t need to reply immediately.
Both methods have their own pros and cons. You can get an immediate response when you call, but you can't talk to multiple people at the same time. Sending emails allows you to send and receive emails to multiple people at the same time, but there is often a delay in response.
1.1.1. Synchronous communication
The call we learned before Feign
is a synchronous method. Although the call can get results in real time, there are the following problems:
Summarize:
Advantages of synchronous calls:
- Strong timeliness, results can be obtained immediately
Problems with synchronous calls:
- High degree of coupling
- Reduced performance and throughput
- There is additional resource consumption
- There is a cascading failure problem
1.1.2.Asynchronous communication
Asynchronous calls can avoid the above problems:
Let's take the purchase of goods as an example. After paying, the user needs to call the order service to complete the order status modification, call the logistics service, allocate the corresponding inventory from the warehouse and prepare for shipment.
In event mode,The payment service is an event publisher (publisher), after the payment is completed, you only need to publish a payment success event (event), with the order id in the event.
Order service and logistics service are event subscribers (Consumer), subscribe to the event of successful payment, and complete your business after listening to the event.
In order to decouple the event publisher and subscriber, the two do not communicate directly, but there is an intermediary ( Broker
).Publisher publishes events to Broker, doesn't care who subscribes to the event.订阅者从Broker订阅事件,不关心谁发来的消息。
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控 。
好处:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ技术。
1.2.技术对比:
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:
- ActiveMQ
RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
| RabbitMQ| ActiveMQ| RocketMQ| Kafka
---|---|---|---|---
公司/社区| Rabbit| Apache| 阿里| Apache
开发语言| Erlang| Java| Java| Scala&Java
协议支持| AMQP,XMPP,SMTP,STOMP| OpenWire,STOMP,REST,XMPP,AMQP| 自定义协议| 自定义协议
可用性| 高| 一般| 高| 高
单机吞吐量| 一般| 差| 高| 非常高
消息延迟| 微秒级| 毫秒级| 毫秒级| 毫秒以内
消息可靠性| 高| 一般| 高| 一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2.快速入门
2.1.安装RabbitMQ 【重要】
安装过程中出现了一些问题:通过参考一下资料得到了解决:
https://blog.csdn.net/lslslsls77/article/details/125611788
https://blog.csdn.net/qq_42115732/article/details/99541012
2.1.1、安装RabbitMQ
,参考课前资料:
我们在Centos7
虚拟机中使用Docker
来安装。
1、下载镜像
方式一:在线拉取
docker pull [rabbitmq](/search?q=rabbitmq):3.8-management
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机的/tmp
目录下了,这是一个临时目录
2、上传到虚拟机中后,使用命令加载镜像
即可:
执行docker命令前需要先启动docker服务:systemctl start docker
docker load -i mq.tar
导入成功:
我们查看一下:在/tmp
下执行docker images
命令:
发现一个名为rabbitmq
,版本为3-management
的镜像已经导入成功了
3、安装MQ
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
-e
是配置环境变 量,这里配置了两个:用户名和密码;用于访问mq
或者登陆它的管理平台;
--name mq \
是起一个名字;
--hostname mq1 \
是配置主机名,可以不配,但是做集群不是时一定要配
-p 15672:15672 \
端口映射,rabbitmq
管理平台的端口,提供一个UI界面,管理起来方便;;
-p 5672:5672 \
端口映射,将来做消息通信的接口,收发消息通过这个端口来建立连接;
-d \
是后台运行
rabbitmq:3-management
是镜像的名称;
使用上面方式运行容器出错:
docker: Error response from daemon: Conflict. The container name "/mq" is already in use by container "670cefe07ee4ffb414b9449a7eeed2280dd9d05050f85988b6093599b6f01326". You have to remove (or rename) that container to be able to reuse that name.
出现上述的错误,是因为docker容器里面已经存在,如果不需要,则删除。docker rm fb087642b497(该containerID)
如果想要再次使用,使用
docker restart 镜像名
重启该container容器。
运行MQ容器时会出现错误 :
docker:docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
解决办法:
通过docker ps
命令查看一下
发现成功的启动起来了;启动起来之后我们就可以访问了;访问15672
这个端口:http://192.168.138.100:15672/
用户名是itcast
,密码是123321
,点击Login
就可以进入用户管理平台了;
2.1.2、介绍
MQ的基本结构:
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
2.2.RabbitMQ消息模型
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:
2.3.导入Demo工程
课前资料提供了一个Demo工程,mq-demo
:
导入后可以看到结构如下:
包括三部分:
mq-demo
:父工程,管理项目依赖publisher
:消息的发送者consumer
:消息的消费者
2.4.入门案例 【不重要,过于繁琐】
简单队列模式的模型图:
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
2.4.1.publisher实现
思路:
- 建立连接
- 创建Channel (通道)
- 声明队列
- 发送消息
- 关闭连接和channel(通道)
代码实现:
package cn.itcast.mq.helloworld;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.130.100"); //虚拟机ip地址
factory.setPort(5672); //消息通信的接口
factory.setVirtualHost("/"); //虚拟主机
factory.setUsername("itcast"); //用户名
factory.setPassword("123321"); //密码
// 1.2.建立连接
Connection connection = factory.newConnection(); //到此控制台已经建立了一个连接;
// 2.创建通道Channel
Channel channel = connection.createChannel(); //到此建立了一个通道;
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null); //到此队列被创建了
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
//发完消息,通道和链接就可以关闭了,解除了耦合;异步;
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
发完消息,通道和链接就可以关闭了,解除了耦合;异步的体现,这时消费者就可以接收消息了;
2.4.2.consumer实现
代码思路:
- 建立连接
- 创建Channel(通道)
- 声明队列
- 订阅消息
代码实现:
package cn.itcast.mq.helloworld;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.130.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
2.5.总结
基本消息队列
的消息发送流程:
- 建立connection(连接)
- 创建channel(通道)
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列
的消息接收流程:
- 建立connection(连接)
- 创建channel(通 道)
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
3.SpringAMQP【重要】
代替2中的入门案例
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了
RabbitTemplate
工具,用于发送消息
3.1.Basic Queue 简单队列模型【官方HelloWorld
第一个案例】
简单来说 就是让一个消费者绑定到一个队列,消费队列中的消息 。
在父工程mq-demo中引入amqp依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>