This is the 15th day of my participation in Gwen Challenge

ETCD coding case sharing in GO

Let’s review last time we talked about service registration and discovery

  • What is shared service registration and discovery
  • What is the CAP theorem
  • What is ETCD and how does it compare to Zookeeper
  • Simple principle of distributed lock implementation of ETCD

If you are interested in ETCD for service registration and discovery, please check out the ETCD for article Service registration and discovery

Today we will take a look at how GO operates ETCD, an open source, highly available distributed key-value storage system

If you are interested, you can check out the official documentation of GO ETCD

PKG. Go. Dev/go. Etcd. IO /…

According to the official document, we share a few points

  • How to install ETCD
  • PUT and GET operations on keys in ETCD
  • WATCH the operation
  • Lease, the Lease
  • KeepAlive keep alive
  • Implementation of ETCD distributed lock

How to install ETCD

ETCD installation and deployment

Here we do a simple standalone deployment

  • togithubDownload the latest ETCD package fromGithub.com/etcd-io/etc…
  • After unpacking, copy etCD and etCDCTL to our$GOBINOr add an environment variable to our system (the purpose is to type etcd directly and the system can run the executable file)
  • You can useetcd --versionCheck the version

The ETCD command is not shared here, but how does GO use ETCD

The installation of the package

This time we use the clientv3 package of ETCD. We can install ETCD correctly by executing the following command

 go get go.etcd.io/etcd/clientv3
Copy the code

The following problems may occur when you download the clientv3 package of ETCD either by executing the command above or by using the go mod:

/ root/go/pkg/mod/github.com/coreos/[email protected]+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/ root/go/pkg/mod/github.com/coreos/[email protected]+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption / root/go/pkg/mod/github.com/coreos/[email protected]+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOptionCopy the code

The above problem is due to package conflict, we just need to replace the following package command under our go.mod

Replace google.golang.org/grpc => google.golang.org/grpc v1.26.0Copy the code

For example, my go.mod looks like this

Module my_etCD go 1.15 require (github.com/coreos/etcd v3.3.25+incompatible // indirect github.com/coreos/go-semver V0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-2019110409366-d3cd4ed1dbcf // indirect github.com/coreos/pkg V0.0.0-20180928190104-399ea9e2e55f // indirect github.com/gogo/protobuf v1.3.2 // Indirect github.com/google/uuid v1.2.0 // Indirect go.etcd. IO/etCD v3.3.25+incompatible go.uber.org/zap v1.17.0 // Indirect google.golang.org/grpc v1.38.0 // Indirect) replace google.golang.org/grpc => google.golang.org/grpc v1.26.0Copy the code

By the way, go mod package management has been available since Go 1.14. Go Mod package management is very convenient. Here is a simple share of how to use it

  • To initialize a go mod in the same directory as main.go, run the following command
go mod init xxx
Copy the code
  • Finish writing our code inmain.goFile, can be inmain.goIs executed in the same directory as theGo Build Compiles the GO program
  • If the compilation has these problems, you can simply add the above replacement package statements to the generated go.mod file

With the package installed, we can start coding

ETCD set KEY and get KEY operations

The default port for ETCD looks like this:

  • 2379port

Provides HTTP API services

  • 2380port

Used to communicate with the peer

So let’s start writing a DEMO of GET and PUT keys

package main

import (
   "context"
   "log"
   "time"

   "go.etcd.io/etcd/clientv3"
)

func main(a) {

   // Set the log parameter to print the current time and current number of lines
   log.SetFlags(log.Ltime | log.Llongfile)

    // ETCD default port number is 2379
   // Use the clientv3 package of ETCD
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:2379"},
      // The timeout is 10 seconds
      DialTimeout: 10 * time.Second,
   })

   iferr ! =nil {
      log.Printf("connect to etcd error : %v\n", err)
      return
   }

   log.Printf("connect to etcd successfully ...")
   // defer finally closes the connection
   defer client.Close()

   // PUT KEY is name and value is xiaomotong
   ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   _, err = client.Put(ctx, "name"."xiaomotong")
   cancel()
   iferr ! =nil {
      log.Printf("PUT key to etcd error : %v\n", err)
      return
   }

   // Get the ETCD KEY
   ctx, cancel = context.WithTimeout(context.Background(), time.Second)
   resp, err := client.Get(ctx, "name")
   cancel()
   iferr ! =nil {
      log.Printf("GET key-value from etcd error : %v\n", err)
      return
   }
	
   // Iterate over read KEY and corresponding value
   for _, ev := range resp.Kvs {
      log.Printf("%s : %s\n", ev.Key, ev.Value)
   }
}
Copy the code

