Skip to main content

RabbitMQ in practice

RabbitMQ in practice

Article directory

    • 1 Basic usage
    • 1.1 Producer code
  • 1.2 Consumer code

  • 2.1 Work queues

    • 2.2 Rotation training distribution message
  • 2.3 Extraction tool class

  • 2.4 Start two worker threads

  • 2.5 Consumers

  • 2.6 Producers

  • 3 message response

    • 3.1 Explanation of Multiple
  • 3.2 Messages are automatically re-queued

  • 3.3 Producers

  • 3.4 Consumer 1 Processing is fast

  • 3.5 Consumer 2 processes slowly. If it fails, Consumer 1 will consume the message.

  • 4. RabbitMQ persistence

    • 4.1 How to achieve persistence in queues
  • 4.2 Message persistence

  • 4.3 Unfair distribution

  • 4.4 Prefetch value (distributed proportionally)

  • 5 Release confirmation

    • 5.1 Enable publishing confirmation method
  • 5.2 Single confirmation release, batch confirmation release, asynchronous confirmation release

  • 5.3 Comparison of release confirmation speed among the above 3

  • 6 switches

    • 6.1 Publish and subscribe model
    • 6.1.1 Concept of Exchanges
  • 6.1.2 Types of Exchanges

  • 6.1.3 Unnamed type/exchange

  • 6.1.4 Temporary queue

  • 6.1.5 Bindings

  • 6.2 Fanout

    • 6.2.1 Code implementation
  • 6.3 Direct exchange/routing mode Direct exchange

    • 6.3.1 Actual combat
  • 6.3.2 Code

  • 6.4 Topics

    • 6.4.1 Topic matching case
  • 6.4.2 Actual combat

  • 7 Dead letter queue

    • 7.1 Dead letter queue concept
  • 7.2 Sources of dead letters

  • 7.3 Dead letter combat

  • 7.4 Code

  • 7.5 Limitation on queue length

  • 7.6 Rejection by consumers becomes a dead letter

  • 8 delay queue

    • 8.1 Code architecture diagram
  • 8.2 Code

    • 8.2.1 TtlQueueConfig
  • 8.2.2 SendMessageController

  • 8.2.3 DeadLetterQueueConsumer

  • 8.3 Optimization

  • 8.4 Questions

  • 8.5 Rabbitmq plug-in implements delay queue

  • 8.6 Code architecture diagram

    • 8.6.1 DelayedQueueConfig
  • 8.7 Summary

  • 9 Release Confirmation Advanced

    • 9.1 Confirmation mechanism plan
  • 9.2 Code architecture diagram

  • 9.3 Configuration files

  • 9.4 Code

    • 9.4.1 ConfirmConfig
  • 9.4.2 SendMessageController

  • 9.4.3 ProducerController

  • 9.4.4 Consumer

  • 9.4.5 MyCallBack

  • 9.4.6 Rollback message

  • 9.4.7 Backup switch

  • 10 other knowledge points about RabbitMQ

    • 10.1 Idempotence
  • 10.2 Repeated consumption of messages

  • 10.3 Priority queue

  • 10.4 Lazy queue

  • 11 Cluster management

    • 11.1 Building steps
  • 12 mirror queue

    • 12.1 Building steps
  • 13 Haproxy implements load balancing

  • 14 Federation Exchange Federation Exchange

  • 15 Federation Queue Federation Switch

  • 16 Shovel

pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.atguigu.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-stream-[rabbitmq](/search?q=rabbitmq)-provider8801</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>

RabbitMQ in practice
Broker: An application that receives and distributes messages. RabbitMQ Server is Message Broker.
Virtual host: designed for multi-tenancy and security reasons, divides the basic components of AMQP into a virtual group, similar to the namespace concept in the network. When multiple different users use the services provided by the same RabbitMQ server, multiple vhosts can be divided, and each user creates exchange/queue, etc. in their own vhost.

1 Basic usage

1.1 Producer code

    package com.atguigu.springcloud.service.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import [java](/search?q=java).util.HashMap;
import java.util.Map;


