· Xiao Blackboard Go team

Github.com/tal-tech/go…

In the development of microservices, API gateways play the role of providing restful apis externally, and THE DATA of APIS often depend on other services, and complex apis will depend on multiple or even dozens of services. While the time of a single dependent service is generally low, the time of the entire API can increase significantly if multiple services are serially dependent.

So by what means to optimize? We first think of the way by concurrent processing, can reduce the dependence on time-consuming, so Go based library provides us with WaitGroup tools used for concurrency control, but the actual business scenarios more dependent on if there is an error we expect to return immediately rather than wait for all rely on execution of the returned results, In addition, assigning variables to WaitGroup often requires locking, and adding Add and Done to each dependent function can be error-prone for beginners

Based on the above background, the Go-Zero framework provides us with the concurrency processing tool MapReduce, which is out of the box and does not require any initialization. Let’s take a look at the time comparison between using MapReduce and not using MapReduce:

The serial processing of the same dependency takes 200ms. The maximum dependency processing time after MapReduce is 100ms. MapReduce can greatly reduce service time, and the effect becomes more obvious with the increase of dependency processing time without increasing server pressure

Concurrent processing toolMapReduce

MapReduce is a software architecture proposed by Google for parallel computation of large data sets. The MapReduce tool in Go-Zero borrows from this architecture

The MapReduce tool in the Go-Zero framework is used to process batch data concurrently to improve service performance

Let’s use a few examples to demonstrate MapReduce

MapReduce has three main parameters: the first parameter is generate for data production, the second parameter is mapper for data processing, and the third parameter is reducer for data aggregation and return after mapper. The number of concurrent processing threads can also be set through opTS

Scenario 1: The result of some functions often depends on multiple services. For example, the result of commodity details often depends on user service, inventory service, order service, etc. Generally, the dependent services are provided externally in the form of RPC. In order to reduce the dependency time, parallel processing of dependencies is often required

func productDetail(uid, pid int64) (*ProductDetail, error) {
	var pd ProductDetail
	err := mr.Finish(func(a) (err error) {
		pd.User, err = userRpc.User(uid)
		return
	}, func(a) (err error) {
		pd.Store, err = storeRpc.Store(pid)
		return
	}, func(a) (err error) {
		pd.Order, err = orderRpc.Order(pid)
		return
	})

	iferr ! =nil {
		log.Printf("product detail error: %v", err)
		return nil, err
	}

	return &pd, nil
}
Copy the code

In this example, the return of commodity details depends on multiple services to obtain data, so the concurrent dependency processing can greatly improve the performance of the interface

Scenario 2: In most cases, we need to process a batch of data, such as a batch of user ids, to verify the validity of each user. If there is an error during the verification process, the verification fails, and the returned result is a valid user ID

func checkLegal(uids []int64) ([]int64, error) {
	r, err := mr.MapReduce(func(source chan<- interface{}) {
		for _, uid := range uids {
			source <- uid
		}
	}, func(item interface{}, writer mr.Writer, cancel func(error)) {
		uid := item.(int64)
		ok, err := check(uid)
		iferr ! =nil {
			cancel(err)
		}
		if ok {
			writer.Write(uid)
		}
	}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
		var uids []int64
		for p := range pipe {
			uids = append(uids, p.(int64))
		}
		writer.Write(uids)
	})
	iferr ! =nil {
        log.Printf("check error: %v", err)
		return nil, err
	}

	return r.([]int64), nil
}

func check(uid int64) (bool, error) {
	// do something check user legal
	return true.nil
}
Copy the code

In this example, if there is an error in the check process, the verification process is terminated by the cancel method and error is returned. If the verification result of a UID is false, the final result does not return the UID

Precautions for Using MapReduce

  • Cancel can be called in mapper and Reducer, and the parameter is error. Immediately after the call, the return result is nil, error
  • If writer.Write is not called in mapper, the item will not be aggregated by reducer
  • If writer.Wirte is not called in reducer, nil is returned, ErrReduceNoOutput
  • Reducer is a single thread, and results from all mapper are serialized here

Analysis of implementation principle:

In MapReduce, the buildSource method first generates data by executing generate(with an unbuffered channel) and returns an unbuffered channel from which Mapper reads data

func buildSource(generate GenerateFunc) chan interface{} {
    source := make(chan interface{})
    go func(a) {
        defer close(source)
        generate(source)
    }()

    return source
}
Copy the code

The cancel method is defined in the MapReduceWithSource method, which can be called in both Mapper and Reducer. After the call, the main thread will return immediately after receiving the close signal

cancel := once(func(err error) {
    iferr ! =nil {
        retErr.Set(err)
    } else {
        // The default error
        retErr.Set(ErrCancelWithNil)
    }

    drain(source)
    // Call close(ouput) the main thread receives the Done signal and returns immediately
    finish()
})
Copy the code

ExecuteMappers are called in the mapperDispatcher method. ExecuteMappers consume the data generated by buildSource, and each item is processed separately with a goroutine. By default, the maximum number of concurrent requests is 16. This can be done with WithWorkers

var wg sync.WaitGroup
defer func(a) {
    wg.Wait() // Make sure all items are processed
    close(collector)
}()

pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // Write mapper data to collector
for {
    select {
    case <-done: // An immediate return is triggered when cancel is called
        return
    case pool <- lang.Placeholder: // Control the maximum number of concurrent requests
        item, ok := <-input
        if! ok { <-poolreturn
        }

        wg.Add(1)
        go func(a) {
            defer func(a) {
                wg.Done()
                <-pool
            }()

            mapper(item, writer) Write writer.Write to Write the result to the collector's channel(1)}}}Copy the code

The reducer goroutine processes the data written to the collector by mapper. If the Reducer does not manually call writer.Write, finish is executed to close the output to avoid deadlocks

go func(a) {
    defer func(a) {
        if r := recover(a); r ! =nil {
            cancel(fmt.Errorf("%v", r))
        } else {
            finish()
        }
    }()
    reducer(collector, writer, cancel)
}()
Copy the code

The toolkit also provides many methods for different business scenarios, the implementation principle is similar to MapReduce, interested students can view the source code to learn

  • The function of MapReduceVoid is similar to that of MapReduce but only error is returned
  • Finish handles a fixed number of dependencies and returns error, one of which returns immediately
  • The FinishVoid and Finish methods function similarly and return no value
  • Map only generates and mapper and returns channel
  • The MapVoid function is similar to that of Map

This paper mainly introduces the MapReduce tool in go-Zero framework, which is very practical in the actual project. Good use of tools to improve service performance and development efficiency is a great help, I hope this article can bring you some harvest.