Today we start to implement the function of bullets, although we have implemented a super simple, crude Websocket protocol, but as mentioned before, that is just learning to use, in fact we use a third party package.

The function of barrage is realized

Depend on the installation

We use the Go mod to install dependencies. For package management of Go, see the official documentation.

Install the websocket implementation, we use github.com/gorilla/websocket package:

$ go get github.com/gorilla/websocket
Copy the code

We use Redis. To install the Redis package, we use github.com/go-redis/redis:

$ go get github.com/go-redis/redis/v8
Copy the code

Golang.org/x package failure

Since wall problems are known to cause installation failures, check out this article

Setting environment variables:

$ export GOPROXY=https://goproxy.io
Copy the code

In this way, the installation will be successful.

usego modFor dependency management, the actual package is not downloadedsrcIt’s in the directorygo modIt’s in the warehouse. You can get throughgo envCommand to seego modWarehouse directory. In addition, I useGolandEditor, because the dependency package is not theresrcDirectory,import“, the editor has been red, indicating that the package can not be found, can not find the package, the corresponding code prompt, this is very annoying. Fortunately, Goland supports itgo mod, we need a simple setup:

There is no longer a message indicating that the package can’t be found, and the code message is working properly.

Connect the Redis

var ctx = context.Background()

var rdb = NewRedisConn()
func NewRedisConn(a) (client *redis.Client) {
	client = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "".// no password set
		DB:       0.// use default DB
	})

	_, err := client.Ping(ctx).Result()
	iferr ! =nil {
		log.Fatal(err)
	}
	return
}
Copy the code

Just follow the official demo, there’s nothing to say.

URI defined

RFC6455 defines the routing rules for websocket. We define the URI as /chat? room=xxx

Room indicates the room number. As we all know, there must be room number in the live broadcast. In the video website, the ID of the video or the id of the current page is used as the room number.

http.HandleFunc("/chat".func(writer http.ResponseWriter, request *http.Request) {
    serveWs(writer, request)
})
err := http.ListenAndServe("localhost:9527".nil)
iferr ! =nil {
    log.Fatal(err)
}
Copy the code

Since websocket is HTTP based, we can get routing parameters as HTTP:

queries := request.URL.Query()
roomId := queries.Get("room")
Copy the code

After the parameters are obtained, some verification needs to be done:

if roomId == "" {
    writer.WriteHeader(400)
    log.Println("Room must be")
    return
}
_, err := rdb.Get(ctx, fmt.Sprintf("room:%s", roomId)).Result()
if err == redis.Nil {
    writer.WriteHeader(400)
    log.Println("The room does not exist")
    return
} else iferr ! =nil {
    writer.WriteHeader(500)
    log.Println(err)
    return
}
Copy the code

Some definitions

// Message type
const (
	danmuMsg = 1 / / barrage
	bannedMsg = 2 / / silence
	joinMsg = 3 // Enter the room
	metaMsg = 4 // Room information
)

const (
	deadline = 5 * time.Second // The timeout period
	pongWait = 60 * time.Second // Wait time for pong
	pingPeriod = (pongWait * 9) /10 / / frequency of the ping
	maxMessageSize = 512 // Maximum read data size
)

type Client struct {
	conn *websocket.Conn
	msg RsvData

	room *Room

	// The amount of data cached by the client
	bufChan chan []byte
}

// Message format
type Data struct {
	MsgType int
	Msg string
}

// Format of the end message
type RsvData struct {
	Data
	Token []byte / / the reserved
}

// Write message format
type WriteData struct {
	Data
}
Copy the code

Room Definition:

type Room struct {
	id string
	clients map[*Client]bool

	broadcastMsg chan []byte

	leaving chan *Client
	entering chan *Client
}

type Rooms struct {
	rooms map[string]*Room

	mux sync.Mutex
}
Copy the code

The room with

User enters a room, needs to obtain the room information, if the room does not exist to create:

room, ok := rooms.rooms[roomId]
if! ok { room = &Room{ id: roomId, clients:make(map[*Client]bool),
        leaving: make(chan *Client),
        broadcastMsg: make(chan []byte),
        entering: make(chan *Client),
    }
    go room.broadcast()
    rooms.mux.Lock()
    rooms.rooms[roomId] = room
    rooms.mux.Unlock()
}
Copy the code

We do it in rooms, one goroutine at a time, and a user comes in and needs to be told to add him to the room:

room.entering <-client
Copy the code

We listen for entering in the goroutine of the room:

cli := <-r.entering
r.clients[cli] = true
Copy the code

When we watch the live broadcast, we often see “Welcome XXX to the room”.

msg := WriteData{
    Data{MsgType:joinMsg, Msg: cli.conn.RemoteAddr().String()},
}
byteMsg, _ := json.Marshal(msg)
r.broadcastMsg <- byteMsg
Copy the code

We haven’t added user related functions yet, so we’ll use the user’s IP instead.

The user watches the live broadcast for a while, doesn’t want to watch it or wants to watch another live broadcast, then in the abstract, the user leaves the current room,

cli := <-r.leaving
delete(r.clients, cli)
close(cli.bufChan)
Copy the code

When the user leaves the room, the user is deleted from the current room and no longer sends or receives messages. The corresponding channel is closed

