• The vertical table

    Put some table fields into one table and some into other tables. Classification of table fields by frequency of use.

    For example: in the query of the list of goods, only part of the fields are displayed in the list. At the same time, the list query is more frequent than the query of details, so it is not necessary to display all the fields. We can take out the list information separately as a table.

    1. To avoid I/O contention and reduce the probability of table locking, users who view details are not affected by browsing product information
    2. Give full play to the operation efficiency of popular data, the operation efficiency of commodity information will not be dragged down by the low efficiency of commodity description.

    If a table contains a large amount of data, you can split the table by field and separate popular and unpopular fields into different tables. These libraries can be stored on different storage devices to avoid I/O contention. The performance gains from vertical sharding are focused on efficient manipulation of hot data and reduced disk contention.

    • Why are large fields less efficient
      1. Large amounts of data take longer to read.
      2. If large data is stored on multiple pages, the unit of disk storage is one page. The I/O count is increased.
      3. The database is loaded into memory in rowbit units, and there is also a memory buffer, so the probability of hitting the buffer is low when the data is larger.
  • Vertical depots

    After reaching a certain extent, the vertical table in the library only solves the problem of large amount of data in a single table, but does not distribute the table to different servers, so each table still competes with the same physical machine CPU, memory, network IO, disk.

    The improvements it brings are:

    1. Solve the business level coupling, business clarity
    2. Hierarchical management, maintenance, monitoring, and expansion of data of different services
    3. In high concurrency scenarios, vertical database separation improves I/O and database connections to a certain extent, and reduces the bottleneck of hardware resources on a single machine

    By classifying tables according to business and distributing them in different databases, vertical branch database can be deployed on different servers to share the pressure of multiple servers, but it still does not solve the problem of large amount of data in a single table.

  • The level of depots

    We have been through the vertical table, vertical depots solve the problem of the performance of the part, but as the growth of the business, the single table and the library performance worse and worse, at this time we will consider half a library data, this is called level depots (because why call level depots, points out the structure of the library is the same.) . For example, if the id is singular, we put it in library 1, and if the ID is even, we put it in library 2.

  • The level of table

    Horizontal database can solve the problem of too much single table data, but too many libraries will also increase the pressure on operation and maintenance.

Horizontal segmentation is just to split the data without changing the structure

At the stage of system design, we should determine the scheme of vertical branch library and vertical branch table according to the tightness of business coupling. In the case that the amount of data and access pressure is not particularly large, we should first consider the scheme of cache, read and write separation and index technology. If the amount of data is very large and continues to grow, then consider horizontal sub-database and horizontal sub-table

Problems brought by separate database and separate table

  • Transaction consistency problem

    Because the data is distributed in different libraries and even different servers, the distributed transaction problem is inevitable.

  • Associated query across nodes

    Before there is no branch database, we can use the following SQL to carry out associated query on store information when retrieving goods:

    SELECT p.*, R. [geographical area name], S. [store name], S. [reputation]FROM[product information] pLEFT JOIN[Geographical area] RONP. [source]=R. [Geographic area coding]LEFT JOIN[Store information] SON p.id =S.[store] WHERE... ORDER BY... LIMIT...Copy the code

    However, after vertical separation, [commodity information] and [store information] are not in the same database, or even not in the same server, so they cannot be associated query.

    Solution:

    • Global tables: Global tables are tables that all modules in the system may depend on. It’s similar to what we understand as a data dictionary. To avoid cross-library join queries, we can keep a copy of such tables in every other database. At the same time, this kind of data usually changes very little (or hardly at all), so you don’t have to worry too much about “consistency.”

    • ** Field redundancy: ** This is a typical anti-paradigm design that is common in the Internet industry, usually to avoid join queries for performance.

      Take a simple scenario in an e-commerce business:

      The “Order Table” saves the “seller Id” and the “Name” field of the seller is redundant, so that there is no need to query the “Seller user table” when querying the details of the order.

      Field redundancy can bring convenience and is a manifestation of “space for time”. However, its application scenario is also relatively limited, more suitable for the case of less dependent fields. The most complex issue is data consistency, which is difficult to guarantee, either with triggers in the database or at the business code level. Of course, it is also necessary to consider the requirements of consistency based on actual business scenarios. Just like the example above, if the seller changes the Name, does it need to be updated synchronously in the order information?

    • ** Data assembly: ** is divided into two queries, the first time to find the associated data ID, the second time to query the associated data according to the ID, and then the obtained data for field assembly.

    • **ER sharding: In ** relational databases, if you can determine the relationship between tables, you can put the data with the associated relationship on the same shard.

  • Cross-node paging, sorting functions

    Problems such as limit paging and order by sorting become more complicated when querying across multiple libraries. The data needs to be sorted and returned in different shard nodes, and then the result sets returned by different shards are summarized and sorted again. For example, the commodity database after horizontal sorting is pagination in reverse order by ID, and the first page is taken:

  • The primary key to avoid heavy

    In a partitioned database and table environment, since the data in the table is stored in different databases at the same time, the auto-growth of the primary key is useless, and the ID generated by a partitioned database cannot be guaranteed to be globally unique. Therefore, global primary keys need to be designed separately to avoid cross-repository primary key duplication.

