Translation original link repost/reprint please indicate the source

The original link @medium.com was published on 2017/08/03

Everybody is good! My name is Sergey Kamardin. I’m an engineer from Mail.Ru. This article will describe how we developed a heavy load WebSocket service using Go. Even if you are familiar with WebSockets but know little about Go, I hope that the ideas and techniques for performance optimization described in this article have inspired you.

1. Introduction

As a prelude to the whole article, I want to talk about why we developed this service.

Mail.ru has a number of systems that contain state. The user’s E-mail storage is one of them. There are many ways to track these state changes. State changes can be detected through periodic polling or system notifications. Both approaches have their advantages and disadvantages. For email products, it is a consideration index to let users receive new emails as soon as possible. Polling mail generates approximately 50,000 HTTP requests per second, of which 60% return status 304 (indicating that the mailbox has not changed). Therefore, in order to reduce the load on the server and speed up the receiving of mail, we decided to rewrite a publish-subscriber service (which would also be called bus, Message Broker, or Event-channel). This service is responsible for receiving notifications of status updates and then also for subscribing to those updates.

Before rewriting the publisher-subscriber service:

Now:

The first picture above shows the old architecture. Browser periodically polls the API service for updates to the mail Storage service.

The second figure shows the new architecture. The Browser establishes a WebSocket connection with the Notificcation API service. The notification API service sends the associated subscription to the Bus service. When a new E-mail message is received, the Storage service (Storage) sends a notification to Bus (1), which in turn sends the notification to the corresponding subscriber (2). The API service finds the connection for the received notification and pushes the notification to the user’s browser (3).

We will discuss this API service (also known as WebSocket service) today. Before WE begin, I want to mention that this online service handles nearly 3 million connections.

2. The idiomatic way

First, let’s take a look at how some of the functionality of this service could be implemented with Go without any optimization. Before implementing specific features with NET/HTTP, let’s discuss how we will send and receive data. This data is defined over the WebSocket protocol (for example, JSON objects). We will refer to them as packets.

Let’s implement the Channel structure first. It contains the logic to send and receive packets over a WebScoket connection.

2.1. The Channel structure



// Packet represents application level data.
type Packet struct{... }// Channel wraps user connection.
type Channel struct {
    conn net.Conn    // WebSocket connection.
    send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}Copy the code

The two goroutines that I want to emphasize here are reading and writing. Each Goroutine requires its own memory stack. The initial stack size depends on the operating system and Go version and is usually between 2KB and 8KB. We mentioned earlier that there are 3 million online connections, and if each Goroutine stack requires 4KB, then all connections require 24GB of memory. And that’s not counting the memory allocated for the Channel structure, the ch.send used to send packets, and other internal fields.

2.2 I/O goroutines

Let’s look at the implementation of “Reader” :



func (c *Channel) reader(a) {
    // We make a buffered read to reduce read syscalls.
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}Copy the code

Here we use Bufio.reader. As many bytes as the BUF size allows are read each time to reduce the number of read() system calls. In an infinite loop, we expect to receive new data. Remember the previous sentence: expect to receive new data. We’ll talk about that later.

We ignore both packet parsing and processing logic because they are irrelevant to the optimization we are discussing. Buf is worth keeping an eye on though: its default size is 4KB. This means that all connections will consume an additional 12 GB of memory. The same is true for writer:



func (c *Channel) writer(a) {
    // We make buffered write to reduce write syscalls.
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}Copy the code

We loop the packet to the buffer on the c.end channel where the packet is to be sent. As careful readers will have noticed, this is another 4KB of memory. Three million connections would take up 12GB of memory.

2.3. The HTTP

We already have a simple Channel implementation. Now we need a WebSocket connection. As it’s still under the heading of Idiomatic Way, let’s take a look at how it’s usually implemented.

Note: If you don’t know how WebSockets work, it’s worth noting that clients set up Websockets through a special HTTP mechanism called Upgrade requests. After a successful upgrade request is processed, the server and client exchange binary WebSocket frames using a TCP connection. Here is a description of the frame structure.



import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws".func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    / /...
})Copy the code

Note that the http.responsewriter structure here contains bufio.Reader and bufio.Writer (each containing 4KB of cache). They are used to initialize \* HTTP. Request and return results.

Regardless of which WebSocket successfully responds to an upgrade request, the server receives an I/O cache and the corresponding TCP connection after calling responseWriter.hijack ().

PutBufio {Reader,Writer} sometimes we can release the cache back to sync.pool in NET/HTTP.

