At OSDI in 2006, Google released Bigtable: A Distributed Storage System for Structured Data, which describes the Data model, interface and implementation of Bigtable, A Distributed Storage System for managing Structured Data.

This paper will first give a brief description of the distributed storage system described in Bigtable, and then analyze LevelDB, Google’s open source KV storage database. LevelDB can be understood as a single point of Bigtable system, although there is no Bigtable logic related to tablet management or distribution, but we can read the LevelDB source code to add to our understanding of Bigtable.

Bigtable

Bigtable is a distributed storage system for managing structured data. It has excellent scalability and can process PB level data in thousands of machines at the same time. Many projects at Google, including the Web Index, use Bigtable to store huge amounts of data; Bigtable’s paper claims that it achieves four goals:

In my opinion, these goals are good to look at, but in fact they don’t mean much. All projects will announce that they have achieved high performance, high availability, etc. What we need to focus on is how Bigtable actually achieves this.

The data model

Bigtable is very similar to a database in many ways, but it provides a different interface from a database. It does not support a full relational data model, but uses a simple data model, so that data can be more flexible control and management.

In its implementation, Bigtable is a sparse, distributed, multidimensional persistent ordered hash.

A Bigtable is a sparse, distributed, persistent multi-dimensional sorted map.

In fact, its definition also determines that its data model is very simple and easy to implement. We use row, column and TIMESTAMP as the key of this hash, and the value is a byte array, which can also be understood as a string.

The most important value is the value of the row, which can be up to 64KB in length. All reads and writes to the same row can be treated as atomic. Because Bigtable sorts rows in lexicographical order by their value, the extent of each row is partitioned by Bigtable and given to a tablet.

implementation

In this section, we’ll look at the Bigtable paper’s description of its own implementation, which includes many things: how the tablet is organized, how the tablet is managed, how the read and write requests are handled, and how the data is compressed.

The organization of tablets

We use a three-tier structure like a B+ tree to store the location of the tablet. The first layer is a separate Chubby file that holds the location of the root tablet.

Chubby is a distributed lock service that we may cover in a later article.

Each METADATA tablet, including the tablet on the root node, stores the location of the tablet and the minimum and maximum values of keys in the tablet. Each METADATA row stores approximately 1KB of data in memory, and if each METADATA tablet is 128MB in size, the entire three-tier structure can store 2^61 bytes of data.

The management of the tablet

How does Bigtable manage large amounts of data, given that there are a large number of tablet servers and shard tablets for data? Like many distributed systems, Bigtable uses a master server to dispatch tablets to different server nodes.

To reduce the load on the primary server, all clients simply get the location of the Tablet server from the Master. Instead of requesting the Master node for each read and write, the client directly connects to the Tablet server. The client itself also keeps a cache of the tablet server’s location to reduce the number and frequency of communication with the Master.

Processing of read and write requests

From the processing of read and write requests, we can actually see how various parts of Bigtable work together, including logs, memtables, and SSTable files.

When a client sends a write operation to the tablet server, it adds a record to the log in the tablet server and inserts the record into the memTable after the log is successfully appended. This is exactly the same as the implementation of most databases. Write data sequentially to the log, and then write data randomly to the database. Because random write takes much longer than adding content, data may be lost due to device failure.

When the Tablet server receives a read, it does a merge lookup on the memTable and SSTable, which are lexicographically stored for keys and values, so the entire read is very fast.

The compression of the table

When the memtable size exceeds a certain threshold, the current MEMtable will be frozen and a new memtable will be created. The frozen memtable is converted to an SSTable and written to the GFS system. This Compaction is also known as a Minor Compaction Compaction.

Each Minor Compaction creates a new SSTable, which reduces memory usage and reduces the recovery time caused by an abnormally large log when a service process exits. Since there is a Minor Compaction that compacts data from the memtable, there must be a corresponding Major Compaction.

Bigtable periodically runs a Major Compaction in the background that merges and sorts keys from memtable data and a portion of the SSTable. A new SSTable is generated and the original memtable and SSTable are removed. The newly generated SSTable contains all the data and information of the former two, and some of the undeleted information is completely cleared.

summary

At this point, the introduction of Google Bigtable paper is almost complete, of course, this article only introduces part of the content, about the implementation details of the compression algorithm, caching and the implementation of the submission of logs and other issues are not covered, readers who want to know more about the relevant information, It is highly recommended to take A look at Bigtable: A Distributed Storage System for Structured Data to understand its implementation.

LevelDB

LevelDB is a standalone version of Bigtable’s key and value storage system. LevelDB provides an extremely high speed key and value storage system. LevelDB provides an extremely high speed key and value storage system. And it was done by Bigtable authors Jeff Dean and Sanjay Ghemawat, which can be said to be a high copy of Bigtable’s paper describing its implementation.

