Websocket simple concept understanding

Websocket can be understood: Web + Socket = websocket, which is based on socket and works in the application layer of a single TCP connection for full-duplex communication protocol.

The biggest feature of Websocket is that the server can take the initiative to push information to the client, and the client can also take the initiative to send information to the server. It can also be understood that Websocket is a kind of server push technology.

Of course it has other features and so on:

  • Clients can communicate with any server regardless of the same origin policy
  • An inner message can send text or binary data
  • It has good compatibility with HTTP protocol

Just because my background, need to carry out some messages notification push to the front end, all need to carry out the relevant Websocket communication. As for some specific websocket knowledge points, I will not expand too much here, here is mainly practice records. The webSocket requirement for Flask was implemented based on gEvent – webSocket. Fastapi also comes with webSocket support, which is good for my simple needs.

1 Examples on the official website

1.1 Official website Example 1: WebSocket Pit Filling

First of all, if you copy and paste the example from the official website to run, you will find that you will run when trying to connect to your websocket, it will never connect!

The simplest example code from the official website is:

from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse app = FastAPI() html = """ <! DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://localhost:8000/ws"); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() while True: data = await websocket.receive_text() await websocket.send_text(f"Message text was: {data}") if __name__ == '__main__': Import uvicorn uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True) uvicorn.run('masin:app', port=8011, )Copy the code

But when you’re running! The browser will keep sending out error messages, initially thinking that there is a problem with the browser and a different port number, etc., but after each attempt, the problem is still the same.

failed: Error during WebSocket handshake: xxxxxx: 400
Copy the code

Scratching my head! So I decided to re-install our Uvicorn:

pip install uvicorn[standard]
Copy the code

You must use the above method, even if you have standard installed separately! Or maybe mine didn’t reboot!

Or re-install webSockets:

pip install websockets
Copy the code

The corresponding versions are as follows:

Later we can happily connect to our WebSocket!

When our service is running, the front end can communicate directly with our @app.webSocket (“/ws”). 1: Open the browser and check our link status

2: random input, you can see our own input information again browser!

However, the above example is flawed in that there is no exit exception catching! When our browser refreshed, because the link was not released and closed and raised an exception!

1.2 Official website Example 2- Added dependency injection

Add token dependency verification! The complete code for the sample is:

from typing import Optional from fastapi import Cookie, Depends, FastAPI, Query, WebSocket, status from fastapi.responses import HTMLResponse app = FastAPI() html = """ <! DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <form action="" onsubmit="sendMessage(event)"> <label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label> <label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label> <button onclick="connect(event)">Connect</button> <hr> <label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = null; function connect(event) { var itemId = document.getElementById("itemId") var token = document.getElementById("token") ws  = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws? token=" + token.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) async def get_cookie_or_token( websocket: WebSocket, session: Optional[str] = Cookie(None), token: Optional[str] = Query(None), ): if session is None and token is None: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return session or token @app.websocket("/items/{item_id}/ws") async def websocket_endpoint( websocket: WebSocket, item_id: str, q: Optional[int] = None, cookie_or_token: STR = Depends(get_cookie_or_token), # # wait for connection to await websocket.accept() # process link while True: Receive_text () # send the received data back to await websocket.send_text(f"Session cookie or Query token value is: {cookie_or_token}") # if q is not None: Send_text (f"Query parameter q is: {q}") # await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}") if __name__ == '__main__': Import uvicorn uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True) uvicorn.run('masin:app', port=8011, )Copy the code

Description of example steps:

The first is the browser opens the page, will be websocket connection:

1: The first time we click “send”, it will return directly because it depends on our get_cookie_or_token. Query tokens are handled by dependencies.

2: The second time, the official entry into our inside to send and receive data.

1.3 Official website example 3- Multi-person text chat

The previous two examples have not dealt with the abnormal problem of WebSocketDisconnect on the server caused by our client disconnection, mainly because we need to timely capture and close the exception!

A complete example is provided on the official website as follows:

#! The/usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: masin file function description: Function Description Creator: Typing Import List from fastAPI import fastAPI, WebSocket WebSocketDisconnect from fastapi.responses import HTMLResponse app = FastAPI() html = """ <! DOCTYPE HTML > < HTML > <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2> <span id="ws-id"></span></h2> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ class ConnectionManager: def __init__(self): Active_connections: List[websocket] = [] async def connect(self, websocket: websocket): Active_connections. append(websocket) def disconnect(self, websocket: WebSocket): Remove (websocket) async def send_personal_message(self, message: Send_text (message) async def broadcast(self, message: STR): # loop variable sends a message to all online active links - global broadcast for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): # await manager.connect(WebSocket) try: while True: Data = await websocket.receive_text() await manager.send_personal_message(f" You typed: : {data}", websocket) await manager.broadcast(f"Client #{client_id} says: {data}") except WebSocketDisconnect: Manager.disconnect (websocket) # Broadcast all user messages await manager.broadcast(f"Client #{client_id} left the chat room ") if __name__  == '__main__': Import uvicorn uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True) uvicorn.run('masin:app', port=8011, )Copy the code

Then we can use the online test tool to test the connection, or directly open the browser test:

Open a client using the test tool:

Request the address is: the ws: / / localhost: 8000 / ws/xxxxxxidCopy the code

Open a client using a browser and send a message:

View the results:

2 Extension – Customer service system countdown message recovery processing

If you are using websocket do a customer service system, the need to deal with the user message processing, but if a user to connect up, never send any message, nor does it close under the condition of our browser, we service side how to handle this kind of always don’t talk again of pit a user consultation?

How long has it been since a message has been received? To put it simply:

— The server actively asks whether there are any questions to consult, if there is no start countdown — and then every how long to ask, until a certain number of times, or has not received the message from the user ID, then we will take the initiative to disconnect the user link.

The timeout mechanism of coroutines can be implemented in two ways:

  • One is based on async_timeout, which is in the form of a context manager
  • One is based on asyncio’s own asyncio.wait_for

Based on example 3 provided on the official website:

2.1 Implementation of timeout using async_timeout:

#! The/usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: masin file function description: Function Description Creator: Typing Import List from fastAPI import fastAPI, WebSocket WebSocketDisconnect from fastapi.responses import HTMLResponse import asyncio app = FastAPI() html = """ <! DOCTYPE HTML > < HTML > <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2> <span id="ws-id"></span></h2> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ class ConnectionManager: def __init__(self): # save all current links to websocket object # self.active_connections: List[WebSocket] = [] self.active_connections = [] async def connect(self, websocket: WebSocket): Active_connections. append(websocket) async def disconnect(self, websocket: WebSocket): Active_connections. remove(websocket) # await websocket.close() async def send_personal_message(self, message: str, websocket: WebSocket): Send_text (message) async def broadcast(self, message: STR): # loop variable sends a message to all online active links - global broadcast for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): # await manager.connect(WebSocket) timeout_count = 0 try: while True: try: async with async_timeout.timeout(1) as cm: Receive_text () timeout_count = 0 await manager.send_personal_message(f" You typed: : {data}", websocket) await manager.broadcast(f"Client #{client_id} says: {data}") print(cm.expired) except asyncio.TimeoutError: Timeout_count = timeout_count+1 await manager.send_personal_message(f" You have not entered a message for a long time, is there anything else you need to inquire? : ({timeout_count}) ", websocket) # places to note !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! If timeout_count >5: raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() # await manager.disconnect(websocket) # await manager. Await Manager. disconnect(websocket) # Await Manager. broadcast(f"Client #{client_id} left chat room ") if __name__ == '__main__': Import uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True) import uvicorn # uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True)Copy the code

2.2 Asyncio.wait_for

@app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): # front default await manager.connect(websocket) timeout_count = 0 try: while True: try: If no message is received, then a timeout will be thrown. Notify front end data = await asyncio.wait_for(websocket.receive_text(), 1) await manager.send_personal_message(f" You typed: : {data}", websocket) # notify all active online linked objects of await manager.broadcast(f"Client #{client_id} says: {data}") timeout_count = 0 # data = await asyncio.wait_for(websocket.receive_text(), 1) # await manager.send_personal_message(f" You typed: {data}", websocket) # notify all active online linked objects # await manager.broadcast(f"Client #{client_id} says: {data}") except asyncio.TimeoutError: Timeout_count = timeout_count + 1 await manager.send_personal_message(f" You have not entered a message for a long time, is there anything else you need to inquire? : ({timeout_count}) ", websocket) # places to note !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! # Places to notice !!!!!!!!!!!!!!! If timeout_count > 5: raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() raise WebSocketDisconnect() #await manager.disconnect(websocket) #await manager. Broadcast (f"Client #{client_id} has left the chat room ")Copy the code

Test results:

Note the following: Do not directly await manager.disconnect(websocket) during timeout processing, otherwise it will say that you cannot find the object of the websocket you are joining!

ValueError: list.remove(x): x not in list
Copy the code

This place is weird! I don’t know why this is happening!

3 Added classes to define WebSocket routes

The previous examples use our app.webSocket to define routes directly

@app.websocket("/ws/{client_id}")
Copy the code

But in fact we can and can inherit from WebSocketEndpoint to implement a new class to implement the route definition, and then use

@app.websocket_route
Copy the code

To decorate the websocket route implemented by the class or use the added method:

app.add_websocket_route("/ws/{user_id}",EchoSever,name="ws")
Copy the code

The specific implementation process is as follows:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect from starlette.endpoints import WebSocketEndpoint from fastapi.responses import HTMLResponse from enum import Enum from typing import Any, Dict, List, Optional import asyncio app = FastAPI() html = """ <! DOCTYPE HTML > < HTML > <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2> <span id="ws-id"></span></h2> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ class ConnectionManager: def __init__(self): # save all current links to websocket object # self.active_connections: List[WebSocket] = [] self.active_connections = [] async def connect(self, websocket: WebSocket): Active_connections. append(websocket) async def close(self, websocket: WebSocket): # Active disconnection of clients, Close () self.active_connections.remove(websocket) async def disconnect(self, websocket: WebSocket): Active_connections. remove(websocket) # await websocket.close() async def send_personal_message(self, message: str, websocket: WebSocket): Send_text (message) async def broadcast(self, message: STR): # loop variable sends a message to all online active links - global broadcast for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): Render a front-end page for testing links, you can actually use the tool to test it! return HTMLResponse(html) @app.websocket_route("/ws/{user_id}", name="ws") class EchoSever(WebSocketEndpoint): encoding: str = "text" session_name: str = "" count: int = 0 def __init__(self, *args, **kwargs): super().__init__(*args, Print (args[0]['endpoint']) print(args[0]['path_params']) self.user_id: Optional[str] = args[0]['path_params'].get('user_id') # async def daojishi(self, websocket): # # setattr(websocket,'timeout_count',0) # # try: # while True: # try: # await asyncio.wait_for(websocket.receive_text(), 1) # except asyncio.TimeoutError: # timeout_count = getattr(websocket,'timeout_count') # timeout_count = timeout_count + 1 # setattr(websocket, 'timeout_count', timeout_count) # await manager.send_personal_message(f" You haven't entered a message for a long time, is there anything else you need to ask? : ({timeout_count}) ", websocket) # if timeout_count > 5: # raise WebSocketDisconnect() # raise WebSocketDisconnect() # raise WebSocketDisconnect() # # await manager.disconnect(websocket) # # Handle the connection after closing the browser # except WebSocketDisconnect: # pass # print(' active disconnection ') # await manager.disconnect(websocket) # await Manager. broadcast(f" visitor: Async def on_connect(self, websocket): async def on_connect(self, websocket): Broadcast (f" visitor: {self.user_id} entered the room!" Daojishi (websocket) # async def on_receive(self, websocket, data): async def on_receive(self, websocket, data) # timeout_count = getattr(websocket, 'timeout_count') # setattr(websocket, 'timeout_count', 0) await manager.broadcast(f" {self.user_id} says "{data}") # async def on_disconnect(self, websocket, close_code) when client disconnects: # broadcast all user messages globally from all online links try: Await Manager.disconnect (websocket) # broadcast to all other online websocket await manager.broadcast(f" visitor: {self.user_id} left the chat room ") except ValueError: pass if __name__ == '__main__': Import uvicorn uvicorn.run(app='main_ro:app', host="127.0.0.1", port=8000, reload=True, debug=True)Copy the code

The above example can also complete a chat room conversation with multiple people.

In the example above, we added a new closing handler to ConnectionManager.

async def close(self, websocket: WebSocket): Close () self.active_connections.remove(websocket)Copy the code

It will normally notify us that the front-end connection is disconnected. If it is directly raise WebSocketDisconnect(), the front-end cannot normally get the disconnected message.

Conclusion:

The above are some simple cases provided by the official website. If it is really needed to be applied to the production environment, there are still many problems involved, such as

  • User authentication combined with user login mechanism can be achieved
  • There are also user messages to understand packages and packets
  • Heartbeat Mechanism Detection
  • Client reconnection mechanism
  • Offline detection rules

At the end

Simple notes! For reference only!

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822