Those 3 million connections require an additional 24 GB of memory.

So, we’ve used up 72 GB of memory for a program that doesn’t do anything!

3. The optimization

Let’s review the user connection workflow described earlier. After the WebSocket is set up, the client sends a request to subscribe to related events (we’ll ignore ping/pong requests here). Then, the client may not send any other data for the lifetime of the connection.

The lifetime of the connection may last from a few seconds to a few days.

So most of the time, channel.reader () and channel.writer () are waiting to receive and send data. Waiting along with them is their respective allocation of 4 KB OF I/O cache.

Now, we see that there are places where we can optimize further, right?

3.1. Netpoll

Do you remember that the implementation of channel.reader () used bufio.reader.read ()? Bufio.reader.read () in turn calls conn.read (). The call is blocked waiting to receive new data on the connection. If there is new data on the connection, the Go runtime wakes up the corresponding Goroutine to read the next packet. After that, the Goroutine is blocked again to wait for new data. Let’s look at how the Go environment knows that Goroutine needs to be awakened.

If we look at the implementation of conn.read (), we see that it calls net.net fd.read () :



// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    / /...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        iferr ! =nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue}}}/ /...
        break
    }
    / /...
}Copy the code

Go uses sockets in non-blocking mode. EAGAIN indicates that there is no data in the socket but the socket is not blocked. The OS returns control to the user process.

Here it begins with a read() system call to the connection file descriptor. If read() returns an EAGAIN error, the runtime calls polldesc.waitread () :



// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead(a) error {
   return pd.wait('r')}func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   / /...
}Copy the code

If we dig a little deeper, we can see that the implementation of NetPoll uses Epoll in Linux and Kqueue in BSD. Why don’t we do something similar with these connections? Cache space is allocated and goroutine for reading data is enabled only if there is readable data on the socket.

There is a question about the open netpoll function at github.com/golang/go.

Kill goroutines. 3.2

Suppose we implement Netpoll in the Go language. We can now avoid creating a goroutine for channel.reader () and instead receive new data events from the subscribed connection.



ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func(a) {
    // We spawn goroutine here to prevent poller wait loop
    // to become locked during receiving packet from ch.
    go ch.Receive()
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive(a) {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}Copy the code

Channel.writer() is relatively easy because we only need to create a Goroutine and allocate the cache when we send the packet.



func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}Copy the code

Note that here we are not dealing with the EAGAIN returned by the write() system call. We rely on the Go runtime environment to handle it. This rarely happens. We can still do it the same way we did before if we need to.

After reading the packets to be sent from ch.send, ch.writer() will complete its operation and finally release the goroutine stack and the cache used for sending.

Very good! We have saved 48 GB by avoiding the I/O cache and stack memory occupied by these two goroutines running continuously.

3.3. Control resources

A large number of connections can cause more than just a lot of memory consumption. We also kept running into race conditions and deadlocks as we developed the server. This was followed by what are known as self-ddos attacks. In this case, the client will make things worse by blatantly trying to reconnect to the server.

For example, if for some reason we suddenly couldn’t handle a ping/pong message, these idle connections would be constantly closed (they would think they were invalid and therefore wouldn’t receive data). The client then thinks it has lost the connection every N seconds and tries to re-establish the connection instead of waiting for a message from the server.

In this case, it is a good idea to stop the overloaded server from accepting new connections so that the load balancer (such as Nginx) can redirect requests to other servers.

Regardless of the load on the server, if all the clients suddenly (probably because of a bug) send a packet to the server, the 48 GB we saved will be consumed again. We will create a Goroutine for each connection and allocate the cache as we did before.

Goroutine pool

You can use a Goroutine pool to limit the number of packets processed at the same time. The following code is a simple implementation:



package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func(a)),
        sem:  make(chan struct{}, size),}}func (p *Pool) Schedule(task func(a)) error {
    select {
    case p.work <- task:
    case p.sem <- struct{} {} :go p.worker(task)
    }
}

func (p *Pool) worker(task func(a)) {
    defer func(a) { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}Copy the code

Our code for using Netpoll looks like this:



pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func(a) {
    // We will block poller wait loop when
    // all pool workers are busy.
    pool.Schedule(func(a) {
        ch.Receive()
    })
})Copy the code

Now not only do we have to wait for readable data to appear on the socket before we can read the packet, but we also have to wait for a free Goroutine to be fetched from the pool.

Similarly, let’s change the Send() code:



pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}Copy the code