The following is the most important, bullet screen, watching live or video, how to deal with those bullet screen? We still abstract it into a room, and send the barrage in this room to all users in this room.

msg := <-r.broadcastMsg
for cli := range r.clients {
    cli.bufChan <- msg
}
Copy the code

This is what the room needs to do, handling incoming, departing, and messages:

func (r *Room) broadcast(a) {
	for {
		select {
		case cli := <-r.entering:
			msg := WriteData{
				Data{MsgType:joinMsg, Msg: cli.conn.RemoteAddr().String()},
			}
			byteMsg, _ := json.Marshal(msg)
			r.broadcastMsg <- byteMsg
			r.clients[cli] = true
		case cli := <-r.leaving:
			delete(r.clients, cli)
			close(cli.bufChan)
		case msg := <-r.broadcastMsg:
			for cli := range r.clients {
				cli.bufChan <- msg
			}
		}
	}
}
Copy the code

To send and receive barrage

Each user has a Websocket connection. In abstract terms, only two things are done on a connection, sending and receiving messages:

go client.readMsg()
client.writeMsg()
Copy the code

In go’s HTTP service, when a connection comes in, it opens a Goroutine to handle the connection, so a Websocket and a Goroutine, we open a Goroutine to handle the read.

Write a message

We define a bufChan channel for each client connection, which may have been closed (e.g., leaving a room), so we need to consider the case where the channel is closed:

msg, ok := <-c.bufChan
C. bofchan channel is closed
if! ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{})
    return
}
Copy the code

If not closed, write messages to the client normally:

// Only text messages are supported for now
w, err := c.conn.NextWriter(websocket.TextMessage)
iferr ! =nil {
    return
}
w.Write(msg)

// Send information in the cache
n := len(c.bufChan)
for i := 0; i < n; i++ {
    w.Write(<-c.bufChan)
}

/ / github.com/gorilla/websocket package of general practice, do it
// Flush the information to the stream
// It does not close the TCP connection
// w.Flush() would be more appropriate
iferr := w.Close(); err ! =nil {
    return
}
Copy the code

The heartbeat processing

Why should add heartbeat processing? This prevents one end from accidentally disconnecting the TCP connection while the other end does not know about it and keeps the connection handle, thus running out of memory over time. TCP has a keep-alive option, which is rarely used in practice. Generally, the application performs the heartbeat processing by itself.

Ping and pong control frames are defined in the websocket protocol for heartbeat packets. For websocket control frames, see section RFC64555.5

Heartbeat packets can be sent by the client or the server. There is no way to ask the client to do this or that, so the server usually sends heartbeat packets by itself.

Creating a timer:

ticker := time.NewTicker(pingPeriod)
<-ticker.C
c.conn.SetWriteDeadline(time.Now().Add(deadline))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err ! =nil {
    return
}
Copy the code

The write message has been processed, here is the complete code:

func (c *Client) writeMsg(a) {
	ticker := time.NewTicker(pingPeriod)
	defer func(a) {
		ticker.Stop()
		c.conn.Close()
	}()

	for {
		select {
		case msg, ok := <-c.bufChan:
			c.conn.SetWriteDeadline(time.Now().Add(deadline))
			C. bofchan channel is closed
			if! ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			// ToDO message type is configurable
			// Only text messages are supported for now
			w, err := c.conn.NextWriter(websocket.TextMessage)
			iferr ! =nil {
				return
			}
			w.Write(msg)

			// Send information in the cache
			n := len(c.bufChan)
			for i := 0; i < n; i++ {
				w.Write(<-c.bufChan)
			}

			// Flush the information to the stream
			// It does not close the TCP connection
			// w.Flush() would be more appropriate
			iferr := w.Close(); err ! =nil {
				return
			}
		case <-ticker.C:
			c.conn.SetWriteDeadline(time.Now().Add(deadline))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err ! =nil {
				return}}}}Copy the code

Read the information

It is easy to read the message, just read it, and send it to the client:

c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
_, msg, err:= c.conn.ReadMessage()
iferr ! =nil {
    if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
        log.Printf("error: %v", err)
    }
    break
}

// Todo json parsing other processing
writeMsg := WriteData{
    Data{MsgType:danmuMsg, Msg: string(msg)},
}
byteMsg, _ := json.Marshal(writeMsg)
c.room.broadcastMsg <- byteMsg
Copy the code

The information read here is forwarded directly to the client without doing any processing. Then we do processing to the read information, such as sensitive word filtering, suppression processing, etc.

Test it out:The message format is not processed at first, as long as the data can be sent and received normally, the message will be processed later.

Everything seemed normal, but after about a minute, the connection suddenly broke:What’s going on here?

Because we set ReadDeadline to 1 minute above. After a minute, the server itself disconnects. 1 minute is the pong wait time, so when we ping the client and the client responds, we need to reset the ReadDeadline time.

c.conn.SetPongHandler(func(string) error {
    c.conn.SetReadDeadline(time.Now().Add(pongWait))
    return nil
})
Copy the code

Reset deadline when you receive Pong, and that’s it. Now the basic function of sending bullets has been implemented.

For deadline, you can check the official documentation:

$ go doc net.Conn.SetDeadline
Copy the code

The code has been committed to the Gitee repository.