Interested partners can copy the above code to your environment to run, you can see the answer you want

ETCD WATCH operation

The WATCH operation is to shoot a sentinel to monitor the change of corresponding value of a key, including adding, deleting and modifying

func main(a) {

   // Set the log parameter to print the current time and current number of lines
   log.SetFlags(log.Ltime | log.Llongfile)

    // ETCD default port number is 2379
   // Use the clientv3 package of ETCD
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:2379"},
      DialTimeout: 10 * time.Second,
   })
   iferr ! =nil {
      log.Printf("connect to etcd error : %v\n", err)
      return
   }

   log.Printf("connect to etcd successfully ...")

   defer client.Close()
   // Keep a watch on the name changes
   // respCh is a channel
   respCh := client.Watch(context.Background(), "name")
   // If respCh is empty, it will block here
   for watchResp := range respCh {
      for _, v := range watchResp.Events {
         log.Printf("type = %s , Key = %s , Value = %s\n", 
            v.Type, v.Kv.Key, v.Kv.Value)
      }
   }
}
Copy the code

Because respCh is a channel, the for loop will block if there is no data in it, so we need to simulate adding, deleting, and modifying name values on the terminal, and our program will do the corresponding

For example, I type etcdctl –endpoints=http://127.0.0.1:2379 put name “xiaomotong” in terminal command.

The program we ran above would then print the following statement

./my_etcd
22:18:39 /home/xiaomotong/my_etcd/main.go:23: connect to etcd successfully ...
22:18:43 /home/xiaomotong/my_etcd/main.go:31:type =  PUT , Key = name , Value = xiaomotong
Copy the code

ETCD LEASE operation

If you LEASE a key, you can set it to an expiry date. If you LEASE a key, you can set it to an expiry date

func main(a) {

   // Set the log parameter to print the current time and current number of lines
   log.SetFlags(log.Ltime | log.Llongfile)
   // ETCD default port number is 2379
   // Use the clientv3 package of ETCD
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:2379"},
      DialTimeout: 10 * time.Second,
   })
   iferr ! =nil {
      log.Printf("connect to etcd error : %v\n", err)
      return
   }

   log.Printf("connect to etcd successfully ...")

   defer client.Close()

   // We create a 20-second lease
   resp, err := client.Grant(context.TODO(), 20)
   iferr ! =nil {
      log.Printf("client.Grant error : %v\n", err)
      return
   }

   // After 20 seconds, the /name key is removed
   _, err = client.Put(context.TODO(), "/name"."xiaomotong", clientv3.WithLease(resp.ID))
   iferr ! =nil {
      log.Printf("client.Put error : %v\n", err)
      return}}Copy the code

The name above will automatically expire after 20 seconds

ETCD alive operation

By the way, Keepalived is also an open source component for high availability, so those interested can take a closer look

Here keepalived is keepalive, here ETCD is keepalive, you can make an adjustment in the above code, the above name, not invalid

func main(a) {

   // Set the log parameter to print the current time and current number of lines
   log.SetFlags(log.Ltime | log.Llongfile)
   // ETCD default port number is 2379
   // Use the clientv3 package of ETCD
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:2379"},
      DialTimeout: 10 * time.Second,
   })
   iferr ! =nil {
      log.Printf("connect to etcd error : %v\n", err)
      return
   }

   log.Printf("connect to etcd successfully ...")

   defer client.Close()

   // We create a 20-second lease
   resp, err := client.Grant(context.TODO(), 20)
   iferr ! =nil {
      log.Printf("client.Grant error : %v\n", err)
      return
   }

   // After 20 seconds, the /name key is removed
   _, err = client.Put(context.TODO(), "/name"."xiaomotong", clientv3.WithLease(resp.ID))
   iferr ! =nil {
      log.Printf("client.Put error : %v\n", err)
      return
   }

   // This key name will be stored permanently
   ch, kaerr := client.KeepAlive(context.TODO(), resp.ID)
   ifkaerr ! =nil {
      log.Fatal(kaerr)
   }
   for {
      ka := <-ch
      log.Println("ttl:", ka.TTL)
   }
}
Copy the code

We can read keepalived’s official description,

