Recently, I am working on the r&d version of enterprise wechat/Flying book docking, one of which is: news push. In order to enable users to work together better, we need to push reminders or messages to other collaborators through enterprise wechat/flybook and other platforms. Redis pub/ Sub mode is adopted in the project (it is very simple to use, compared with MQ I used before), so I have learned this part again. Ok, no more talk, this article will briefly introduce the publish/subscribe model, and from the use of Redis pub/sub, their own nodeJS to achieve a similar function, the process is mainly used in nodeJS native module NET.

introduce

Publish/subscribe, a common pattern for processing messages, is posted here, but well described:

As can be seen from the figure, subscribers subscribe through the scheduling center, publishers publish messages to the scheduling center, and the scheduling center distributes messages to each subscriber. Both subscribers and publishers operate through a scheduling center, which in this case is a publish/subscribe model. So why have this pattern? Let’s talk about what it does and what good it does!

benefit

  • Decoupling: It abstracts the subscribers and publishers, who are unaware of each other’s existence, even though they are on different platforms and in different languages, and they just have to do their own thing, which of course conforms to the principle of singleness in programming.
  • Scalability: A publisher publishes a message, and after sending a message, he can send other messages without waiting for a response from a subsequent subsystem.
  • Reliability: mainly adopt asynchronous way to pass, the benefit of asynchronous present everyone is to write JS, mainly reflect in the load aspect.

Ok, introduced its benefits, the following according to the Redis pub/sub mode, with nodeJS to achieve a similar function, before the implementation of the Redis pub/sub mode

Redis pub/sub mode

There are many different publish/subscribe models in Redis, but this is the simplest scenario that can be mapped out by the business.

implementation

Ioredis is one of the common drivers for NodeJS and encapsulates the common API for Redis. The following uses ioredis to show the above pattern:

  • Subscribe to the sub. Js
import Redis from "ioredis";
const client = new Redis({
  host: "127.0.0.1".port: 6379.password: "123456".name: "myRedis"});// Subscribe to myChannel
client.subscribe("mychannel", (e) => {
  console.log("subscribe channel: mychannel");
});
// Listen for incoming messages
client.on("message", (channel, message) => {
  console.log(`channel: ${channel},message: ${message}`);
});
// Listening error
client.on("error", (err) => {
  console.log("response err:" + err);
});
Copy the code
  • Release the pub. Js
import Redis from "ioredis";
const client = new Redis({
  host: "127.0.0.1".port: 6379.password: "123456".name: "myserver-3y"});const msg = { id: 1.name: "ipenman".content: "No work tomorrow." };
client.publish("mychannel".JSON.stringify(msg));
Copy the code

Sub.js, pub.js

So in this example, you can see that you can configure a connection, and then subscribe or publish, and there’s also a listening function, and you can see in the renderings that a subscription is not broken once it’s started, so it’s a long connection, so we’ll start with the client.

Create client.ts

import { connect, Socket, SocketConnectOpts } from "net";
class Client {
  private connection: Socket;
  private config: SocketConnectOpts;

  constructor(config: SocketConnectOpts) {
    this.config = config;
    // Create a client connection
    this.connection = connect(config);
  }

  subscribe(channelName: string, handle: (err: Error) |Function= >void) {
    this.connection.write(JSON.stringify({ type: "subscribe", name: channelName }), handle);
    return this; // This is to be able to chain calls like redis
  }
  publish(channelName: string, message: string) {
    this.connection.write(JSON.stringify({ type: "publish", name: channelName, message: message }));
    return this;
  }

  on(eventName: string, handle: (. args) = > void | Function) {
        // This is just an example, so only one message is written. More complex, switch or policy mode can be used
		if (eventName === 'message') {
			this.connection.on('data'.(data) = > {
				const sData = data.toString()
				const { name, message } = JSON.parse(sData)
				handle(name, message)
			})
		}
		return this}}Copy the code

The client simply sends a request to the server.

Ii. Server server.ts

import { createServer, AddressInfo, Socket } from "net";
const pubsub = new PubSub(); // PubSub: subscribe to publish mode
const server = createServer(); // Create a service
server
  .on("connection".(socket) = > {
    const id = new Date().getTime(); // Generate the unique id for the connection
    socket
      // The client request is received
      .on("data".(data) = > {
        const sData = data.toString();
        const { type, name, message }: Data = JSON.parse(sData);
        if (type= = ="subscribe") {
          pubsub.subscriber(name, { id, socket });
        } else if (type= = ="publish") { pubsub.publish(name, message); }}); }) .on("error".(e) = > {
    console.error(e);
  })
  .listen(3300);
Copy the code

Above the first implementation of a simple server received client instructions, call pubsub corresponding method, the focus of the mode is the above pubsub, below see the specific implementation:

class PubSub {
  private channels: Map<string, Channel>;
  constructor() {
    this.channels = new Map();
  }

  subscriber(channelName: string, subscriber: Subscriber) {
    const channel = this.channels.get(channelName);
    if(! channel) {const channel = new Channel(channelName, subscriber);
      this.channels.set(channelName, channel);
    } else {
      channel.subscribe(subscriber);
    }
  }
  publish(channelName: string, message: string) {
    // Find the corresponding channel and push the message
    const channel = this.channels.get(channelName); channel && channel.publish(message); }}interface Subscriber {
  id: number;
  socket: Socket;
}
export interface Data {
  type? :string; name? :string; message? :string;
}

// Channel
class Channel {
  private _name: string;
  private _subscribers: Subscriber[] = [];