public class Producer {
public static final String queue_name="hello";

public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 1.队列名称
* 2.队列里的消息是否持久化(硬盘)默认情况消息存在内存中
* 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true:可以多个消费者消费
* 4.是否自动删除,最后一个消费断开链接以后,该队列是否自动删除 true:自动删除
* 5.其他参数(延迟或死信)
*/
Map<String,Object> arguments = new HashMap();
arguments.put("x-max-priority",10);//官方允许是0-255之间,此处设置10,不要设置过大,浪费cpu与内存
channel.queueDeclare(queue_name,true,false,false,arguments);
//String message = "hellow world";
for (int i = 1; i <11 ; i++) {
String message = "info"+i;
if(i==5){
//设置消息优先级为5
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",queue_name,properties,message.getBytes());
}else{
channel.basicPublish("",queue_name,null,message.getBytes());

}
}
/**
* 发送消息
* 1.发送到那个交换机
* 2.路由的key值时那个,本次是队列名称
* 3.其他参数
* 4.发送消息的消息体
*/
//channel.basicPublish("",queue_name,null,message.getBytes());
System.out.println("消息发送完毕");
}
}


RabbitMQ in practice

1.2 Consumer code

    package com.atguigu.springcloud.service.impl;

import com.rabbitmq.client.*;


public class Consumer {

public static final String queue_name="hello";
//接收消息
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//声名 接收消息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println(new String(message.getBody()));
};

//取消消息时的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消息消费被中断");
};

/**
* 消费者消费消息
* 1.消费那个队列
* 2.消费成功之后是否要自动应答 true:自动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);

/**
* A.Channel.basicAck(用于肯定确认)
* RabbitMQ已经知道该消息并且成功的处理消息,可以将其丢弃了
* B.Channel.basicNack(用于否定确认)
* C.Channel.basicReject(用于否定确认)
* 与Channel.basicNack相比少一个参数
* 不处理消息直接拒绝,可以将其丢弃了
*/
}
}


2.1 Work queues

Work queue is also called task queue. The main idea is to avoid executing resource-intensive tasks immediately and having to wait for it to complete. Instead we schedule execution after the task. We encapsulate the task as a message and send it to the queue. A worker process running in the background will pop out the tasks and eventually execute the job. When there are multiple worker threads, these worker threads are responsible for handling these tasks

2.2 Rotation training distribution message

In this case, we will start two worker threads and one message sending thread. Let's take a look at how these two worker threads work.
RabbitMQ in practice

2.3 Extraction tool class

    package com.atguigu.springcloud.service.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
return channel;
}
}


2.4 Start two worker threads

How to configure idea 2022 to run multiple instances
RabbitMQ in practice

2.5 Consumers

First start the producer to generate hello queue

    package com.atguigu.springcloud.two;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 工作线程相当于一个消费者
*/
public class Worker01 {

public static final String queue_name="hello";
//接收消息
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到的消息: "+new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag +"消息被消费者取消消费接口回调逻辑");
};
System.out.println("C1等待接收消息.....");
/**
* 1.队列名称
* 2.队列里的消息是否持久化(硬盘)默认情况消息存在内存中
* 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true:可以多个消费者消费
* 4.是否自动删除,最后一个消费端开链接以后,改队列是否自动删除 true:自动删除
* 5.其他参数(延迟或死信)
*/
channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);
}
}


2.6 Producers

    package com.atguigu.springcloud.two;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
* 生产者发送大量的消息
*/
public class task01 {
public static final String queue_name="hello";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声名
/**
* 1.队列名称
* 2.队列里的消息是否持久化(硬盘)默认情况消息存在内存中
* 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true:可以多个消费者消费
* 4.是否自动删除,最后一个消费端开链接以后,改队列是否自动删除 true:自动删除
* 5.其他参数(延迟或死信)
*/
channel.queueDeclare(queue_name,false,false,false,null);
//控制台中输入信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",queue_name,null,message.getBytes());
System.out.println("消息发送成功");
}
}
}



3 message response

    消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是消费者在收到消息并处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把改消息删除。

Automatic response
RabbitMQ in practice
method of message response
RabbitMQ in practice

3.1 Explanation of Multiple

The advantage of manual response is that it can respond in batches and reduce network congestion
. channel.basicAck(deliveryTag,true);
multiple's true and false represent different meanings.
true represents batch response to unanswered messages on the channel
. For example, there is a message with tag 5 on the channel. 6, 7, 8 If the current tag is 8, then the unanswered messages from 5 to 8 will be confirmed. The message received message response is false. Compared with the above, only the three messages 5, 6, and 7 with tag = 8 will be answered. The message will still not be acknowledged and the message response will be received.

