sequence

This article focuses on remoteBrokerOffsetStore for RocketMQ-Client-Go

remoteBrokerOffsetStore

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

type remoteBrokerOffsetStore struct {
	group       string
	OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
	client      internal.RMQClient
	namesrv     internal.Namesrvs
	mutex       sync.RWMutex
}
Copy the code
  • RemoteBrokerOffsetStore defines the Group, OffsetTable, Client, NamesRV, mutex attributes

NewRemoteOffsetStore

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {
	return &remoteBrokerOffsetStore{
		group:       group,
		client:      client,
		namesrv:     namesrv,
		OffsetTable: make(map[primitive.MessageQueue]int64),
	}
}
Copy the code
  • The NewRemoteOffsetStore method instantiates remoteBrokerOffsetStore

persist

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	if len(mqs) == 0 {
		return
	}

	used := make(map[primitive.MessageQueue]struct{}, 0)
	for _, mq := range mqs {
		used[*mq] = struct{}{}
	}

	for mq, off := range r.OffsetTable {
		if_, ok := used[mq]; ! ok { delete(r.OffsetTable, mq)continue
		}
		err := r.updateConsumeOffsetToBroker(r.group, mq, off)
		iferr ! = nil { rlog.Warning("update offset to broker error", map[string]interface{}{
				rlog.LogKeyConsumerGroup: r.group,
				rlog.LogKeyMessageQueue:  mq.String(),
				rlog.LogKeyUnderlayError: err.Error(),
				"offset":                 off,
			})
		} else {
			rlog.Info("update offset to broker success", map[string]interface{}{
				rlog.LogKeyConsumerGroup: r.group,
				rlog.LogKeyMessageQueue:  mq.String(),
				"offset":                 off,
			})
		}
	}
}
Copy the code
  • Persist methods traverse OffsetTable, perform r.u pdateConsumeOffsetToBroker

remove

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	delete(r.OffsetTable, *mq)
	rlog.Info("delete mq from offset table", map[string]interface{}{
		rlog.LogKeyMessageQueue: mq,
	})
}
Copy the code
  • Remove method executiondelete(r.OffsetTable, *mq)

read

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
	r.mutex.RLock()
	switch t {
	case _ReadFromMemory, _ReadMemoryThenStore:
		off, exist := r.OffsetTable[*mq]
		if exist {
			r.mutex.RUnlock()
			return off
		}
		if t == _ReadFromMemory {
			r.mutex.RUnlock()
			return -1
		}
		fallthrough
	case _ReadFromStore:
		off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
		iferr ! = nil { rlog.Error("fecth offset of mq error", map[string]interface{}{
				rlog.LogKeyMessageQueue:  mq.String(),
				rlog.LogKeyUnderlayError: err,
			})
			r.mutex.RUnlock()
			return -1
		}
		r.mutex.RUnlock()
		r.update(mq, off, true)
		return off
	default:
	}

	return1}Copy the code
  • The read method for _ReadFromStore executes r.f etchConsumeOffsetFromBroker

update

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	localOffset, exist := r.OffsetTable[*mq]
	if! exist { r.OffsetTable[*mq] = offsetreturn
	}
	if increaseOnly {
		if localOffset < offset {
			r.OffsetTable[*mq] = offset
		}
	} else {
		r.OffsetTable[*mq] = offset
	}
}
Copy the code
  • Update method updates r.offsettable [*mq]

fetchConsumeOffsetFromBroker

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
	broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	if broker == "" {
		r.namesrv.UpdateTopicRouteInfo(mq.Topic)
		broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	}
	if broker == "" {
		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
	}
	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
		ConsumerGroup: group,
		Topic:         mq.Topic,
		QueueId:       mq.QueueId,
	}
	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
	res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
	iferr ! = nil {return -1, err
	}
	ifres.Code ! = internal.ResSuccess {return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
	}

	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)

	iferr ! = nil {return -1, err
	}

	return off, nil
}
Copy the code
  • FetchConsumeOffsetFromBroker method build QueryConsumerOffsetRequestHeader request, and then by r.c lient. InvokeSync initiated the request

updateConsumeOffsetToBroker

Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {
	broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	if broker == "" {
		r.namesrv.UpdateTopicRouteInfo(mq.Topic)
		broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
	}
	if broker == "" {
		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
	}

	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
		ConsumerGroup: group,
		Topic:         mq.Topic,
		QueueId:       mq.QueueId,
		CommitOffset:  off,
	}
	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
	return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}
Copy the code
  • UpdateConsumeOffsetToBroker method build UpdateConsumerOffsetRequestHeader request, and then by r.c lient. InvokeOneWay initiate the request

summary

RemoteBrokerOffsetStore defines the Group, OffsetTable, Client, NamesRV, mutex attributes; It provides NewRemoteOffsetStore, persist, remove, read, update, fetchConsumeOffsetFromBroker, updateConsumeOffsetToBroker method

doc

  • offset_store