1: consumer_pubsub1.java package com.example; 2: consumer_pubsub1.java package com.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; / * *

  • @ version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @description: consumer */

public class Consumer_PubSub1 {

public static void main(String[] args) throws IOException, TimeoutException { String queueName1 = "test_fanout_queue1"; String queueName2 = "test_fanout_queue2"; //1. Create a ConnectionFactory Factory = new ConnectionFactory(); // set the IP address factory.sethost ("121.196.161.240"); factory.setPort(5672); factory.setVirtualHost("/admin"); factory.setUsername("admin"); factory.setPassword("admin"); //3. Connection Connection Connection = Factory. NewConnection (); //4. Create Channel Channel = connection.createChannel(); // DURABLE () {// Durable () {// Durable () {// Durable () {// Durable () {// Durable () {// Durable () // Map<String autoDelete when there is no Consumer, Object> arguments information Chantile.queueClare ("work_queues",true,false,false,null); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} DefaultConsumer(Channel){// ConsumerTag message identifier // Enenvelope gets switch route // Body data @Overpass public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println(“consumerTag:”+consumerTag);

// System.out.println(“Exchange:”+envelope.getExchange());

// System.out.println(“RoutingKey:”+envelope.getRoutingKey());

// System.out.println(“properties:”+properties);

System.out.println("body:"+new String(body)); System.out.println(" Print log information to console "); }}; channel.basicConsume("test_fanout_queue1",true,consumer); }

} consumer 2: package com.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; / * *

  • @ version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @description: consumer */

public class Consumer_PubSub2 {

public static void main(String[] args) throws IOException, TimeoutException { String queueName1 = "test_fanout_queue1"; String queueName2 = "test_fanout_queue2"; //1. Create a ConnectionFactory Factory = new ConnectionFactory(); // set the IP address factory.sethost ("121.196.161.240"); factory.setPort(5672); factory.setVirtualHost("/admin"); factory.setUsername("admin"); factory.setPassword("admin"); //3. Connection Connection Connection = Factory. NewConnection (); //4. Create Channel Channel = connection.createChannel(); // DURABLE () {// Durable () {// Durable () {// Durable () {// Durable () {// Durable () {// Durable () // Map<String autoDelete when there is no Consumer, Object > the arguments/energy futures (https://www.gendan5.com/cf/ef.html) parameter information channel.queueDeclare("work_queues",true,false,false,null); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} DefaultConsumer(Channel){// ConsumerTag message identifier // Enenvelope gets switch route // Body data @Overpass public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println(“consumerTag:”+consumerTag);

// System.out.println(“Exchange:”+envelope.getExchange());

// System.out.println(“RoutingKey:”+envelope.getRoutingKey());

// System.out.println(“properties:”+properties);

System.out.println("body:"+new String(body)); System.out.println(" Save log information to database "); }}; channel.basicConsume("test_fanout_queue2",true,consumer); }

}