advantages disadvantages Applicable scenario
Use the UUID algorithm to generate unique ids Without any dependence The ID is too long and not a number type Generate seesion_id
The unique ID is generated by primary key auto-increment of single database Convenient access, monotonically increasing Low generation efficiency, strong database dependence, ids are continuous This method is applicable to services with low concurrency.
Multi-database primary key auto-increment generates unique ID Convenient access, monotonically increasing, higher generation efficiency than single database It is not easy to expand capacity, relies heavily on the database, and ids are continuous Schema generation IDS suitable for sub-libraries and sub-tables
Generate unique IDS for database segmentation High efficiency Strongly database dependent, ids are contiguous This mode is suitable for services that have a high number of concurrent ids and whose ids are continuous without compromising information security.
Generate a unique ID based on the Snowflake algorithm High efficiency, can run without relying on other components Uneven ID distribution may cause data skew for some services Suitable for services with a high number of concurrent ID generation

Sharding – JDBC is introduced

It uses the client directly connected to the database, in the form of JAR package to provide services, without additional deployment and dependence, can be understood as an enhanced VERSION of THE JDBC driver, fully compatible with JDBC and various ORM frameworks. The core functions of Sharding-JDBC are data Sharding and read/write separation. Through Sharding-JDBC, applications can transparently use JDBC to access multiple data sources that have been separated by database, table and read/write, without caring about the number of data sources and how data is distributed. Works with any Java-based ORM framework, such as Hibernate, Mybatis, Spring JDBC Template or directly using JDBC.

Sharding – JDBC process

Through log analysis, what sharing-JDBC does after getting the SQL that the user wants to execute:

  1. Parse the SQL to get the slice key values, such as order_id
  2. Sharding-jdbc allocates t_order_$->{order_id % 2 + 1} to insert data into t_ORDER_1 if order_id is even and t_order_2 if order_id is odd.
  3. So sharding-JDBC overwrites the SQL statement based on the value of order_id. The overwritten SQ statement is the actual SQL statement to be executed.
  4. Execute the rewritten real SQL statement
  5. All the results of actually executing the SQL are aggregated and merged, and returned.

Sharding-JDBC execution principle

