preface

Hello, everyone. I am Asong. This time, I would like to introduce a Go asynchronous task framework machinery to you. Those of you who have used Python know that Celery frames, machinery frames are similar to Celery frames. Here’s a look at the basic use of machinery.

My own translation of a rough version of the machinery Chinese document, partners in need of public account self-take no watermark version: background reply: machinery can receive.

Or download from github: github.com/asong2020/G…

The topic

When we use some apps, we usually receive an email or a text message after logging in to the system, telling us that we have logged in at a certain time and place. The email or SMS is received after we have logged in, which is the asynchronous mechanism used here. Have you ever wondered why there’s no synchronization here? So let’s analyze it. Now suppose that we adopt the way of synchronous implementation, the user logs in, will first to check the account password is correct, to send the user login prompt after verification through information, if made a mistake in this step, you will cause the failure of the user login, this is greatly affect the user experience, a login prompt priority level is not high, So we can completely use asynchronous mechanism implementation, even if the failure will not affect the user experience. With all that said, how do you implement asynchrony? Yes, that’s right, it’s the Machinery framework. I heard you don’t know how to use it yet, so TODAY I’m going to write a little example, and we’re going to learn about it.

features

Above is simply an example, the task queue has a broad application scenarios, such as large computing tasks, when there is a large amount of data to insert, through the split and partial insert task queue, so as to realize the serial processing or the realization of parallel tasks grouping processing chain tasks, improve the system robustness, improve the system of concurrent degree; Or preprocessing data, periodically synchronize data from back-end storage to the cache system, so that when the query request occurs, directly to the cache system query, improve the response speed of the query request. There are many scenarios that apply to task queues, and I will not list them here. Returning to the theme of this article, since we want to learn about machinery, we must first understand what characteristics he has?

  • Task Retry Mechanism
  • Delayed task support
  • Task callback mechanism
  • Task Result record
  • Workflow mode: Chain, Group, and Chord
  • Multiple Brokers support: Redis, AMQP, AWS SQS
  • Multiple Backends support: Redis, Memcache, AMQP, MongoDB

architecture

Task queue, in short, is an enlarged producer-consumer model. User requests will generate tasks, and task producers will continuously insert tasks into the queue. Meanwhile, the processor program of the queue acts as the consumer’s continuous consumption tasks. Based on this framework design idea, let’s take a look at the simple design structure legend of machinery:

  • Sender: business push module, generate specific tasks, can be split according to the business logic, according to the interaction;
  • Broker: Store specific serialized tasks. Redis, AMQP, and SQS are currently supported in machinery;
  • Worker: work process, responsible for consumer functions and handling specific tasks;
  • Backend: Back-end storage that stores data about the execution status of tasks.

e.g

Learning a new thing, I used to write a demo, learn to walk first, then learn to run. So let’s start with an example, which is very simple, asynchronously computes the sum from 1 to 10.

Take a look at the config file code:

broker: redis://localhost:6379

default_queue: "asong"

result_backend: redis://localhost:6379

redis:
  max_idle: 3
  max_active: 3
  max_idle_timeout: 240
  wait: true
  read_timeout: 15
  write_timeout: 15
  connect_timeout: 15
  normal_tasks_poll_period: 1000
  delayed_tasks_poll_period: 500
  delayed_tasks_key: "asong"
Copy the code

Here broker is implemented with result_backend.

Main code, full version github access:


