This article is very popular on Medium. The author analyzes it with a real case and speaks it well.

We often hear that it is easy to implement high concurrency using Go’s Goroutine and channel. Is it possible to implement high concurrency by running all the code in goroutine? Apparently not. This article will teach you how to write a simple, highly concurrent Go program step by step.

The body of the

I’ve worked in anti-spam, anti-virus, and anti-malware for over 15 years at several different companies, and I now know that these systems will eventually become more complex as we process massive amounts of data every day.

Currently, I am the CEO of Smsjunk.com and the chief architect of KnowBe4, both companies in the cybersecurity industry.

Interestingly, most of all the Web backend development I’ve been involved with as a software engineer over the past 10 years has been done using RubyonRails. Don’t get me wrong, I love RubyonRails, and I believe it’s a great ecosystem, but after a while, you start thinking and designing systems in Ruby’s way, forgetting how efficient and easy it is to simplify software architecture with multi-threading, parallelism, fast execution and small memory consumption. I’ve been a C/C++, Delphi, and C # developer for many years, and I’m just beginning to realize how complex tools can be to work with properly.

I'm not really interested in the language and framing wars on the Internet, which language is better, which framework is faster. I've always believed that efficiency, productivity, and code maintainability depend primarily on how easy it is to build solutions.

The problem

When working with our anonymous monitoring and analysis system, our goal is to be able to handle large numbers of POST requests from millions of endpoints. The Web processor will receive a JSON document that may contain a collection of valid content that needs to be written to AmazonS3 for our map-reduce system to manipulate the data later.

Traditionally, we would consider creating a worktier architecture that utilizes technology stacks such as the following:

  • Sidekiq
  • Resque
  • DelayedJob
  • ElasticbeanstalkWorkerTier
  • RabbitMQ
  • .

And set up two different clusters, one for the Web front end and one for the worker, so we can expand the machine at will to handle the upcoming requests.

From the beginning, our team knew we could do this with Go, as we saw during the discussion phase that this could be a very high-traffic system. I’ve been using Go for almost 2 years, and we’ve developed a few systems using Go, but none of them had traffic of this magnitude. We started by creating several structs to define the Web request we received through a POST call and uploading it to S3 storage.

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3(a) error {
	// the storageFolder method ensures that there are no name collision in
	// case we get same timestamp in the key name
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	ifencodeErr ! =nil {
		return encodeErr
	}

	// Everything we post to the S3 bucket should be marked 'private'
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
Copy the code

Naive approach – Hardcore use of Goroutine

Initially, we implemented a very simple and crude POST handler that ran each request directly into a new Goroutine:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

	ifr.Method ! ="POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	iferr ! =nil {
		w.Header().Set("Content-Type"."application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// Go through each payload and queue items individually to be posted to S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- DON'T DO THIS
	}

	w.WriteHeader(http.StatusOK)
}
Copy the code

This actually works for average concurrency, but it quickly turns out not to work for high concurrency scenarios. We probably had more requests, and when we deployed the first release to production, the order of magnitude we started seeing was not that, and we underestimated the concurrency.

There are several problems with this approach. There is no way to control the number of goroutines working. Also, because we had a million POST requests per minute, the system crashed very quickly.

again

We need to find another way. From the beginning we discussed how to keep the request handler’s life cycle as short as possible and to generate processing in the background. Of course, this is something that RubyonRails must do, otherwise all your available web workers will block whether you use puma, unicorn or passenger.

We need to use common solutions to do this, such as Resque, Sidekiq, SQS, etc. There are other tools, of course, because there are many ways to do it.

Therefore, our second improvement was to create a buffer channel where we could throw job requests into a queue and upload them to S3. Since we could control the maximum length of the queue and had enough RAM to queue the in-memory jobs, we thought we would just buffer jobs in the channel queue.

var Queue chan Payload

func init(a) {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request){...// Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}
Copy the code

Then, in order to pull tasks out of the buffer channel and process them, we are using this:

func StartProcessor(a) {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD}}}Copy the code

Honestly, I don’t know what we were thinking. It’s gonna be a rough night. This approach didn’t give us much of an improvement, we replaced the defective concurrency with a buffered queue, which only delayed the problem. Our synchronous processor only uploads payloads to S3 one at a time, and because the rate of incoming requests is far greater than the ability of a single processor to upload to S3, our buffer channel quickly reaches its limit, the queue is blocked and no more jobs can be added to it.

We simply circumvented the problem and ended up crashing our system completely. After we deployed this flawed version, our latency continued to rise.

Better solutions

We decided to use a common pattern on The Go Channel to create a 2-tier(dual)channel system, one to handle queued jobs and one to control how many workers work concurrently on the JobQueue.

The idea is to speed up parallel uploads to S3 to a sustainable speed without crashing the machine or causing S3 connection errors.

So we chose to create a Job/Worker pattern. For those familiar with Java, C#, etc., see this as Golang’s way of implementing workerthread-pool using channels.

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE"))// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}}// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(a) {
	go func(a) {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				iferr := job.Payload.UploadToS3(); err ! =nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return(1)}}}}// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop(a) {
	go func(a) {
		w.quit <- true(1)}}Copy the code

We modify our Web request handler to create a Job struct with a payload and send it to JobQueueChannel for the worker to process.

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    ifr.Method ! ="POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    iferr ! =nil {
		w.Header().Set("Content-Type"."application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}
Copy the code

During our Web server initialization, we create a Dispatcher and call Run() to create the worker pool and start listening for jobs appearing in the JobQueue.

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()
Copy the code

Here is the code for our scheduler implementation:

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run(a) {
    // starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch(a) {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// try to obtain a worker job channel that is available.
				// this will block until a worker is idle
				jobChannel := <-d.WorkerPool

				// dispatch the job to the worker job channel
				jobChannel <- job
			}(job)
		}
	}
}
Copy the code

Note that we instantiated the maximum number of workers and saved them to the worker pool (the WorkerPoolChannel above). Since we’ve used Amazon Elasticbeanstalk for docker-like Go projects, and we always try to follow a 12-factor approach to configure systems in production, we read these values from environment variables so that we can quickly adjust them to control the number and maximum size of work queues, There is no need to redeploy the cluster.

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS") 
  MaxQueue  = os.Getenv("MAX_QUEUE"))Copy the code

Immediately after we released this release, we saw that all of our request latency dropped to a very low number, and we became much more efficient at handling requests.

A few minutes after our elastic load balancer was fully warmed up, we saw that our ElasticBeanstalk application was serving nearly a million requests per minute. Traffic peaks at more than a million requests per minute, usually during the morning hours.

We deployed new code and reduced the number of servers from 100 to about 20.

With clustering and auto scaling Settings properly configured, we were able to do the job with four EC2 C4 units in the build environment. If the CPU exceeds 90% for five consecutive minutes, the elastic auto scaling system automatically expands a new instance.

conclusion

Simplicity has always been my winning formula. We could have designed a complex system with multiple queues, multiple daemons and difficult to deploy, but instead we decided to take advantage of Elasticbeanstalk’s auto-scaling and efficient and simple way of concurrency, which Go provides well.

Experience has taught us to use the best tools to get the job done. Sometimes, when your RubyonRails system needs to implement a very powerful handler, consider looking for a simpler and more powerful alternative solution outside the Ruby ecosystem.

Author: MarcioCastilho

The original:Medium.com/smsjunk/han…