Creating a Client

Let’s look at the code to create the client:

func newClient(config *Config) *Client {
    Configuration of etcDV3
	conf := clientv3.Config{
		Endpoints:            config.Endpoints,
		DialTimeout:          config.ConnectTimeout,
		DialKeepAliveTime:    10 * time.Second,
		DialKeepAliveTimeout: 3 * time.Second,
		DialOptions: []grpc.DialOption{
			grpc.WithBlock(),
			grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
			grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
		},
		AutoSyncInterval: config.AutoSyncInterval,
	}
    ...
    
    // Call clientv3 method to connect
	client, err := clientv3.New(conf)

	iferr ! =nil {
		config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
	}
}
Copy the code

Get and store

Kv interface of etcDV3:


type KV interface {
	Put(ctx context.Context, key, val string, opts ... OpOption) (*PutResponse, error) Get(ctx context.Context, keystring, opts ... OpOption) (*GetResponse, error) Delete(ctx context.Context, keystring, opts ... OpOption) (*DeleteResponse, error) Compact(ctx context.Context, revint64, opts ... CompactOption) (*CompactResponse, error) Do(ctx context.Context, op Op) (OpResponse, error) Txn(ctx context.Context) Txn }Copy the code
  • The Put, Get, and Delete methods encapsulate the concrete behavior of the Do() method.

Get and get by prefix


// GetKeyValue queries etcd key, returns mvccpb.KeyValue
func (client *Client) GetKeyValue(ctx context.Context, key string) (kv *mvccpb.KeyValue, err error) {
	rp, err := client.Client.Get(ctx, key)
	...
}

// GetPrefix get prefix
func (client *Client) GetPrefix(ctx context.Context, prefix string) (map[string]string, error) {
	resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
	iferr ! =nil {
		return vars, err
	}
	...
}
Copy the code

The difference between getting a single key/value and getting a key/value with a prefix is through an OpOption method WithPrefix().

storage

Storage in Jupiter is not wrapped separately, but rather directly calls etcDV3’s client methods.

func (kv *kv) Put(ctx context.Context, key, val string, opts ... OpOption) (*PutResponse, error) {
Copy the code

Put method in etcDV3 client library. GRPC service registration is about writing service information to etCD.

delete


// DelPrefix is deleted by prefix
func (client *Client) DelPrefix(ctx context.Context, prefix string) (deleted int64, err error) {
	resp, err := client.Delete(ctx, prefix, clientv3.WithPrefix())
	...
}
Copy the code

Delete information based on the prefix.

Gets the value of multiple keys

To obtain the values of multiple keys in etCD, we can query the results of multiple keys through etCD’s transaction method Txn().

// GetValues queries etcd for keys prefixed by prefix.
func (client *Client) GetValues(ctx context.Context, keys ...string) (map[string]string, error) {
	var (
		firstRevision = int64(0)
		vars          = make(map[string]string)
		maxTxnOps     = 128 // Maximum number of submissions
		getOps        = make([]string.0, maxTxnOps)
	)

	// The specific transaction query processing method
	doTxn := func(ops []string) error {
		txnOps := make([]clientv3.Op, 0, maxTxnOps)

		// Add the query operation
		for _, k := range ops {
			txnOps = append(txnOps, clientv3.OpGet(k,
				clientv3.WithPrefix(),
				clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend),
				clientv3.WithRev(firstRevision)))
		}

		// Commit the transactionresult, err := client.Txn(ctx).Then(txnOps...) .Commit()iferr ! =nil {
			return err
		}
		// Process the result returned
		for i, r := range result.Responses {
			...
		}
		// Get the latest version number of the revision
		if firstRevision == 0 {
			firstRevision = result.Header.GetRevision()
		}
		return nil
	}

	for _, key := range keys {
		// Add the key that needs to be committed, and judge whether the maximum number of commits is reached, then commit for transaction query
		getOps = append(getOps, key)
		if len(getOps) >= maxTxnOps {
			iferr := doTxn(getOps); err ! =nil {
				return vars, err
			}
			getOps = getOps[:0]}}// Check whether there are unsubmitted keys and query if there are
	if len(getOps) > 0 {
		iferr := doTxn(getOps); err ! =nil {
			return vars, err
		}
	}
	return vars, nil
}
Copy the code

