Writing in the front

In the process of system development, we often need to realize the requirement of message push. This is fine for single-end single-instance scenarios (there are many tutorials on the web that will not be expanded here), but what about distributed systems and multi-end scenarios that need to be pushed?

In distributed systems, the message push server is multi-instance. A service in a system generates a message, which needs to be pushed to multiple terminals in real time. How to effectively push WebSocket? Let’s start with the following scenario:

Suppose that the push message is generated by message instance 2, but the message instances that the terminal is really connected to are instance 1 and instance 3, instead of instance 2 that is connected to the production message, how does the system synchronously push the message from instance 2 to terminal 1 and terminal 2? This will be described in detail below.

The basic principle of

In order to meet the requirements, we use Redis as the collaborative middleware, which is used to store user information, generate the unique identification and POD Address of user connection. The producer instance of the message obtains the unique identification and POD address of terminal connection through subscription to Redis, and notifies the corresponding message instance. Finally, the corresponding message instance connected to the terminal pushes the message to the user terminal through WebSocket. The specific process is as follows:

To implement the concept, we construct two components: Client and ClientManager, and the implementation logic is as follows.

Server-side implementation

Client

The function of the Client component is to manage information about a user’s connection to an instance of a messaging service, as defined by a Golang structure:

	type Client struct {
			UUID   string 
			UserID string
			Socket *websocket.Conn
			Send   chan []byte
	}
Copy the code

The data types in the structure are described as follows:

  • UUID: a unique identifier for a connection, by which connection information can be found.

  • UserID: indicates the UserID.

  • Socket: indicates the connection object.

  • Send: message data channel.

We implement two methods for the Client structure: Read and Write to handle receiving and sending messages.

The Read method

The Read method is relatively simple. After receiving the request message from the terminal, the message instance responds to the received message status through the WebSocket without returning the request result. The result is returned via the Write method.

func (c *Client) Read(close, renewal chan *Client) { defer func() { close <- c }() for { _, message, err := c.Socket.ReadMessage() if err ! = nil { break } // ... // message logic } }Copy the code

The Write method

The Write method returns the request result to the terminal. A Client listens to a Send Channel. When a channel has data, it sends a message to the terminal through a socket connection.

func (c *Client) Write(close chan *Client) { for { select { case message, ok := <-c.Send: if ! ok { return } c.Socket.WriteMessage(websocket.TextMessage, message) case <-c.Ctx.Done(): return } } }Copy the code

ClientManger

The ClientManager component is a connection pool that manages all terminal connections and provides registration, deregistration, and renewal functions.

	type ClientManager struct {
			sync.RWMutex
			Clients    map[string]*Client 
			Register   chan *Client
			Unregister chan *Client
			Renewal    chan *Client
	}
Copy the code

The data types of the structure are described as follows:

  • Clients: is a collection used to store Client objects created.

  • Register: registered channel.

    • The connection is registered with Clients and added to the Client collection by key-value, which is the unique identifier of the connection and value is the connection itself.

    • Redis stores the unique identifier of the connection, the user ID, and the pod address information used to establish the connection.

  • Unregister: unregistered channel.

    • Removes the connection object from the Clients collection of the ClientManager component.

    • Delete the cache information corresponding to Redis.

  • Renewal: Renewed channel, used to renew the key of Redis.

ClientManager provides a Start method that listens for registered, unregistered, and renewed channels to manage connection objects created. When these channels have data, the corresponding operation is performed.

func (manager *ClientManager) Start(ctx context.Context) { for { select { case conn := <-manager.Register: manager.Lock() manager.Clients[conn.UUID] = conn manager.Unlock() _, err := manager.affair.Register(ctx, &RegisterReq{ UserID: conn.UserID, UUID: conn.UUID, IP: manager.IP, }) case conn := <-manager.Unregister: _, err := manager.affair.Unregister(ctx, &UnregisterReq{ UserID: conn.UserID, UUID: conn.UUID, }) conn.Socket.Close() close(conn.Send) delete(manager.Clients, conn.UUID) case conn := <-manager.Renewal: / /... // Key renewal to Redis } } }Copy the code

Being pushed

When a message service instance produces user messages and needs to push messages to terminals, the push steps are as follows:

  1. The UserID reads data from Redis to obtain the connection uniqueness identifier and pod Address, which is written to Redis when the terminal first establishes a connection with the message instance.

  2. At this point, a request is sent to the corresponding message service instance based on the POD Address.

  3. The corresponding message service instance receives the request.

The server receives the request using the following processing logic:

  1. Find the connection corresponding to the identity based on the parameter passed for the unique identity of the connection. We provide a Write method for ClientManager.

This method uses the Clients collection of the ClientManager component to find the corresponding Client based on its unique identity. Use the Client SendOut method, write data to the terminal.

func (manager *ClientManager) Write(message *Message) error { manager.RLock() client, ok := manager.Clients[message.Recipient] manager.RUnlock() if ! ok { return errors.New("client miss [" + message.Recipient + "]") } return client.SendOut(message) }Copy the code
  1. Define Client’s SendOut method.

This method converts the received message into a byte array and sends it to the Client’s Send Channel.

func (c *Client) SendOut(message *Message) error { content, err := json.Marshal(message.Content) if err ! = nil { return err } c.Send <- content return nil }Copy the code
  1. Sends data to a terminal.

As described in the Write method of the Client component, if there is data in a Send channel, it reads the data generated by the channel and returns the data to the corresponding terminal through the connection object.

conclusion

The above is the main idea of the Web Socket push message to the terminal: Through Redis, the user’s information and connection identification and POD Address are stored. When a message service instance generates a message, the information is read from Redis and the message service instance connected to the terminal is notified. Then these service instances send messages to the terminal through WebSocket objects. The all-image cloud low-code platform also integrates real-time message push, so that users can get the latest message status in time when using the platform. Next time we’ll be bringing you Knative Elastic stretching, please stay tuned.

The author

Tina Chou

This article is published by OpenWrite!