Marcio Castilho

August 31, 2017

I’ve been in the anti-spam, anti-virus, and anti-malware industry for more than 15 years, working for several different companies, and NOW I know that these systems can end up getting very complicated due to the huge amount of data we deal with every day.

I am currently THE CEO of Smsjunk.com and lead architect of KnowBe4, both companies active in the cybersecurity industry.

Interestingly, over the last 10 years or so as a software engineer, almost all of the web backend development I’ve been involved with has been in Ruby on Rails. Don’t get me wrong, I love Ruby on Rails, I believe it’s a great environment, and after a while you start thinking and designing systems in Ruby’s way, forgetting how efficient and simple your architecture can be if you take advantage of multi-threading, parallelization, fast execution, and small memory overhead. Over the years as a C/C++, Delphi, and C developer, I’ve come to realize that with the right tools to get things done, things aren’t that complicated.

I’m not a big fan of Internet language and frameworks. I believe efficiency, productivity, and code maintainability largely depend on how easy it is to build solutions.

The problem

In developing our anonymous telemetry and analysis system, our goal was to be able to handle large numbers of POST requests from millions of endpoints. The Web handler will receive a JSON document that may contain collections of payloads that may need to be written to Amazon S3 so that our Map-Reduce system can manipulate the data later.

Traditionally, we would consider creating a worktier architecture using the following:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • The and so on…

Then set up two different clusters, one for the Web front end and the other for workers, so that we can expand according to the background workload.

From the beginning, our team knew we should use Go because we guessed during the discussion phase that it would be a very high-traffic system. I have been using Go for about 2 years and have developed a few systems, but none of them have had this much load.

We start by creating some structure to define the payload of the Web request received through a POST call, and create a method to upload it to an S3 bucket.

//gistfile1.go
type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3(a) error {
	// the storageFolder method ensures that there are no name collision in
	// case we get same timestamp in the key name
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	ifencodeErr ! =nil {
		return encodeErr
	}

	// Everything we post to the S3 bucket should be marked 'private'
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
Copy the code

Simply use Go Routines

We started with a very simple IMPLEMENTATION of the POST handler, trying to process requests in parallel with a simple Goroutine:

//gistfile1.go
func payloadHandler(w http.ResponseWriter, r *http.Request) {

	ifr.Method ! ="POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	iferr ! =nil {
		w.Header().Set("Content-Type"."application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// Go through each payload and queue items individually to be posted to S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- DON'T DO THIS
	}

	w.WriteHeader(http.StatusOK)
}
Copy the code

This method is generally acceptable for moderate loads, but soon proves to be less effective for increased loads. We encountered an order of magnitude more requests than we had expected when we deployed our first release into production. We totally underestimated the traffic.

The above approach is bad in several ways. We have no control over how many GO Routines are generated. Since we were getting a million POST requests per minute, the code quickly crashed.

Try it again

We need a different solution. From the beginning, we discussed how to make the lifetime of the request handler as clean as possible and do the processing in the background. Of course, this is what you have to do in the Ruby on Rails world, otherwise you will block all available worker Web processors like Puma, Unicorn, passenger (leaving JRuby aside for now). We need to leverage common solutions to do this, such as Resque, Sidekiq, SQS, and so on. There are many ways to achieve this.

So the second iteration was to create a buffer channel so that we could put some work tasks in the queue and upload them to S3. Since we could control the length of the queue and there was enough memory in memory, we thought we could just cache the work tasks in the channel queue.

//second_attemp.go 
var Queue chan Payload

func init(a) {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request){...// Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}
Copy the code

Then, we need to extract the work from the queue and process it, using a similar method:

//medium-secondattempt-2.go
func StartProcessor(a) {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD}}}Copy the code

Honestly, I don’t know what we were thinking. It must be the result of staying up all night drinking Red Bull. This approach doesn’t do us any good, we trade the flawed concurrency for a buffered queue that just postpones the problem. Our synchronous processor only uploaded one payload to S3 at a time, and because the rate of receiving requests was far greater than the ability of a single processor to upload to S3, our buffer channel quickly reached its limit, preventing the request handler’s ability to queue additional work.

We’re just sidestepping the issue and counting down the days until our system finally dies. After we deployed this flawed version, the system’s latency rate increased at a constant rate for several minutes.

Better solutions

We decided to use a common pattern of Go Channels to create a two-tier system of channels, one for hosting the task queue and the other for controlling the amount of concurrency to process the task queue.

The idea is to parallelize S3 uploads at a sustainable rate that neither crashes the machine nor causes S3 to make connection errors. Therefore, we choose to create a Job/Worker pattern. For those familiar with Java, C#, etc., this can be seen as implementing a worker thread pool using Golang’s channels.

//medium_better_solution.go
var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE"))// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}}// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(a) {
	go func(a) {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				iferr := job.Payload.UploadToS3(); err ! =nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return(1)}}}}// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop(a) {
	go func(a) {
		w.quit <- true(1)}}Copy the code

We have modified our Web request handler to create an instance of the Job structure with the payload and send it to the JobQueue channel for workers to pick up.

//medium_better_solution_2.go
func payloadHandler(w http.ResponseWriter, r *http.Request) {

    ifr.Method ! ="POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    iferr ! =nil {
		w.Header().Set("Content-Type"."application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}
Copy the code

During web server initialization, we create a Dispatcher and call Run() to create the workers pool and start listening to see if JobQueue has a job to process.

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()
Copy the code

Here is the code for our Dispatcher implementation:

//medium_dispatcher.go
type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run(a) {
    // starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch(a) {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// try to obtain a worker job channel that is available.
				// this will block until a worker is idle
				jobChannel := <-d.WorkerPool

				// dispatch the job to the worker job channel
				jobChannel <- job
			}(job)
		}
	}
}
Copy the code

Notice that we instantiated the maximum number of workers and added them to the workers pool. Since we used Amazon Elasticbeanstalk and Dockinized Go environments in this project, and we strictly configured our production environment with the 12-factor approach, we read these values from environment variables. This allows us to control the number of workers and the length of the task queue so that we can quickly adjust these values without redeploying the cluster.

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS") 
  MaxQueue  = os.Getenv("MAX_QUEUE"))Copy the code

Immediately after we deployed, we saw the system’s latency rate drop to a negligible magnitude, while our ability to handle requests increased dramatically.

A few minutes after Elastic Load Balancers fully warmed up, we saw that Elasticbeanstalk app was processing nearly 1 million requests per minute. Typically during the morning hours, the traffic would exceed 1 million a minute.

When we deployed the new code, we reduced the number of servers from 100 to about 20.

By properly configuring the cluster and auto-scaling Settings, we can configure only 4 EC2 C4. Large instances and then use Elastic auto-scaling to generate new instances when CPU duration exceeds 90% for 5 minutes.

conclusion

Simplicity is the most important thing to me. We could have designed a complex system with many queues, back-office workers and complex deployments, but we decided to take advantage of the power of Elasticbeanstalk auto-scaling and the easy way to deal with concurrency provided by Golang. Write to Amazon S3 with only 4 machines (probably less than my MacBook Pro) processing 1 million POST requests per minute.

Each task has the right tools. Sometimes, when your Ruby on Rails system needs a very powerful Web handler, consider a simpler, more powerful alternative solution outside of the Ruby ecosystem.

Original link: medium.com/smsjunk/han…