“This is the 8th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021”

Introduction to the

Streaming media nowadays has become an important technology of industrial, such as: live sites, video monitoring live transmission, APP, how to achieve a high concurrency video website, that has implications for language technology selection and the use of streaming media technology, this section mainly introduces how to use Golang to implement a streaming video site.

The outline

  • background
  • Why do you choose Go and some of its advantages
  • Introduction to GoLang and implementing a WebServer toolchain
  • Golang’s Channel concurrency mode
  • Complete a streaming website with Golang
  • Site deployment

background

Why do you choose Go and some of its advantages

Why do you choose Go to develop video websites? This is mainly reflected in the advantages of Go. So what are the advantages of Go?

  • Development efficiency is high, no matter in other languages, need many other configurations or plug-ins, even the whole family bucket of complete Java language will need a Servlet engine, such as: Tomcat, Jetty, etc. But Go offers unique capabilities in this regard. Most of the functionality and content is already integrated into the PKG. Includes the development of a complete development tool chain (tools, test, benchmark, builtin.etc), including Go command (Go test, Go install, Go build). These are all complete and can be used by downloading Go directly.
  • On the other hand, it is easy to deploy, and GO is a compiled language that can compile multiple platform executables. Compile once, run everywhere, generate binary files after direct compilation, run directly.
  • Good native HTTP library, integrated template engine, no need to add third party framework.

Introduction to GoLang and implementing a WebServer toolchain

Go is a compiled language, and it aims to combine the speed of a dynamic language such as Python with the performance and security of a compiled language such as C/C++.

There are some common toolchains in Go, such as:

  • Env GOOS= Linux GOARCH=amd64 go build, CI/CD, this is a very useful command.
  • Go install, which is also compiled, but differs from build in that the output file is packaged into a library under the PKG after compilation.
  • Go get is used to obtain the go third-party package. The common one is: Go get -u Git address, which indicates that you have obtained a certain resource from Git and installed it locally.
  • Go FMT, unified code style, typesetting.
  • Run tests in the current directory. “Go test -v” prints all the information.
  • The go test file is usually named xxx_test. go

Key points:

  • Using TestMain as the initialization test and using Run() to call other tests can do some testing that needs to be initialized, such as database, file loading, etc.
func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    m.Run()
}
Copy the code
  • If Run() is not added, tests other than TestMain will not be performed.
func TestPrint(t *testing.T) {
    fmt.Println("Test print")
}

func TestMain(m *testing.M) {
    fmt.Println("Test begin")
    //m.Run()
}
Copy the code

As stated above, the TestPrint function will not be executed if the Run() method is not executed.

Golang’s Channel concurrency mode

In Go, now that you have coroutines, how do these coroutines communicate with each other? Go provides a channel to solve this problem.

Declare a channel

In Go, declaring a channel is as simple as using the built-in make function:

ch:=make(chan string)
Copy the code

Where chan is a keyword that indicates the channel type. The following string indicates that the data in a channel is of type string. As you can also see from the channel declaration, chan is a collection type.

Once chan is defined, it can be used. There are only two operations for a chan: send and receive:

  • Send: Sends a value to chan and places the value in chan with the operator chan <-

  • Receive: Gets the value in chan with the operator < -chan

Example:

