Rearrange the log collection system box, as shown below

The overall logic of the code to be implemented this time is:

The full code address is: github.com/pythonsite/…

Etcd introduction

Highly available distributed key-value storage that can be used for configuring sharing and service discovery

Similar projects: Zookeeper and Consul

Development language: Go

Interface: Restful interface, easy to use

Implementation algorithm: A highly consistent, highly available service storage directory based on RAFT algorithm

Application scenarios of ETCD:

  • Service discovery and service registration
  • Configuration center (required by the logging collection client we implemented)
  • A distributed lock
  • Master the election

The etCD website has a very brief introduction:

Etcd build: Download address: github.com/coreos/etcd… Download the corresponding version according to your environment and start it up

You can verify this by running the following command:

[root@localhost etcd-v3.2.18-linux-amd64]# /etcdctl set name zhaofan zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# /etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#Copy the code

Context is introduced and used

Context management: Context management: context management: context management

  • Controls timeout for goroutine
  • Save context data

Here’s a simple example:

 

package main import ( "fmt" "time" "net/http" "context" "io/ioutil" ) type Result struct{ r *http.Response err error } func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err ! Println(" HTTP Request failed,err:",err) return} go func(){resp,err := client.do (req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!" ) case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return } func main() { process() }Copy the code

Use context to save the context.

package main import ( "github.com/Go-zh/net/context" "fmt" ) func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b } func calc(ctx context.Context,a, Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) // Pass CTX to add return CTX := context.withValue (context.background (),"trace_id","123456") Calc (CTX, 20, 30)}Copy the code

Use etCD and context together

There is a small issue that needs to be noticed: the etcd startup mode may not connect by default, especially if you are installing in virtual mode, so you need to use the following command: ./etcd –listen-client-urls http://0.0.0.0:2371 –advertise-client-urls http://0.0.0.0:2371 –listen-peer-urls http://0.0.0.0:2381).

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}Copy the code

The following example is to store and value values by connecting to etCD

package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := Clientv3.new (clientv3.config {Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second,}) if err! = nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err ! = nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err ! = nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }Copy the code

There is also a useful example on the context website to control the exit of a goroutine that has been opened:

 

package main import ( "context" "fmt" ) func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated  integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }Copy the code

Example code for the WithDeadline demo in the official documentation:

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}Copy the code

With a basic use of the above code, if we use etCD to do configuration management, if the configuration changes, we can notify the corresponding server configuration changes, as shown in the following example:

package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := Clientv3.new (clientv3.config {Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second,}) if err! Println("connect failed,err:",err) return} defer cli.close () // this will block RCH := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{ fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }Copy the code

A simple example of implementing a consumer code for Kafka:

package main import ( "github.com/Shopify/sarama" "strings" "fmt" "time" ) func main() { consumer,err := Sarama. NewConsumer (strings. The Split (" 192.168.0.118:9092 ", ", "), nil) if err! = nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err ! = nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err ! = nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } }(pc) } time.Sleep(time.Hour) consumer.Close() }Copy the code

But the above code is not optimal because we end up waiting for the goroutine to execute via time.sleep, which we can change to sync.waitgroup

package main import ( "github.com/Shopify/sarama" "strings" "fmt" "sync" ) var ( wg sync.WaitGroup ) func main() { NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err! = nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err ! = nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err ! = nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ wg.Add(1) for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close() }Copy the code

The log information to be collected by the client is stored in the ETCD

The code for etCD handling is:

package main import ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt" ) var Client var logConfChan chan string // Initialize etcd func initEtcd(addr []string,keyfmt String,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err ! = nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, Key := range keys{CTX,cancel := context.withTimeout (context.background (),2* time.second) // Obtain information from etCD to collect logs resp,err :=  Client.Get(ctx,key) cancel() if err ! = nil { logs.Warn("get key %s failed,err:%v",key,err) continue } for _, ev := range resp.Kvs{ logs.Debug("%q : %q\n", ev.Key, ev.Value) logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return } func WatchEtcd(keys []string){ // Var watchChans [] clientv3.watchchan for _,key := range keys{RCH := var watchChans [] clientv3.watchchan for _,key := range keys{RCH := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{ select{ case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) } default: } } time.Sleep(time.Second) } waitGroup.Done() } func GetLogConf()chan string{ return logConfChan }Copy the code

Limit. Go is used to limit the speed of the log collector.

package main import ( "time" "sync/atomic" "github.com/astaxie/beego/logs" ) type SecondLimit struct { unixSecond int64 curCount int32 limit int32 } func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit } func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count)) } func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit { time.Sleep(time.Microsecond) logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount) continue } if sec ! = atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false } }Copy the code

summary

This time, the first half of the log collection is basically handled, and the logs are then thrown into ES and eventually rendered on the page