In the third article, we will build on the previous one by adding a message confirmation mechanism.

questions

At present, when a message is sent to consumers, Mini-NSQ is no longer responsible for it. However, consumers may fail to consume, so Mini-NSQ needs to be able to re-deliver this message.

Code design

In NSQ’s design, when a message is sent to a consumer, it is not deleted immediately, but is temporarily transferred to a special container. If the consumer succeeds, it sends NSQ a “FIN” message containing the ID of the message, and NSQ finds the message from the container and deletes it. If the consumer fails, it sends a “REQ” message to NSQ, which also contains the MESSAGE ID, and NSQ resends the message. At the same time, a special coroutine scans the container at regular intervals and resends any message that has not been processed after a specified period of time.

Code change analysis

From the above analysis, we know that we have two lookup requirements for this “container” that stores sent but unacknowledged messages: 1. Find the message by ID. 2. Find the first message to expire. With a single data structure, there must be a sort of look-up that goes through everything to find the message we need. To reduce the time complexity of lookups, we must maintain two data structures simultaneously.

For the first lookup requirement, a Map is definitely the best fit. For the second lookup requirement, if the timeouts for all messages were the same (as they are in our implementation), we would only need to maintain slices, and the ones put in first would be the first to expire. However, in a real NSQ, each client can individually set the timeout period for all messages sent to that client, which means that the messages put in first do not necessarily expire before those put in later. NSQ uses a data structure called a priority queue in order to still find the first message to expire by the time complexity of O(1) under such conditions. The time complexity of insertion and deletion of this data structure is O(logN), and the time complexity of search is O(1). Readers can find relevant information by themselves if they are not familiar with it.

In order to be consistent with NSQ, and because it is easy to implement priority queues in GO, and because the GO toolkit itself provides the heap structure to help us implement this, we chose to deprecate slicing and use priority queues. Again, the priority queue is not necessary here, because in our current design all messages have the same timeout, and readers can try to replace it with slices if they are interested.

message.go

type Message struct {
	// for in-flight handling
	index      int  // Represents the subscript position in inFlightPQ
	pri        int64   // Obsolete time
}
Copy the code

In Message, we added two fields, the PRI field, which is used to sort the priority queue, and the index field, which is used to delete the specified Message from the priority queue.

pqueue.go

This file implements priority queues, so let’s look at them in detail.

type inFlightPqueue []*Message

func newInFlightPqueue(capacity int) *inFlightPqueue {
	pq:=make(inFlightPqueue, 0, capacity)
	return &pq
}

// The following five methods are the five methods defined by heap that must be implemented to implement a priority queue
func (pq inFlightPqueue) Len(a) int { return len(pq) }

func (pq inFlightPqueue) Less(i, j int) bool {
	return pq[i].pri < pq[j].pri  
}

func (pq inFlightPqueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *inFlightPqueue) Push(x interface{}) {
	msg:=x.(*Message)
	n := len(*pq)
	msg.index = n
	*pq = append(*pq, msg)
}

func (pq *inFlightPqueue) Pop(a) interface{} {
	n := len(*pq)
	msg := (*pq)[n- 1]
	*pq = (*pq)[0 : n- 1]
	msg.index=- 1
	return msg
}

// The following "W" methods are provided for external use
// Add a message to the queue
func (pq *inFlightPqueue) WPush(msg *Message) {
	heap.Push(pq, msg)
}

// Eject the earliest expired message from the queue
func (pq *inFlightPqueue) WPop(a)  *Message{
	return heap.Pop(pq).(*Message)
}

// Remove the specified message from the queue
func (pq *inFlightPqueue) WRemove(msg *Message) {
	heap.Remove(pq,msg.index)
}

// Look for the message in the queue that expired earliest, but does not pop up
func (pq *inFlightPqueue) WPeek(max int64) *Message {
	if len(*pq) == 0 {
		return nil
	}
	msg:=(*pq)[0]
	if msg.pri > max {
		return nil
	}
	return msg
}
Copy the code

