The original article is reprinted from liu Yue’s Technology blog v3u.cn/a_id_202

“Desire to express” is a powerful “source power” in the history of human growth. Engels pointed out directly that in the age of ignorance, namely the lower stage of human, “fruit, nuts, roots as food; The creation of a language with clear syllables was the main achievement of this period. In the Internet era, people’s desire for expression is often more easily satisfied because of the existence of chat software. Generally speaking, there are two types of chat: group chat and single chat. A group chat or group chat can be considered a chat room with a maximum number of people, while a single chat can be considered a special chat room with a maximum of two people.

To develop high-quality chat systems, developers should have a basic knowledge of how the client and server communicate. In a chat system, the client can be a mobile application (c-side) or a Web application (B-side). The clients do not communicate directly with each other. Instead, each client is connected to a chat service that supports communication between the two parties. So the service must support the most basic functions in the business:

1. Receive information from other clients in real time.

2. Pushes each message to the recipient in real time.

When a client intends to start a chat, it connects to the chat service using one or more network protocols. For chat service, the choice of network protocol is very important. Here, we choose Tornado framework built-in Websocket protocol interface, which is simple and convenient, and install Tornado6.1

Pip3 install tornado = = 6.1Copy the code

Then write a program to launch the file main.py:

import tornado.httpserver import tornado.websocket import tornado.ioloop import tornado.web import redis import Threading import asyncio # user lists the users = [] # websocket protocol class WB (tornado. Websocket. WebSocketHandler) : Def check_origin(self,origin): def open(self): Users.append (self) # def on_message(self,message): self.write_message(message['data']) # def on_close(self): Users. remove(self) # create torando instance app = Tornado.web.Application([(r'/wb/', wb)],debug=True) if __name__ == '__main__': # statement server http_server_1 = tornado. Httpserver. Httpserver (app) # listener port http_server_1. Listen open event loop (# 8000) tornado.ioloop.IOLoop.instance().start()Copy the code

In this way, a set of WebSocket protocol service is set up in a short time. Every time a client initiates a Websocket connection request, we will add it to the user list and wait for the user to push or receive information.

Now we need to connect the sender and the receiver of the message in some form to achieve the purpose of “chat”. Here we choose Redis publish and subscribe mode (PubSub), with a demo example, server.py

import redis  
  
r = redis.Redis()  
r.publish("test",'hello')
Copy the code

Then write client.py:

import redis  
r = redis.Redis()  
ps = r.pubsub()  
ps.subscribe('test')    
for item in ps.listen():   
    if item['type'] == 'message':  
        print(item['data'])
Copy the code

A subscriber subscribes to a channel. Publisher is responsible for sending binary string messages to channels, which then push them to subscribers when they receive them.

Channels can not only contact publishers and subscribers, but also use channels for “message isolation”, that is, messages from different channels are only pushed to subscribers of that channel:

Rewrite main.py according to publisher subscriber logic:

import tornado.httpserver import tornado.websocket import tornado.ioloop import tornado.web import redis import Threading import asyncio # user = [] # channelchannels = ["channel_1","channel_2"] # websocket WB (tornado. Websocket. WebSocketHandler) : # cross-domain support def check_origin (self, origin) : return True # open link def open (self) : Users.append (self) # def on_message(self,message): self.write_message(message['data']) # def on_close(self): Def redis_listener(loop): asyncio.set_event_loop(loop) async def listen(): R = redis.Redis(decode_responses=True) # declare pubSB instance ps = r.pusub () # subscribe chat room channel ps.subscribe(["channel_1","channel_2"]) # For message in ps.listen(): print(message) # for user in users: print(user) if message["type"] == "message" and message["channel"] == user.get_cookie("channel"): User.write_message (message["data"]) future = asyncio.Gather (listen()) loop.run_until_complete(future) # Msg (tornado. Web. RequestHandler) : # rewrite the superclass method def set_default_headers (self) : Self. Set_header (" access-Control-allow-origin ","*") self. Set_header (" access-Control-allow-origin ","*" Self.set_header (" access-Control-allow-headers ","x-requested-with") # requested-headers self.set_header(" access-Control-allow-headers ","x-requested-with" Self. set_header(" access-control-allow-methods ","POST,GET,PUT,DELETE") # async def POST (self): Data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # Redis.redis () r.publish(channel,data) return self. Write (" OK ") # tornado.web. (r'/send/',Msg), (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': Thread(target=redis_listener,args=(loop,)).start() # declare the server Http_server_1 = tornado. Httpserver. Httpserver (app) # listener port http_server_1. Listen. (8000) # open event loop tornado.ioloop.IOLoop.instance().start()Copy the code