3.2 Messages are automatically re-queued

If the consumer loses connection for some reason (its channel is closed, the connection is closed, or the TCP connection is lost) and the message is not sent an ACK confirmation, RabbitMQ will understand that the message was not fully processed and will requeue it. If the consumer can handle it at this point, it will quickly redistribute it to another consumer. This way, even if a consumer dies occasionally, you can be sure that no messages are lost.
RabbitMQ in practice

3.3 Producers

    package com.atguigu.springcloud.three;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

/**
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Task2 {

public static final String task_queue_name="ack_queue";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
boolean durable= true;//需要让queue持久化
channel.queueDeclare(task_queue_name,durable,false,false,null);
//从控制台中输入信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
//MessageProperties.PERSISTENT_TEXT_PLAIN :持久化消息到硬盘
channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("生产者发送消息"+message);
}

}
}


3.4 Consumer 1 Processing is fast

    package com.atguigu.springcloud.three;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.atguigu.springcloud.util.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Work03 {

public static final String task_queue_name="ack_queue";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间短");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody()));
//手动应答
/**
* 1.消息的标记 tag
* 2.是否批量应答 false:不批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//设置能者多劳模式
//int prefetchCount = 1;
//比1大的都是预取值 与消费者2成正比
int prefetchCount = 2;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag ->{
System.out.println(consumerTag+"消费者取消消费接口逻辑");
}));
}
}



3.5 Consumer 2 processes slowly. If it fails, Consumer 1 will consume the message.

    package com.atguigu.springcloud.three;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.atguigu.springcloud.util.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Work04 {

public static final String task_queue_name="ack_queue";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理时间短");
DeliverCallback deliverCallback = (consumerTag,message) ->{
SleepUtils.sleep(30);
System.out.println("接收到的消息:"+new String(message.getBody()));
//手动应答
/**
* 1.消息的标记 tag
* 2.是否批量应答 false:不批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//设置能者多劳模式
//int prefetchCount = 1;
//比1大的都是预取值 与消费者1成正比
int prefetchCount = 5;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag ->{
System.out.println(consumerTag+"消费者取消消费接口逻辑");
}));
}
}



4. RabbitMQ persistence

4.1 How to achieve persistence in queues

You need to set the durable parameter to persistence
boolean durable= true;//You need to make the queue persistent
channel.queueDeclare(task_queue_name,durable,false,false,null);
RabbitMQ in practice
RabbitMQ in practice

4.2 Message persistence

RabbitMQ in practice

4.3 Unfair distribution

channel.basicQos(1);//Not setting it to 1 means fair distribution
RabbitMQ in practice

4.4 Prefetch value (distributed proportionally)

If there are 7 messages, consumer 1 will execute 2 messages first, consumer 2 will execute 5 messages first, and the rest will be distributed according to the can-do-more-work mode. Settings
in consumer 1
//Set the can-do-more-work mode
//int prefetchCount = 1;
//Prefetch values ​​larger than 1 are proportional to consumer 2
int prefetchCount = 2;
channel.basicQos(prefetchCount);
RabbitMQ in practice
Settings in consumer 2
//Set the able-bodied mode
//int prefetchCount = 1;
//Any prefetch value larger than 1 is proportional to consumer 1
int prefetchCount = 5;
channel.basicQos(prefetchCount);
RabbitMQ in practice

5 Release confirmation

Release confirmation principle
RabbitMQ in practice

5.1 Enable publishing confirmation method

RabbitMQ in practice

5.2 Single confirmation release, batch confirmation release, asynchronous confirmation release

RabbitMQ in practice
How to handle asynchronous unacknowledged messages
RabbitMQ in practice

Single, batch, asynchronous confirmed publishing messages

    package com.atguigu.springcloud.four;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* 发布确认模式
* 使用的时间,比较那种方式是最好的
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
public class ConfirmMessage {
//批量发消息的个数
public static final int message_count = 1000;

public static void main(String[] args) throws Exception{
//1.单个确认
ConfirmMessage.publishMessageIndividually();//耗时641ms
//2.批量确认
//ConfirmMessage.publishMessageBathch();//耗时141ms
//3.异步确认
// ConfirmMessage.publishMessageIndividually();//耗时62ms
}

//单个确认
public static void publishMessageIndividually ()throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声名
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启确认发布
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < message_count; i++) {
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息马上进行发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功:"+message);

}
}
long end = System.currentTimeMillis();
System.out.println("发布"+message_count+"条单独确认消息,耗时"+(end-begin)+"ms");
}
//批量发布确认
public static void publishMessageBathch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声名
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启确认发布
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认消息大小
int batchSize = 100;

//批量发消息 批量发布确认
for (int i = 0; i < message_count; i++) {
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//判断达到100条消息的时候批量确认一次
if(i%batchSize==0){
//发布确认
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+message_count+"条批量确认消息,耗时"+(end-begin)+"ms");
}
//异步发布确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//队列的声名
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启确认发布
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况下
* 1.轻松的将需要与消息进行关联
* 2.轻松的批量删除条目,只要给到序号
* 3.支持高并发
*/
ConcurrentSkipListMap<Long,String> coutstandingConfirms = new ConcurrentSkipListMap<>();
//开始时间
long begin = System.currentTimeMillis();