Package main import "FMT" func main() {ch := make(chan string) go func() {FMT.Println(" FMT ") ch <-" Println("I am main goroutine") v := < -ch FMT.Println(" received chan: ",v)}Copy the code

Let’s first execute and see the print result:

The value of chan received is: Sender: Main goroutineCopy the code

It can be seen from the running result that the effect of using time.Sleep function is achieved.

It should be clear why the program doesn’t exit before the new Goroutine completes. There is no value in the chan created by make, and the main Goroutine wants to get a value from the chan. Wait until another Goroutine sends a value to chan.

There is no buffer channel

In the example above, the chan created with make is an unbuffered channel with a capacity of 0 and no data to store. Therefore, an unbuffered channel only serves to transfer data, and the data does not stay in the channel. This also means that the send and receive operations of an unbuffered channel take place simultaneously, which is also called a synchronous channel.

Have a buffer channel

A buffered channel is like a blocking queue, with its internal elements first in, first out. The second argument to the make function specifies the size of a channel and creates a buffered channel, as in:

cacheCh := make(chan int,5)
Copy the code

Chan defines an element of size 5 as chan of type int.

A buffered channel has the following characteristics:

  • A buffered channel has a buffered queue inside it

  • The send operation inserts an element to the end of the queue, and if the queue is full, blocks and waits until another Goroutine executes, and the receive operation frees up the queue

  • The receive operation takes the element from the head of the queue and removes it from the queue. If the queue is empty, it blocks and waits until another Goroutine executes, and the send operation inserts the new element

Cache := make(chan int,5) cache < -2 cache < -2 FMT.Println(" size :",cap(cache),", len(cache))Copy the code

An unbuffered channel is a channel with zero capacity. Such as make (chan int, 0)

Shut down the channel

A channel is closed by the built-in function close. If a channel is closed, no data can be sent to it, and if it is, a Painc exception will be raised. However, it is possible to receive data from a channel, if there is no data in the channel, with zero values of the element type.

A one-way channel

The so-called one-way, can not send, or can only receive. Declare a one-way channel with the <- operator as follows:

send := make(chan <- int)
receive := make(<- chan int)
Copy the code

Complete a streaming website with Golang

Business module

API Interface design

  • layered
  • Restful Design
  • CRUD distinguishes resource operations
  • Return code specification

First, we write a startup class:

package main 

import (
	"net/http"
	"github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
	r *httprouter.Router
}

func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
	m := middleWareHandler{}
	m.r = r
	return m
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	//check session
	validateUserSession(r)

	m.r.ServeHTTP(w, r)
}

func RegisterHandlers() *httprouter.Router {
	router := httprouter.New()

	router.POST("/user", CreateUser)

	router.POST("/user/:user_name", Login)

	return router
}

func main() {
	r := RegisterHandlers()
	mh := NewMiddleWareHandler(r)
	http.ListenAndServe(":1000", mh)
}
Copy the code

Here we implement registration, login, and some initial listening ports. Next, we need to look at the main concern for back-end video processing is session:

package session import ( "time" "sync" "github.com/avenssi/video_server/api/defs" "github.com/avenssi/video_server/api/dbops" "github.com/avenssi/video_server/api/utils" ) var sessionMap *sync.Map func init() { sessionMap = &sync.Map{} } func nowInMilli() int64{ return time.Now().UnixNano()/1000000 } func deleteExpiredSession(sid string) { sessionMap.Delete(sid) dbops.DeleteSession(sid) } func LoadSessionsFromDB() { r, err := dbops.RetrieveAllSessions() if err ! = nil { return } r.Range(func(k, v interface{}) bool{ ss := v.(*defs.SimpleSession) sessionMap.Store(k, ss) return true }) } func GenerateNewSessionId(un string) string { id, _ := utils.NewUUID() ct := nowInMilli() ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min ss := &defs.SimpleSession{Username: un, TTL: ttl} sessionMap.Store(id, ss) dbops.InsertSession(id, ttl, un) return id } func IsSessionExpired(sid string) (string, bool) { ss, ok := sessionMap.Load(sid) if ok { ct := nowInMilli() if ss.(*defs.SimpleSession).TTL < ct { deleteExpiredSession(sid) return "", true } return ss.(*defs.SimpleSession).Username, false } return "", true }Copy the code

As you can see from the above code, Go mainly refers to the related video plugin libraries avenssi/ Video_server etc to handle caching sessions. This is one reason why go was chosen to develop the back end.

We also define an error code message:

package defs

type Err struct {
	Error string `json:"error"`
	ErrorCode string `json:"error_code"`  
}

type ErrResponse struct {
	HttpSC int
	Error Err
}

var (
	ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
	ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
	ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
	ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)
Copy the code

This is the main logic for the business layer. Scheduler and Stream Server are discussed below.

scheduler

Scheduler is mainly used to schedule tasks. Which tasks are the main ones? These are mainly tasks for which the normal API cannot give immediate results. For example, our video website needs some requirements for video review and data recovery. At this time, we need to do some short delay, so that users can’t see it, but the background still exists. This requires scheduler to process it asynchronously. There are also periodic tasks.

In Scheduler, there are also timers, which are mainly used to process tasks periodically.

In this section, we adopt runner’s production and consumer mode to achieve. The specific code is as follows:

package taskrunner import ( ) type Runner struct { Controller controlChan Error controlChan Data dataChan dataSize int longLived bool Dispatcher fn Executor fn } func NewRunner(size int, longlived bool, d fn, e fn) *Runner { return &Runner { Controller: make(chan string, 1), Error: make(chan string, 1), Data: make(chan interface{}, size), longLived: longlived, dataSize: size, Dispatcher: d, Executor: e, } } func (r *Runner) startDispatch() { defer func() { if ! r.longLived { close(r.Controller) close(r.Data) close(r.Error) } }() for { select { case c :=<- r.Controller: if c == READY_TO_DISPATCH { err := r.Dispatcher(r.Data) if err ! = nil { r.Error <- CLOSE } else { r.Controller <- READY_TO_EXECUTE } } if c == READY_TO_EXECUTE { err := r.Executor(r.Data) if err ! = nil { r.Error <- CLOSE } else { r.Controller <- READY_TO_DISPATCH } } case e :=<- r.Error: if e == CLOSE { return } default: } } } func (r *Runner) StartAll() { r.Controller <- READY_TO_DISPATCH r.startDispatch() }Copy the code

Runner is reusable, and the Task that follows is customized for Runner. For example: We delay deleting videos.

Let’s get the data first and see:

package dbops import ( "log" _ "github.com/go-sql-driver/mysql" ) func ReadVideoDeletionRecord(count int) ([]string, error) { stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?") var ids []string if err ! = nil { return ids, err } rows, err := stmtOut.Query(count) if err ! = nil { log.Printf("Query VideoDeletionRecord error: %v", err) return ids, err } for rows.Next() { var id string if err := rows.Scan(&id); err ! = nil { return ids, err } ids = append(ids, id) } defer stmtOut.Close() return ids, nil } func DelVideoDeletionRecord(vid string) error { stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?" ) if err ! = nil { return err } _, err = stmtDel.Exec(vid) if err ! = nil { log.Printf("Deleting VideoDeletionRecord error: %v", err) return err } defer stmtDel.Close() return nil }Copy the code

Task:

package taskrunner import ( "os" "errors" "log" "sync" "github.com/avenssi/video_server/scheduler/dbops" ) func deleteVideo(vid string) error { err := os.Remove(VIDEO_PATH + vid) if err ! = nil && ! os.IsNotExist(err) { log.Printf("Deleting video error: %v", err) return err } return nil } func VideoClearDispatcher(dc dataChan) error { res, err := dbops.ReadVideoDeletionRecord(3) if err ! = nil { log.Printf("Video clear dispatcher error: %v", err) return err } if len(res) == 0 { return errors.New("All tasks finished") } for _, id := range res { dc <- id } return nil } func VideoClearExecutor(dc dataChan) error { errMap := &sync.Map{} var err error forloop: for { select { case vid :=<- dc: go func(id interface{}) { if err := deleteVideo(id.(string)); err ! = nil { errMap.Store(id, err) return } if err := dbops.DelVideoDeletionRecord(id.(string)); err ! = nil { errMap.Store(id, err) return } }(vid) default: break forloop } } errMap.Range(func(k, v interface{}) bool { err = v.(error) if err ! = nil { return false } return true }) return err }Copy the code

This is the process of asynchronous, timed processing of video stream information.

stream server

  • Streaming

  • Upload files

Streaming is mainly different from ordinary links, it needs to keep long links, and short links are different, when sending a request to the client, it will constantly output data stream, and will be very long. Therefore, there is a problem when multiple long links are maintained at the same time. If we continue to launch links and open web pages, our service will crash eventually. Therefore, we need to carry out flow control: limit, where flow control may only be restricted during connect.

package main import ( "log" ) type ConnLimiter struct { concurrentConn int bucket chan int } func NewConnLimiter(cc int)  *ConnLimiter { return &ConnLimiter { concurrentConn: cc, bucket: make(chan int, cc), } } func (cl *ConnLimiter) GetConn() bool { if len(cl.bucket) >= cl.concurrentConn { log.Printf("Reached the rate limitation.") return false } cl.bucket <- 1 return true } func (cl *ConnLimiter) ReleaseConn() { c :=<- cl.bucket log.Printf("New connction coming: %d", c) }Copy the code

After adding flow control, we need to embed flow control in HTTP Middleware. Also, we need to register the Router and HTTP server at startup, so the code is as follows:

package main import ( "net/http" "github.com/julienschmidt/httprouter" ) type middleWareHandler struct { r *httprouter.Router l *ConnLimiter } func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler { m := middleWareHandler{} m.r = r m.l = NewConnLimiter(cc) return m } func RegisterHandlers() *httprouter.Router { router := httprouter.New() router.GET("/videos/:vid-id", streamHandler) router.POST("/upload/:vid-id", uploadHandler) router.GET("/testpage", testPageHandler) return router } func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if ! m.l.GetConn() { sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests") return } m.r.ServeHTTP(w, r) defer m.l.ReleaseConn() } func main() { r := RegisterHandlers() mh := NewMiddleWareHandler(r, 2) http.ListenAndServe(":2000", mh) }Copy the code

Finally, let’s look at how streamHandler handles it:

func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { vid := p.ByName("vid-id") vl := VIDEO_DIR + vid video, err := os.Open(vl) if err ! = nil { log.Printf("Error when try to open file: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") return } w.Header().Set("Content-Type", "video/mp4") http.ServeContent(w, r, "", time.Now(), video) defer video.Close() }Copy the code

We use a more general approach here: after receiving the unique information of the stream, we directly process it.

Upload files, we need to do a static check and then read data from it:

func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE) if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err ! = nil { sendErrorResponse(w, http.StatusBadRequest, "File is too big") return } file, _, err := r.FormFile("file") if err ! = nil { log.Printf("Error when try to get file: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") return } data, err := ioutil.ReadAll(file) if err ! = nil { log.Printf("Read file error: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") } fn := p.ByName("vid-id") err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666) if err ! = nil { log.Printf("Write file error: %v", err) sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") return } w.WriteHeader(http.StatusCreated) io.WriteString(w, "Uploaded successfully") }Copy the code

Site deployment

We want to compile and package the previous code:

FROM Ubuntu :16.04 as build ENV TZ=Asia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime&& echo $TZ > /etc/timezone RUN apt-get update && apt-get install -y --no-install-recommends \ g++ \ ca-certificates \ wget && \ rm -rf /var/lib/apt/lists/* ENV GOLANG_VERSION 1.15.1 RUN wget -nv-o - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \ | tar - C/usr/local - xz ENV GOPROXY=https://goproxy.cn,direct ENV GO111MODULE=on ENV GOPATH /go ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH WORKDIR  /go/src COPY . . WORKDIR /go/src/video-service RUN sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.conf RUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \ go install -ldflags="-s -w" -v /go/ SRC /video-service FROM Ubuntu :16.04 WORKDIR /video-service RUN mkdir -p log COPY -- FROM =build /go/bin/video-service /video-service CMD ["./video-service"]Copy the code

Next, add the deployment script:

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    app: video-service
  name: video-service
  namespace: system-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: video-service
  template:
    metadata:
      labels:
        app: video-service
    spec:
      containers:
        - image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service
          imagePullPolicy: Always
          name: video-service
          ports:
            - containerPort: 1000
          #livenessProbe:
            #httpGet:
              #path: /api/v1/healthz
              #port: 1000
              #scheme: HTTP
            #initialDelaySeconds: 15
            #periodSeconds: 10
            #timeoutSeconds: 3
            #failureThreshold: 5
          volumeMounts:
            - name: video-service-config
              mountPath: /video-service/conf
      volumes:
        - name: video-service-config
          configMap:
            name: video-service-config
      nodeSelector:
        video-service: "true"
      restartPolicy: Always
Copy the code

Execute the compile command:

sh build/build.sh
kubectl create -f deploy.yml
Copy the code

K8s is used here to deploy to the machine. Service access address after deployment:

10.11.3.4:1000
Copy the code