TiDB is an open source distributed NewSQL database implemented by PingCAP company based on Google Spanner/F1 paper.

TiDB has the following NewSQL core features:

  • SQL support (TiDB is MySQL compatible)
  • Horizontal linear elastic extension
  • Distributed transaction
  • Strong data consistency across data centers
  • High availability of fault recovery

TiDB is designed to be 100% OLTP scenarios and 80% OLAP scenarios, and more sophisticated OLAP analysis can be done through the TiSpark project.

  • OLTP: Online transaction processing, also known as transaction-oriented processing
  • OLAP: Online analytical processing, a software technology that enables analysts to quickly, consistently, and interactively view information from all aspects to achieve a deep understanding of the data (dynamic reporting)

TiDB is not invasive to business, and it can elegantly replace traditional Sharding schemes such as database middleware and database sub-database sub-table. At the same time, it also makes delivery and maintenance personnel do not need to pay attention to the details of the database Scale, focusing on business development, greatly improving the productivity of research and development.

Overall architecture of TiDB

The TiDB cluster is divided into three components:

TiDB Server

TiDB Server is responsible for receiving SQL requests, processing SQL-related logic, and finding TiKV address for storing data required for calculation through PD, interacting with TiKV to obtain data, and finally returning results. TiDB Server is stateless. It does not store data, but only calculates data. It can be expanded infinitely and provides unified access addresses externally through load balancing components (such as LVS, HAProxy, or F5).

PD Server

Placement Driver (PD for short) is the management module of the whole cluster. Its main work is as follows: first, it is the meta-information of the storage cluster (which TiKV node is a Key stored in); Second, TiKV cluster scheduling and load balancing (such as data migration, Raft group leader migration, etc.); Third, allocate globally unique and increasing transaction ids.

PD is a cluster, and an odd number of nodes need to be deployed. It is recommended that at least three nodes be deployed online.

TiKV Server

TiKV Server is responsible for storing data. Externally, TiKV is a distributed key-value storage engine that provides transactions. The basic unit of data storage is Region. Each Region stores data of a Key Range (from StartKey to EndKey). Each TiKV node is responsible for multiple regions. TiKV uses Raft protocol for replication to ensure data consistency and disaster recovery. Replicas are managed by Region. Multiple regions on different nodes form a Raft Group and are replicas of each other. Data load balancing among multiple TiKV is scheduled by PD, which is also based on Region.

That store

TiDB is a project to replace MySQL, so the most basic data storage function must be included. Let’s know how TiKV of TiDB stores data.

Key-Value

The choice of TiKV is key-value model, and provides an ordered traversal method. To put it simply, TiKV can be viewed as a huge Map in which keys and values are raw Byte arrays. In this Map, keys are arranged in the order of comparing the total raw binary bits of the Byte array. Two things to remember about TiKV here:

  • This is a huge Map, which stores key-value pairs
  • The key-value pairs in this Map are ordered according to the binary order of keys, that is, we can Seek to the position of a certain Key, and then continuously call the Next method to obtain the key-value pairs larger than the Key in increasing order

One might ask, how does this storage model relate to tables in SQL?

The storage model has nothing to do with the Table in SQL. For now let’s forget any concepts in SQL and focus on implementing a large (distributed) Map like TiKV with high performance and reliability.

RocksDB

Instead of writing data directly to disk, TiKV stores the data in RocksDB, where RocksDB is responsible for landing the data. RocksDB is an excellent open source stand-alone storage engine that meets the requirements of stand-alone engines and is being continuously optimized by the Facebook team. RocksDB is simply considered to be a stand-alone key-value Map.

Data copy scheme Raft

Raft is a consistency algorithm that is equivalent to Paxos but much easier to understand. Raft’s paper, if you’re interested, read it. This is only a brief introduction to Raft, please refer to the paper for details. In addition, Raft paper is only a basic solution, which will not perform well if implemented strictly according to the paper. TiDB has made a lot of optimizations for the implementation of Raft protocol. For details, please refer to the article “TiKV Source Code Analysis series – Optimization of Raft”.

