preface

In the learning process, IT is found that Redis zSET can also be used to realize lightweight delayed message queue function. Although the reliability needs to be improved, it can be fully realized for some functional requirements that do not require high data reliability. The main use of Redis in zset zadd, ZrangebyScore and Zdel to achieve a small demo.

Prepare to install Redis, Redis-Go in advance

Because it’s macOS, straight

$ brew install redis
$ go get github.com/garyburd/redigo/redis
Copy the code

Because I am lazy, I directly used objectId in Bson to generate the unique ID of the task.

$ go get gopkg.in/mgo.v2/bson
Copy the code

It is not necessary to have a unique ID, but if you need to carry it later, you can easily find the corresponding task.

producers

10W tasks are generated through a for loop, each with a different time

func producer(a) {
	count := 0
	// Generate 100000 tasks
	for count < 100000 {
		count++
		dealTime := int64(rand.Intn(5)) + time.Now().Unix()
		uuid := bson.NewObjectId().Hex()
		redis.Client.AddJob(&job.JobMessage{
			Id: uuid,
			DealTime: dealTime,
		},  + int64(dealTime))
	}
}
Copy the code

The AddJob function, in a separate package, takes the randomly generated time from the previous function as the timestamp to be processed.

// Add a task
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}
Copy the code

consumers

The consumer processing process is divided into two steps:

  1. Gets a task that is less than or equal to the current timestamp
  2. Determine who has the current task by deleting the current task

Because when you get a task whose timestamp is less than or equal to the current timestamp, multiple Go Routine tasks may read the current task at the same time, but only one task can process the current task. Therefore, we need a scheme to determine who should handle the task (of course, if only one consumer can read it) : In this case, it can be obtained through the delete operation of Redis, because only the successful operation when deleting the specified value will return a non-0 value. Therefore, we can think that the Go routine that successfully deletes the current queue gets the current task.

Here’s the code:

/ / consumer
func consumer(a) {
	// Start the go routine
	count := 0
	for count < 10 {
		go func(a) {
			for {
				jobs := redis.Client.GetJob()
				if len(jobs) <= 0 {
					time.Sleep(time.Second * 1)
					continue
				}
				currentJob := jobs[0]
				// If the current redis queue succeeds,
				if redis.Client.DelJob(currentJob) > 0 {
					var jobMessage job.JobMessage
					util.JsonDecode(currentJob, &jobMessage) // A custom JSON parsing function
					handleMessage(&jobMessage)
				}

			}

		}()
		count++
	}
}

// Use functions to process tasks
func handleMessage(msg *job.JobMessage) {
	fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
	go func(a) {
		countChan <- true(1)}}Copy the code

Redis section of the code, get the task and delete the task

// Get the task
func (client *RedisClient) GetJob(a) []string {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	timeNow := time.Now().Unix()
	ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit".0.1))
	iferr ! =nil {
		panic(err)
	}
	return ret
}

// Delete the current task to determine whether the current task is stolen
func (client *RedisClient) DelJob(value string) int {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	ret, err := redis.Int(conn.Do("zrem", key, value))
	iferr ! =nil {
		panic(err)
	}
	return ret
}
Copy the code

The code is basically like this. At the end of the run, I was able to handle about 1W tasks every 3-4 seconds, which is really fast…