To understand how Sharding-JDBC works, you need to understand the following concepts:

  • Logical table

    A general term for a horizontally split data table. Example: The order data table is split into 10 tables, t_ORDER_0, T_ORDER_1, and T_ORDER_9, based on the mantisda of the primary key. Their logical table name is T_ORDER.

  • Truth table

    A physical table that actually exists in a sharded database. T_order_0 to T_order_9 in the previous example.

  • Data nodes

    The smallest physical unit of a data fragment. It consists of a data source name and a table, for example, ds_0.t_ORDER_0.

  • The binding table

    It refers to the primary table and sub-table whose sharding rules are consistent. For example, if the t_ORDER and T_ORDER_item tables are sharded according to order_ID and the partitioning keys between the bound tables are identical, the two tables are bound to each other. Cartesian product association does not appear in multi-table associated query between bound tables, which greatly improves the efficiency of associated query. For example, if SQL is:

    SELECT i.* FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id WHERE o.order_id in (10.11);
    Copy the code

    Assuming that the shard key order_id routes the value 10 to slice 0 and the value 11 to slice 1 without the binding table relationship configured, then the SQL after the route should be 4, which are rendered as cartesian product:

    SELECT i.* FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    SELECT i.* FROM t_order_0 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    SELECT i.* FROM t_order_1 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    SELECT i.* FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    Copy the code

    After the binding table relationship is configured, the route should have two SQL entries:

    SELECT i.* FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    SELECT i.* FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id WHERE o.order_id in(10.11);
    Copy the code
  • The broadcast table

    A table that exists in all shard data sources and has exactly the same structure and data in it in each database. This method is applicable to scenarios where a small amount of data needs to be associated with massive data tables, for example, dictionary tables.

  • Shard key

    Database fields used for sharding are key fields for horizontal splitting of databases (tables). For example, if the mantissa of the order primary key in the order table is modulo sharded, the order primary key is a sharding field. If there is no fragment field in SQL, full routing is performed, resulting in poor performance. In addition to the support for single sharding fields, ShardingJdbc also supports sharding by multiple fields.

  • Subdivision algorithm

    Contains the sharding key and the sharding algorithm, which are independently extracted due to the independence of the sharding algorithm. The real sharding operation is sharding key + sharding algorithm, also known as sharding strategy. The built-in sharding strategy can be roughly divided into mantissa module, hash, range, label, time, etc. The sharding policy configured by the user side is more flexible. It is commonly used to configure the sharding policy with a Groovy expression, such as: T_user_ $->{u_id % 8} indicates that the T_user table is divided into 8 tables according to u_id mode 8. The table names are T_user_0 to t_user_7.

  • Auto-add primary key generation policy

    By generating auto-increment primary key on the client and replacing it with the native auto-increment primary key of the database, the distributed primary key has no duplication.

SQL parsing

When Sharing-JDBC receives an SQL statement, it performs SQL parsing => Query optimization => SQL routing => SQL rewriting => SQL execution => Result merging, and finally returns the execution result.

SQL parsing is divided into lexical parsing and syntax parsing. A lexical parser is used to disassemble SQL into non-divisible atomic symbols called tokens. It classifies them into keywords, expressions, literals, and operators based on the dictionaries provided by different database dialects. The SYNTAX parser is then used to convert the SQL into an abstract syntax tree. For example, the following SQL:

SELECT id, name FROM t_user WHERE status = 'ACTIVE' AND age > 18
Copy the code

After parsing, the abstract syntax tree is shown in the following figure:

Iterate through the abstract syntax tree to extract the context needed for sharding, and mark places where SQL rewriting is possible (described below). The parsing context for Sharding includes Select Items, Table, Sharding Condition, Auto Increment Primary Key, and Order By, Group By, and paging information (Limit, Rownum, Top).

SQL routing

SQL routing is the process of mapping data operations on logical tables to operations on data nodes.

  • Shard routing
    • The direct route
    • Standard routing
    • Cartesian routing
  • Broadcast routing
    • Full library table routing
    • All library routing
    • Full instance routing
    • Unicast routing
    • Blocking the route

Sharding policies for databases and tables are matched based on the parsing context, and routing paths are generated.

SQL with shard keys can be divided into single-slice routes (the shard key operator is equal), multi-slice routes (the shard key operator is IN), and range routes (the shard key operator is BETWEEN) based on the shard key operators. SQL without shard keys uses broadcast routes.