Raft is a conformance protocol that provides several important features:

  1. Leader election
  2. Members of the change
  3. Log copy

TiKV uses Raft to replicate data. Every data change is landed as a Raft log. Data is safely and reliably synchronized to most nodes of the Group through the log replication function of Raft.

So far, we have learned that TiDB can quickly store data on disk with a stand-alone RocksDB; With Raft, we can copy data to multiple machines in case a single machine fails. Data is written through the Raft layer interface rather than directly to RocksDB. By implementing Raft, we now have a distributed KV and no longer have to worry about a machine crashing.

Region

Another very important concept is mentioned here: Region. This concept is the basis for understanding the following set of mechanisms; please read this section carefully. As mentioned earlier, we view TiKV as a large ordered KV Map, so in order to achieve horizontal scaling of storage, we need to spread the data across multiple machines. The data spread across multiple machines mentioned here and data replication for Raft are not the same concept, let’s forget about Raft in this section and assume there is only one copy of all the data so it is easier to understand.

For a KV system, there are two typical schemes to disperse data among multiple machines: one is to Hash according to Key and select corresponding storage nodes according to Hash value; The other is a Range. A certain Range of consecutive keys is stored on a storage node. TiKV chooses the second method. The entire key-value space is divided into many segments, and each segment is a series of consecutive keys. We call each segment a Region, and we try to keep the data stored in each Region within a certain size (this size can be configured, The current default is 96mb). Each Region can be described from StartKey to EndKey.

After dividing the data into regions, TiKV will do two important things:

  • In the unit of Region, distribute data on all nodes in the cluster and ensure that each node serves the same number of regions
  • Implement Raft replication and member management by Region

Data is divided into multiple regions based on keys. Data in each Region is stored on only one node. PD Server is responsible for distributing regions to all nodes in the cluster as evenly as possible. In this way, on the one hand, the storage capacity is horizontally expanded (after new nodes are added, regions on other nodes are automatically dispatched), and on the other hand, load balancing is achieved (there is not a lot of data on a node. There is no data on other nodes). To ensure that upper-layer clients can access the required data, the PD Server records the distribution of regions on nodes. That is, you can use any Key to query the Region in which the Key is located and the node on which the Region is located.

TiKV replicates data in the unit of Region. That is, data in a Region will have multiple copies. We call each copy a Replica. Data consistency is maintained among replicas through Raft. Multiple replicas in a Region are stored on different nodes to form a Raft Group. One Replica serves as the Leader of the Group and the other replicas serve as followers. All reads and writes are performed by the Leader, who copies them to the followers.

MVCC

Many databases implement multi-version control (MVCC), and TiKV is no exception. The MVCC implementation of TiKV is implemented by adding Version after the Key. Briefly, with MVCC, the Key arrangement of TiKV looks like this:

Key1-Version3 -> Value

Key1-Version2 -> Value

Key1-Version1 -> Value

Said to calculate

Mapping of relational model to key-value model

As mentioned above, the data storage of TiDB is in KV mode, so how to map to our table structure and meet the needs of various queries?

TiDB assigns a TableID to each table, an IndexID to each index, and a RowID to each row. TableID is unique in the whole cluster, IndexID/RowID is unique in the table, and these ids are int64 types.

Each row of data is encoded as a key-value pair according to the following rules:

Key: tablePrefix{tableID}_recordPrefixSep{rowID}
Value: [col1, col2, col3, col4]
Copy the code

One of the Key tablePrefix/recordPrefixSep are specific string constants, is used to distinguish other data in KV space.

For Index data, key-value pairs are encoded as follows:

Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue
Value: rowID
Copy the code