//准备消息的监听器,监听那些消息成功了,那些消息失败了
//消息确认成功回调函数
ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
if(multiple){
//2.删除掉已经确认的消息,剩下的就是未确认的消息
ConcurrentNavigableMap<Long, String> confirmed = coutstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else{
coutstandingConfirms.remove(deliveryTag);
}

System.out.println("确认的消息:"+deliveryTag);
};
//消息确认失败回调函数
/**
* 1.消息的标识
* 2.是否为批量确认
*/
ConfirmCallback nackCallback = (deliveryTag,multiple) ->{
System.out.println("未确认的消息:"+deliveryTag );
};
/**
* 1.监听那些消息成功
* 2.监听那些消息失败
*/
channel.addConfirmListener(ackCallback,nackCallback);//异步通知

//批量发送消息
for (int i = 0; i < message_count; i++) {
String message = "消息"+i;
channel.basicPublish("",queueName,null,message.getBytes());
//1.此处记录下所有要发送的消息,消息的总和
coutstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}

long end = System.currentTimeMillis();
System.out.println("发布"+message_count+"条异步确认消息,耗时"+(end-begin)+"ms");
}
}


5.3 Comparison of release confirmation speed among the above 3

Publish messages individually: wait for confirmation synchronously, simple, but has limited throughput.
Publish messages in batches: simple and reasonable throughput. Once a problem occurs, it is difficult to determine which message has the problem.
Asynchronous processing: Best performance and resource usage, good control in case of errors, but more complicated to implement.

6 switches

RabbitMQ in practice
RabbitMQ in practice

6.1 Publish and subscribe model

RabbitMQ in practice

6.1.1 Concept of Exchanges

The core idea of ​​RabbitMq's messaging model is that messages produced by producers are never sent directly to queues. In fact, often the producer doesn't even know which queues the messages are being delivered to.
On the contrary, the producer can only send messages to the exchange (exchange). The work of the exchange is very simple. On the one hand, it receives messages from the producer, and on the other hand, it pushes them into the queue. The switch must know exactly what to do with the received message. Should these messages be placed in a specific queue, pushed to many queues, or discarded. This depends on the type of switch.
RabbitMQ in practice

6.1.2 Types of Exchanges

Direct/Route
Topic
Headers
Fanout/Publish Subscribe (fanout)

6.1.3 Unnamed type/exchange

RabbitMQ in practice

6.1.4 Temporary queue

RabbitMQ in practice
RabbitMQ in practice

6.1.5 Bindings

The bridge between exhange and queue, it tells us that exchange is bound to that queue. For example, the picture below tells us that x is bound to Q1 and Q2.
RabbitMQ in practice
Manual binding
RabbitMQ in practice
. Queues under the same switch are distinguished by RoutingKey.
RabbitMQ in practice

6.2 Fanout

RabbitMQ in practice
RabbitMQ in practice
Producer->Exchange->Queue with the same binding relationship->Consumer 1 Consumer 2
RabbitMQ in practice
The binding relationship between Logs and temporary queue is as shown below
RabbitMQ in practice

6.2.1 Code implementation

producer

    package com.atguigu.springcloud.five;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.RabbitUtils;

import java.util.Scanner;