Standard routing

Standard routing is the sharding method most recommended by ShardingSphere, and it is applicable to SQL that does not contain associated query or only contains associated query between bound tables. When the sharding operator is equal, the routing result will fall into a single library (table). When the sharding operator is BETWEEN or IN, the routing result will not necessarily fall into a unique library (table). Therefore, a logical SQL may eventually be split into multiple real SQL for execution. For example, if the data is sharded according to the odd and even numbers of order_id, the SQL for a single table query is as follows:

SELECT * FROM t_order WHERE order_id IN (1.2);
Copy the code

Then the result of routing should be:

SELECT * FROM t_order_0 WHERE order_id IN (1.2);
SELECT * FROM t_order_1 WHERE order_id IN (1.2);
Copy the code

An associated query for a bound table has the same complexity and performance as a single table query. For example, if an associated query containing a bound table has the following SQL:

SELECT * FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
Copy the code

Then the result of routing should be:

SELECT * FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
SELECT * FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
Copy the code

As you can see, the number of SQL splits is consistent with a single table.

Cartesian routing

Cartesian routing is the most complex case because it cannot locate sharding rules according to the relationship between the bound tables, so associative queries between unbound tables need to be disassembled and executed as cartesian product combinations. If the SQL in the previous example was not configured with binding table relationships, the result of routing would be:

SELECT * FROM t_order_0 o JOIN t_order_item_0 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
SELECT * FROM t_order_0 o JOIN t_order_item_1 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
SELECT * FROM t_order_1 o JOIN t_order_item_0 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
SELECT * FROM t_order_1 o JOIN t_order_item_1 i ON o.order_id=i.order_id  WHERE order_id IN (1.2);
Copy the code

The Cartesian route query performance is low, so use it with caution. Full-database table routing For SQL that does not carry fragment keys, broadcast routes are adopted. According to the SQL type, it can be divided into five types: full database table route, full database route, full instance route, unicast route, and block route. Among them, full-library table routing is used to deal with the operation of all real tables related to its logical tables in the database, including DQL(data query) and DML (data manipulation) without sharding keys, as well as DDL (data definition), etc. Such as:

SELECT * FROM t_order WHERE good_prority IN (1, 10);
Copy the code

All tables in all databases will be traversed, matching logical tables and real table names one by one. After routing, become

SELECT * FROM t_order_0 WHERE good_prority IN (1, 10);
SELECT * FROM t_order_1 WHERE good_prority IN (1, 10);
SELECT * FROM t_order_2 WHERE good_prority IN (1, 10);
SELECT * FROM t_order_3 WHERE good_prority IN (1, 10);
Copy the code

Rewrite the SQL

The SQL written by engineers for logical tables cannot be directly executed in real databases. SQL rewriting is used to rewrite logical SQL into SQL that can be executed correctly in real databases. As a simple example, if the logical SQL is:

SELECT order_id FROM t_order WHERE order_id=1;
Copy the code

Assuming that the SQL is configured with the shard key order_id and order_id=1, it will route to shard table 1. Then the rewritten SQL should be:

SELECT order_id FROM t_order_1 WHERE order_id=1;
Copy the code

Fill the column

For another example, Sharding-JDBC needs to fetch the data when the results are merged, but the data is not returned through the SQL of the query. This situation mainly applies to GROUP BY and ORDER BY. When the result is merged, GROUP and sort the result according to the field items of GROUP BY and ORDER BY. However, if the selection item of the original SQL does not contain the GROUP item or sort item, the original SQL needs to be overwritten. Let’s take a look at the scenario in the original SQL with the information needed for result merging:

SELECT order_id, user_id FROM t_order ORDER BY user_id;
Copy the code

Since user_id is used for sorting, the data of user_id needs to be retrieved in the result merge, and the above SQL can obtain user_id data, so there is no need to complement. If the selection does not contain the columns required to merge the results, you need to complete the columns, as in the following SQL:

