What is MVCC

Before you know it, you need to understand the concepts of optimistic and pessimistic locks. Optimistic locking and pessimistic locking are two programming ideas, which are not limited to programming languages.

Pessimistic locking

When reading or writing critical resources, the data is locked directly to prevent others from synchronizing the data. The lock is released only after the operation is complete. In this way, concurrency security is realized. Common examples include Mutex of Go, Synchronized of Java, etc. Optimistic lock when performing operations on critical resources, it does not lock the data to realize monopoly, but determines whether the data has been modified by others, if modified, it will return modification failure. Common ways to verify whether to modify are multi-version concurrency control (MVCC) and so on.

MVCC profile

MVCC means that when modifying data, it does not modify the original data, but adds a modified data on the basis of the data, and distinguishes the data by a unique version number. The version number is generally increased by itself. When reading data, the snapshot data corresponding to the current version is actually read. For example, if a key-value pair is K->{v.0}, the version number of value is 0. The data of version 0 read by operation 1 will become K->{V.0, V.1} after the transaction is submitted. The data of version 1 read by operation 2 will become K->{V.0, V.1, V.2} after the transaction is submitted successfully. Each modification only appends the version number and data value. In this way, each transaction operates on the data within its own version number, thus achieving isolation between transactions. You can also access the corresponding data by specifying a version number.

The realization of etcd

Etcd is a key-value database based on THE MVCC mechanism. Let’s demonstrate this with an example.

Etcd version number

Let’s start with a simple PUT, get example of the etCD version number.