/**
*
*/
public class EmitLog {
private static final String EXCHANGE_NAME= "logs";

public static void main(String[] args) throws Exception{
try (Channel channel = RabbitMqUtils.getChannel()){
/**
* 声名一个exchange
* 1.exchange的名称
* 2.exchange的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()){
String message= sc.nextLine();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}

}
}


Consumer 1

    package com.atguigu.springcloud.five;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 消息的接收
*/
public class ReceiveLogs01 {
public static final String EXHCANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名一个交换机 扇出类型
channel.exchangeDeclare(EXHCANGE_NAME,"fanout");
//声名一个队列 临时队列
/**
* 生成一个临时队列,队列名称是随机的
* 当消费者断开与队列的链接,队列就删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName,EXHCANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印到屏幕上.....");
//接收消息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("控制台打印接收到的消息:"+ new String(message.getBody(),"UTF-8"));
};
//消费者取消消息时回调接口
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}


Consumer 2

    package com.atguigu.springcloud.five;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 消息的接收
*/
public class ReceiveLogs02 {
public static final String EXHCANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名一个交换机
channel.exchangeDeclare(EXHCANGE_NAME,"fanout");
//声名一个队列 临时队列
/**
* 生成一个临时队列,队列名称是随机的
* 当消费者断开与队列的链接,队列就删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName,EXHCANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印到屏幕上.....");
//接收消息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("ReceiveLogs02控制台打印接收到的消息:"+ new String(message.getBody(),"UTF-8"));
};
//消费者取消消息时回调接口
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}


6.3 Direct exchange/routing mode Direct exchange

RountingKey is not the same
RabbitMQ in practice

RabbitMQ in practice
multiple bindings
RabbitMQ in practice

6.3.1 Actual combat

RabbitMQ in practice
RabbitMQ in practice

6.3.2 Code

producer

    package com.atguigu.springcloud.six;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
*/
public class DirectLogs {
private static final String EXCHANGE_NAME= "direct_logs";

public static void main(String[] args) throws Exception{
try (Channel channel = RabbitMqUtils.getChannel()){
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()){
String message= sc.nextLine();
// channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}

}
}


Consumer 1

    package com.atguigu.springcloud.six;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
*/
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声名一个队列名字 不持久化 不共享 不自动删除
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+ new String(message.getBody(),"UTF-8"));
};
//消费者取消消息时回调接口
channel.basicConsume("console",true,deliverCallback,consumerTag ->{});
}
}


Consumer 2

    package com.atguigu.springcloud.six;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
*/
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声名一个队列名字 不持久化 不共享 不自动删除
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANGE_NAME,"error");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+ new String(message.getBody(),"UTF-8"));
};
//消费者取消消息时回调接口
channel.basicConsume("disk",true,deliverCallback,consumerTag ->{});
}
}


6.4 Topics

之前类型的问题
RabbitMQ in practice
RabbitMQ in practice
topic的要求
RabbitMQ in practice
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词

6.4.1 Topic匹配案例

下图绑定关系如下
Q1–>绑定的是 中间带orange带3个单词的字符串( .orange.
Q2–>绑定的是 最后一个单词是rabbit的3个单词( ..rabbit)
第一个单词是lazy的多个单词(lazy.#)
RabbitMQ in practice
RabbitMQ in practice
RabbitMQ in practice
当队列绑定关系是下列这种情况试需要引起注意
当一个队列绑定的是#,那么这个队列将接收所有数据,有点像fanout(扇出)了
如果队列绑定键中没有#和*出现,那么该队列绑定类型就是direct(直接)了

6.4.2 实战

RabbitMQ in practice
生产者

    package com.atguigu.springcloud.seven;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
* 生产者
*/
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**
* 绑定关系如下
* Q1-->绑定的是
* 中间带orange带3个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是rabbit的3个单词(*.*.rabblt)
* 第一个单词是lazy的多个单词(lazy.#)
*/
Map<String,String> bindingKeyMap = new HashMap();
bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任务绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);

}
}
}


消费者1

    package com.atguigu.springcloud.seven;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 声名主题交换机及相关队列
*/
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
//接收消息
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声名队列
String queuename="Q1";
channel.queueDeclare(queuename,false,false,false,null);
channel.queueBind(queuename,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+queuename+"绑定键:"+message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queuename,true,deliverCallback,consumerTag -> {});
}
}


消费者2

    package com.atguigu.springcloud.seven;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 声名主题交换机及相关队列
