preface

Today we introduce another component of the Go-Zero ecology called Go-Stash. This is a Go version of the Logstash language. We use Go-Stash to save 2/3 of the server resources compared to the original logstash. Give it a try if you’re using Logstash, and see how easy it is to implement such a tool based on Go-Zero, which the authors did in just two days.

The overall architecture

Starting with its configuration, let’s look at the design architecture.

Clusters:
  - Input:
      Kafka:
        # Kafka configuration --> linkage go-queue
    Filters:
    	# filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es configuration {host, index}
Copy the code

Kafka is the data output, es is the data input, filter abstracts the data processing.

Yes, the entire Go-stash is what you see is what you get as shown in the Config configuration.

Start the

The startup process from Stash. Go is roughly divided into several parts. Since multiple clusters can be configured, analyzing from a cluster:

  1. The establishment andesThe connection of theesConfiguration 】
  2. buildfilter processorsesPreprocessor, do data filtering and processing, you can set up a number of 】
  3. Perfect foresIndex configuration, starthandleAnd at the same time willfilterAdd handle
  4. Connected downstreamkafka, will be created abovehandlePass in, donekafkaesBetween data consumption and data writing

MessageHandler

In the architecture diagram above, the filter in the middle is only seen from config. In fact, it is a part of MessageHandler in more detail, which does data filtering and conversion.

The following code: github.com/tal-tech/go…

type MessageHandler struct {
	writer  *es.Writer
	indexer *es.Index
	filters []filter.FilterFunc
}
Copy the code

MessageHandler is connected to the downstream ES, but there is no operation on Kafka.

The interface design of MessageHandler implements the ConsumeHandler interface in go-queue.

Here, the upstream and downstream are connected:

  1. MessageHandlerTook over theesThe operation is responsible for data processing to data writing
  2. That’s truekafkaConsumeOperation. This is done during the consumption processhandlerTo writees

In fact, Consume() does the same:

func (mh *MessageHandler) Consume(_, val string) error {
	var m map[string]interface{}
  // Deserialize messages from Kafka
	if err := jsoniter.Unmarshal([]byte(val), &m); err ! =nil {
		return err
	}
	// es writes the index configuration
	index := mh.indexer.GetIndex(m)
  // Filter chain-processing [map in, map out]
	for _, proc := range mh.filters {
		if m = proc(m); m == nil {
			return nil
		}
	}
	bs, err := jsoniter.Marshal(m)
	iferr ! =nil {
		return err
	}
	/ / es to write
	return mh.writer.Write(index, string(bs))
}
Copy the code

The data flow

So, data processing, upstream and downstream connections. Data from Kafka -> es is pulled from Kafka by the developer.

So how does the data flow move? Let’s go back to the main program github.com/tal-tech/go…

In fact, starting the whole process is actually a combination of modes:

func main(a) {
	// Parse command line arguments to start graceful exit.// Service combination mode
	group := service.NewServiceGroup()
	defer group.Stop()

	for _, processor := range c.Clusters {
		Es / / connection.// filter processors build.// Prepare es write {write index, writer}
		handle := handler.NewHandler(writer, indexer)
		handle.AddFilters(filters...)
		handle.AddFilters(filter.AddUriFieldFilter("url"."uri"))
    // Start kafka as configured, passing in the consumption operation, and add to the combinator
		for _, k := range toKqConf(processor.Input.Kafka) {
			group.Add(kq.MustNewQueue(k, handle))
		}
	}
	// Start the combinator
	group.Start()
}
Copy the code

The whole data flow is related to the group combinator.

group.Start()
	|- group.doStart()
		|- [service.Start() for service in group.services]
Copy the code

All services added to a group implement Start(). Kafka starts at Start() :

func (q *kafkaQueue) Start(a) {
	q.startConsumers()
	q.startProducers()

	q.producerRoutines.Wait()
	close(q.channel)
	q.consumerRoutines.Wait()
}
Copy the code
  1. Start thekafkaConsumer applications
  2. Start thekafkaConsumption pull end [may be confused by the name, but is actually fromkafkaPull the message toq.channel
  3. Consumptive process terminates, finishing work

The handler we pass to Kafka is actually Consume, and this method is executed in q.tartconsumers () :

q.startConsumers()
	|- [q.consumeOne(key, value) for msg in q.channel]
		|- q.handler.Consume(key, value)
Copy the code

So the whole data stream is completely strung together:

conclusion

As the first go-Stash article, this is an overview of the Architecture and design of Go-Stash. The next article will reveal more about the performance and why we want to develop such a component.

Github.com/tal-tech/go…

Stay tuned for more design and implementation articles on Go-Zero.

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us!

Wechat communication group

Follow the “micro service practice” public account and reply to the group to obtain the community qr code.