/etcdctl --endpoints=127.0.0.1:23790 put linugo go OK You can see that both k and V are base64 encrypted, Can see three version [XXXX etcdctl] $. / etcdctl - endpoints = 127.0.0.1:23790 get linugo - w = json | python - m json tool {" count ": 1, "header": { "cluster_id": 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 1}, "KVS ": [{"create_revision": 2, "key": "bGludWdv", "mod_revision": 2," KVS ": [{"create_revision": 2, "key": "bGludWdv", "mod_revision": 2, "value": "Z28=", "version": /etcdctl --endpoints=127.0.0.1:23790 put linugo gol OK [XXXX etcdctl]$ /etcdctl -- endPoints =127.0.0.1:23790 put Linugo gola OK [XXXX etcdctl]$./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool { "count": 1, "header": { "cluster_id": 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 4}, "KVS ": [{"create_revision": 2,# create_revision: 2 "key": "Value ": "Z29sYQ==", "version": 3}]}Copy the code

It can be seen that there is a version number corresponding to the creation time, and the new version number will be generated after each modification, is similar to the above version number superposition? Next put a different key-value pair than the one above.

/etcdctl --endpoints=127.0.0.1:23790 put linugo1 go OK [XXXX etcdctl]$ . / etcdctl - endpoints = 127.0.0.1:23790 get linugo1 - w = json | python - m json tool {" count ": 1," header ": {" cluster_id" : 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 5 }, "kvs": [ { "create_revision": "Value ": "Z28=", "version": 1}]}Copy the code

Revision is a global revision number that acts as a logical clock. Revisions to each key trigger a major revision. What each key does is record the major version number corresponding to the value when it is created or modified. In theory, you can find the change history of any data by using the major version number. If both records are recorded, all version numbers can be queried by key, and the value can be found by the latest version number. In fact, etCD does the same.

MVCC overview

Etcd maintains the two mappings mentioned above. It maintains a B-tree in memory as the mapping between the key and the corresponding version number. This structure is called treeIndex, and BoltDB is used to provide the mapping between the version number and the corresponding value as well as the persistent storage of data.

When querying a data, first locate the latest revision through the treeIndex (if the client has a specified revision), and then locate the corresponding value through the revision. This logic is somewhat similar to normal index queries in MySQL.

keyIndex

In treeIndex, keys correspond to multiple Revisions through the keyIndex structure.

type keyIndex struct {
  key []byte
  modified revision // the main rev of the last modification
  generations []generation
}
Copy the code
  • Key: indicates the key put by the user
  • Modified: Indicates the last revision that was modified
  • Generations: All generations correspond to revisions
type revision struct {
  main int64
  sub int64
}
Copy the code

Revision is also not a pure numeric type, but consists of two fields

  • Main: indicates the major version number, which corresponds to the major version number in etCD
  • Sub: sub-transaction version number. A transaction may contain multiple sub-transactions (for example, a Txn request contains multiple PUT operations. The main version number does not change.
type generation struct {
  ver int64
  created revision // when the generation is created (put in first revision).
  revs []revision
}
Copy the code

In etCD, deleting the data does not delete the data, but appends a new generation element to the Generations array, which cannot be retrieved if the version number is used to retrieve the del data. To get the most recent revision of a key, you just need to find the last generation in the Generations array and the last revision element in the REvs.

  • Ver: Indicates the number of key changes in this generation, which corresponds to the number of key changes in the previous example
  • Created: global revision of the created version
  • Revs: key Indicates the version number of each version

Example

Here is an example to explore the transformation relationship among the three.

  • First, put a new key and check its keyIndex. The initial version is 112, as shown on the left.
  • Run the put operation on the key twice to check keyIndex. The version number increases by 2 and the number of ver changes by 3. Two elements are added to reVS.
  • Delete the key, check keyIndex, version number increases, generation slice adds a bit, the original revs also adds a bit, corresponding to delete operation. Right in the picture below.

KeyValue

Next, the data corresponding to the key in boltDB is not a simple value, but also contains many other fields

Type KeyValue struct {Key []byte // CreateRevision int64 // CreateRevision int64 // Last modified Version Version Value []byte // Incoming Value Lease int64 // Lease ID}Copy the code

Since data is persistent on disk through boltDB, it will cause performance problems to directly use boltDB to read and write on disk every time you query or modify. Therefore, there is a buffer before accessing boltDB, and there are two buffers, one read buffer (txReadBuffer). Located in baseReadTx, used for reading transactions; A txWriteBuffer, located at batchTxBuffered, is used for write operations and disk flushing.

type txWriteBuffer struct {
  txBuffer
  bucket2seq map[BucketID]bool
}

type txReadBuffer struct {
  txBuffer
  bufVersion uint64
}
Copy the code

Buffer is read first, boltDB is read if there is no hit. When the End transaction is committed, the write buffer is directly written to the buffer. When the End transaction is committed, the merge operation is performed with the read buffer, and the backstop coroutine will periodically unload the write buffer.

Read the data source call relationship

The read request corresponds to the Range method in ETCD, and the request is encapsulated in the upper layer, interceptor verification, RAFT data synchronization and other processes.

//etcdserver/apply.go func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { ...... If TXN = = nil {/ / initializes a read transaction, the part of the Shared areas and read locks, access to the latest version number, TXN = A.S.K v.R ead (MVCC ConcurrentReadTxMode, trace) / / end of the transaction, commit the transaction, Unlock the locked part defer txn.end ()}...... // Call the Range method implemented by the read transaction and get the corresponding value by key (because Range lookup is supported, Rr, err := txn.Range(CTX, r.key, mkGteRange(r.rangeEnd), ro) if err! = nil {return nil, err} Revision = rr.Rev resp.count = int64(rr.Count) resp.kvs = make([]* mvccpb.keyValue, len(rr.KVs)) for i := range rr.KVs { if r.KeysOnly { rr.KVs[i].Value = nil } resp.Kvs[i] = &rr.KVs[i] } return resp, nil }Copy the code

Next, look at the implementation of the Range method

//mvcc/metrics_txn.go func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, Ro RangeOptions) (*RangeResult, error) {tw.ranges++ // return tw.txnwrite.range (CTX, key, end, ro) } //mvcc/kvstore_txn.go func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { return tr.rangeKeys(ctx, key, end, tr.Rev(), ro) } func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, Ro RangeOptions) (*RangeResult, error) {// Check whether the version number is normal operation...... // Get the version number of the specified key in the treeIndex. Revisions are Revisions(revpAIRS), total := tr.s.vindex.Revisions(key, end, rev, int(ro.limit))...... KVS := make([]mvccpb.KeyValue, limit) revBytes := newRevBytes() Revpair := range revpAIRS [:len(KVS)] {// Check whether the operation times out...... RevToBytes (Revpair, revBytes) _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0) if len(vs)! = {1... If err := KVS [I].Unmarshal(vs[0]); err ! = nil { ...... } } return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil }Copy the code

Find the revision

//mvcc/index.go func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, Rev, _, _, err := ti.Get(key, atRev)... Visit (key, end, func(ki *keyIndex) bool {...... }) return revs, total } func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { keyi := &keyIndex{key: RLock() defer to ti.runlock (). If keyi = ti. Keyi == nil {return Revision {}, Revision {}, 0, ErrRevisionNotFound} Return keyi.get(ti. Lg, atRev)} func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (Modified, Created Revision, ver int64, err Error) { AtRev := ki.findGeneration(atRev) //walk Func (rev Revision) bool {return rev.main > atRev}) if n! // Return the final revision (the last element in the array revs), the creation revision, Return g.evs [n], g.created, g.ver-int64 (len(g.evs)-n-1), nil} Return Revision {}, revision{}, 0, ErrRevisionNotFound }Copy the code

Check the value

Turn off a Revision by calling UnsafeRange. This method first looks in the read cache and cannot find it in boltDB.

//mvcc/backend/read_tx.go func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, Limit int64) (byte [], [] [] [] byte) {/ / limit on the number of access to test... // Read from the cache readBuffer first, if it is returned directly. Keys, vals := basereadTx.buf. Range(bucketType, key, endKey, limit) If (len(keys)) == limit {return keys, vals}...... K2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys... , append(v2, vals...) }Copy the code

Write data source call relationships

Write data corresponds to the PUT method and goes through a series of pre-authentication steps.

//etcdserver/apply.go func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { ...... if txn == nil { ...... TXN = a.s.kv ().write (trace) // Write transaction submit defer txn.end ()}...... // Call the put method to write the transaction, Resp.Header.Revision = txn. put (p.ey, val, leaseID) return resp, trace, nil } //mvcc/metrics_txn.go func (tw *metricsTxnWrite) Put(key, value []byte, Lease lease.LeaseID) (rev int64) {// Some data for indicator statistics...... Return tw.txnwrite.Put(key, value, lease)} // MVCC /kvstore_txn.go func (TW *storeTxnWrite) put (key, Value []byte, lease lease.LeaseID) int64 {//put operation TW. Put (key, value, lease) return TW. BeginRev + 1}Copy the code

Put generates the new version number and synchronizes cached operations.

func (tw *storeTxnWrite) put(key, value []byte, LeaseID lease.LeaseID) {rev := tw.beginRev + 1 c := rev oldLease := lease. Returns the version number at creation (value used to encapsulate the store), Get(key, rev) if err == nil {c = created. Main...... } ibbytes := newRevBytes() // Generate a revision{main: rev, sub: Int64 (len(tw.changes))} revToBytes(idxRev, ibrev) + 1 ver = ver + 1 kv := mvccpb.KeyValue{Key: key, Value: value, CreateRevision: c, ModRevision: rev, Version: ver, Lease: Int64 (leaseID),} // serialize value d, err := kv.marshal () Tw.tx.unsafeseqput (buckets.Key, ibfile, d) // add (Key: Revision) to the treeIndex; Put(key, idxRev)...... add revsion to revs in generations corresponding to keyIndex } //mvcc/backend/batch_tx.go func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, Value []byte) {// Add data to boltDB, T.bucchtx.unsafeseqput (bucket, key, value) // Add the data to the buffer. value) } func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { t.unsafePut(bucket, key, value, true) } func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) { if err := bucket.Put(key, value); err ! = nil { ...... } // Pending flag bit increment, for later merge cache and persistent data.Copy the code

Cache merger

At the End of the request, TXN’s corresponding End() method is called to commit the transaction,

//mvcc/kvstore_txn.go unc (tw *storeTxnWrite) End() { ...... // Unlock tw.tx.unlock ()} // MVCC /backend/batch_tx.go func (t *batchTxBuffered) Unlock() {// Pending not equal to 0 indicates that there is a write operation if t.pending ! = 0 {// read buffer lock, Read requests are blocked during this time t.buckend.readtx.lock () // Merge two caches t.bof.writeback (& t.buckend.readtx.buf) t.buckend.readtx.unlock () If t.ending >= t.buchend.batchLimit {t.com MIT (false)}} t.batchtx.unlock ()} Go func (TXW *txWriteBuffer) writeback(TXR *txReadBuffer) {// Merge for k, wb := range txw.buckets { rb, ok := txr.buckets[k] if ! ok { delete(txw.buckets, k) txr.buckets[k] = wb continue } ...... rb.merge(wb) } ...... }Copy the code

Data persistence

When put is called, data is stored in buffer and boltDB, but it is not persisted. The early persistence operations are performed in a background backend coroutine that starts when ETCD is started and is responsible for resource persistence at exit time and periodic disk persistence of data.

//mvcc/backend/backend.go func (b *backend) run() { defer close(b.donec) t := time.NewTimer(b.batchInterval) defer t.Stop() for { select { case <-t.C: Case < -b.topc :// Exit the coroutine after receiving the disk persistence signal. B.batchTx.Com mitAndStop() return} Commit if b.batchtx.safepending ()! = 0 { b.batchTx.Commit() } t.Reset(b.batchInterval) } } //mvcc/backend/batch_tx.go func (t *batchTxBuffered) Commit() { t.Lock() t.commit(false) t.Unlock() }Copy the code

To delete data

When the data is deleted, the data is not actually deleted, but a new generation element is added to the Generations array in the keyIndex. The deletion operation corresponds to the DeleteRange method in ETCD to delete the data that meets the conditions.

//mvcc/kvstore_txn.go func (tw *storeTxnWrite) deleteRange(key, End []byte) int64 {rrev := beginRev if len(tw.changes) > 0 {rrev++} _ := tw.s.kvindex.Range(key, end, rrev) if len(keys) == 0 { return 0 } for _, Key := range keys {tw.delete(key)} return int64(len(keys))} func (tw *storeTxnWrite) delete(key []byte) { Ibbytes := newRevBytes() // Generate a revision idxRev := revision{main: tw. BeginRev + 1, sub: Int64 (len(tw.changes))} revToBytes(idxRev, ibytes) // make a token of ibmb = appendMarkTombstone(tw.storetxnread. ibytes) kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() ...... UnsafeSeqPut(buckets.Key, ibytes, d) // Call Tombstone err = tw.s.vindex.Tombstone(Key, idxRev) ...... } func (ti *treeIndex) Tombstone(key []byte, rev revision) error { keyi := &keyIndex{key: Key} ti.lock () defer ti.unlock () // fetch keyIndex item := ti.tree.get (keyi) if item == nil {return ErrRevisionNotFound} ki  := item.(*keyIndex) return ki.tombstone(ti.lg, rev.main, rev.sub) } func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { ...... Put (lg, main, sub); // Append empty generation ki.generations = append(ki.generations, generation{}) keysGauge.Dec() return nil }Copy the code

summary

This paper began to analyze the principle of ETCD implementation of MVCC, and then traced the specific implementation of MVCC from the perspective of source code, only the general process and the general idea of the source code was traced, for some specific implementation did not go into depth (such as boltDB storage, buffer storage, ETCD transactions, etc.). For those of you who want to go deeper, you need to read the code in more detail yourself. The most important thing to understand about ETCD MVCC is to understand keyIndex, Revision, generation.

reference

  • Etcd-v3.5.0 source: github.com/etcd-io/etc…
  • Etcd Principles and Practice: How does ETCD implement MVCC