“This is the 16th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”


preface

GoReplay abstracts two concepts of data flow, namely, using input and output to represent the source and destination of data, collectively called plugin. Middleware between input and output modules is used to realize the expansion mechanism.

Input_file. go: input Plugin for reading files, implements the IO.Reader interface, and registers it in the plugin. inputs queue according to configuration.

The main parameters

-input-file value // Read requests from file: Gor --input-file./requests. Gor --output-http staging.com -input-file-dry-run // Simulate reading from the data source without replaying it the datasourcewithout replaying it. You will get information about expected replay time, Number of found records etc. -input-file-loop // Loop input files, usefulforPerformance testing. -input-file-max-wait duration // Set the maximum time between requestshelp insituations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s-input-file-read-depth int // Attempt to read and cache multiple records in advance. At the same time, it can sort requests if they do not appear in order. Because it needs to hold the buffer in memory, a larger value results in worse performance (the default is 100) GoReplay tries toread and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance (default 100)
Copy the code

Multiple speed playback

Directions for use

The core feature of GoReplay to realize pressure test is to meet the function of variable speed playback. Support to reduce or magnify playback of recorded actual production request traffic for stress testing

For example, run the following command to play back the traffic from a file to server 237 and double the size:

[root@vm-1 ~]./gor --input-file "requests.gor|200%" --output-http="http://172.16.106.237:8082"
2021/08/17 15:03:58 [PPID 12356 and PID 18187] Version:1.3.0
[DEBUG][elapsed 1.361742ms]: [INPUT-FILE] FileInput: end of file 'requests.gor'
Copy the code
  • Requests. Gor | 1: no more than 1 biggest QPS.
  • Requests. Gor | 100% : more than 100% of the original flow.

The source code parsing

Gor. go is the main method that initializes all plug-ins and starts emiter (core processor) listening

Initialize the plug-in:

Start the emitter:

The original code is as follows:

func main(a) {
	// Get environment variables
	if os.Getenv("GOMAXPROCS") = ="" {
		// Set the maximum logical core
		runtime.GOMAXPROCS(runtime.NumCPU() * 2)
	}

	args := os.Args[1:]
	var plugins *InOutPlugins
	// File service
	if len(args) > 0 && args[0] = ="file-server" {
		if len(args) ! =2 {
			log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
		}
		dir, _ := os.Getwd()

		Debug(0."Started example file server for current directory on address ", args[1])

		log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
	} else {
		// Parse command line arguments
		flag.Parse()
		// Initialize the global Settings variable
		checkSettings()
		// Initialize the available plug-ins
		plugins = NewPlugins()
	}

	log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), VERSION)

	if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
		log.Fatal("Required at least 1 input and 1 output")}if*memprofile ! ="" {
		profileMEM(*memprofile)
	}

	if*cpuprofile ! ="" {
		profileCPU(*cpuprofile)
	}

	ifSettings.Pprof ! ="" {
		go func(a) {
			log.Println(http.ListenAndServe(Settings.Pprof, nil))
		}()
	}

	closeCh := make(chan int)
	// Program core event handling
	emitter := NewEmitter()
	// Call Start to Start emitter
	go emitter.Start(plugins, Settings.Middleware)
	if Settings.ExitAfter > 0 {
		log.Printf("Running gor for a duration of %s\n", Settings.ExitAfter)

		time.AfterFunc(Settings.ExitAfter, func(a) {
			log.Printf("gor run timeout %s\n", Settings.ExitAfter)
			close(closeCh)
		})
	}
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	exit := 0
	select {
	case <-c:
		exit = 1
	case <-closeCh:
		exit = 0
	}
	// Close all coroutines
	emitter.Close()
	os.Exit(exit)
}
Copy the code

In plugins.go, the limiter class performs various variable speed operations:

Source code is as follows:

// Automatically detects type of plugin and initialize it
//
// See this article if curious about reflect stuff below: http://blog.burntsushi.net/type-parametric-functions-golang
func (plugins *InOutPlugins) registerPlugin(constructor interface{}, options ...interface{}) {
	var path, limit string
	vc := reflect.ValueOf(constructor)

	// Pre-processing options to make it work with reflect
	vo := []reflect.Value{}
	for _, oi := range options {
		vo = append(vo, reflect.ValueOf(oi))
	}

	if len(vo) > 0 {
		// Removing limit options from path
		path, limit = extractLimitOptions(vo[0].String())

		// Writing value back without limiter "|" options
		vo[0] = reflect.ValueOf(path)
	}

	// Calling our constructor with list of given options
	plugin := vc.Call(vo)[0].Interface()

	iflimit ! ="" {
		plugin = NewLimiter(plugin, limit)
	}

	// Some of the output can be Readers as well because return responses
	if r, ok := plugin.(PluginReader); ok {
		plugins.Inputs = append(plugins.Inputs, r)
	}

	if w, ok := plugin.(PluginWriter); ok {
		plugins.Outputs = append(plugins.Outputs, w)
	}
	plugins.All = append(plugins.All, plugin)
}
Copy the code

