Skip to main content

The explanation of RabbitMQ is so thorough, I admire it!

The explanation of RabbitMQ is so thorough, I admire it!

The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
https://www.rabbitmq.com/install-homebrew.html
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

The final output of successful execution is as follows:

The explanation of RabbitMQ is so thorough, I admire it!

Start the service:

    # Startup method 1: background startup
brew services start rabbitmq
# Startup method 2: Current window startup
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server

Enter in the browser:

    http://localhost:15672/

The RabbitMQ background management interface will appear (the username and password are both guest):

The explanation of RabbitMQ is so thorough, I admire it!

    ## Add account number
. /rabbitmqctl add_user admin admin
## Add access rights .
. /rabbitmqctl set_permissions -p "/" admin ". *" ". *" ". *"
## Set super permissions
. /rabbitmqctl set_user_tags admin administrator

The explanation of RabbitMQ is so thorough, I admire it!

    <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>


![The explanation of RabbitMQ is so thorough, I admire it!](6b44e99974d17195ee2722671aabdc1f.png)

Start writing code:

    public class RabbitMqTest {
// message queue name
private final static String QUEUE_NAME = "hello";

@Test
public void send() throws java.io.IOException, TimeoutException {
// Create the connection project
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("127.0.0.1"); factory.setPort(5672)
factory.setUsername("admin"); factory.setPassword("admin"); factory.setPassword("admin")
factory.setUsername("admin"); factory.setPassword("admin");
// Create a connection
Connection connection = factory.newConnection(); //Create the message channel.
// Create a message channel
Channel channel = connection.createChannel(); //Generate a message queue.
// Generate a message queue
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;
//publish the message, the first parameter is the route (Exchange name), "" means use the default message route
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// Close the message channel and connection
channel.close();
connection.close();
}

@Test
public void consumer() throws java.io.IOException, TimeoutException {
// Create the connection factory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672); factory.setUsername("127.0.0.1"); factory.setPort(5672)
factory.setUsername("admin"); factory.setPassword("admin"); factory.setPassword("admin")
factory.setUsername("admin"); factory.setPassword("admin");
// Create a connection
Connection connection = factory.newConnection();
//create the message channel
final Channel channel = connection.createChannel();
// message queue
channel.queueDeclare(QUEUE_NAME, true, false, false, null);; //MessageQueue(QUEUE_NAME, true, false, null)
System.out.println("[*] Waiting for message. To exist press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}

The console output after executing send():

    [x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'

The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

For code explanations in the examples, you can directly refer to the official website : https://www.rabbitmq.com/tutorials/tutorial-one-java.html

The explanation of RabbitMQ is so thorough, I admire it!

    public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
// Create a connection project, the default case is given below
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}

Package generator:

    public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
// Create a connection
Connection connection = factory.newConnection(); //create the connection
// Create a message channel
Channel channel = connection.createChannel(); // Create a message channel.
// Declare the messages in the exchange to be persistent and not automatically deleted
channel.exchangeDeclare(exchange, exchangeType, true, false, null); // Publish the message.
// Publish the message
channel.basicPublish(exchange, toutingKey, null, message.getBytes()); // Publish the message; channel.exchangeDeclare(exchange, exchangeType, true, false, null); // Publish the message.
System.out.println("Sent '" + message + "'"); // Publish the message.
System.out.println("Sent '" + message + "'"); channel.close();
connection.close();
}
}


Encapsulated consumer:

    public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
// Create a connection
Connection connection = factory.newConnection();
//create the message channel
final Channel channel = connection.createChannel();
// message queue
channel.queueDeclare(queue, true, false, false, null); //message queue = connection.createChannel(); //message queue = connection.createChannel()
// Bind the queue to the switch
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// Cancel auto-ack
channel.basicConsume(queue, false, consumer);
}
}

The explanation of RabbitMQ is so thorough, I admire it!

    public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg(routingKey[i % 3], msg + i);
}
System.out.println("----over-------");
Thread.sleep(1000 * 60 * 100);
}
}

The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

    public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc"};

for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
}


Output after execution:

    [*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 0
[x] Done
[x] Received 'hello >>> 3
[x] Done
[x] Received 'hello >>> 6
[x] Done
[x] Received 'hello >>> 9
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 1
[x] Done
[x] Received 'hello >>> 4
[x] Done
[x] Received 'hello >>> 7
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 2
[x] Done
[x] Received 'hello >>> 5
[x] Done
[x] Received 'hello >>> 8
[x] Done


The explanation of RabbitMQ is so thorough, I admire it!

    public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 将qc修改为qc1

for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}

The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

    public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg("", msg + i);
}
}
}

consumer:

    public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], "");
}
}
}


The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

    public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
// Create a connection
Connection connection = factory.newConnection();
//create a message channel
Channel channel = connection.createChannel();

// Declare the message in exchange
channel.exchangeDeclare(exchange, exchangeType); // Publish the message.

// Publish the message
channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));; // publish the message.

System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}


Consumer packaging:

    public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare(exchange, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}

Producer:

    public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void publishMsg(String msg) {
try {
MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10000; i++) {
directProducer.publishMsg(msg + i);
}
}
}


consumer:

    public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void msgConsumer() {
try {
MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
for (int i = 0; i < 3; i++) {
consumer.msgConsumer();
}
}
}


The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

For code details, please see the official website: https://www.rabbitmq.com/tutorials/tutorial-five-java.html
For more methods, please check the official website directly: https://www.rabbitmq.com/getstarted.html
The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

    /**
* Declare an exchange.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
6.1.1 durable

The explanation of RabbitMQ is so thorough, I admire it!
The explanation of RabbitMQ is so thorough, I admire it!

    // Active ack/nak after receiving a message
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// Cancel auto-ack
channel.basicConsume(queue, false, consumer);