Here we assume that there are two channels by default, and the logic looks like this: Controlled by the front-end websocket link the user to select the news release to the channel, at the same time, each user through front cookies set properties have channel, channel when have the attribute of the user of the channel posted a message, after all the other with the channel attribute of the user through the redis subscription after active push message has just been released, The channel push only matches the users who subscribe to the channel to achieve the purpose of message isolation.

If the loop instance is passed to the coroutine object, the loop instance will not be available in the subscription method.

IOLoop.current() doesn't work in non-main
Copy the code

This is because Tornado is based on ioloop, whereas Django or Flask in synchronous framework mode doesn’t have this problem.

Here we use the most popular VUe3.0 framework to write chat.vue:

<template> <div> <h1> Chat window </h1> <van-tabs V-model :active="active" @click="change_channel"> <van-tab title=" customer service # 1 "> <tr v-for="item,index in msglist" :key="index"> {{item}} </tr> </table> </van-tab> <van-tab title=" customer service 2 "> <table> <tr V -for="item,index in msglist" :key="index"> {{item}} </tr> </table> </van-tab> </van-tabs> <van-field label=" chat info" V-model =" MSG "/> <van-button color="gray" @click="commit"> </van-button> </div> </template> <script> export default { Data () {return {auditList :[], // msglist:[], MSG :"", websock: null, // establish a connection lockReconnect: False, // Whether to establish a connection timeout: 3 * 1000, //30 seconds one heartbeat timeoutObj: null, // Outer heartbeat countdown serverOutobj: null, // inner heartbeat detection timeoutNum: Null, // Disconnect reconnection countdown active:0, channel:"channel_1"}}, Change_channel :function(){if(this.active === 0){this.channel = "channel_1"; var name = "channel"; var value = "channel_1"; }else{ this.channel = "channel_2"; var name = "channel"; var value = "channel_2"; } this.msglist = []; var d = new Date(); d.setTime(d.getTime() + (24 * 60 * 60 * 1000)); var expires = "expires=" + d.toGMTString(); document.cookie = name + "=" + value + "; " + expires; this.reconnect(); }, initWebSocket() {// initialize weosocket const wsuri = "ws://localhost:8000/wb/"; this.websock = new WebSocket(wsuri); this.websock.onopen = this.websocketonopen; this.websock.onmessage = this.websocketonmessage; this.websock.onerror = this.websocketonerror; this.websock.onclose = this.websocketclose; }, reconnect() {var that = this; If (thate. lockReconnect) {// If (thate. lockReconnect); } that.lockReconnect = true; Timeoutnum && clearTimeout(that.timeoutnum); Timeoutnum = setTimeout(function() {// Then connect that.initwebSocket (); that.lockReconnect = false; }, 5000); }, reset() {var that = this; // Clear time (clear the internal and external heartbeat timing) clearTimeout(thate.timeoutobj); clearTimeout(that.serverTimeoutObj); // Restart the heartbeat thate.start (); }, start() {var self = this; self.timeoutObj && clearTimeout(self.timeoutObj); // If the outer heartbeat countdown exists, clear self.servertimeoutobj && clearTimeout(self.servertimeoutobj); Self.timeoutobj = setTimeout(function() {self.timeoutobj = setTimeout(function() {self.timeoutobj = setTimeout(function() { If (self.websock.readyState == 1) {// If (self.websock. send("heartCheck"); } else {// reconnect(); } self.serveroutobj = setTimeout(function() {self.websock.close(); }, self.timeout); }, self.timeout); // 3s once}, websocketonopen(e) {// After the connection is established, execute the send method to send data console.log(" success "); // this.websock.send("123"); // this.websocketsend(JSON.stringify(actions)); }, webSocketonError () {// Connection failed to establish console.log(" failed "); this.initWebSocket(); }, websocketonmessage(e) { console.log(e); Parse (e.data); // Const redata = json.parse (e.data); const redata = e.data; / / accumulate this. Msglist. Push (redata); console.log(redata); }, webSocketSend (Data) {// Data is sent this.websocket.send (Data); }, webSocketClose (e) {// Close this.reconnect() console.log(" reconnect ", e); }, Function (){// Send the request this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{ console.log(data); }); },}, mounted(){// Connect to the backend webSocket service this.initwebSocket (); var d = new Date(); d.setTime(d.getTime() + (24 * 60 * 60 * 1000)); var expires = "expires=" + d.toGMTString(); document.cookie = "channel" + "=" + "channel_1" + "; " + expires; } } </script> <style scoped> @import url(".. /assets/style.css"); .chatbox{ color:black; } .mymsg{ background-color:green; } </style>Copy the code

Here, the front-end online client periodically sends heartbeat events to the status server. If the server receives a heartbeat event from the client within a certain amount of time (for example, x seconds), the user is considered online. Otherwise, it is offline and can be reconnected within the threshold time. At the same time, the label page of vant framework can be used to synchronize channel switching, after which the channel identity is written into cookies, so that the back-end service can identify and match the push.

It works like this:

Sure, the functionality is already there, but what if we’re in a high-concurrency scenario? Imagine a channel with 100,000 people online at the same time and 100 new messages per second. Then tornado’s websocket service push frequency is 100W *10/s = 1000W /s.

Without load balancing, such a system architecture is difficult to withstand the pressure, so where is the bottleneck? Redis: aioredis: aioredis: aioredis: Aioredis

pip3 install aioredis
Copy the code

Aioredis avoids IO blocking by using coroutines to asynchronously operate redis reads and writes, making the publish and subscribe operations of messages non-blocking.

At this point, we can create a new asynchronous subscription service file main_with_aioredis.py:

import asyncio  
import aioredis  
from tornado import web, websocket  
from tornado.ioloop import IOLoop  
import tornado.httpserver  
import async_timeout
Copy the code

The main modification logic is to asynchronously establish redis links via aioredis and asynchronously subscribe to multiple channels, and then register the subscribed asynchronous task reader via the native coroutine asyncio.create_task method (asyncio.ensure_future can also be used) :

async def setup():  
    r = await aioredis.from_url("redis://localhost", decode_responses=True)  
    pubsub = r.pubsub()  
  
    print(pubsub)  
    await pubsub.subscribe("channel_1","channel_2")  
  
    #asyncio.ensure_future(reader(pubsub))  
    asyncio.create_task(reader(pubsub))
Copy the code

In the subscription consumption method, asynchronously listens to the published information in the subscribed channel, and compares the user’s channel attributes and pushes them by channel as in the previous synchronous method:

async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: User.write_message (message["data"]) await asyncio.sleep(0.01) except asyncio.timeouterror: passCopy the code

Finally, the Tornado event loop IOLoop pass is used to execute the callback method and add the SETUP method to the event callback:

If __name__ == '__main__': # Listen to port application.listen(8000) loop = ioloop.current () loop.add_callback(setup) loop.start()Copy the code

Complete asynchronous publish, subscribe, push service transformation main_aioredis.py:

import asyncio import aioredis from tornado import web, Ioloop import ioloop import Tornado. Httpserver import Async_timeout users = [] # webSocket protocol Class WB (tornado. Websocket. WebSocketHandler) : # cross-domain support def check_origin (self, origin) : return True # open link def open (self) : Users.append (self) # def on_message(self,message): self.write_message(message['data']) # def on_close(self): Def set_default_headers(self): def set_headers (self): Self. Set_header (" access-Control-allow-origin ","*") self. Set_header (" access-Control-allow-origin ","*" Self.set_header (" access-Control-allow-headers ","x-requested-with") # requested-headers self.set_header(" access-Control-allow-headers ","x-requested-with" Self. set_header(" access-control-allow-methods ","POST,GET,PUT,DELETE") # async def POST (self): Data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # publish r = await aioredis.from_url("redis://localhost", decode_responses=True) await r.publish(channel,data) return self.write("ok") async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: User.write_message (message["data"]) await asyncio.sleep(0.01) except asyncio.timeouterror: pass async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub)) application = web.Application([ (r'/send/',Msg), (r'/wb/', WB), ],debug=True) if __name__ == '__main__': Listen (8000) loop = IOLoop. Current () loop.add_callback() loop.start()Copy the code

From the point of view of programming, it makes full use of the idea of asynchronous execution of coroutines, making it more silky and smooth.

Conclusion: Practice, Redis publish-subscribe pattern, very fit this real-time chat (websocket) communication system, but released news if there is no corresponding channel or consumers, the message will be discarded, if we in the production environment at the time of consumption, suddenly broken network, lead to one of the subscribers to hang up for a long time, So if you want to keep the system robust, you’ll need other services to design high-availability real-time storage solutions, but that’s another story. Finally, here’s the project address, for everyone to enjoy: github.com/zcxey2911/t…

The original article is reprinted from liu Yue’s Technology blog v3u.cn/a_id_202