Because Bigtable is just a paper, and because its implementation relies on some of Google’s non-open source base services: GFS, Chubby, etc., it’s hard to get access to its source code, but LevelDB provides a better understanding of many of the ideas in this paper.

An overview of the

LevelDB acts as a “repository” for key and value stores and provides a very simple set of add, delete, change and query interfaces:

class DB {
 public:
  virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0;
  virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
  virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
  virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0;
}
Copy the code

The Put method will eventually call the Write method internally, but at the top level it gives the caller two different choices.

Get and Put are the interfaces that LevelDB provides for reading and writing. If we have a clear understanding of how this works, we should be able to understand LevelDB’s implementation.

In this section, we will first understand some of the implementation of the whole project through the analysis of read and write operations, and explain problems and new concepts when we encounter them. In this process, we will step by step introduce the implementation of some important modules in LevelDB to achieve the goal of mastering its principles.

Start with the write operation

First let’s look at the write methods in Get and Put:

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
    ...
}
Copy the code

As mentioned above, the DB::Put method wraps the passed argument as a WritaBatch, and then still executes the DBImpl::Write method to Write data to the database; Write method DBImpl::Write is a very complex process, including a lot of context state judgment, let’s look at the overall logic of a Write operation:

LevelDB takes three steps to perform a write to a database:

  1. callMakeRoomForWriteMethod provides sufficient space for upcoming writes;
    • During this process, a running out of space in the memtable could freeze the current memtable, cause a Minor Compaction to occur and create a new oneMemTableObject;
    • When certain conditions are met, a Major Compaction occurs, which compacts the sstables in the database.
  2. throughAddRecordMethod to append a write operation to the log;
  3. After successfully writing a record to the log, we useInsertIntoInsert memtable directly to complete the entire write operation process;

We are not going to provide LevelDB’s entire implementation of the Put method here, but rather a condensed version of the code that gives us an overview of how the write operation works:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;

  MakeRoomForWrite(my_batch == NULL);

  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  WriteBatch* updates = BuildBatchGroup(&last_writer);
  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  last_sequence += WriteBatchInternal::Count(updates);

  log_->AddRecord(WriteBatchInternal::Contents(updates));
  WriteBatchInternal::InsertInto(updates, mem_);

  versions_->SetLastSequence(last_sequence);
  return Status::OK();
}
Copy the code

Immutable memtable

In DBImpl::Put, the preparation of the write operation MakeRoomForWrite is a method we need to pay attention to:

Status DBImpl::MakeRoomForWrite(bool force) {
  uint64_t new_log_number = versions_->NewFileNumber();
  WritableFile* lfile = NULL;
  env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);

  delete log_;
  delete logfile_;
  logfile_ = lfile;
  logfile_number_ = new_log_number;
  log_ = new log::Writer(lfile);
  imm_ = mem_;
  has_imm_.Release_Store(imm_);
  mem_ = new MemTable(internal_comparator_);
  mem_->Ref();
  MaybeScheduleCompaction();
  return Status::OK();
}
Copy the code

When LevelDB’s memtable is running low on memory, we freeze the memtable and create a new memtable object.

As you can see, LevelDB has introduced an immutable memTable structure imM, which is exactly the same structure as MEMTable, except that all data in it is immutable.

After switching to a new memtable, it is possible to execute a MaybeScheduleCompaction to trigger a Minor Compaction that solidifies data from the IMM into an SSTable in the database. Imm can solve the problem that the data in memTable is too large and cannot be written during compression.

After imM is introduced, if there is too much data in memtable, we can directly assign memTable pointer to IMM, and then create a new memtable instance, so that we can continue to accept external write operations. No more waiting for Minor Compaction to end.

Format of log records

LevelDB is a persistent KV database, and LevelDB must have a log module to recover data in the event of an error. I’m not going to show you the implementation of AddRecord, the method used to append logs, because it just concatenates headers and strings.

Logs are stored in blocks at LevelDB, each block is 32KB in length, and a fixed block length determines where the log can be stored in the block. LevelDB uses a RecordType to indicate the current record location in the block:

enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,
  kFullType = 1,
  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
Copy the code

The type of the log record is stored in the header of the log, which also stores the CRC check of the 4-byte log, record length, and other information:

The figure above contains four blocks of six log records. Each log record or part of a log record can be marked by RecordType and reconstructed using this information when the log is needed.

virtual Status Sync() { Status s = SyncDirIfManifest(); if (fflush_unlocked(file_) ! = 0 || fdatasync(fileno(file_)) ! = 0) { s = Status::IOError(filename_, strerror(errno)); } return s; }Copy the code

Since new entries to the log are written sequentially, it writes very fast, and when the entries are finished in memory, the contents of this part of the buffer fflush is directly stored on disk to persist the entries for later error recovery and other operations.