Method begins by declaring the maxTxOps maximum number of commits. The doTxn method is the actual transaction query function and then commits the query by breaking the keys into the maxTxOps maximum number. This ensures that the query will not time out because the keys are too large and the query will be slow.

Continuous monitoring of

Create a batch to continuously monitor key changes:

// NewWatch creates continuous monitoring
func (client *Client) WatchPrefix(ctx context.Context, prefix string) (*Watch, error) {
	resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
	iferr ! =nil {
		return nil, err
	}

	var w = &Watch{
		revision:     resp.Header.Revision,
		eventChan:    make(chan *clientv3.Event, 100),
		incipientKVs: resp.Kvs,
	}

	xgo.Go(func(a) {
		ctx, cancel := context.WithCancel(context.Background())
		w.cancel = cancel
		// Request monitoring
		rch := client.Client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify(), clientv3.WithRev(w.revision))
		for {
			for n := range rch {
				...
				for _, ev := range n.Events {
					select {
					case w.eventChan <- ev:
					default:
						xlog.Error("watch etcd with prefix", xlog.Any("err"."block event chan, drop event message"))
					}
				}
			}
			ctx, cancel := context.WithCancel(context.Background())
			w.cancel = cancel
			if w.revision > 0 {
				rch = client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify(), clientv3.WithRev(w.revision))
			} else {
				rch = client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
			}
		}
	})

	return w, nil
}
Copy the code

This is requested to a monitor channel through the Etcdv3 client’s Watch() method, which continually receives a value of type WatchChan, which is actually a WatchResponse structure with the return value pushed by etCD. Then we keep processing Events in the return value.

An Event has two types: PUT and DELETE, indicating that certain keys or values are updated or deleted.

Lease mechanism

Let’s start with lease, a distributed system technology family.

The essence of the lease mechanism in ETCD is to set an expiration time for K-V, after which the service needs to renew k-V at regular intervals, otherwise k-V will be automatically cleaned up.

Create lease code:

// Create a lease
func (reg *etcdv3Registry) getSession(k string, opts ... concurrency.SessionOption) (*concurrency.Session, error){... sess, err := concurrency.NewSession(reg.client.Client, opts...)iferr ! =nil {
		return sess, err
	}
	reg.rmu.Lock()
	reg.sessions[k] = sess
	reg.rmu.Unlock()
	return sess, nil
}

// Use the lease
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
	sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
	iferr ! =nil {
		return err
	}
	opOptions = append(opOptions, clientv3.WithLease(sess.Lease()))
}

// Submit information to etCD
_, err := reg.client.Put(readCtx, key, val, opOptions...)
Copy the code

The lease is created in the Concurrency package of the ETCD library. The way to do this is simply to add WithLease() to the request options.

A distributed lock

Etcd’s distributed Lock is very similar to the sync.mutex we normally use, with two methods Lock() and Unlock().


// Mutex ...
type Mutex struct {
	s *concurrency.Session
	m *concurrency.Mutex
}

// NewMutex ...
func (client *Client) NewMutex(key string, opts ... concurrency.SessionOption) (mutex *Mutex, err error) {
	mutex = &Mutex{}
	// Default session TTL = 60s
	mutex.s, err = concurrency.NewSession(client.Client, opts...)
	iferr ! =nil {
		return
	}
	mutex.m = concurrency.NewMutex(mutex.s, key)
	return
}

// Lock ...
func (mutex *Mutex) Lock(timeout time.Duration) (err error) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	return mutex.m.Lock(ctx)
}

// Unlock ...
func (mutex *Mutex) Unlock(a) (err error) {
	err = mutex.m.Unlock(context.TODO())
	iferr ! =nil {
		return
	}
	return mutex.s.Close()
}
Copy the code

The difference is that distributed locks have an expiration time.

reference

  • Etcd V3 principle analysis
  • Summary of etCD use experience

Article series

  • Jupiter framework entry introduction
  • GRPC through ETCD service discovery and registration – source code analysis
  • Jupiter -etcd client introduction