*/
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
//接收消息
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声名队列
String queuename="Q2";
channel.queueDeclare(queuename,false,false,false,null);
channel.queueBind(queuename,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queuename,EXCHANGE_NAME,"lazy.#");
System.out.println("ReceiveLogsTopic02等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+queuename+"绑定键:"+message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queuename,true,deliverCallback,consumerTag -> {});
}
}


7 死信队列

7.1 死信队列概念

RabbitMQ in practice
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQde 死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后再指定时间未支付时自动失效

7.2 死信的来源

消息TTL过期
队列达到最大长度(队列满了,无法添加数据到mq中)
消息被拒绝(basic.reject或basic.nack)并且requeue=false.

7.3 死信实战

RabbitMQ in practice

7.4代码

生产者

    package com.atguigu.springcloud.eight;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

/**
* 死信队列 生产者代码
*/
public class Producer {
//普通交换机的名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//死信消息 设置TTL时间 单位是毫秒
//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message="info"+i;
//channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());

}
}
}


消费者1 (绑定与死信队列关系)

    package com.atguigu.springcloud.eight;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
* 死信队列实战
* 消费者1
*/
public class Consumer01 {
//普通交换机的名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机的名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声名死信和普通的交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声名普通队列
Map<String,Object> arguments = new HashMap();
//过期时间 设置到生产者更灵活
arguments.put("x-message-ttl",10000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
//设置正常队列长度的限制
//arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//声名死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通的交换机与队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机与死信的队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息。。。。。");
DeliverCallback deliverCallback = (consumerTag,message) ->{
String msg = new String(message.getBody(),"UTF-8");
if(msg.equals("info5")){
System.out.println("Consumer01接收的消息时:"+msg+":此消息时被C1拒绝的");
//参数说明 1:消息的标签 2:是否放回原队列(放到了死信)
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println("Consumer01接收的消息时:"+msg);
}
};
//开启手动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag ->{});
}
}



消费者2(死信队列)

    package com.atguigu.springcloud.eight;

import com.atguigu.springcloud.service.impl.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
* 死信队列实战
* 消费者1
*/
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";

public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息。。。。。");
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("Consumer02接收的消息时:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag ->{});

}
}


7.5 队列长度的限制

超出6个成为死信
RabbitMQ in practice
有长度限制后生产者就不需要设置超时时间
RabbitMQ in practice

7.6 被消费者拒绝成为死信

RabbitMQ in practice

8 延迟队列

使用场景
RabbitMQ in practice
RabbitMQ in practice

8.1 代码架构图

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下:
RabbitMQ in practice
这种方式每增加一个新的时间需求,就需要新增一个队列。

8.2 代码

8.2.1 TtlQueueConfig
    package com.atguigu.springcloud.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* ttl队列,配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
//普通交换机的名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LETTER_QUEUE = "QD";
//普通队列的名称
public static final String QUEUE_C = "QC";

//声名QC
@Bean("queueC")
public Queue queueC(){
Map<String,Object> arguments = new HashMap(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
//ttl 不写就是适合所有时间的队列
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}

@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}


//声名xExchange 别名
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}

