Function declaration:

class LEVELDB_EXPORT DB { public: DB() = default; // Copy constructor DB(const DB&) = delete; DB& operator=(const DB&) = delete; // Destructor virtual ~DB(); // Write virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& Value) = 0; Virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; }Copy the code

Struct && class:

Let’s look at the write operation first. It involves a structure: WriterOptions and a class: Slice

struct LEVELDB_EXPORT WriteOptions { WriteOptions() = default; Bool sync = bool sync = bool sync = bool sync =false;
};
}
Copy the code
Class LEVELDB_EXPORT Slice {private: // const char* data_; // String length size_t size_; }Copy the code

The methods of the Slice class are simple and will not be covered. The function is to avoid the overhead of string manipulation

Upper-level method calls:

Status DBImpl::Put(const WriteOptions& O, const Slice& key, const Slice& val) {// Call DB::PutreturnDB::Put(o, key, val); } Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); // This is actually a batch operation that calls the Write methodreturn Write(opt, &batch);
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates);
Copy the code

At this point, summarize the LevelDB Put method implementation, which will eventually call DB::Write(). So we analyze the DB::Write() method.

Write method analysis:

  1. Constructing the Writer object
Writerw (&mutex_); writerw (&mutex_); w.batch = updates; w.sync = options.sync; // marks the end of the writefalse;
Copy the code
  1. Lock mutex_. MutexLock constructor: mutex_.lock (); Destructor: mutex_.unlock (). Since it is an element in the stack, the destructor is automatically called when the stack ends, which is unlocked.
  MutexLock l(&mutex_);
Copy the code
  1. Add the object constructed in step 1 to Writers
// Writers is a two-way queue writers_.push_back(&w);Copy the code
  1. Blocking queue
// Block the wait queuewhile(! w.done && &w ! = writers_.front()) { w.cv.Wait(); } // If the writer currently passed in is woken up by another thread and has finished executing, the writer is returned directlyif (w.done) {
    return w.status;
  }
Copy the code

As you can see from steps 3 and 4, the Put operation is a single-threaded operation. Through mutex lock.

  1. Ensure that memtable memory is sufficient
Status Status = MakeRoomForWrite(updates == nullptr);Copy the code
  1. Get the version number
Uint64_t last_sequence = versions_->LastSequence();Copy the code
  1. Create a WriteBatch object
WriteBatch* write_Batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer); / / the same batch write operations, set up the same version number WriteBatchInternal: : SetSequence (write_batch, last_sequence + 1); / / the next version number = the current version number + the article written by several last_sequence + = WriteBatchInternal: : Count (write_batch);Copy the code
  1. Write it to the op log
// Since the lock is a global variable, it is necessary to Unlock mutex_.unlock (); // Write to oplogThe buffer status = log_ - > AddRecord (WriteBatchInternal: : Contents (write_batch)); Bool sync_error = indicates whether sync is incorrectfalse; // If the buffer is successfully written, and sync =true, the file is flushed to diskif(status.ok() && options.sync) {// Flush to disk, fast due to sequential write status = logfile_-> sync (); // If no write succeeds, sync_error is set totrue
      if(! status.ok()) { sync_error =true; }}if(status.ok()) {// If op is writtenlogSuccess, the data written to the memtable, behind the method to explain the status = WriteBatchInternal: : InsertInto (write_batch mem_); } // mutex_.lock ();if(sync_error) {// Record error logs RecordBackgroundError(status); }Copy the code
  1. Set the new version number for versions
// Update version versions_->SetLastSequence(last_sequence);Copy the code
  1. Process the writer that enters the queue before processing the current writer
// This handles the case where the thread is invoked by another threadwhile (trueWriter* ready = writer_.front (); // let the queue head element out of the queue writers_.pop_front();if(ready ! // Write the same result as the current writer readystatus = status; / / set todone
      ready->done = true; Ready -> CV.Signal(); } // If it is currently written, it exits the loop, and the following elements need to go through the above flowif (ready == last_writer) break;
  }
Copy the code
  1. Call up the next writer to enter the double-endian queue
  if(! Writers_.empty ()) {// Get the queue header to call writers_.front()-> cv.signal (); }Copy the code
  1. Returns the result
  return status;
Copy the code

Methods to be analyzed

  1. MakeRoomForWrite(bool force)
  2. BuildBatchGroup(Writer** writer)
  3. WriteBatchInternal::Contents(write_batch)
  4. AddRecord(Slice& slice)
  5. WriteBatchInternal::InsertInto(write_batch, mem_)
  6. RecordBackgroundError(status)