Record insertion

When a record of data is written to the log, it cannot be queried. It can only be queried after the data is written to the MEMTable, which is what this section explains. Both data insertion and data deletion add a record to the MemTable.

The difference between added and deleted records is that they use different ValueType tags. Inserted data will be set to kTypeValue, and deleted data will be marked as kTypeDeletion. But they all actually insert a piece of data into the memTable.

virtual void Put(const Slice& key, const Slice& value) {
  mem_->Add(sequence_, kTypeValue, key, value);
  sequence_++;
}
virtual void Delete(const Slice& key) {
  mem_->Add(sequence_, kTypeDeletion, key, Slice());
  sequence_++;
}
Copy the code

We can see that they both call the memtable Add method to insert data into their internal data structure skiplist in the format shown above. This data contains the key value, sequence number, and type of the record. These fields will be stored in Skiplist after concatenation. Since we are not deleting data in memTable, how do we ensure that the data we fetch is up to date every time? First, in Skiplist, we used a comparator we defined ourselves:

int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
    }
  }
  return r;
}
Copy the code

The data in the two keys may contain different contents. Some may contain all information such as the key value and serial number, but for example, the key called from the Get method may only contain the key length, key value and serial number. However, this does not affect the data extraction. Since we only extract information from the header of each key, we do not fetch any data outside of the key, whether it is a complete key/value or an individual key.

The method takes the key and serial number from two different keys and compares them. The comparison is done using InternalKeyComparator, which sorts by user_key and sequence_number, where user_key is in ascending order and sequence_number is in descending order. Since the insertion sequence number of data is increasing, we can ensure that the first fetched data is the latest or deleted information.

The sequence number does not delete historical data and can speed up write operations and improve LevelDB write performance.

Data reading

Reading data from LevelDB is not complicated. Memtable and IMM are more like two-level caches that provide faster access in memory. If you can get the value of the response directly from these two places in memory, they must be the latest data.

LevelDB always writes the new key/value pairs first and deletes the history data when the data is compressed.

Data is read in memtables, Immutable memtables, and different levels of SstAbles, which are stored in memory and persist on disk as *. LDB files. The name of our database is LevelDB because of the different levels of Sstables.

The implementation code of the simplified read operation method is like this, the context of the method is very clear, the author believes that there is no need to explain too much:

Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { LookupKey lkey(key, versions_->LastSequence()); if (mem_->Get(lkey, value, NULL)) { // Done } else if (imm_ ! = NULL && imm_->Get(lkey, value, NULL)) { // Done } else { versions_->current()->Get(options, lkey, value, NULL); } MaybeScheduleCompaction(); return Status::OK(); }Copy the code

When LevelDB queries memTable and IMM, the presence of data does not necessarily mean that the current value exists, and it still needs to determine if the current record was deleted by ValueType.

Multi-tier SstAbles

When LevelDB does not find the corresponding data in memory, it searches the sstAbles at multiple levels on disk. The process becomes a little more complicated. LevelDB searches the sstAbles at multiple levels and does not skip any of them. The lookup process involves a very important data structure called FileMetaData:

FileMetaData contains information about the entire file, including the maximum and minimum key values, number of lookups allowed, number of references to the file, file size, and file number, because all SstAbles are stored in the same directory in a fixed format. So we can easily find the corresponding file by the file number.

LevelDB will first look for the corresponding key in Level0. However, unlike other levels, the key ranges of multiple SstAbles at Level0 overlap, and the four fixed SSTAbles at Level0 are searched for the corresponding values.

However, when it comes to sstables at a higher level, since sstables at the same level have no overlapping parts, we can use the maximal value information in the known Sstables to find the corresponding Sstables quickly. Then check whether the current SSTable contains the query key. If it does not, search the next level until the last level kNumLevels (default level 7) or find the corresponding value.

SSTable “merge”

Since LevelDB’s data is organized through multiple levels of Sstables, how does it merge and compress the sstables at different levels; LevelDB implements both Compaction operations that are almost identical to the two Compaction operations described in the Bigtable paper.

MaybeScheduleCompaction may be called during the execution of either a read or write operation to attempt to merge the SstAbles in the database. When the merge conditions are met, This is done in the background by executing a BackgroundCompaction.

A Minor Compaction occurs when data in memory exceeds the maximum memtable size, freezes the memtable into an imM, and CompactMemTable() is executed to compress the memtable.

void DBImpl::CompactMemTable() {
  VersionEdit edit;
  Version* base = versions_->current();
  WriteLevel0Table(imm_, &edit, base);
  versions_->LogAndApply(&edit, &mutex_);
  DeleteObsoleteFiles();
}
Copy the code

CompactMemTable will execute WriteLevel0Table to convert the current IMM to a Level0 SSTable file. This could trigger a new Major Compaction, where we need to select the appropriate level to compress:

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) {
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  Iterator* iter = mem->NewIterator();
  BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);

  const Slice min_user_key = meta.smallest.user_key();
  const Slice max_user_key = meta.largest.user_key();
  int level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
  edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest);
  return Status::OK();
}
Copy the code

