As you can see from the previous article, it is important to implement your own message bus framework to decouple bounded contexts and provide the necessary support for large concurrent access.

Message bus functions:

1. Bounded context decoupling: In the first wave of DDD articles, when the order information is updated, we update the dealer information by calling the domain model and repository of the dealer bounded context, which causes coupling. Through a message bus, can in order line context of WebApi service (source micro service – producers) to update the order information, publish an event message to a queue, the message bus dealer boundaries context WebApi service (consumer) to subscribe to this event message, and then to his Handler for message processing, Update your dealer information. This decoupled the order-bound context from the reseller bound context.

2. Large concurrency support: the performance of ordering can be further improved through message bus. We can direct the operation of user order to an order command WebApi receive, order command WebApi receive the command, directly throw to a message bus queue, and then immediately return the order result to the front end. This allows users to speed up without having to wait for subsequent complex order business logic. The sequence of processing of subsequent orders is handed over to the message Handler for subsequent processing and further delivery of messages.

Key points of message bus design:

1. Define the interface for messages (events) : All messages that need to be delivered and processed are inherited from this interface because of constraints on the content that must be contained in the message, such as the ID of the message, the time when the message was generated, and so on.

public interface IEvent { Guid Id { get; set; } DateTime CreateDate { get; set; }}Copy the code

2. Define the interface of the message (event) processor: When the message is delivered to the message bus queue, there must be a consumer WebApi to receive and process the message. The specific processing logic is realized in the subscriber processor.

public interface IEventHandler
{
    Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}
Copy the code

As can be seen from the above code, the type processed by the message (event) processor is the message class inherited from IEvent interface.

3. Define the message (event) associated with messages (events) processor interface: a type of message is delivered, you must find this message processor in the subscriber, so be sure to define the associated interface, so as to match messages with CPU, to achieve message is to subscribe to after processing.

public interface IEventHandlerExecutionContext
{
    void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}
Copy the code

The RegisterEventHandler method is all about associating a message with a message handler. This method is actually used by the subscriber, who tells the message bus what messages should be sent to my handler for processing.

The IsRegisterEventHandler method determines whether there is already a correlation between the message and the handler.

The HandleAsync method finds the handler corresponding to the message and then calls the Handle method of the processor to process the message.

4. Define message publishing, subscription, and message bus interfaces: The message bus must support at least two functions, one is that producers can publish messages to my message bus, and the other is that subscribers need to be able to subscribe messages from my message bus.

public interface IEventPublisher
{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
}
Copy the code

As you can see from the above code, the messages published by the producer still inherit from the IEvent.

public interface IEventSubscriber
{
    void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
}
Copy the code

The above code is used by the subscriber to subscribe messages from the message bus. As you can see from the code, its final implementation is actually to establish the association between the message and the processor

public interface IEventBus:IEventPublisher,IEventSubscriber
{
}
Copy the code

The message (event) bus is inherited from both interfaces and supports both message publishing and message subscription.

5. Implement the event base class: above has subscribed to the message (event) interface, here to implement the event base class, in fact, is to implement the message ID and generated time:

public class BaseEvent : IEvent { public Guid Id { get; set; } public DateTime CreateDate { get; set; } public BaseEvent() { this.Id = Guid.NewGuid(); this.CreateDate = DateTime.Now; }}Copy the code

6. Implement message bus base classes: The underlying message bus dependencies can be message broker products such as RabbitMq, Kafaka, or those provided by third-party cloud platforms, which are often packaged. Prior to encapsulation, we need to define the top-level message bus base class implementation, mainly for future implementations that depend on it to be replaceable, and also to pass in the associated interface of the message to the message handler for easy use by subscribers.

public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext =  eventHandlerExecutionContext; } public abstract void Publish<TEvent>(TEvent @event) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }Copy the code

7. Implement message and processor association: The message must be associated with the processor so that the subscriber does not know which processor to hand over to until it receives a particular type of message.

class EventHandlerExecutionContext : IEventHandlerExecutionContext { private readonly IServiceCollection registry; private readonly IServiceProvider serviceprovider; private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null) { this.registry = registry; this.serviceprovider = this.registry.BuildServiceProvider(); } public async Task HandleAsync<TEvent>(TEvent @event) where TEvent: IEvent { var eventtype = @event.GetType(); if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0) { using(var childscope = this.serviceprovider.CreateScope()) { foreach(var handlertype in handlertypes) { var handler = Activator.CreateInstance(handlertype) as IEventHandler; await handler.HandleAsync(@event); }}}} // Check whether the message is associated with the handler public bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent: IEvent where TEventHandler : IEventHandler // Associate messages with processors, associations can be established in memory, Public void RegisterEventHandler<TEvent, TEventHandler>() where TEvent: IEvent where TEventHandler: IEventHandler { Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations); }}Copy the code

We have basically built the message bus framework and implemented the basic functionality, and we will use it to implement the RabbitMq message bus in the next chapter.

Micro service combat video, please pay attention to the wechat public number: MSSHCJ