Recently, when I looked at the company’s Redis Queue, I found that the underlying use of go-Zero queue. This article takes a look at the design of queue and hopefully the minimal design practices for MQ.

use

Based on other MQ experiences, the basic process is as follows:

  1. createproducerconsumer
  2. Start themq
  3. Production messages/consumption messages

Corresponds to the queue, which looks something like this:

Create a queue

// The producer creates the factory
producer := newMockedProducer()
// Consumers create factories
consumer := newMockedConsumer()
// Pass the producer and consumer create factory function to NewQueue()
q := queue.NewQueue(func(a) (Producer, error) {
  return producer, nil
}, func(a) (Consumer, error) {
  return consumer, nil
})
Copy the code

Let’s look at what build conditions NewQueue requires:

  1. producer constructor
  2. consumer constructor

Both factory functions are passed to the queue for execution and retry.

The purpose of these two requirements is to encapsulate both producer/consumer builds and message production/consumption in MQ and to put the whole producer/consumer logic at the developer’s disposal:

type (
	// Developers need to implement this interface
	Producer interface {
		AddListener(listener ProduceListener)
		Produce() (string.bool)}...// ProducerFactory defines a method that generates a Producer
	ProducerFactory func(a) (Producer, error)
)
Copy the code
  1. It’s basically handing over the logic of the producer to the developer,mqOnly responsible for producer/consumer messaging and scheduling between.
  2. The factory method is designed to give both the producer itself and the production messagequeueDo your own scheduling or retry.

Production of MSG

The production message of course goes back to the producer itself:

type mockedProducer struct {
	total int32
	count int32
  // Use waitGroup to simulate task completion
	wait  sync.WaitGroup
}
// Produce()
func (p *mockedProducer) Produce(a) (string.bool) {
	ifatomic.AddInt32(& p.count,1) < = p.total { p.wait.Done()return "item".true
	}
	time.Sleep(time.Second)
	return "".false
}
Copy the code

Producer writing in a queue must implement:

  • Produce(): The developer writes the logic that produces the message
  • AddListener()Producer:

Consumption of MSG

Similar to a producer:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error{ atomic.AddInt32(& c.count,1)
	return nil
}
Copy the code

Start the queue

Start and then verify that the data transfer between the producer and consumer we described above is successful:

func TestQueue(t *testing.T) {
	producer := newMockedProducer(rounds)
	consumer := newMockedConsumer()
	/ / create the queue
	q := NewQueue(func(a) (Producer, error) {
		return producer, nil
	}, func(a) (Consumer, error) {
		return consumer, nil
	})
	// When the producer finishes producing, execute Stop() to shut down production
	go func(a) {
		producer.wait.Wait()
    // the MQ production end stops production, not mq itself stops running
		q.Stop()
	}()
	/ / start
	q.Start()
	// Verify that the message consumption is complete
	assert.Equal(t, int32(rounds), atomic.LoadInt32(& consumer.count)) }Copy the code

That’s the easiest way to get started with queue. Developers are free to define producer/consumer production/consumption logic according to their own business realities.

The overall design

! [image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)

The overall process is as shown in the figure above:

  1. All communications are made up ofchannelfor
  2. By adding listenerslistener, and event triggeringeventIs equivalent to separating the trigger logic
  3. Producers haveproduceoneThis is the logic of producing messages, but theProduce()Is written by developersinterfaceIt is this function in
  4. The same goes for consumers,Consume()

The basic message flow is described in the figure above and the specific code analysis will be left to the next chapter, our 😁 analysis, especially how to control the channel is the core of the whole design.

conclusion

This article gives a brief overview of queue design in terms of usage and overall architecture analysis. In the next part, we will delve into the source code and analyze internal message flow and channel control.

Stay tuned for more design and implementation articles on Go-Zero. Welcome to pay attention to and use.

The project address

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us!

Wechat communication group

Follow the “micro service practice” public account and reply to the group to obtain the community qr code.