All changes to the current SSTable data are recorded and managed by a unified VersionEdit object, which we will describe later. If a successful write is made, the metadata FileMetaData of this file is returned. Finally, the VersionSet method LogAndApply was called to record all the changes in the files truthfully. Finally, some data cleaning was done.

While a Major Compaction is a little more complicated, the logic of a consolidated BackgroundCompaction is clear:

void DBImpl::BackgroundCompaction() { if (imm_ ! = NULL) { CompactMemTable(); return; } Compaction* c = versions_->PickCompaction(); CompactionState* compact = new CompactionState(c); DoCompactionWork(compact); CleanupCompaction(compact); DeleteObsoleteFiles(); }Copy the code

We find files that need to be compressed from the current VersionSet, package them, and store them in a Compaction object. This object must select two levels of Sstables. Once we select a lower-level SSTable, we can select a higher-level SSTable that has overlapping keys with the SSTable. With the help of FileMetaData data, we can quickly find all the data to be compressed.

Too many queries means that when a client calls a Get method multiple times, if the Get method finds a key in an SSTable at one level, then it does a lookup of the SSTable at the next level that contains that key. In other words, the search takes more time due to the coverage of keys of different levels. After the creation of each SSTable, allowed_OPENS has 100 times. When allowed_OPENS less than 0, it will trigger the file to merge with higher levels, so as to reduce the search times of future queries.

The DoCompactionWork method in LevelDB merges all incoming sstables using merge sort, resulting in a new SSTable at a higher level (Level2 in the figure).

In this way, the binary search for data in the SSTable and the time of reading files can be reduced when the value between 17 and 40 is queried next time, improving the read and write performance.

Storage of db state VersionSet

All states in LevelDB are actually stored by a VersionSet structure, which contains a set of Version structures. All versions, including historical versions, are connected by a bidirectional linked list, but only one Version is the current Version.

When the SSTable in LevelDB changes, it generates a VersionEdit structure and finally executes the LogAndApply method:

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
  Version* v = new Version(this);
  Builder builder(this, current_);
  builder.Apply(edit);
  builder.SaveTo(v);

  std::string new_manifest_file;
  new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
  env_->NewWritableFile(new_manifest_file, &descriptor_file_);

  std::string record;
  edit->EncodeTo(&record);
  descriptor_log_->AddRecord(record);
  descriptor_file_->Sync();

  SetCurrentFile(env_, dbname_, manifest_file_number_);
  AppendVersion(v);

  return Status::OK();
}
Copy the code

The main job of this method is to create a new Version object using the current Version and VersionEdit, append the Version changes to the MANIFEST log, and change the global current Version information in the database.

The MANIFEST file records the tables at all levels of LevelDB, the Key ranges of each SSTable, and other important metadata. It is stored as a log to which all operations to add or delete files are appended.

The format of the SSTable

LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB: LevelDB We can see that sstAbles store data in this format:

When LevelDB reads an LDB file that exists in SSTable, LevelDB reads the Footer information in the file first.

The entire Footer takes up 48 bytes in the file, where we can get the positions of the MetaIndex and Index blocks, and then use the indexes to find the corresponding values.

The TableBuilder::Rep structure contains all the information needed to create a file, including data blocks, index blocks, and so on:

struct TableBuilder::Rep { WritableFile* file; uint64_t offset; BlockBuilder data_block; BlockBuilder index_block; std::string last_key; int64_t num_entries; bool closed; FilterBlockBuilder* filter_block; . }Copy the code

At this point, we are done parsing the entire data reading process; For read operations, we can understand for LevelDB in multi-level cache “inside it in turn to find whether there is a corresponding key, if there is a will return directly, the only different from the cache is probably, in the data, after the” hit “it will not move data closer, but will move the data to further reduce the access time to the next, It sounds crazy, but if you think about it, it is.

summary

In this article, we looked at the LevelDB source code for reading and writing operations, including the level of data stored in LevelDB format, multi-level sstables, how to merge, and version management information, but due to space constraints. Some of these problems are not introduced and analyzed in detail, such as error recovery and caching. But reading LevelDB source code deepens our understanding of the distributed KV storage database described in Bigtable paper.

LevelDB’s source code is very easy to read and an excellent resource for learning C++. If you have any questions about this article, you can leave a comment in Disqus below the blog.

Reference

  • Bigtable: A Distributed Storage System for Structured Data
  • LevelDB
  • The Chubby lock service for loosely-coupled distributed systems
  • LevelDB, Impl
  • The SSTable leveldb