Instead of calling go ch.writer(), we want to reuse the pool goroutine to send data. So, if a pool has N goroutines, we can guarantee that N requests will be processed simultaneously. And N + 1 requests don’t allocate N + 1 cache. The Goroutine pool allows us to limit Accept() and Upgrade() to new connections, thus avoiding most DDoS situations.

3.4. Zero-copy Upgrade

As mentioned earlier, the client switches to the WebSocket protocol via an HTTP Upgrade request. An upgrade request is shown below:



GET /wsHTTP / 1.1Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version13:Upgrade: websocket

HTTP / 1.1101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocketCopy the code

We receive the HTTP Request and its header just to switch to the WebSocket protocol, and HTTP.Request stores all the header data. The lesson here is that, for optimization purposes, we can abandon the use of standard NET/HTTP services and avoid useless memory allocation and copying when processing HTTP requests.

For example, HTTP.Request contains a field called Header. The standard NET/HTTP service will unconditionally copy all Header data from the request into the Header field. You can imagine that this field holds a lot of redundant data, such as a header containing a very long cookie.

How do we optimize?

WebSocket implementation

Unfortunately, all the libraries we could find while we were tuning the server side only supported upgrades to standard NET/HTTP services. And none of the libraries allowed us to implement the read and write optimizations mentioned above. To make these optimizations possible, we must have a set of underlying apis to operate webSockets. To reuse the cache, we need protocol functions like the following:



func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) errorCopy the code

If we have a library containing such an API, we can read packets from the connection as follows:



// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _: =ReadFrame(buf)
    parsePacket(frame.Payload)
    / /...
}Copy the code

In short, we need to write our own library.

github.com/gobwas/ws

The main design idea of the WS library is not to expose the operation logic of the protocol to the user. All read and write functions accept the common IO.Reader and IO.Writer interfaces. So it can be used with or without caches and other I/O libraries.

In addition to upgrade requests in the standard library NET/HTTP, WS also supports zero-copy upgrades. It can handle upgrade requests and switch to WebSocket mode without any memory allocation or copying. Ws.upgrade () accepts IO.ReadWriter (net.conn implements this interface). In other words, we can use the standard net.listen () function and immediately send the connection received from ln.accept () to ws-.upgrade () for processing. The library also allows you to copy any requested data to meet future application requirements (for example, copy cookies to validate a session).

Here are performance tests for handling upgrade requests: implementation of standard NET/HTTP libraries and Net.listen () using zero-copy upgrades:



BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/opCopy the code

Using WS and the zero-copy upgrade saved us 24 GB of space. This space was originally used as an I/O cache for net/ HTTP requests.

Review 3.5.

Let’s review the optimizations mentioned earlier:

  • A read Goroutine with a cache takes up a lot of memory. Solution: Netpoll (epoll, kqueue); Reuse the cache.
  • A write Goroutine with a cache takes up a lot of memory. Solution: Create goroutine when needed; Reuse the cache.
  • Netpoll does not do a good job of limiting the number of connections when there are a large number of connection requests. Solution: Reuse goroutines and limit their number.
  • net/httpThe processing of requests to upgrade to WebSocket is not the most efficient.Solution:Zero-copy upgrade over TCP connections.

Here is the rough implementation code for the server side:



import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp".": 8080")

for {
    // Try to accept incoming connection inside free pool worker.
    // If there no free workers for 1ms, do not accept anything and try later.
    // This will help us to prevent many self-ddos or out of resource limit cases.
    err := pool.ScheduleTimeout(time.Millisecond, func(a) {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // Wrap WebSocket connection with our Channel struct.
        // This will help us to handle/send our app's packets.
        ch := NewChannel(conn)

        // Wait for incoming bytes from connection.
        poller.Start(conn, netpoll.EventRead, func(a) {
            // Do not cross the resource limits.
            pool.Schedule(func(a) {
                // Read and handle incoming packet(s).
                ch.Recevie()
            })
        })
    })
    iferr ! =nil {
        time.Sleep(time.Millisecond)
    }
}Copy the code

4. Conclusion

In programming, premature optimization is the root of all evil. Donald Knuth

The optimizations above make sense, but not in all cases. For example, optimization doesn’t make much sense if the ratio of free resources (memory, CPU) to the number of online connections is high. Of course, it’s always helpful to know where to optimize and how.

Thank you for your attention!

5. References

  • Github.com/mailru/easy…
  • github.com/gobwas/ws
  • Github.com/gobwas/ws-….
  • Github.com/gobwas/htt….