//声名yExchange 别名
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声名普通队列 ttl:10s
@Bean("queueA")
public Queue queueA(){
Map<String,Object> arguments = new HashMap(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
//设置TTL
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
//声名普通队列 ttl:40s
@Bean("queueB")
public Queue queueB(){
Map<String,Object> arguments = new HashMap(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
//设置TTL
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}

//死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}


8.2.2 SendMessageController
    package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.config.DelayedQueueConfig;
import com.atguigu.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Date;
import java.util.List;

/**
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {

@Resource
private IMessageProvider messageProvider;

@Autowired
private RabbitTemplate rabbitTemplate;


@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
//开始发消息 消息 ttl
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒消息给TTL队列:{}",new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,msg ->{
//发送消息的延时
msg .getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
//开始发送消息,基于插件的消息及延迟的时间
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延时队列delayed.queue:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME
,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg ->{
//发送消息的时候 延时时长 单位:ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});

}



}


8.2.3 DeadLetterQueueConsumer
    package com.atguigu.springcloud.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 队列TTL消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接收消息
@RabbitListener(queues="QD")
public void receiveD(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);

}
}


8.3 优化

在这里新增了一个队列QC,绑定关系如下,该队列不设置TTL时间
RabbitMQ in practice

QC不设置时间
RabbitMQ in practice

8.4 问题

RabbitMQ in practice

8.5 Rabbitmq 插件实现延迟队列

在官网上下载https://www.rabbitmq.com/community-plugins.html下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMq的插件目录。进入rabbitMq的安装目录下的plgins目录,执行下面命令让插件生效,然后重启RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ in practice
装完插件在交换机会多一个类型
RabbitMQ in practice

8.6 代码架构图

RabbitMQ in practice

8.6.1 DelayedQueueConfig
    package com.atguigu.springcloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY="delayed.routingkey";


@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}


//声名交换机 基于插件
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> arguments = new HashMap();
arguments.put("x-delayed-type","direct");
/**
* 1.交换机名称
* 2.交换机类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}

@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange
){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}


DelayQueueConsumer

    package com.atguigu.springcloud.consumer;

import com.atguigu.springcloud.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 延时队列消费者 ,基于插件的延时
*/
@Slf4j
@Component
public class DelayQueueConsumer {
//监听消息
@RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}",new Date().toString(),msg);
}

}


参考8.2.3
RabbitMQ in practice

8.7 总结

RabbitMQ特性:消息可靠发送,消息可靠投递。死信队列保障消息至少消费一次以及未被正确处理的消息不会被丢弃。另外通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有其他选择,比如利用java的DelayQueue,利用Redis的zset,利用Quart或者利用kafka的时间轮,这些方式各有各的特点,看需要适用的场景

9 发布确认高级

RabbitMQ in practice

9.1 确认机制方案

RabbitMQ in practice

9.2 代码架构图

RabbitMQ in practice

9.3 配置文件

RabbitMQ in practice
RabbitMQ in practice

    spring:
application:
name: cloud-stream-provider
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启交换机确认回调
publisher-returns: true #一旦交换机路由不出去消息会回退消息

9.4 代码

9.4.1 ConfirmConfig
    package com.atguigu.springcloud.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 配置类 发布确认(高级)
*/
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME="confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY="key1";
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";

//声名交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
//声名队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);

}
//备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//备份队列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
//报警队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingbackupExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(backupQueue).to(fanoutExchange);
}
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(warningQueue).to(fanoutExchange);
}

}


9.4.2 SendMessageController
    package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.config.DelayedQueueConfig;
import com.atguigu.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Date;
import java.util.List;

/**
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {

@Resource
private IMessageProvider messageProvider;

@Autowired
private RabbitTemplate rabbitTemplate;


@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
//开始发消息 消息 ttl
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒消息给TTL队列:{}",new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X", "XC",message,msg ->{
//发送消息的延时
msg .getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
//开始发送消息,基于插件的消息及延迟的时间
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延时队列delayed.queue:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME
,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg ->{
//发送消息的时候 延时时长 单位:ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});

}



}


9.4.3 ProducerController
    package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 开始发消息 测试d发布确认
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME ,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息内容为:{}",message+"key1");

//错误的RoutingKey 交换机会收到消息但是队列不会收到
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME ,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message+"key12",correlationData2);
log.info("发送消息内容为:{}",message+"key12");

//改错交换机的名称会出现回退
CorrelationData correlationData3 = new CorrelationData("3");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123" ,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData3);
log.info("发送消息内容为:{}",message+"key1");
}
}



9.4.4 Consumer
    package com.atguigu.springcloud.consumer;

import com.atguigu.springcloud.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 接收 发布确认的消息
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",msg);
}
}


9.4.5 MyCallBack
    package com.atguigu.springcloud.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* 回调接口
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
//

@Autowired
private RabbitTemplate rabbitTemplate;

//注入到RabbitTemplate
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

/**
* 交换机确认回调方法
* 1.发消息 交换机收到了 回调
* 1.1 correlationData 保存回调消息的ID及相关信息
* 1.2 交换机收到消息 ack = true
* 1.3 cause null
* 2.发消息 交换机接收失败了 回调
* 2.1 correlationData 保存回调消息的id及相关信息
* 2.2 交换机收到消息 ack = false
* 2.3 cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到id为:{}的消息",id);

}else{
log.info("交换机还未收到id为:{}的消息,原因为:{}",id,cause);
}
}



//可以在当消息传递过程中不可达目的地时将消息返还给生产者
//只有不可达目的地的时候才进行回退
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息:{},被交换机{}退回,退回原因:{},路由key.{}",
new String(message.getBody()),exchange,replyText,routingKey);
}
}


9.4.6 回退消息

Mandatory参数
RabbitMQ in practice
RabbitMQ in practice

9.4.7 备份交换机

RabbitMQ in practice
如果消息发给交换机交换机接收不到消息就发送给备份的交换机
RabbitMQ in practice
RabbitMQ in practice
BackupConsumer

    package com.atguigu.springcloud.consumer;

import com.atguigu.springcloud.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 备份队列
*/