Index data also needs to consider both Unique Index and non-unique Index. For Unique Index, the above coding rules can be followed. For non-unique indexes, however, it is not possible to construct a Unique Key with this encoding, because the tablePrefix{tableID}_indexPrefixSep{indexID} for the same Index is the same. ColumnsValue may be the same for multiple rows of data, so we changed the encoding of the non-unique Index a bit:

Key: tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue_rowID
Value: null
Copy the code

This creates a unique Key for each row in the index. Note that each xxPrefix in the Key of the above encoding rule is a string constant, which is used to distinguish namespaces and avoid conflicts between different types of data. The definitions are as follows:

var(
    tablePrefix     = []byte{'t'}
    recordPrefixSep = []byte("_r")
    indexPrefixSep  = []byte("_i")
)
Copy the code

In addition, all rows in a Table and all data in an Index have the same prefix, regardless of the Key encoding scheme for Row or Index. In this way, the data of the same specific prefix are arranged together in the Key space of TiKV. At the same time, Row or Index data can be stored in TiKV in an orderly manner as long as TiDB’s encoding scheme for the suffix part ensures that the comparison relation between before and after encoding remains unchanged.

Meta information management

Each Database/Table is assigned a unique ID that acts as a unique identifier, and when encoded as key-value, this ID is encoded in the Key with an M_ prefix. This constructs a Key, and the Value stores serialized meta information.

SQL operations

After understanding the SQL-to-KV mapping scheme, we can understand how relational data is stored, and then we need to understand how this data can be used to satisfy the user’s query requirements, that is, how a query statement operates on the underlying stored data. The simplest solution that can be thought of is to map the SQL query to the KV query through the mapping scheme described in the previous section, then get the corresponding data through the KV interface, and finally perform various calculations. Select count(*) from user where name=”TiDB”; For such a statement, we need to read all the data in the table, then check if the Name field is TiDB, and if so, return the row.

This solution will definitely Work, but not very well, for obvious reasons:

  1. When scanning data, every line must be read out from TiKV operation, and there is at least one RPC overhead. If a lot of data needs to be scanned, the overhead will be very large
  2. Not all rows are useful, and if you don’t, you don’t have to read them
  3. It doesn’t really mean anything that the value of the row that fits the requirement, it’s really just how many rows of data are there

So how to avoid the above pitfalls, what TiDB does is to keep the computation as close to the storage node as possible to avoid a lot of RPC calls. Secondly, Filter is also pushed down to the storage node for calculation, so that only valid rows need to be returned to avoid meaningless network transmission. Finally, the aggregation function and GroupBy are also pushed down to the storage node for pre-aggregation. Each node only needs to return a Count value, and then tidb-server sums the Count value.

Talk about scheduling

In the storage section, we learned that data is replicated and stored in regions. Each Region has multiple replicas, which are distributed on different TiKV nodes. The Leader is responsible for reading/writing. The followers are responsible for synchronizing the RAFT logs sent by the Leader. So how do we manage these regions and TiKV nodes to achieve the desired high availability?

Information collection

Scheduling depends on the collection of information of the whole cluster. In short, we need to know the status of each TiKV node and each Region. TiKV cluster will report two types of information to PD:

Each TiKV node will regularly report the overall information of the node to PD

Heartbeat packets exist between TiKV node (Store) and PD. On the one hand, PD detects whether each Store is alive and whether there are new stores through heartbeat packets. On the other hand, the heartbeat packet also carries the Store state information, including:

  • Total disk capacity
  • Available disk capacity
  • Number of regions carried
  • Data write speed
  • Number of Snapshots sent/received (Replicas may synchronize data through snapshots)
  • Whether the overload
  • Tag information (a Tag is a hierarchical series of tags)

The Leader of each Raft Group reports to the PD on a regular basis

There is a heartbeat packet between the Leader and PD of each Raft Group to report the state of the Region. This includes the following information:

  • The position of the Leader
  • The location of the Followers
  • Number of offline replicas
  • The speed at which data is written/read