SELECT order_id FROM t_order ORDER BY user_id;
Copy the code

Since the original SQL does not contain the user_id that you need to get in the result merge, you need to complement the SQL. SQL ();

SELECT order_id, user_id AS ORDER_BY_DERIVED_0 FROM t_order ORDER BY user_id;
Copy the code

Another case of complement is the use of AVG aggregate functions. In distributed scenarios, using avg1 + AVG2 + AVg3/3 to calculate the average is incorrect and should be rewritten as (sum1 + SUM2 + sum3)/(count1 + count2 + count3). This requires rewriting the SQL containing AVG to SUM and COUNT, and recalculating the average when the results are merged. For example, SQL:

SELECT AVG(price) FROM t_order WHERE user_id=1;
Copy the code

Need to rewrite as:

SELECT COUNT(price) AS AVG_DERIVED_COUNT_0, SUM(price) AS AVG_DERIVED_SUM_0 FROM t_order WHERE user_id=1;
Copy the code

Then you can calculate the average correctly by merging the results.

Paging correction

Fetching paging data from multiple databases is different from a single database scenario. Assume that every 10 data is a page, and take the data on page 2. It is not correct to get LIMIT 10, 10 in a sharding environment, merge and then fetch the first 10 according to the sorting criteria. For example, if SQL is:

SELECT score FROM t_score ORDER BY score DESC LIMIT 1.2;
Copy the code

The following figure shows the result of paging execution without rewriting SQL.

As shown in the figure, if you want to obtain the common score sorted items 2 and 3 in both tables, it should be 95 and 90. Since the SQL being executed can only fetch the second and third data from each table, that is, 90 and 80 from the T_SCOre_0 table; We get 85 and 75 from the T_SCORE_1 table. Therefore, the result merge can only be performed from the obtained 90,80,85 and 75, so no matter how the result merge is implemented, it is impossible to obtain the correct result.

The correct thing to do is to change the paging condition to LIMIT 0, 3, extract all the first two pages of data, and then compute the correct data with the sorting condition. The following figure shows the result of paging execution after SQL rewriting.

The farther back you get the offset, the less efficient it becomes to use LIMIT paging. There are many ways to avoid paging with LIMIT. Such as building a secondary index of the number of row records and row offsets, or pagination that uses the end ID of the last page data as a condition for the next query.

If you use placeholders to write SQL when paging information is corrected, you only need to rewrite the parameter list, not the SQL itself.

SQL execution

  • Connection mode
    • Memory limited mode
    • Connection restriction mode
  • Automatic execution engine
    • Preparation stage
    • Execution phase

Memory limited mode

The premise of using this pattern is that ShardingSphere does not limit the number of database connections consumed by an operation. If the actual SQL execution needs to operate on 200 tables in a database instance, a new database connection is created for each table and processed concurrently through multiple threads to maximize the execution efficiency. In addition, when SQL meets the conditions, streaming merge is preferred to prevent memory overflow or frequent garbage collection.

Connection restriction mode

The premise for using this pattern is that ShardingSphere strictly controls the number of database connections consumed for an operation. If the SQL actually executed requires operations on 200 tables in a database instance, only a unique database connection is created and its 200 tables are processed sequentially. If shards in an operation are scattered across different databases, multi-threading is still used to handle operations on different libraries, but only one unique database connection is still created for each operation of each library. This prevents the problem of consuming too many database connections for one request. This pattern always selects memory merge.

The memory limited mode is suitable for OLAP operations and can improve system throughput by relaxing restrictions on database connections. The connection restriction mode applies to OLTP operations, which usually have shard keys and are routed to a single shard. Therefore, it is a wise choice to strictly control database connections to ensure that database resources in an online system can be used by more applications.

Automatic execution engine

