This is the 10th day of my participation in the August More text Challenge. For details, see: August More Text Challenge


Related articles

RabbitMQ series: RabbitMQ series


preface

  • In the beginning we learned about the rotation distribution used by RabbitMQ to distribute messages, but in some scenarios this strategy is not very good.

  • For example, there are two customers in processing tasks, one of the consumer A processing speed is very fast, and the other A consumer B processing speed is very slow, this time we are still using distributed training in rotation will be to the processing speed of A large part of the idle time, the consumers and the consumers have been dealing with slow in work.

  • This is not a good allocation in this case, but RabbitMQ is unaware of it and still distributes fairly.

  • At this time, we can use unfair sharing to achieve, is the model of more work! You can do it. Oh, you can do more. I’m a better eater, so I work less.

1. Unfair distribution

  • producers

    • /** * This is a test producer *@author DingYongJun
       *@date2021/8/1 * /
      public class DyProducerTest_xiaoxiyingda {
          /** * For convenience, we use the main function to test *@param args
           */
          public static void main(String[] args) throws Exception{
              // Use the utility class to create the channel
              Channel channel = RabbitMqUtils.getChannel();
      
              /** * generate a queue * 1. Queue name * 2. Whether messages in the queue are persistent * 3. Is the queue only for one consumer to consume is it shared true Multiple consumers can consume * 4. Indicates whether the queue is automatically deleted after the last consumer disconnect. True Indicates whether the queue is automatically deleted * 5. Other parameters */
              // This is the persistent parameter. False does not persist, true does
              boolean durable = true;
              channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false.false.null);
      
              /** * send a message * 1. Send to the switch * 2. What is the key of the route * 3. Other parameter information * 4. Body of the sent message */
              Scanner sc = new Scanner(System.in);
              System.out.println("Please enter information");
              while (sc.hasNext()) {
                  String message = sc.nextLine();
                  //MessageProperties.PERSISTENT_TEXT_PLAIN; This represents message persistence to disk
                  channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                  System.out.println("The producer sends the message"+ message); }}}Copy the code
  • consumers

    • /** * This is a test for consumers *@author DingYongJun
       *@date2021/8/1 * /
      public class DyConsumerTest_xiaoxiyingda01 {
      
          public static void main(String[] args) throws Exception{
              // Use the utility class to create the channel
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("This is Consumer A. I'm waiting for A message!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  // True indicates the unanswered messages on the batch reply channel. False Indicates a single reply
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("Message consumption interrupted");
              };
              // Unfair distribution
              int prefetchCount = 1;
              channel.basicQos(prefetchCount);
              /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); }}Copy the code
    • /** * This is a test for consumers *@author DingYongJun
       *@date2021/8/1 * /
      public class DyConsumerTest_xiaoxiyingda02 {
      
          public static void main(String[] args) throws Exception{
              // Use the utility class to create the channel
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("This is Consumer B. I'm waiting for a message!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(3000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  // True indicates the unanswered messages on the batch reply channel. False Indicates a single reply
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("Message consumption interrupted");
              };
      
              // Unfair distribution
              int prefetchCount = 1;
              channel.basicQos(prefetchCount);
      
              /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); }}Copy the code
    • The execution result

    • Certain conclusions can be drawn from the results

      • Obviously, A executes 7 and B executes 3.
      • Both consumers set up an unfair distribution pattern.
      • When consumer A is efficient (one second), consumer B is slow (three seconds).
      • MQ will automatically determine who is doing it faster and allocate more to those who are doing it faster.
      • It’s essentially no rest. After finishing, continue to get a new task!
      • The donkey of production team dare not press so! Ha ha ~
    • This means that if I haven’t finished the task or I haven’t answered you, don’t assign it to me. I can only handle one task at the moment.

    • Rabbitmq will then assign the task to the idle consumer who is less busy, if none of the consumers have completed the task at hand.

    • As new tasks are constantly added to the queue, the queue may be full. At this time, new workers can only be added or the policies of other storage tasks can be changed.

    • Setting code:

      • // Unfair distribution
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        Copy the code
      • // Source code as proof
        public void basicQos(int prefetchCount) throws IOException {
                this.basicQos(0, prefetchCount, false);
            }
        Copy the code
      • The default value of int is 0 and I don’t need to explain that. So if we don’t set it, this value is zero.

      • If the value is set to 1, the unfair distribution mode is enabled.

2. Pre-value distribution

  • The message itself is sent asynchronously, so there must be more than one message on the channel at any one time and the manual acknowledgement from the consumer is also asynchronous in nature.

  • So there is a buffer of unacknowledged messages, and it is hoped that the developer will limit the size of this buffer to avoid the problem of unacknowledged messages in the buffer.

  • This, of course, will result in very low throughput, especially if consumer connection latency is significant, especially in an environment with long consumer connection latency. For most applications, a slightly higher value will be optimal.

  • The producer should be the same as in the above example. Don’t need to change

  • consumers

    • /** * This is a test for consumers *@author DingYongJun
       *@date2021/8/1 * /
      public class DyConsumerTest_xiaoxiyingda01 {
      
          public static void main(String[] args) throws Exception{
              // Use the utility class to create the channel
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("This is Consumer A. I'm waiting for A message!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  // True indicates the unanswered messages on the batch reply channel. False Indicates a single reply
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("Message consumption interrupted");
              };
              // Unfair distribution
              int prefetchCount = 5;
              channel.basicQos(prefetchCount);
              /** * Consumer message * 1. Which queue to consume * 2. Whether to automatically reply after the consumption is successful true means automatic reply false manual reply * 3. A pullback where consumers failed to spend */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); }}Copy the code
    • Consumer B has the same code, which means the sleep time is longer, and the prefetchCount is set to 3 so you can see the difference.

  • The execution result

  • The conclusion can be drawn from the above results

    • PrefetchCount, when set to 3, means that there are at most three entries in the current channel, and any more are queued outside.
    • So when we send eight pieces of data, A has five and B has three. When one comes again, who consumes one first, the new one will be consumed by who.
    • If there’s always data coming in the queue. Then the AB consumer will be replenished all the time. The number of messages with full prefetchCount is kept in the channel.

I see no ending, but I will search high and low

If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah