About the message queue issue after Spring-cloud upgrade (RabbitMq)
About the message queue issue after Spring-cloud upgrade (RabbitMq)
Versions below 4.0
1. Post a message
1.1 Send messages regularly
1.1.1 Import dependencies
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
1.1.2 Configuration classes and configuration files
Starting from 3.1, it is recommended to use functional programming mode to define message producers and consumers. Beans that implement Supplier in the Spring context will automatically document message producers:
@Configuration
public class MessageSender {
@Bean
public Supplier<Sample> sample1() {
return () -> new Sample(1, "Supplier");
}
}
Create a binding named sample-out-0, which is mapped to a specific message middleware target through a specific binder implementation (such as RabbitMQ Binder). In the implementation of RabbitMQ Binder, the default mapping is a Topic type exchanger named sample-out-0. The target name corresponding to the binding can be configured in the configuration file.
#spring.cloud.stream.bindings.bindingname.destination=destinationname
spring.cloud.stream.bindings.sample1-out-0.destination=sample1
Create a message producer, call the get method of the returned Supplier object once per second by default, and then send the data returned by get to the target corresponding to the binder of sample-out-0. Polling related properties can be configured through properties starting with spring.integration.poller.
Note: The binder implements the isolation between Spring Cloud Stream and message middleware, so that the message middleware used can be switched at will. Binding realizes the isolation of our code from the specific goals of the message middleware, which means that what we operate in the code is binding. What target the binding corresponds to in the message middleware is determined by the specific binder.
Configuration file
spring:
[rabbitmq](/search?q=rabbitmq): ##配置rabbitmq的信息
host: 192.168.5.104
port: 5672
username: rabbitmq
password: rabbitmq
cloud:
stream:
bindings: ## 配置绑定的目标
sample1-out-0:
destination: sample1
function: ## 配置哪些实现了函数式接口的bean要做为消息生成者,默认所有实现了Supplier接口的Beandefinition: sample1
1.2 Take the initiative to send messages
1.1 Import dependencies
Dependencies for sending messages at the same time
1.2 Configuration classes and configuration files
import org.springframework.cloud.stream.function.StreamBridge;
@Configuration
public class MessageSender {
@Autowired
private StreamBridge streamBridge;
/**
* 发送任意数据,如果绑定不存在则创建
*/
public void sample2(Sample sample) {
streamBridge.send("sample2-out-0", sample);
}
}
Configuration file
spring:
rabbitmq: ##配置rabbitmq的信息
...
cloud:
stream:
bindings: ## 配置绑定相关的名称
...
sample2-out-0:
destination: sample2
2. Consume news
2.1 Import dependencies
Dependencies on publishing messages
2.2 Configuration classes and configuration files
@Component
/*@EnableBinding可以加在任一个配置类中*/
@EnableBinding(SampleMessage.class)
public class StreamConfigurer {
}
/*注意:在配置类中通过@EnableBinding(SampleMessage.class)声明绑定*/
public interface SampleMessage {
String SAMPLE_1 = "sample1";
String SAMPLE_2 = "sample2";
/*可以通过spring.cloud.stream.bindings.绑定名称.destination=目标名称配置绑定对应的
目标名称*/
@Input(SAMPLE_1)
SubscribableChannel receiveMessage1();
@Input(SAMPLE_2)
SubscribableChannel receiveMessage2();
}
@Input(SAMPLE_1)
- Declare a subscription message binding. When implemented based on RabbitMQ, a queue and an exchanger bound to the queue will be created in RabbitMQ. If the exchanger has been created, it will be bound directly.
- The default queue name is: binding target name. Consumer group name. The default binding target name is the binding name (such as sample1); the default consumer group name is anonymous. Unique serial code. The bound target name and consumer group name are configurable
- The default exchange name is: binding target name. Modify the binding target name spring.cloud.stream.bindings.binding
name.destination=destination name
Note: When SubscribableChannel is declared, a message listener must be generated through @StreamListener, otherwise the program will not know how to process the message after it is obtained.
Configuration file
##Configure rabbitmq information
spring.rabbitmq.host=192.170.2.177
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbitmq
spring.rabbitmq.password=rabbitmq
2.3 Consuming news
@Component
public class SampleListener {
private static final Logger logger = LogManager.getLogger();
/*@StreamListener要有对应的@Input*/
@StreamListener(SampleMessage.SAMPLE_1)
public void listenerMessage1(Sample sample) {
logger.info("收到消息1:{},绑定的名称:{}", sample, SampleMessage.SAMPLE_1);
}
/*注意:如果订阅的是同一个目标,则返回值要一样*/
@StreamListener(SampleMessage.SAMPLE_2)
@SendTo({SampleMessage.SAMPLE_1})
public Sample listenerMessage2(Sample sample) {
logger.info("收到消息2:{},绑定的名称:{},回复消息给:{}", sample,
SampleMessage.SAMPLE_2,
SampleMessage.SAMPLE_1);
sample.setMessage("消息已经消费");
return sample;
}
}

Version 4.0 or above
Versions 4.0 and above have deleted the annotations for consuming messages, such as @EnableBinding, @Input, etc.
Consume news
1. Import dependencies
Same as above (just different versions)
2. Configuration classes and configuration files
In the configuration class, @Bean can instantiate an object that implements the Consumer <sample>
interface and override the only abstract method of the functional interface.
But it is not very suitable to consume messages in the configuration class. So you can instantiate a class that implements Consumer <sample>
and override the accept method as a method to consume messages. And scan the class with configuration class.
@Component
public class SampleConsumer implements Consumer<Sample> {
private static final Logger logger = LoggerFactory.getLogger(SampleConsumer .class);
@Autowired
private ProductsService productsService;
@Override
public void accept(Sample sample) {
logger.warn("sample:{}",sample);
}
}
Note :
The class that implements the Consumer interface will create an exchanger and corresponding binding queue with the same name in RabbitMQ. It actually has nothing to do with the message producer, so the name of the message converter needs to be the same as the message producer.
After 4.0, the configuration of changing the switch name is also different.
Configuration file
spring.cloud.stream.bindings.sampleConsumer-in-0.destination=sample1#Modify the converter name to be consistent with the message producer