Switching between the two modes is left to static initial configuration, which lacks flexibility. In actual application scenarios, the routing result varies with SQL and placeholder parameters. This means that some operations may need to use in-memory merge, while others may prefer streaming merge. The specific method should not be configured by the user before ShardingSphere is launched, but the connection mode should be determined dynamically based on the SQL and placeholder parameter scenarios. The user does not need to know what the so-called memory limit mode and connection limit mode are, but the execution engine automatically selects the best execution scheme according to the current scenario.

The automated execution engine refines the selection of connection modes to the granularity of each SQL operation. For each SQL request, the automated execution engine performs real-time calculations and trade-offs based on its routing results, and autonomously executes the appropriate join pattern to achieve the optimal balance of resource control and efficiency.

  • Preparation stage

    This stage is used to prepare the data for execution. It consists of two steps: grouping result sets and creating execution units.

    1. Group the SQL routing results by the name of the data source.
    2. By following the formula below, you can obtain each database instance inmaxConnectionSizePerQueryThe SQL routing result group for each connection needs to be executed, and the optimal connection mode for this request is calculated.

Within the scope of the maxConnectionSizePerQuery allowed, when a connection needs to execute the request quantity is greater than 1, means that the current database connection cannot hold the corresponding data result sets, you must use memory merge; On the other hand, when the number of requests a connection needs to perform is equal to 1, which means that the current database connection can hold the corresponding data result set, streaming merge can be used.

Each connection mode selection is specific to each physical database. If you route to more than one database in the same query, the connection pattern of each database may not be the same, and they may be mixed.

When data sources use techniques such as database connection pooling to control the number of database connections, there is a chance that a deadlock will occur if concurrency is not handled properly while retrieving database connections. When multiple requests are waiting for each other to release database connection resources, starvation waits occur, causing cross-locking deadlock problems.

Suppose a query requires obtaining two database connections at one data source and routing to two sub-table queries of the same database. It is possible that query A has obtained one database connection from this data source and is waiting to obtain another database connection. Query B has also acquired a database connection at the data source and is also waiting for another database connection to be acquired. If the maximum number of connections allowed in the database connection pool is 2, then the two query requests will wait forever. The following diagram depicts a deadlock situation.

ShardingSphere synchronizes database connections to avoid deadlocks. When it creates the execution unit, it atomically obtains all the database connections required by the SQL request at one time, eliminating the possibility of obtaining partial resources in each query request. Because the operation on the database is very frequent, locking every time a database connection is acquired will reduce the concurrency of ShardingSphere. Therefore, ShardingSphere has made two optimizations here: 1. Avoid locking operations that require only one database connection at a time. Because only one connection needs to be acquired at a time, the scenario of two requests waiting for each other does not occur, requiring no locking. For most OLTP operations, shard keys are used to route to unique data nodes, which makes the system completely unlocked and further improves the concurrency efficiency. In addition to routing to a single shard, read/write separation also falls within this category. 2. Lock resources only in memory limited mode. In connection limited mode, all query result sets free up database connection resources after loading into memory, so there is no deadlock wait problem.Copy the code
  • Execution phase

    This stage is used to actually execute the SQL and is divided into two steps: group execution and merge result set generation.

    Group execution delivers groups of execution units generated in preparation for execution to the underlying concurrent execution engine and sends events for each critical step in the execution process. For example, execute the start event, execute the success event, and execute the failure event. The execution engine only cares about sending events; it doesn’t care about subscribers to events. Other modules of ShardingSphere, such as distributed transactions and call link tracing, will subscribe to the events of interest and process them accordingly.

    ShardingSphere generates in-memory merging result sets or streaming merging result sets through the join patterns acquired in the preparation phase of execution and passes them to the result merging engine for further work.

    The overall structure of the execution engine is divided as shown below.

Results the merge

Combining multiple data result sets obtained from each data node into one result set and returning it to the requesting client correctly is called result merging.

Result merging supported by Sharding-JDBC can be functionally divided into traversal, sort, grouping, paging and aggregation, which are combined rather than mutually exclusive. The overall structure of the merging engine is divided as shown below.