For concurrent operations, you’ve already seen channel channels, sync primitives that lock shared resources, context-tracking coroutines/passarguments, and so on. These are basic elements of concurrent programming that you should have a good grasp of. Today we’ll show you how to use these basic elements to compose a concurrency pattern to better write concurrent programs.

For select infinite loop mode

This is a common pattern, as used in the examples in the previous article. It is usually used in combination with a channel to complete the task. The format is:

for { // For infinite loop, or for range loop
  select {
    // Control by channel
    case <-done:
      return
    default:
      // Perform specific tasks}}Copy the code
  • This is a concurrent mode of for + select multiplexing. Whichever case satisfies the condition, the corresponding branch is executed, and the loop will not exit until there is an exit condition.
  • If no exit condition is met, the default branch will continue to execute

For range SELECT finite loop mode

for _,s:=range []int{} {select {
   case <-done:
      return
   case resultCh <- s:
   }
Copy the code
  • Generally, the content of the iteration is sent to the channel
  • The done channel is used to exit the for loop
  • The resultCh Channel is used to receive the values of the loop, which can be passed to other callers through resultCh

Select a timeout mode

If a request needs to access the server for data, but the response may be delayed due to network problems, you need to set a timeout:

package main

import (
	"fmt"
	"time"
)

func main(a) {
	result := make(chan string)
	timeout := time.After(3 * time.Second) //
	go func(a) {
		// Simulate network access
		time.Sleep(5 * time.Second)
		result <- "Server Result"} ()for {
		select {
		case v := <-result:
			fmt.Println(v)
		case <-timeout:
			fmt.Println("Network access has timed out.")
			return
		default:
			fmt.Println("Wait...")
			time.Sleep(1 * time.Second)
		}
	}
}
Copy the code

Running result:

Wait for... Wait for... Wait for... The network access timed outCopy the code
  • Select Timeout mode is a timeout period set by the time.After function to prevent an exception from causing the select statement to wait indefinitely

Note: Do not write like this

for {
  	select {
  	case v := <-result:
  		fmt.Println(v)
  	case <-time.After(3 * time.Second): // Do not write in select
  		fmt.Println("Network access has timed out.")
  		return
  	default:
  		fmt.Println("Wait...")
  		time.Sleep(1 * time.Second)
  	}
  }
Copy the code

Case <- time.after (time.second) : case <- time.after (time.second) : case <- time.after (time.second) : case <- time.after (time.second) : case <- time.

The WithTimeout function of Context times out

package main

import (
	"context"
	"fmt"
	"time"
)
func main(a) {
	// Create a context for a child node and time out automatically after 3 seconds
	//ctx, stop := context.WithCancel(context.Background())
	ctx, stop := context.WithTimeout(context.Background(), 3*time.Second)

	go func(a) {
		worker(ctx, "Worker 1")
	}()
	go func(a) {
		worker(ctx, "Worker 2")
	}()
	time.Sleep(5*time.Second) // Work for 5 seconds and then rest
	stop() // Give the stop command after 5 seconds
	fmt.Println("?????")}func worker(ctx context.Context, name string){
	for {
		select {
		case <- ctx.Done():
			fmt.Println("Time off...")
			return
		default:
			fmt.Println(name, "Carefully touch the fish, do not disturb...")
		}
		time.Sleep(1 * time.Second)
	}
}
Copy the code

Running result:

Working people2Touch the fish carefully, do not disturb... Working people1Touch the fish carefully, do not disturb... Working people1Touch the fish carefully, do not disturb... Working people2Touch the fish carefully, do not disturb... Working people2Touch the fish carefully, do not disturb... Working people1Touch the fish carefully, do not disturb... Off duty, off duty, off duty/ / two seconds later?????Copy the code
  • In the previous example, we used the WithTimeout function to cancel the timeout, which is a recommended way to use it

Pipeline mode

Pipeline mode has also become the Pipeline mode, simulating the production line in reality. Let’s take the assembly of mobile phones as an example, assuming that there are only three processes: procurement of parts, assembly, and packaging of finished products:

Parts procurement (Step 1) – “Assembly (Step 2) -” Packaging (Step 3)

package main

import (
	"fmt"
)

func main(a) {
	coms := buy(10)    // Purchase 10 sets of parts
	phones := build(coms) // Assemble 10 mobile phones
	packs := pack(phones) // Pack them up for sale
	// Output the test to see how it works
	for p := range packs {
		fmt.Println(p)
	}
}

// Process 1 procurement
func buy(n int) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- fmt.Sprint("Parts", i)
		}
	}()
	return out
}

// Step 2: assembly
func build(in <-chan string) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for c := range in {
			out <- "Assembly (" + c + ")"
		}
	}()
	return out
}

// Step 3: Pack
func pack(in <-chan string) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for c := range in {
			out <- "Packaging (" + c + ")"
		}
	}()
	return out
}
Copy the code