PD continuously collects information about the entire cluster through these two types of heartbeat messages and uses this information as the basis for decision making. In addition, PD can receive additional information through the management interface to make more accurate decisions. For example, when the heartbeat packet of a Store is interrupted, PD cannot determine whether the node is temporarily or permanently disabled, and can only wait for a period of time (30 minutes by default). If there is no heartbeat packet, it considers that the Store is offline. Then it decides to dispatch all regions on the Store. However, sometimes the o&M personnel take the initiative to take a machine offline. In this case, PD can be notified through the management interface of PD that the Store is unavailable, so that PD can immediately determine that it needs to remove all regions on the Store.

Scheduling strategy

After collecting such information, PD also needs some policies to make specific scheduling plans.

The number of replicas in a Region is correct

If the PD finds that the number of replicas in the Region does not meet the requirements through the heartbeat packet of the Region Leader, Add/Remove replicas to adjust the number. The possible reasons for this are:

  • A node is offline and all data on it is lost. As a result, the number of replicas in some regions is insufficient
  • A offline node recovers services and is automatically connected to the cluster. In this way, the number of replicas in the Replica Region that has been replenished before is too much. A Replica needs to be deleted
  • The administrator modifies the replica policy and the Max – Replicas configuration

Multiple replicas in a Raft Group are not in the same location

Under normal circumstances, PD only ensures that multiple replicas do not belong to one node, so as to avoid loss of multiple replicas caused by the failure of one node. In a real deployment, the following requirements may also arise:

  • Multiple nodes are deployed on the same physical machine
  • TiKV nodes are distributed on multiple racks. It is hoped that the system availability can be guaranteed even when a single rack is powered down
  • TiKV nodes are distributed in multiple IDCs. The system is expected to be available even when a single equipment room is powered off

In essence, all these requirements are that a node has common location attributes and constitutes a minimum fault tolerant unit. We hope that there will not be multiple replicas of a Region in this unit. In this case, you can configure lables and location-labels on PD to specify which Lables are location labels. During Replica allocation, try to ensure that no multiple replicas in a Region have the same location identification.

Distribution of copies is evenly distributed between stores

As mentioned earlier, there is a fixed upper limit to the amount of data that can be stored per replica, so we maintain a balance in the number of replicas on each node to make the overall load more balanced.

The number of leaders is evenly distributed among stores

Raft protocol reads and writes through the Leader, so most of the computational load is on the Leader, and PD tries to spread the Leader between nodes as much as possible.

The number of access hotspots is evenly distributed among stores

Each Store and Region Leader reports information about the current access load, such as the Key read/write speed. PD detects access hotspots and spreads them out between nodes.

Each Store occupies roughly the same storage space

When each Store is started, a Capacity parameter is specified to indicate the upper limit of the storage space of the Store. PD takes the remaining storage space of nodes into account when scheduling.

Control the scheduling speed to avoid affecting online services

Scheduling operations require CPU, memory, disk IO, and network bandwidth, and we need to avoid too much disruption to online services. PD controls the number of ongoing operations. The default speed control is conservative. If you want to speed up the scheduling (for example, if you have stopped the service upgrade and add a new node, you want to speed up the scheduling as soon as possible), you can manually speed up the scheduling through PD-ctl.

You can manually offline a node

After a node is manually offline through pD-ctl, PD schedules data from the node under certain rate control. When scheduling is complete, this node is put to the offline state.

Implementation of scheduling

With this information in mind, let’s take a look at the entire scheduling process.

PD continuously collects heartbeat packets from the Store or the Leader to obtain detailed data of the entire cluster and generates scheduling operation sequences based on the information and scheduling policies. Every time it receives heartbeat packets from the Region Leader, PD checks whether there are operations to be performed on the Region, sends the operations to the Region Leader through heartbeat packets, and monitors the execution result in heartbeat packets. Note That this operation is only a suggestion to the Region Leader. It is not guaranteed that the operation can be executed. The Region Leader determines whether and when the operation will be executed based on its current status.

See PingCAP’s official blog for more information.