@Slf4j
@Component
public class BackupConsumer {
//接收备份消息
@RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.error("备份队列的消息:{}",msg);
}
}


WarningConsumer

    package com.atguigu.springcloud.consumer;

import com.atguigu.springcloud.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 报警消费者
*/
@Slf4j
@Component
public class WarningConsumer {
//接收报警消息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}",msg);
}
}


同时配置回退消息和备份消息优先走备份消息

10 RabbitMQ其他知识点

10.1 幂等性

RabbitMQ in practice

10.2 消息重复消费

RabbitMQ in practice
解决思路
RabbitMQ in practice
消费端的幂等性保障
RabbitMQ in practice
Redis原子性
RabbitMQ in practice

10.3 优先级队列

RabbitMQ in practice
如何实现
RabbitMQ in practice
队列中添加优先级
RabbitMQ in practice
RabbitMQ in practice
消息中添加代码优先级
RabbitMQ in practice

注意事项
RabbitMQ in practice
RabbitMQ in practice

10.4 惰性队列

正常队列消息保存在内存中
惰性队列消息保存在磁盘中
RabbitMQ in practice
两种模式
RabbitMQ in practice

11 集群管理

11.1 搭建步骤

1.修改3台机器的主机名称
vim/etc/hostname
2.配置各个节点上的hosts文件,让各个节点都能互相认识对方
vim/etc/hosts
10...** node1
10...** node2
10...** node3
RabbitMQ in practice
3.确保各个节点上的cookie文件使用的是同一个值
在node1 上执行远程操作命令
RabbitMQ in practice
RabbitMQ in practice
4.启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
5.在节点2执行

RabbitMQ in practice
6.在节点3执行
RabbitMQ in practice
7.查询集群状态
rabbitmqctl cluster_status

8.需要重新设置用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p “/” admin “." ". ” “.*”
9.解除集群节点(node2 和 node3机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)

12 镜像队列

RabbitMQ in practice
RabbitMQ in practice

12.1 搭建步骤

1. Start three cluster nodes
2. Find any node and add policy
RabbitMQ in practice

RabbitMQ in practice
name: name
pattem: queue name starting with **
ha-mode=exactly The backup mode is specified
ha-params=2 This parameter specifies the previous parameter
ha-sync-mode=automatic The synchronization mode is automatic synchronization

3. Create a queue on node1 to send a message. The queue has a mirror queue.

13 Haproxy implements load balancing

RabbitMQ in practice
RabbitMQ in practice

14 Federation Exchange Federation Exchange

Building steps
1. Need to ensure that each node runs independently
2. Enable federation-related plug-ins
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
RabbitMQ in practice
3. Schematic (first run consumer to create fed_exchange on node2)

RabbitMQ in practice

RabbitMQ in practice
4. Configure upstream (node1) in downstaream (node2)
RabbitMQ in practice
4. Add policy
RabbitMQ in practice

15 Federation Queue Federation Switch

Federated queues can provide load balancing for a single queue among multiple Broker nodes or clusters. A federated queue can connect to one or more upstream queues (upstream queues), and obtain messages from these upstream queues to meet the needs of local consumers for consuming messages.
Construction steps
1. Schematic diagram
RabbitMQ in practice
2. Add upstream (same as above)
3. Add policy
RabbitMQ in practice

16 Shovel

The data forwarding function of the federation script is similar. Shovel is reliable enough to continuously pull data from the queue in one Broker and forward it to the exchange in another Broker. The queue as the source end and the exchanger as the destination end can be located on the same Broker at the same time, or they can be located on different Brokers. Shovel can be translated as a shovel. This shovel can shovel from one side of the message to the other.
Building steps
1. Enable the plug-in
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
RabbitMQ in practice
RabbitMQ in practice
RabbitMQ in practice