Plugins. Go class, like ‘request. Gor | 100%’ parameter parsing:

Source code is as follows:

// extractLimitOptions detects if plugin get called with limiter support
// Returns address and limit
func extractLimitOptions(options string) (string.string) {
	split := strings.Split(options, "|")

	if len(split) > 1 {
		return split[0], split[1]}return split[0].""
}
Copy the code

The main execution method in limiter. Go class, the source code is as follows:

package main

import (
	"fmt"
	"io"
	"math/rand"
	"strconv"
	"strings"
	"time"
)

// Limiter is a wrapper for input or output plugin which adds rate limiting
type Limiter struct {
	plugin    interface{}
	limit     int
	isPercent bool

	currentRPS  int
	currentTime int64
}

func parseLimitOptions(options string) (limit int, isPercent bool) {
	if n := strings.Index(options, "%"); n > 0 {
		limit, _ = strconv.Atoi(options[:n])
		isPercent = true
	} else {
		limit, _ = strconv.Atoi(options)
		isPercent = false
	}

	return
}

// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{}, options string) PluginReadWriter {
	l := new(Limiter)
	l.limit, l.isPercent = parseLimitOptions(options)
	l.plugin = plugin
	l.currentTime = time.Now().UnixNano()

	// FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
	if fi, ok := l.plugin.(*FileInput); ok && l.isPercent {
		fi.speedFactor = float64(l.limit) / float64(100)}return l
}

func (l *Limiter) isLimited(a) bool {
	// File input have its own limiting algorithm
	if _, ok := l.plugin.(*FileInput); ok && l.isPercent {
		return false
	}

	if l.isPercent {
		return l.limit <= rand.Intn(100)}if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() {
		l.currentTime = time.Now().UnixNano()
		l.currentRPS = 0
	}

	if l.currentRPS >= l.limit {
		return true
	}

	l.currentRPS++

	return false
}

// PluginWrite writes message to this plugin
func (l *Limiter) PluginWrite(msg *Message) (n int, err error) {
	if l.isLimited() {
		return 0.nil
	}
	if w, ok := l.plugin.(PluginWriter); ok {
		return w.PluginWrite(msg)
	}
	// avoid further writing
	return 0, io.ErrClosedPipe
}

// PluginRead reads message from this plugin
func (l *Limiter) PluginRead(a) (msg *Message, err error) {
	if r, ok := l.plugin.(PluginReader); ok {
		msg, err = r.PluginRead()
	} else {
		// avoid further reading
		return nil, io.ErrClosedPipe
	}

	if l.isLimited() {
		return nil.nil
	}

	return
}

func (l *Limiter) String(a) string {
	return fmt.Sprintf("Limiting %s to: %d (isPercent: %v)", l.plugin, l.limit, l.isPercent)
}

// Close closes the resources.
func (l *Limiter) Close(a) error {
	if fi, ok := l.plugin.(io.Closer); ok {
		fi.Close()
	}
	return nil
}

Copy the code

The limiter. Go class also implements file speed limiting:

The input_file.go class calls the file limiting function:

File loop read

The main parameters are: –input-file-loop

Such as:

$ sudo ./gor --input-file 'request.gor|10000%' --input-file-loop --output-http 'http://10.96.136.36:8201' 
Copy the code

Apply file recycling to the Plufins. Go class:

The source code:

	for _, options := range Settings.InputFile {
		plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
	}

Copy the code

Input_file. go class to achieve file reading function:

Source code is as follows:

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
	i = new(FileInput)
	i.data = make(chan []byte.1000)
	i.exit = make(chan bool)
	i.path = path
	i.speedFactor = 1
	i.loop = loop
	i.readDepth = readDepth
	i.stats = expvar.NewMap("file-" + path)
	i.dryRun = dryRun
	i.maxWait = maxWait

	iferr := i.init(); err ! =nil {
		return
	}

	go i.emit()

	return
}

Copy the code

The constructor of FileInput, passing in the file path, and so on.

Source code is as follows:

// FileInput can read requests generated by FileOutput
type FileInput struct {
	mu          sync.Mutex
	data        chan []byte
	exit        chan bool
	path        string
	readers     []*fileInputReader
	speedFactor float64
	loop        bool
	readDepth   int
	dryRun      bool
	maxWait     time.Duration

	stats *expvar.Map
}

Copy the code

Check whether the loop reads:

Source code is as follows:

		if reader == nil {
			if i.loop {
				i.init()
				lastTime = - 1
				continue
			} else {
				break}}Copy the code

Core code logic calls