Skip to main content

amqp protocol, rabbitmq getting started, springboot integration rabbitmq

amqp protocol, getting started with rabbitmq, springboot integration with rabbitmq

Preface

This article draws heavily on [Understanding AMQP, the protocol used by RabbitMQ](https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by- rabbitmq/)
To learn [rabbitmq](/search?q=rabbitmq), you have to learn the amqp protocol, amqp full name Advanced Message Queuing Protocol (AMQP), rabbitmq implements amqp, of course. ActiveMQ also implements amqp. amqp has a few important concepts, producer (publisher), exchange, queue, consumer
The amqp protocol. amqp protocol, getting started with rabbitmq, springboot integration rabbitmq
When talking about MQ, we will also often talk about RocketMQ, see Why choose RocketMQ

The flow of a message from send to receive

The producer sends a message to the specified exchange, which decides which queue to bind to based on the routing-key in the message header and the binding-key of the queue; how does the consumer get the message? pull, depending on the configuration.

Structure of a message

A message consists of a header, properties, and data. The header is the one specified in the amq specification, similar to the one in the http specification, such as the routing-key that we will talk about later, the properties are customized by the business, and the data is the binary data;
amqp protocol, getting started with rabbitmq, springboot integration with rabbitmq
Usually we call the process of assigning a message to a queue binding, so what does the message structure have to do with binding? exchange takes the routing-key in the message header and matches it with the binding-key in the queue (each queue has a binding-key ) to do the matching. When a consumer creates a queue, it creates an exchange at the same time, associates the queue with the exchange, and the producer sends it to the specified exchange, thus realizing the message flow;

Exchange types

The following types of exchanges are common.

  • direct
    • binding key exactly matches the routing key, *, # will be treated as normal text;
  • topic
    • binding key's *, # are treated as wildcards, * can match only one word, # matches zero or more dot-separated words;
  • fanout, n. output (number of ends); spread (dispersal); fanout; a column of accounts.
    • The routing key and binding key are ignored, and received messages are forwarded to the full binding queue, in what is commonly known as broadcast mode ;

For example, there is a message, its routing key is NYSE.TECH.MSFT , exchange is topic type, then match the queue binding key results
amqp protocol, rabbitmq primer, springboot integration rabbitmq
The above contains the vast majority of cases. In fact, a queue can have multiple consumers, an exchange can have multiple queues, and a queue can have multiple exchanges.

Application scenarios

RPC (Remote Procedure Call)

amqp protocol, getting started with rabbitmq, springboot integration with rabbitmq
The process goes like this

  1. the client sends a message to the queue, of course, the routing key of the message to match the service. 2. exchange passes the message to the queue.
  2. exchange passes the message to service, service does some operations and returns a response message to exchange, the response message specifies the routing_key to match the response queue. 3. the client receives the message from the response queue.
  3. The client gets the message from the response queue.

Whether this process is blocking or non-blocking depends on the client library, either way it can be done. [AmqpTemplate#sendAndReceive](https://docs.spring.io/spring-amqp/api/org/springframework/amqp/core/AmqpTemplate.html# sendAndReceive%28java.lang.String,java.lang.String,org.springframework.amqp.core.Message%29) corresponds to this pattern

Publish/Subscribe

amqp protocol, getting started with rabbitmq, springboot integration with rabbitmq

A message is consumed by multiple consumers, also called broadcast mode , and in activemq is called topic.

Task distribution

A message will only be consumed by one of multiple consumers, called p2p (peer-to-peer) in activemq.
amqp protocol, getting started with rabbitmq, springboot integration with rabbitmq

And finally a diagram summarizing myselfamqp protocol, getting started with rabbitmq, springboot integration with rabbitmq

jms vs amqp

Refer to [Message Queues (why message queues, comparison of common message queues, who is better to use JMS and AMQP?)] (https://zhuanlan.zhihu.com/p/341764782), simply put JMS is the specification of java, with the language is relevant, and amqp is the protocol, independent of the language, jms provides point-to-point , publish/subscribe two modes, amq also provides these two modes, and on the basis of this, the routing provides a variety of options.

springboot integration rabbitmq

springboot can easily integrate rabbitmq by introducing spring-boot-starter-amqp, and then add configuration to the configuration file

    spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"

send a message

    @Component
public class MyBean {

private final AmqpAdmin amqpAdmin;

private final AmqpTemplate amqpTemplate;

public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}

// ...

}

Send retries

The default template disables retries, you can enable them, note that this is a send (AmqpTemplate ) retry;

      rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"


Consume retries

Receiving messages

    @Component
public class MyBean {

@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}

}

Note that consumers have the concepts of retry and requeue; by default rabbitmq.listener.simple.retry.enabled=false, retry is disabled; because rabbitmq.listener.simple.default-requeue -rejected=true, if the listener throws an exception, then it will rejoin the queue and thus will retry indefinitely; if you don't want to retry indefinitely, there are 2 ways to solve this.

  • Set defaultRequeueRejected property to false, in this case, consumption failure will not be reentered into the queue or throw an AmqpRejectAndDontRequeueException to notify the message does not need to be reentered into the queue.
  • Turn on retries and set the maximum number of retries. For example, to set the maximum number of retries
    rabbitmq.
rabbitmq.
simple.
acknowledge-mode: auto # After the retry count is exceeded, the message is automatically acknowledged and removed from the queue
rabbitmq: listener: simple: acknowledge-mode: auto
# Enable retry, set the maximum number of retries.
enabled: true
max-attempts: 3
max-interval: 6000
initial-interval: 2000
# Do not requeue if consumption fails
default-requeue-rejected: false

Send can be retried, Listener can be retried, note the distinction;

Message acknowledgment, message loss

Acknowledgement mechanism refers to the question of whether the broker should or should not delete a message if there is an error or a network error while the consumer is processing the message.The AMQP 0-9-1 specification provides mechanisms for the consumer to control.

  • Automatic acknowledgement, where the consumer receives a sent message and deletes it regardless of the success or failure of the consumption
  • Manual acknowledgement, delete the message after receiving an explicit acknowledgement back from the consumer.

consumer consumption may appear processing success, processing failure, multiple processing failure, accordingly, there are ack, reject, nack (rabbitmq extends the AMQP 0-9-1 specification, self-initiated) three commands, with the spring source code may not be the same, the spring source code.

    package org.springframework.amqp.core;
public enum AcknowledgeMode {
/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
//Auto-acknowledgement, the consumer receives the message and is removed from the broker regardless of whether the consumption was successful or not
NONE,

/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
//Manual confirmation, the consumer explicitly returns the confirmation before it is removed from the broker
MANUAL,

/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally, or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
//Manual confirmation, java implementation of self-created, container according to the listener execution results, the success of the return ack, the failure of the return nack
AUTO;
}



amqp protocol, rabbitmq primer, springboot integration rabbitmq Refer to Message acknowledgment, [AMQP 0-9-1 Message Acknowledgements](https:// www.rabbitmq.com/tutorials/amqp-concepts.html#consumer-acknowledgements)

publisher lost message

Because of the uncertainty of the network, the message may be lost when the publisher sends it to the broker, the usual solution is to write the message to the database before the publisher sends it, and then compensate manually if the mq server really loses it; it is also possible that the broker has made an error in processing the message, which leads to the loss of the message;

mq server lost message

If the mq server is restarted, then the queue and messages are lost, you can configure the queue and messages to be persistent, so that the restart will not lose the queue and messages. But this is not absolutely will not be lost, because the mq server receives message will be temporarily put into the cache at the right time and then write to disk, if you really need to be absolutely reliable can be considered publisher confirm publisher-confirms);

consumer lost message

By default, after a broker sends a message to a consumer, it deletes the message immediately; if it takes a while to process the message, and the consumer terminates before it is processed, the message is lost, along with any other messages it has received that it hasn't had a chance to process; how do you solve this problem? Consumer acknowledgment mechanism. After processing the message, it sends an ack, the broker receives the ack and then deletes the message. If the ack is never received (the default is 30 minutes), it will be put into the messages_unacknowledged queue.

Summary

The key to realizing peer-to-peer or broadcast mode in rabbitmq is the type of exchange.

  • Peer-to-peer mode
    • exchange using direct, topic can be realized
  • broadcast mode
    • exchange is implemented using fanout