func main(a)  {

	cnf,err := config.NewFromYaml("./config.yml".false)
	iferr ! =nil{
		log.Println("config failed",err)
		return
	}

	server,err := machinery.NewServer(cnf)
	iferr ! =nil{
		log.Println("start server failed",err)
		return
	}

	// Register the task
	err = server.RegisterTask("sum",Sum)
	iferr ! =nil{
		log.Println("reg task failed",err)
		return
	}

	worker := server.NewWorker("asong".1)
	go func(a) {
		err = worker.Launch()
		iferr ! =nil {
			log.Println("start worker error",err)
			return
		}
	}()

	//task signature
	signature := &tasks.Signature{
		Name: "sum",
		Args: []tasks.Arg{
			{
				Type:  "[]int64",
				Value: []int64{1.2.3.4.5.6.7.8.9.10},
			},
		},
	}

	asyncResult, err := server.SendTask(signature)
	iferr ! =nil {
		log.Fatal(err)
	}
	res, err := asyncResult.Get(1)
	iferr ! =nil {
		log.Fatal(err)
	}
	log.Printf("get res is %v\n", tasks.HumanReadableResults(res))

}
Copy the code

Running results:

INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597"."Name":"sum"."RoutingKey":"asong"."ETA":null,"GroupUUID":""."GroupTaskCount":0."Args": [{"Name":""."Type":"[]int64"."Value": [1.2.3.4.5.6.7.8.9.10]}],"Headers": {},"Priority":0."Immutable":false."RetryCount":0."RetryTimeout":0."OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":""."SQSReceiptHandle":""."StopTaskDeletionOnError":false."IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f- 3237.- 49f1- 8464.-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
Copy the code

Okay, so let’s go through the code flow,

  • Read the configuration file. This step is for configurationbrokerandresult_backendHere I’m going to choose thetaredis, because the computer just has this environment, directly used.
  • MachineryLibraries must be instantiated before use. The way to do that is to create oneServerInstance.ServerisMachineryBasic objects for configuring and registering tasks.
  • In yourworkdersBefore you can consume a task, you need to register it with the server. This is done by assigning tasks a unique name.
  • In order to consume tasks, you need to have one or more workers running. All you need to run a worker is one with registered tasksServerInstance. Each worker will only use registered tasks. For each task in the queue, the worker.process () method runs in a Goroutine. You can useserver.NewWorkerTo limit the number of concurrently running worker.process () calls (per worker).
  • You can do this by puttingSignatureInstance passed toServerInstance to invoke the task.
  • callHumanReadableResultsThis method can process the reflected values to get the final result.

multi-function

1. Delayed tasks

The above code is just a simple machinery example, in fact machiney also supports delayed tasks, you can set the ETA timestamp field on the task signature to delay tasks.

eta := time.Now().UTC().Add(time.Second * 20)
	signature.ETA = &eta
Copy the code

2. Retry the task

You can set up multiple retry attempts before declaring the task failed. The Fibonacci sequence is used to separate retry requests over a period of time. The first method is to set the retryTimeout and RetryCount fields in the TSak signature directly. The retry time is superimposed according to the Fibonacci sequence.

//task signature
	signature := &tasks.Signature{
		Name: "sum",
		Args: []tasks.Arg{
			{
				Type:  "[]int64",
				Value: []int64{1.2.3.4.5.6.7.8.9.10},
			},
		},
		RetryTimeout: 100,
		RetryCount: 3,}Copy the code

Or, you can use the return. The tasks. Tasks and the duration of the specified retry ErrRetryTaskLater returned.

func Sum(args []int64) (int64, error) {
	sum := int64(0)
	for _, arg := range args {
		sum += arg
	}

	return sum, tasks.NewErrRetryTaskLater("I said he was wrong.".4 * time.Second)

}
Copy the code

3. The workflow

This is all about running an asynchronous task, but we often do projects where a requirement requires multiple asynchronous tasks to be executed in a choreographed way, so we can use Machinery’s workflow to do it.

3.1 Groups

A Group is a set of tasks that will be executed independently of each other in parallel. Let me draw a picture, just to make it a little bit clearer:

Let’s look at a simple example:

	// group
	group,err :=tasks.NewGroup(signature1,signature2,signature3)
	iferr ! =nil{
		log.Println("add group failed",err)
	}

	asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
	iferr ! =nil {
		log.Println(err)
	}
	for _, asyncResult := range asyncResults{
		results,err := asyncResult.Get(1)
		iferr ! =nil{
			log.Println(err)
			continue
		}
		log.Printf(
			"%v %v %v\n",
			asyncResult.Signature.Args[0].Value,
			tasks.HumanReadableResults(results),
		)
	}