KeepAlive keeps a given lease alive forever. If the Keepalive response sent to the channel is not immediately used, the lease client will continue to send keepalive requests to the ETCD server at least every second until the latest response is used.

// KeepAlive keeps the given lease alive forever. If the keepalive response
// posted to the channel is not consumed immediately, the lease client will
// continue sending keep alive requests to the etcd server at least every
// second until latest response is consumed.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
// given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
// from this closed channel is nil.
//
// If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
// no leader") or canceled by the caller (e.g. context.Canceled), the error
// is returned. Otherwise, it retries.
//
// TODO(v4.0): post errors to last keep alive message before closing
// (see https://github.com/coreos/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
Copy the code

Take a look at ETCD’s distributed lock implementation

Here need to introduce a new package, “github.com/coreos/etcd/clientv3/concurrency”

Go mod will help you download and compile the package directly after you write the code

The Go part is really quite unpoking

package main

import (
	"context"
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
	"log"
)
func main (a){

   // Set the log parameter to print the current time and current number of lines
   log.SetFlags(log.Ltime | log.Llongfile)

   // ETCD default port number is 2379
   // Use the clientv3 package of ETCD
   // Endpoints should be filled in the URL list
   client, err := clientv3.New(clientv3.Config{Endpoints: []string{"/name"}})
   iferr ! =nil {
      log.Printf("connect to etcd error : %v\n", err)
      return
   }
   defer client.Close()

   // Create the first session
   session1, err := concurrency.NewSession(client)
   iferr ! =nil {
      log.Printf("concurrency.NewSession 1 error : %v\n", err)
      return
   }
   defer session1.Close()
   / / set the lock
   myMu1 := concurrency.NewMutex(session1, "/lock")

   // Create a second session
   session2, err := concurrency.NewSession(client)
   iferr ! =nil {
      log.Printf("concurrency.NewSession 2 error : %v\n", err)
      return
   }
   defer session2.Close()
   / / set the lock
   myMu2 := concurrency.NewMutex(session2, "/lock")

   // Session S1 obtains the lock
   iferr := myMu1.Lock(context.TODO()); err ! =nil {
      log.Printf("myMu1.Lock error : %v\n", err)
      return
   }
   log.Println("Get session1 lock ")


   m2Chan := make(chan struct{})
   go func(a) {
      defer close(m2Chan)
      // If the lock fails, it will block until the lock succeeds
      // Use a channel to communicate
      // If myMu2 can be locked successfully, myMu1 is unlocked successfully
      // When myMu2 is successfully locked, the channel is closed
      // Close the channel and read nil from the channel
      iferr := myMu2.Lock(context.TODO()); err ! =nil {
         log.Printf("myMu2.Lock error : %v\n", err)
         return
      }
   }()

   / / unlock
   iferr := myMu1.Unlock(context.TODO()); err ! =nil {
      log.Printf("myMu1.Unlock error : %v\n", err)
      return
   }
   log.Println("Release session1 lock ")

   // Read nil
   <-m2Chan

   log.Println("Get session2 lock")}Copy the code

In the code above, we created two sessions to simulate distributed locking

We let the first session get the lock, and the second session try to lock it

When the second session successfully locks properly, a channel is closed to confirm that the lock was actually added

The logic for locking the second session above is as follows:

  • If the lock fails, it blocks until the lock succeeds
  • Here we use a channel to communicate
  • whenmyMu2If the lock can be added successfully, themyMu1unlocked
  • whenmyMu2When the lock is successful, it closesm2Chanchannel
  • Close the channel fromm2ChanThe channel reads nil. Confirm session 2 was locked successfully

conclusion

  • It shares the simple single-point deployment of ETCD, the package installation used by ETCD, and the problems encountered
  • ETCD setup and get KEY
  • ETCD WATCH monitor KEY simplification
  • ETCD lease and survival mechanism
  • A simple implementation of ETCD’s distributed lock

The above coding case, we can take down their own run to see the effect, learning together, progress together

For more in-depth understanding and learning, you can refer to the official documents mentioned at the beginning of the article. The cases in the official documents are more detailed

Specific source code is also very detailed, afraid you will not learn

Welcome to like, follow and favorites

Friends, your support and encouragement, I insist on sharing, improve the quality of the power

Ok, that’s it for now. Next time, I’ll share the implementation principle of String in GO

Technology is open, our mentality, should be more open. Embrace change, live in the sun, and strive to move forward.

I am Nezha, welcome to like, see you next time ~