Before reading this article, I want you to have a basic understanding of the Go language and some experience with coroutines. This article is designed to help you use advanced concurrency techniques. It consists of the following sections: Basic usage of Goroutine; Chan is used to realize the communication between multiple Goroutine; Use the select keyword to handle timeouts, etc.


The term parsing
goroutine Coroutines are lighter than threads
chan/channel Of pipes, usually used in multiplegoroutineCommunication between

A simple example

func boring(msg string) {
	for i := 0; ; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main(a) {
	go boring("boring!")

	fmt.Println("I'm listening")
	time.Sleep(2 * time.Second)
	fmt.Println("You're boring. I'm leaving")}Copy the code
I'm listening
boring! 0
boring! 1
boring! 2
boring! 3
boring! 4
boring! 5
You're boring. I'm leaving
Copy the code

The boring method outputs the current number of loops to the console. The first behavior of the main method opens a coroutine. In other words, the main method does not wait for the Boring method to complete. After output I’m listening, the main method goes to sleep for 2 seconds, and then wakes up to finish the main function. Since the end of main brings the end of the entire program, the boring coroutine that was opened also ends. However, the above example is only a simple demonstration. In fact, communication is required between coroutines and between coroutines and the main process, which can help us accomplish more complex applications.

Go pipe usage

A simple way to use it is as follows

func boring(msg string, c chan string) {
	for i := 0; ; i++ {
		// Send the message to the channel (hannel/chan)
		// Meanwhile, it is waiting for the consumer consumption of the pipeline to be completed
		c <- fmt.Sprintf("%s %d", msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main(a) {
	c := make(chan string) // Initialize a pipe
	go boring("boring!", c)

	for i := 0; i < 5; i++ {
		// '<-c' waits for the 'boring' method to send it a value. If it never receives it, it will be blocked at this step
		fmt.Printf("You say: %q\n", <-c)
	}
	fmt.Println("You're boring. I'm leaving")}Copy the code
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring. I'm leaving
Copy the code

Simply put, the Boring method is sending data to pipe C and waiting for the other end, the main method, to consume it. Since only one data can exist in the pipeline, the main and Boring methods run alternately to some extent. In fact, not quite. With the main method, you can accept the data from the pipe and proceed directly to the next step without waiting.

Chan’s concept

In the Go language, a channel isgoroutineWith anothergoroutineThe medium of communication, and this communication isunlocked. In other words, a channel is a allowed onegoroutineSend data to anothergoroutineThe technology. By default, channels are bidirectional, which means that a Goroutine can send or receive data over the same channel, as shown below:In the Go language, exceptchan stringSuch writing can use the read-write function in addition to the two-way pipe, can also create a one-way pipe, such as<-chan stringYou can only read data from the pipe, andchan<- stringYou can only write data to a pipe.

Two threads output data

This is done through two pipes

// 'boring' is a method that returns the pipe used to communicate with the 'boring' method
// '<-chan string' means you can only receive string data from the pipe, but not send data to the pipe
func boring(msg string) <-chan string {
	c := make(chan string)
	// Now open the coroutine in the Boring method and send data to the pipe from that coroutine
	go func(a) {
		for i := 0; i < 10; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
		close(c) // Remember to close the coroutine} ()return c
}

func main(a) {
	joe := boring("Joe")
	ahn := boring("Ahn")

	// You must output Joe and Ahn in order
	for i := 0; i < 10; i++ {
		fmt.Println(<-joe)
		fmt.Println(<-ahn)
	}

	fmt.Println("You're both boring. I'm leaving")}Copy the code

This code makes the output alternate between boring(“Joe”) and boring(“Ahn”). While it is possible to output data alternately, this is not done by thread to thread communication per se, which will be tweaked a bit.

Combined pipe

// 'boring' is a method that returns the pipe used to communicate with the 'boring' method
// '<-chan string' means you can only receive string data from the pipe, but not send data to the pipe
func boring(msg string) <-chan string {
	c := make(chan string)
	go func(a) {
		for i := 0; i < 10; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
		close(c)
	}()
	return c
}

func fanIn(cs ... <-chan string) <-chan string {
	c := make(chan string)
	for _, ci := range cs { // spawn channel based on the number of input channel
		go func(cv <-chan string) {
			for {
				c <- <-cv Connect each pipe in CS to the main pipe C
			}
		}(ci)
	}
	return c
}

func main(a) {
	c := fanIn(boring("Joe"), boring("Ahn"))

	for i := 0; i < 20; i++ {
		fmt.Println(<-c)
	}
	fmt.Println("You're both boring. I'm leaving")}Copy the code

Now that we can get data from the coroutines in the two methods, although there is no guarantee of alternating output data (in this case, random), let’s use pipes to start communication between multiple processes.

Intercoroutine communication

type Message struct {
	str  string    // The actual data to be transmitted
	wait chan bool // 
}

func fanIn(inputs ... <-chan Message) <-chan Message {
	c := make(chan Message)
	for i := range inputs {
		input := inputs[i]
		go func(a) {
			for {
				c <- <-input
			}
		}()
	}
	return c
}

// 'boring' is a method that returns the pipe used to communicate with the 'boring' method
func boring(msg string) <-chan Message {
	c := make(chan Message)
	waitForIt := make(chan bool)
	go func(a) {
		for i := 0; ; i++ {
			c <- Message{
				str:  fmt.Sprintf("%s %d", msg, i),
				wait: waitForIt, // Inject the pipe into the return value for communication between coroutines
			}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)

			// The coroutine needs to wait until the information is received before it can continue executing its logic
			<-waitForIt
		}

	}()
	return c
}

func main(a) {
	// merge 2 channels into 1 channel
	c := fanIn(boring("Joe"), boring("Ahn"))

	for i := 0; i < 5; i++ {
		msg1 := <-c // Wait until the data is received from the pipe
		fmt.Println(msg1.str)
		msg2 := <-c
		fmt.Println(msg2.str)

		// Since the Boring coroutine needs to wait for the wait signal to continue, this step ensures that both coroutines can output data once
		msg1.wait <- true // The main coroutine allows the Boring coroutine to continue the task
		msg2.wait <- true // The main coroutine allows the Boring coroutine to continue the task
	}
	fmt.Println("You're both boring. I'm leaving")}Copy the code

Set the timeout waiting time

A simple implementation

// 'boring' is a method that returns the pipe used to communicate with the 'boring' method
func boring(msg string) <-chan string {
	c := make(chan string)
	go func(a) {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
		}

	}()
	return c
}

func main(a) {
	c := boring("Joe")
	
    // The type of timeout is: '<-chan Time'
	timeout := time.After(5 * time.Second) // This method will write data to the timeout pipe after 5 seconds
	for {
		select {
		case s := <-c:
			fmt.Println(s)
		case <-timeout:
			fmt.Println("You talk too much.")
			return}}}Copy the code

Select ensures that case 2 is executed to end the program when the time is up. If both arrive at the same time, then a random case will be executed, where the case may be executed at most once, but not in time to print the result.

【 答 案 】 a

Select is a control structure in Go, similar to the switch statement used for communication. Each case must be a communication operation, either send or receive. Select randomly executes a runnable case. If there is no case to run, then the operation in default will be performed. If there is no default, then it will block until there is a case to run. A default clause should always run.

【 Actual 】 Simulate Google search service

Search is a very common feature in Web pages. In most cases, we use a microservice to build a search service. For example, ElasticSearch is a separate service. Here, we don’t really simulate an ES to handle it, instead, we replace it with a function of random delay. Because the search time is not guaranteed, sometimes it will be fast, but sometimes it will be slow, either because the search itself takes time or because the IO takes time. In this case, we’ll step by step show you how to better use Goroutine and Chan to deal with this problem. In addition, functional programming techniques are used, so if you’re not familiar with this, you can learn a little bit about it before you continue reading.

Google search 1.0

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video"))func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(100 * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

// It calls Web, Image, and Video in turn and appends them to the result return
func Google(query string) (results []Result) {
	results = append(results, Web(query))
	results = append(results, Image(query))
	results = append(results, Video(query))
}

func main(a) {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}
Copy the code
[web result for "golang" image result for "golang" video result for "golang"] 331.199msCopy the code

Now, we can get the results from the Google method, but this step is not enough, we want to call the search service for a time to go online, if the timeout, then the relevant results are removed, and the existing data is returned. But before that, we also found that in the Google method, the three queries are called sequentially, and only the former returns a result can execute the logic, which is why the return time is so long. Based on this, we first change the results of the search to be executed concurrently.

Concurrent search

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video"))func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(100 * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

func Google(query string) []Result {
	c := make(chan Result)

	// The results of the search will be returned to pipe C
	go func(a) {
		c <- Web(query)
	}()
	go func(a) {
		c <- Image(query)
	}()
	go func(a) {
		c <- Video(query)
	}()

	var results []Result
	for i := 0; i < 3; i++ {
		results = append(results, <-c)
	}

	return results
}

func main(a) {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}
Copy the code
[image result for "golang" web result for "golang" video result for "golang"] 109.5769msCopy the code

As you can see, the search results now take only 100+ms, indicating that the three searches were indeed performed concurrently.

Further: Add timeout

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video"))func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Change the search time to random
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

func Google(query string) []Result {
	c := make(chan Result)

	go func(a) {
		c <- Web(query)
	}()
	go func(a) {
		c <- Image(query)
	}()
	go func(a) {
		c <- Video(query)
	}()

	var results []Result

	// In this case, timeout will receive the pipe message after 50 milliseconds
	timeout := time.After(50 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case r := <-c:
			results = append(results, r)
		case <-timeout: // After timeout receives the information, it will end the search and return the result directly
			fmt.Println("timeout")
			return results
		}
	}

	return results
}

func main(a) {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

Copy the code

Result 1: Some logic times out

Timeout  61.205msCopy the code

Result two: None of the logic times out

[web result for "golang" image result for "golang" video result for "golang"] 28.1629msCopy the code

conclusion

So far, I’ve told you some advanced concurrent programming techniques, but there’s more to advanced techniques than that, and HOPEFULLY these tips have given you some help and some thought.