The inFlightPqueue contains Pointers to Message, and the five methods that must be implemented in order to implement priority queues are defined by the heap. Note that in the Less method, we compare against the message field PRI. In the Swap method, the index field of message is guaranteed to be the same as the index of message in the queue. The methods that start with “W” are provided for external use. See comments for methods. Note that in the WRemove method, the second argument to heap.Remove indicates the index that Message is in, and that message’s index stores that value.

pqueue_test.go

To better understand this priority queue, we wrote a test program.

func TestInFlightPqueue(t *testing.T) {
	pq := newInFlightPqueue(5)
	//var id MessageID
	msg1 := &Message{
		pri:  1,
		Body: []byte("first"),
	}
	pq.WPush(msg1)

	msg2 := &Message{
		pri:  3,
		Body: []byte("three"),
	}
	pq.WPush(msg2)

	msg3 := &Message{
		pri:  2,
		Body: []byte("two"),
	}
	pq.WPush(msg3)

	pq.WRemove(msg1)

	for len(*pq) > 0 {
		fmt.Printf("%s\n", pq.WPop().Body)
	}
}
Copy the code

As shown above, we first push three messages to the queue with pri values of 1, 3, and 2, then delete the first message and pop out the remaining messages in turn. The final output is as follows. Readers can adjust the test themselves.

two
three
Copy the code

topic.go

Previously, the message ID was never used, so all the original values were used, and all the ids were the same. But from this chapter on, message ids need to be used to identify messages. We’ve seen that a client can subscribe to only one channel, and we’ll see later that each channel maintains a “container” for messages that have been sent but not yet acknowledged. So in theory the message ID should not be duplicated at the channel level. The message in a channel is passed by topic, and we receive the message from the producer. When we create a message, we only know what topic the message belongs to. In order to facilitate the writing of the code, we ensure that the ID is not duplicated at the topic level, so that the ID in the same channel is also not duplicated, and it does not matter if the ID is duplicated in different topics.

The ID generating structure is guidFactory, and each topic maintains one separately. This structure references the Snowflake algorithm and some other algorithms for generating distributed ids, which take host names into account when generating ids so that the same topic generated by different machines cannot be duplicated. The details of the code are omitted and the reader can use it as a black box.

Next let’s look at the code for Topic and ID generation

type Topic struct {
	idFactory         *guidFactory
}

func NewTopic(topicName string) *Topic {
	t := &Topic{
		idFactory:         NewGUIDFactory(),
	}
}

func (t *Topic) GenerateID(a) MessageID {
	var i int64 = 0
	for {
		id, err := t.idFactory.NewGUID()
		if err == nil {
			return id.Hex()
		}
		if i%10000= =0 {
			log.Printf("TOPIC(%s): failed to create guid - %s", t.name, err)
		}
		time.Sleep(time.Millisecond)
		i++
	}
}
Copy the code

As you can see, topic maintains a guidFactory and uses it to generate ids. The GenerateID method is more complicated and can be used as a black box.

channel.go

type Channel struct {
	inFlightMessages map[MessageID]*Message  // Store the data sent but not received by the client
	inFlightPQ       *inFlightPqueue  // Store the data sent but not received by the client
	inFlightMutex    sync.Mutex
}

func NewChannel(topicName string, channelName string) *Channel {
	c.inFlightMutex.Lock()
	c.inFlightMessages = make(map[MessageID]*Message)
	c.inFlightPQ = newInFlightPqueue(10000)
	c.inFlightMutex.Unlock()
}
Copy the code

In each channel we maintain a map and a priority queue as a “container” for data that has been sent but not received by the client. Because concurrent operations are involved, we use locks to protect this

// Called every time before the message is actually sent to the client
func (c *Channel) StartInFlightTimeout(msg *Message) error  {
	now := time.Now()
	msg.pri = now.Add(10 * time.Second).UnixNano()
	return c.pushInFlightMessage(msg)
}

// if a "FIN" message is received from the client, it will be called
func (c *Channel) FinishMessage(id MessageID) error {
	_, err := c.popInFlightMessage(id)
	return err
}

// If a "REQ" message is received from the client, it will be called
func (c *Channel) RequeueMessage(id MessageID) error {
	// remove from inflight first
	msg, err := c.popInFlightMessage(id)
	iferr ! =nil {
		return err
	}
	return c.PutMessage(msg)
}

