background

The online Devops service connects to k8S via websocket to view pod logs. However, devops service CPU usage often increases after logging viewing is enabled. Later, after printing logs, it was found that in webSocket communication, pod would send log information to DevOPS service through WebSocket every time it generated a log. The Devops service handles WebSocket messages by opening a thread to process the message every time it receives one. As you can see, many threads are created and destroyed during the entire log viewing process. The frequent creation and destruction of threads leads to increased CPU resources.

To solve

One solution is to reduce the number of times the message is sent, which means that the DevOPS service receives fewer WebSocket messages and has fewer threads to create.

In this article, reducing the number of messages sent means combining multiple messages into a single message and then sending it. We call this operation anti-shake.

implementation

If the logic

The first thing to make clear is that every time a message is sent to a Websocket, it is not actually sent. Instead, the message is sent to the message receiving channel and stored in a string variable. Wait until it is time to send the actual message and then send it out.

In this paper, there are three elements of the anti-shake operation logic:

  1. Timeout: Indicates the interval for sending a message.

For example, send the message for the NTH time and send the message for the n+1 time:

If the message is sent for the NTH time and is not triggered for the NTH +1 time within the timeout period, the webSocket is actually sent, and timeout and interval are retimed.

If the NTH sending of a message is triggered after the NTH +1 sending of a message within the timeout period, the webSocket does not send a message, but stores the message, and then retimes timeout and interval (the function of interval is described below).

  1. Total maximum wait time (interval):

In element 1, there is a problem: if the time difference between each message is less than timeout, the message content will be stored forever and cannot be sent out.

So you have the total maximum wait time. The waiting time is to solve the problem that the message cannot be sent out when the interval for sending messages is shorter than timeout.

If the interval for sending a message is less than timeout, the interval must be triggered. When the interval is triggered, it sends a message to the WebSocket, and then retimes timeout and interval.

  1. Message receiving channel

Used to receive/store messages

In practice, this method will run as a coroutine

Code implementation

func trafficAntiShake(conn *websocket.Conn, done chan bool, msgChan chan []byte, Errors chan Error) {// Maximum wait time interval := time.newtimer (500 * time.millisecond) // Read timeout time timeout := time.NewTimer(TimeoutTime) message := make([]byte, 0) message = append(message, <-msgChan...) For {select {// If done is readable, exit the coroutine case <-done: return // Reach the maximum waiting time to send, send immediately, and reset the timer case <-interval.C: if len(message) == 0 { if ! timeout.Stop() { select { case <-timeout.C: default: } } timeout.Reset(TimeoutTime) interval.Reset(IntervalTime) continue } if err := conn.WriteMessage(websocket.BinaryMessage, message); err ! = nil { errors <- err return } message = []byte{} if ! timeout.Stop() { select { case <-timeout.C: default: }} timeout.Reset(TimeoutTime) interval.Reset(IntervalTime) // If the read times out, send a message immediately and Reset the timer case <-timeout.C: if len(message) == 0 { if ! interval.Stop() { select { case <-interval.C: default: } } timeout.Reset(TimeoutTime) interval.Reset(IntervalTime) continue } if err := conn.WriteMessage(websocket.BinaryMessage, message); err ! = nil { errors <- err return } message = []byte{} if ! interval.Stop() { select { case <-interval.C: default: }} timeout.Reset(TimeoutTime) // Read message from websocket, write message to message, Reset timer case MSG := < -msgchan: message = append(message, msg...) if ! timeout.Stop() { select { case <-timeout.C: default: } } timeout.Reset(TimeoutTime) } } }Copy the code

The above method takes four parameters:

  • Conn is the websocket object
  • Done controls the exit of this method
  • MsgChan is used to receive messages
  • Receive an error message generated by this method.

The method first declares three variables:

  • Timeout: Indicates the maximum interval between messages
  • Interval: indicates the maximum waiting time
  • “Message” : stores messages

Then comes the main part: Select and case

  1. Case <-done: This case reads the done channel, for example, when other coroutines want to control the exit of the coroutine calling this method
  2. Case <-interval.C: the channel is readable, indicating that element 2 logic is triggered.

First determine if there are any messages to send, then stop the timeout timer, then reset the timer, and finally loop.

If there is a message, clear the message after sending the message, stop the timeout timer, then reset the timer, and then start the loop

  1. Case <-timeout.C: this channel is readable, indicating that the first case of element 1 is triggered.

Determines if there are any messages to send, stops the interval timer if there are none, resets the timer, and then loops.

4. Case < -msgchan: this channel is readable, indicating that the second case of element 1 is triggered.

Read messages from the channel, store them in Message, reset the timeout timer, and loop.

conclusion

For the realization of anti – shake logic, mainly used in Golang select/case, channel and timer. Note that the message is sent when the timeout or interval ends. The timer must be reset after each time. Otherwise, the timer will not take effect in subsequent cycles

For more information about timer use, see this article: click here