  constructor(name, subscriber) {
    this._name = name;
    this._subscribers.push(subscriber);
  }

  subscribe(subscriber: Subscriber) {
    this._subscribers.push(subscriber);
  }

  publish(message: string) {
    // Push the message
    this._subscribers.forEach((subscriber) = > subscriber.socket.write(JSON.stringify({ name: this._name, message: message }))); }}Copy the code

At this point, a basic publish-subscribe model emerges. For better decoupling, I later made some optimizations and added several functions, such as unsubscribe, destroy connection, and so on. Here is the full code:

server.ts

import { createServer, AddressInfo, Socket } from "net";

class PubSub {
  private channels: Map<string, Channel>;
  constructor() {
    this.channels = new Map();
  }
  getSubscribers(channelName: string) {
    const channel = this.channels.get(channelName);
    if (channel) {
      return channel.subscribers.length;
    }
  }
  subscriber(channelName: string, subscriber: Subscriber) {
    const channel = this.channels.get(channelName);
    if(! channel) {const channel = new Channel(channelName, subscriber);
      this.channels.set(channelName, channel);
    } else {
      channel.subscribe(subscriber);
    }
  }
  unsubscriber(channelName: string, subscriber: Subscriber) {
    const channel = this.channels.get(channelName);
    channel.subscribe(subscriber);
  }
  publish(channelName: string, message: string) {
    const channel = this.channels.get(channelName);
    channel && channel.publish(message);
  }
  destroy(subscriber: Subscriber) {
    for (const [channelName, channel] of this.channels) { channel.unsubscribe(subscriber); }}}interface Subscriber {
  id: number;
  socket: Socket;
}
export interface Data {
  type? :string; name? :string; message? :string;
}
class Channel {
  private _name: string;
  private _subscribers: Subscriber[] = [];

  constructor(name, subscriber) {
    this._name = name;
    this._subscribers.push(subscriber);
  }
  get name() {
    return this._name;
  }
  get subscribers() {
    return this._subscribers;
  }
  subscribe(subscriber: Subscriber) {
    this._subscribers.push(subscriber);
  }
  unsubscribe(subscriber: Subscriber) {
    const subscriberIndex = this._subscribers.findIndex((sub) = > subscriber.id === sub.id);
    if(subscriberIndex ! = =- 1) {
      this._subscribers.splice(subscriberIndex, 1);
      subscriber.socket.write(JSON.stringify({ name: this._name, message: 'Unsubscribed successfully' }));
    }
  }

  publish(message: string) {
    this._subscribers.forEach((subscriber) = > subscriber.socket.write(JSON.stringify({ name: this._name, message: message }))); }}const pubsub = new PubSub();
const server = createServer();
server
  .on("connection".(socket) = > {
    const id = new Date().getTime();
    socket
      .on("data".(data) = > {
        const sData = data.toString();
        const { type, name, message }: Data = JSON.parse(sData);
        if (type= = ="subscribe") {
          pubsub.subscriber(name, { id, socket });
          console.log(Current number of subscribers:${pubsub.getSubscribers(name)}`);
        } else if (type= = ="unsubcribe") {
          pubsub.unsubscriber(name, { id, socket });
        } else if (type= = ="publish") {
          pubsub.publish(name, message);
        }
      })
      .on("close".function (status) {
        console.log("Close connection", status);
        pubsub.destroy({ id, socket });
      })
      .on("error".(e) = > {
        socket.destroy();
      });
  })
  .on("error".(e) = > {
    console.error(e);
  })
  .listen(6379);
Copy the code

client.ts

import { connect, Socket, SocketConnectOpts } from "net";
export class Client {
  private connection: Socket;
  private config: SocketConnectOpts;
  constructor(config: SocketConnectOpts) {
    this.config = config;
    this.connection = this.createConnection();
  }
  private createConnection() {
    return connect(this.config);
  }
  on(eventName: string, handle: (. args) = > void | Function) {
    if (eventName === "message") {
      this.connection.on("data".(data) = > {
        const sData = data.toString();
        const { name, message } = JSON.parse(sData);
        handle(name, message);
      });
    }
    return this;
  }
  unsubscribe(channelName: string, handle: (err: Error) = > void) {
    this.connection.write(JSON.stringify({ type: "unsubscribe", name: channelName }), handle);
    return this;
  }
  subscribe(channelName: string, handle: (err: Error) = > void) {
    this.connection.write(JSON.stringify({ type: "subscribe", name: channelName }), handle);
    return this;
  }
  publish(channelName: string, message: string) {
    this.connection.write(JSON.stringify({ type: "publish", name: channelName, message: message }));
    return this; }}Copy the code

Seriously see here, feel also delay everyone for a long time, the next to enter the use and test link.

use

Use the same way as Redis pub and sub.

sub.ts

import { Client } from "./client";
const client = new Client({ port: 6379, host: "127.0.0.1" });
client.subscribe("mychannel".(a)= > {
  console.log("Subscribed successfully!");
});
client.on("message".(channel, data) = > {
  console.log(channel, data);
});
Copy the code

pub.ts

import { Client } from "./client";
const client = new Client({ port: 6379, host: "127.0.0.1" });
const msg = { id: 1, name: "ipenman", content: "No work tomorrow." };
client.publish("mychannel".JSON.stringify(msg));
Copy the code

Start server.ts start sub.ts subscription start pub.ts publish messages

The test results are as follows:

perfect!

The last

In the actual development, most of them directly use the mainstream ready-made library instead of building the wheel. However, building the wheel is not my original intention, but by building the way of the wheel, we can better understand his principle, how happy it is!