// call it at fixed intervals
func (c *Channel) processInFlightQueue(t int64) bool {
	dirty := false
	for {
		c.inFlightMutex.Lock()
		msg:= c.inFlightPQ.WPeek(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		log.Printf("message:%s is dirty",msg.Body)
		dirty = true

		_, err := c.popInFlightMessage(msg.ID)
		iferr ! =nil {
			goto exit
		}
		c.PutMessage(msg)
	}

exit:
	return dirty
}
Copy the code

The above methods are provided for external calls. StartInFlightTimeout is called before the message is actually sent to the client. It sets the timeout period for the message to 10 seconds and then puts it in the “container”. FinishMessage is called upon receiving a “FIN” message from the client, which deletes the message from the container. RequeueMessage is invoked after receiving a “REQ” message from the client, which first deletes the message from the container and then reposts it to the channel. ProcessInFlightQueue is called at regular intervals. It first checks to see if the message with the earliest expiration date has expired, exits if it hasn’t, and performs the same operation as REQ if it has. Note that it returns a bool variable indicating whether the call found an expired message

Let’s look at the insert and delete methods in detail

func (c *Channel) pushInFlightMessage(msg *Message) error {
	c.inFlightMutex.Lock()
	defer c.inFlightMutex.Unlock()

	/ / 1. Join the map
	_, ok := c.inFlightMessages[msg.ID]
	if ok {
		return errors.New("ID already in flight")
	}
	c.inFlightMessages[msg.ID] = msg

	/ / 2. Join inFlightPQ
	c.inFlightPQ.WPush(msg)
	return nil
}

func (c *Channel) popInFlightMessage(id MessageID) (*Message, error) {
	c.inFlightMutex.Lock()
	defer c.inFlightMutex.Unlock()

	//1. Delete from the map
	msg, ok := c.inFlightMessages[id]
	if! ok {return nil, errors.New("ID not in flight")}delete(c.inFlightMessages, id)

	//2. Delete from inFlightPQ
	c.inFlightPQ.WRemove(msg)
	return msg, nil
}
Copy the code

In the preceding example, check the map to check whether the operation is valid, and then perform operations on the map and priority queue in sequence.

protocal.go

func (p *protocol) messagePump(client *client) {
	for {
		select {
		case subChannel = <-subEventChan: // indicates that a subscription event occurred, where the subChannel is the channel to which the consumer is actually bound.case msg = <-memoryMsgChan: // If the memory channel corresponding to the channel has a message
		case buf := <-backendMsgChan:
			...
		}

		subChannel.StartInFlightTimeout(msg)
		err = p.SendMessage(client, msg)
		iferr ! =nil {
			log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client.RemoteAddr(), err)
			goto exit
		}
	}
}
Copy the code

Note that StartInFlightTimeout is called to temporarily store the message in the container before sending it. If sending fails later, the message will not be lost even if we do nothing.

func (p *protocol) Exec(client *client, params [][]byte) error {
	switch {
	case bytes.Equal(params[0], []byte("FIN")) ://Finish a message (indicate successful processing)
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("REQ")) ://Re-queue a message (indicate failure to process)
		return p.REQ(client, params)
	}
	return errors.New(fmt.Sprintf("invalid command %s", params[0))}func (p *protocol) FIN(client *client, params [][]byte)  error {
	msgID:=decodeMessageID(params[1])
	log.Printf("ready to finish message -- msgID: %v", msgID)
	err := client.Channel.FinishMessage(msgID)
	iferr ! =nil {
		return err
	}
	return nil
}

//REQ <message_id>\n
func (p *protocol) REQ(client *client, params [][]byte) error{
	msgID:=decodeMessageID(params[1])
	log.Printf("ready to requeue message -- msgID: %v", msgID)
	err := client.Channel.RequeueMessage(msgID)
	iferr ! =nil {
		return err
	}
	return nil
}
Copy the code

In Exec we added support for two types of messages, “FIN” and “REQ,” which end up calling the methods we analyzed in channel above.

nsqd.go