Copy the code

Tasks in a group are executed in parallel.

3.2 chrods

We tend to have callback scenarios when working on projects, and Machiney has taken this into account for us. Chord allows you to order a callback to be executed after all tasks in groups have finished executing.

Take a look at this code:

callback := &tasks.Signature{
		Name: "call",
	}



	group, err := tasks.NewGroup(signature1, signature2, signature3)
	iferr ! =nil {

		log.Printf("Error creating group: %s", err.Error())
		return
	}

	chord, err := tasks.NewChord(group, callback)
	iferr ! =nil {
		log.Printf("Error creating chord: %s", err)
		return
	}

	chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
	iferr ! =nil {
		log.Printf("Could not send chord: %s", err.Error())
		return
	}

	results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
	iferr ! =nil {
		log.Printf("Getting chord result failed with error: %s", err.Error())
		return
	}
	log.Printf("%v\n", tasks.HumanReadableResults(results))
Copy the code

The above example executes Task1, task2, and task3 in parallel, aggregating their results and passing them to the callback task.

3.3 chains

A chain is a set of tasks that are executed one after another. Each successful task triggers the next task in the chain.

Look at this code:

//chain
	chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
	iferr ! =nil {

		log.Printf("Error creating group: %s", err.Error())
		return
	}
	chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
	iferr ! =nil {
		log.Printf("Could not send chain: %s", err.Error())
		return
	}

	results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
	iferr ! =nil {
		log.Printf("Getting chain result failed with error: %s", err.Error())
	}
	log.Printf(" %v\n", tasks.HumanReadableResults(results))
Copy the code

The above example executes task1, then task2, then task3. When a task completes successfully, the result is appended to the end of the parameter list for the next task in the chain, and the callback task is finally executed.

Text code address: github.com/asong2020/G…

conclusion

This is the end of the article, the use of machinery, such as a timed task, a timed task group, etc., is not introduced in this article. See the Machinery documentation for more on how to unlock it. Because machiney has no Chinese documents, SO I translated a Chinese document by myself during the learning process, and you can help yourself.

Get steps: pay attention to the public number [Golang Dreamworks], background reply: machiney can get watermarking free version ~ ~ ~

Well, that’s the end of this article, and we’ll see you next time. I hope to be useful to you, and welcome to point out the wrong place, can add my Golang communication group, we learn to communicate together.

At the end, I will send you a small welfare. Recently, I was reading the book [micro-service architecture design mode], which is very good. I also collected a PDF, which can be downloaded by myself if you need it. Access: Follow the public account: [Golang Dreamworks], background reply: [micro service], can be obtained.

I have translated a GIN Chinese document, which will be maintained regularly. If you need it, you can download it by replying to [GIN] in the background.

I am Asong, an ordinary program ape, let GI gradually become stronger together. I built my owngolangCommunication group, you need to add mevxI’ll pull you into the group. We welcome your attention, and we’ll see you next time

Recommended previous articles:

  • Teach my sister how to write message queues
  • Cache avalanche, cache penetration, cache breakdown
  • Context package, read this article enough!!
  • Go -ElasticSearch: How to get started
  • Interviewer: Have you used for-range in go? Can you explain the reasons for these problems
  • Learn wire dependency injection, Cron timing task is actually so easy!
  • I heard you don’t know how to JWT or swagger. – I’m skipping meals and I’m here with my practice program
  • Master these Go language features and you will improve your level by N levels (ii)
  • Go multiplayer chat room, here you can talk about anything!!
  • GRPC Practice – Learning GRPC is that simple
  • Go Standard library RPC practices
  • Asong picked up English and translated it with his heart
  • Several hot loading methods based on GIN