The introduction

PubSub(Publish/Subscribe) mode, which means “Publish/Subscribe” mode, is designed to solve the one-to-many dependency relationship, so that multiple consumers listen to a topic at the same time, which can decouple not only producers and consumers, but also different consumers from each other (note: Antipatterns that rely on the order in which the subscriber executes, using shared data to pass state, need to be avoided because consumers are coupled and cannot change independently. The key to this is the need for an intermediary to maintain the subscription and deliver the produced messages to the subscribers.

In Golang, channel is a natural medium. Let’s implement EventBus, a tool class, step by step based on PubSub pattern.

Define the type of

First, let’s define some basic types and core operations.

//EventID is the unique identifier of an Event
type EventID int64

//Event
type Event interface {
	ID() EventID
}

//EventHandler
type EventHandler interface {
	OnEvent(ctx context.Context, event Event) error
	CanAutoRetry(err error) bool
}

// JobStatus holds information related to a job status
type JobStatus struct {
	RunAt      time.Time
	FinishedAt time.Time
	Err        error
}

//EventBus ...
type EventBus struct {}

func (eb *EventBus) Subscribe(eventID EventID, handlers ... EventHandler){}func (eb *EventBus) Unsubscribe(eventID EventID, handlers ... EventHandler){}func (eb *EventBus) Publish(evt Event) <-chan JobStatus{}Copy the code

The key and dismantling

The handlers map[EventID][]EventHandler is used to maintain the service. The handlers map[EventID][]EventHandler is used to maintain the service. I still need to add a lock.

//Subscribe ...
func (eb *EventBus) Subscribe(eventID EventID, handlers ... EventHandler) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	eb.handlers[eventID] = append(eb.handlers[eventID], handlers...)
}
Copy the code

The implementation here is relatively simple, without considering a consumer, the problem of repeated subscription, left to the user to deal with their own. (But why would the same consumer call subcribe multiple times and subscribe to the same topic? It feels like writing a bug.)

Here comes the core Publish function, which must have a channel(preferably a buffer) to pass the Event data, and some resident coroutines to listen for messages and start relevant consumers for performance purposes. Here is the relevant code (in the full version of the code, added logging, error handling, etc., but left out for emphasis)

func (eb *EventBus) Start(a) {
	if eb.started {
		return
	}

	for i := 0; i < eb.eventWorkers; i++ {
		eb.wg.Add(1)
		go eb.eventWorker(eb.eventJobQueue)
	}

	eb.started = true
}


func (eb *EventBus) eventWorker(jobQueue <-chan EventJob) {
loop:
	for {
		select {
		case job := <-jobQueue:
			jobStatus := JobStatus{
				RunAt: time.Now(),
			}
			
			ctx, cancel := context.WithTimeout(context.Background(), eb.timeout)
			g, _ := errgroup.WithContext(ctx)
			for index := range job.handlers {
				handler := job.handlers[index]
				g.Go(func(a) error {
					return eb.runHandler(ctx, handler, job.event)
				})
			}
			jobStatus.Err = g.Wait()

			jobStatus.FinishedAt = time.Now()

			select {
			case job.resultChan <- jobStatus:
			default:
			}
		    cancel()
		}
	}
}
Copy the code

Now that you’ve done the above, here’s the actual Publish code.

// EventJob ...
type EventJob struct {
	event      Event
	handlers   []EventHandler
	resultChan chan JobStatus
}

//Publish ...
func (eb *EventBus) Publish(evt Event) <-chan JobStatus {
	eb.mu.RLock()
	defer eb.mu.RUnlock()
	if ehs, ok := eb.handlers[evt.ID()]; ok {
		handlers := make([]EventHandler, len(ehs))
		copy(handlers, ehs) // Snapshot a copy of the consumer at that time
		job := EventJob{
			event:      evt,
			handlers:   handlers,
			resultChan: make(chan JobStatus, 1),}var jobQueue = eb.eventJobQueue
		select {
		case jobQueue <- job:
		default:}return job.resultChan
	} else {
		err := fmt.Errorf("no handlers for event(%d)", evt.ID())
		resultChan := make(chan JobStatus, 1)
		resultChan <- JobStatus{
			Err: err,
		}
		return resultChan
	}
}
Copy the code

The relevant consumers are not fetched directly from the Handlers according to their IDS in the eventWorker, partly to make the eventWorker more generic and partly to reduce the blocking caused by locking operations.

At this point, we’ve broken down the core code one by one. For the complete code, see event_bus.go in the ChannelX project

Use the sample

No utility class is complete without an example, and here’s one.

Define an event, where the ID is defined as private, and then enforce it in the constructor.

const ExampleEventID channelx.EventID = 1

type ExampleEvent struct {
	id channelx.EventID
}

func NewExampleEvent(a) ExampleEvent {
	return ExampleEvent{id:ExampleEventID}
}

func (evt ExampleEvent) ID(a) channelx.EventID  {
	return evt.id
}

Copy the code

Next comes the Event Handler, which checks in OnEvent to see if the received event is a subscribed event and if the received event structure can be converted to a specific type, as required. After defense programming, you can handle the event logic.

type ExampleHandler struct {
	logger channelx.Logger
}

func NewExampleHandler(logger channelx.Logger) *ExampleHandler {
	return &ExampleHandler{
		logger: logger,
	}
}

func (h ExampleHandler) Logger(a) channelx.Logger{
	return h.logger
}

func (h ExampleHandler) CanAutoRetry(err error) bool {
	return false
}

func (h ExampleHandler) OnEvent(ctx context.Context, event channelx.Event) error {
	ifevent.ID() ! = ExampleEventID {return fmt.Errorf("subscribe wrong event(%d)", event.ID())
	}

	_, ok := event.(ExampleEvent)
	if! ok {return fmt.Errorf("failed to convert received event to ExampleEvent")}// handle the event here
	h.Logger().Infof("event handled")

	return nil
}

Copy the code

Finally, it’s time to start EventBus, subscribe to events, and publish.

eventBus := channelx.NewEventBus(logger, "test".4.4.2, time.Second, 5 * time.Second)
eventBus.Start()

handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())
Copy the code

Write in the last

I’ve written before about the use of channels,

  • How to make Golang’s channel as silky as NodeJS’s stream
  • How to use Golang channel to achieve message batch processing
  • How to play Golang Channel out of async and await feel
  • Next time you want to write concurrent processing in Golang, use this template.

The lightweight util implementation is open source in channelx, welcome to review, if you like to use the tool, please click a like or star 🙂