If each channel starts a single coroutine to check the “container” it maintains for expired messages at regular intervals, then many coroutines may be running at the same time, which is unnecessary because most messages will be successfully “FIN”. To improve efficiency, we only open a single coroutine to check all channels at the start of the program.

func Start(a) (*NSQD, error) {
	go n.queueScanLoop()
}

func (n *NSQD) queueScanLoop(a) {
	workTicker := time.NewTicker(100 * time.Millisecond)
	defer workTicker.Stop()
	var channels []*Channel
	for {
		select {
		case <-workTicker.C:
			channels = n.channels()
			if len(channels) == 0 {
				continue
			}
		}

		num := 20
		if num > len(channels) {
			num = len(channels)
		}

	loop:
		numDirty := 0
		for _, i := range UniqRands(num, len(channels)) {
			c:=channels[i]
			now := time.Now().UnixNano()
			if c.processInFlightQueue(now) {
				numDirty++
			}
		}
		// If 25% of the selected channels were dirty,
		// the loop continues without sleep.
		if float64(numDirty)/float64(num) > 0.25 {
			goto loop
		}
	}
}
Copy the code

In the for loop, we call it every 100ms, but instead of checking all channels, we check just a few of them. If there are too many channels with expired messages, we stop sleeping and check again immediately after. Relevant algorithms in Redis are referred here for the purpose of improving efficiency.

The UniqRands method is used here, as follows, which returns a slice of random quantity from (0, maxval).

// examplae: input (3,10) output: [0 8 7]
func UniqRands(quantity int, maxval int) []int {
	if maxval < quantity {
		quantity = maxval
	}

	intSlice := make([]int, maxval)
	for i := 0; i < maxval; i++ {
		intSlice[i] = i
	}

	for i := 0; i < quantity; i++ {
		j := rand.Int()%maxval + i
		// swap
		intSlice[i], intSlice[j] = intSlice[j], intSlice[i]
		maxval--

	}
	return intSlice[0:quantity]
}
Copy the code

test

client.go

func main(a) {
	log.SetFlags(log.Lshortfile | log.Ltime)
	nsqdAddr := "127.0.0.1:4150"
	conn, err := net.Dial("tcp", nsqdAddr)
	go readFully(conn)
	iferr ! =nil {
		log.Fatal(err)
	}
	pub(conn, "mytopic"And []byte("one one "))
	pub(conn, "mytopic"And []byte("two two"))
	pub(conn, "mytopic"And []byte("three three"))

	cmd := Subscribe("mytopic"."mychannel")
	cmd.WriteTo(conn)

	select{}}func readFully(conn net.Conn) {
	len: =make([]byte.4)
	retry := 0
	for {
		_, err := conn.Read(len)
		iferr ! =nil {
			fmt.Printf("error during read: %s", err)
		}
		size := binary.BigEndian.Uint32(len)
		data := make([]byte, size)
		var n int
		n, err = conn.Read(data)
		iferr ! =nil {
			fmt.Printf("error during read: %s", err)
		}
		msg, _ := nsqd.DecodeMessage(data)
		log.Printf("local:%s, receive: id:<%v> body:<%s>,size:%d\n", conn.LocalAddr(), msg.ID, msg.Body, n)
		if reflect.DeepEqual(msg.Body, []byte("two two")) && retry < 3 {
			retry++
			requeue(conn, msg.ID)
			log.Printf("requeue message success -- msgID: %s", msg.Body)
			time.Sleep(time.Second)
		}
		if reflect.DeepEqual(msg.Body, []byte("three three")) {
			finish(conn, msg.ID)
			log.Printf("finish message success -- msgID: %s", msg.Body)
		}
	}
}
Copy the code

In our test program, we send three messages, subscribe, and in the coroutine that receives the message, we FIN the message with the content of “three three” and Req the message with the content of “two two” three times, And do nothing about the message that says “one one “.

The test result is as follows. The client output shows that the message “two two” is received again soon after the “Req”. Later, the two messages “one one “and “two two” are received again every 10 seconds because they have not been FIN.

The code address

git clone https://github.com/xianxueniao150/mini-nsq.git
git checkout day03
Copy the code