Running result:

Package (assemble) (parts1Package (assemble (the parts)2Package (assemble (the parts)3Package (assemble (the parts)4Package (assemble (the parts)5Package (assemble (the parts)6Package (assemble (the parts)7Package (assemble (the parts)8Package (assemble (the parts)9Package (assemble (the parts)10))
Copy the code

Fan in fan out mode

After the operation of the mobile phone assembly line, it was found that the accessories assembly process was time-consuming, resulting in the corresponding slow down of procedure 1 and 3. In order to improve the performance, two shifts of manpower were added to procedure 2:

  • According to the diagram, the red part is the fan out and the blue part is the fan in

Improved assembly line:

package main

import (
	"fmt"
	"sync"
)

func main(a) {
	coms := buy(10)    // Purchase 10 sets of accessories
	// Three shifts of people are assembling 100 mobile phones at the same time
	phones1 := build(coms)
	phones2 := build(coms)
	phones3 := build(coms)
	// Aggregate three channels into one
	phones := merge(phones1,phones2,phones3)
	packs := pack(phones) // Pack them up for sale
	// Output the test to see how it works
	for p := range packs {
		fmt.Println(p)
	}
}

// Process 1 procurement
func buy(n int) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- fmt.Sprint("Parts", i)
		}
	}()
	return out
}

// Step 2: assembly
func build(in <-chan string) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for c := range in {
			out <- "Assembly (" + c + ")"
		}
	}()
	return out
}

// Step 3: Pack
func pack(in <-chan string) <-chan string {
	out := make(chan string)
	go func(a) {
		defer close(out)
		for c := range in {
			out <- "Packaging (" + c + ")"
		}
	}()
	return out
}

// Fan in function (component) to send multiple Chanel data to a channel
func merge(ins ... <-chan string) <-chan string {
	var wg sync.WaitGroup
	out := make(chan string)
	// Send data from a channel to out
	p:=func(in <-chan string) {
		defer wg.Done()
		for c := range in {
			out <- c
		}
	}
	wg.Add(len(ins))
	// Fan in. Multiple Goroutines need to be started for data in multiple channels
	for _,cs:=range ins{
		go p(cs)
	}
	// Wait for all input data to be processed before closing the output out
	go func(a) {
		wg.Wait()
		close(out)
	}()
	return out
}
Copy the code

Running result:

Package (assemble) (parts2Package (assemble (the parts)3Package (assemble (the parts)1Package (assemble (the parts)5Package (assemble (the parts)7Package (assemble (the parts)4Package (assemble (the parts)6Package (assemble (the parts)8Package (assemble (the parts)9Package (assemble (the parts)10))
Copy the code
  1. Merge is unrelated to the business and should not be considered a process. Instead, it should be referred to as a component
  2. Components are reusable, and similar to the fan-in process, merge components can be used

Futures model

The processes in Pipeline model are interdependent. Only when the previous process is completed, the next process can start. However, some tasks do not need to depend on each other, so to improve performance, these independent tasks can be executed concurrently.

The Futures model can be understood as the future model. Instead of waiting for the result returned by the subcoroutine, the main coroutine can do other things first and then fetch the result when the subcoroutine needs it in the future. If the subcoroutine has not returned the result, it will wait.

Let’s take hot pot as an example. There is no dependence between washing vegetables and boiling water. It can be done at the same time

Example:

package main

import (
	"fmt"
	"time"
)

func main(a) {
	vegetablesCh := washVegetables() / / wash the dishes
	waterCh := boilWater()           / / boiling water
	fmt.Println("I've arranged for the dishes to be washed and the water to boil. I'll start.")
	time.Sleep(2 * time.Second)

	fmt.Println("We're going to make hot pot. Are we ready for food and water?")
	vegetables := <-vegetablesCh
	water := <-waterCh
	fmt.Println("Ready, ready for the hot pot :",vegetables,water)

}
/ / wash the dishes
func washVegetables(a) <-chan string {
	vegetables := make(chan string)
	go func(a) {
		time.Sleep(5 * time.Second)
		vegetables <- "Washed dishes."} ()return vegetables
}
/ / boiling water
func boilWater(a) <-chan string {
	water := make(chan string)
	go func(a) {
		time.Sleep(5 * time.Second)
		water <- "Boiling water."} ()return water
}
Copy the code

Running result:

Have arranged to wash dishes and boil water, I first open a game to do hot pot, see if the dishes and water are ready, you can do hot pot: washed dishes boiling waterCopy the code
  1. The biggest difference between coroutines in the Futures model and ordinary coroutines is that they can return a result, which will be used at some point in the future. So any future operation that retrives this result must be a blocking operation, waiting until the result is retrieved.
  2. If your big task can be broken down into separate, concurrent tasks, and the results of those small tasks can be used to produce the results of the final big task, you can use the Futures model.