The authors introduce

Zhang Yonglun, Software development engineer, Apache ShardingSphere (Incubating) PPMC. In February 2018, I joined the ShardingSphere project and experienced the whole process of incubation at Apache. I am good at Sharding-proxy, SQL-Parser, APM and performance testing.

The content is divided into four parts. First, introduction to Sharding-Proxy: Proxy positioning in ShardingSphere, Proxy architecture and features are introduced. The second part, SQL life: Here, we understand the internal operation process of Sharding-Proxy from the perspective of a simple query SQL. The third part, the core principles: it will introduce several principles that are not difficult to understand but very important to Sharding-proxy. Finally, performance optimization: For the application of Sharding-Proxy, its availability and performance are very important indicators, so the common problems that appeared before are summarized here. It also describes how performance assurance is currently implemented.

I. Introduction to Sharding-Proxy

1. Apache ShardingSphere

Let’s take a look at the positioning of ShardingSphere. It is an ecosystem composed of distributed database middleware. The reason why it is an ecosystem is that the design of its whole function is a closed-loop structure. In addition, it will provide users with a variety of access methods to facilitate users to face different access requirements in production. Now you can see the overall architecture of ShardingSphere, whose functions are composed of five chunks. They are data sharding, distributed transaction and database governance. These three pieces have been delivered online to users. The governance interface was recently developed by the community and donated to the Apache Foundation. Multimodal connectivity is still in the planning and development process. The bottom is its access end, we provide you with three products, namely sharding-JDBC, Sharding-Proxy and Sharding-Sidecar. Each access terminal has all the above functions. The three products meet the requirements of users in different production scenarios.

2. Sharding-proxy architecture

Let’s get down to business. Sharding-proxy is positioned as a transparent database Proxy, which encapsulates database binary protocol and is used to support heterogeneous languages. Currently compatible with MySQL and PG, can be accessed using any client that is compatible with MySQL or PG protocols, such as MySQL command line, MySQL Workbench, Navicat, etc., more DBA-friendly.

The whole architecture can be divided into front-end, back-end and core components of three parts. The front-end is responsible for network communication with the client, using the CLIENT/server framework based on NIO, using NIO model under Windows and Mac operating systems, Linux system automatically adapted to Epoll model. Complete the coding and decoding of MySQL protocol during communication. After the Core component gets the decoded MySQL command, it starts to call Sharding-core to parse, route, rewrite, merge results and other Core functions of SQL. Backend interaction with the real database currently relies on Hikari connection pooling.

The life of SQL

1. Business scenarios

As you can see, the functions of the whole ShardingSphere are very many, which cannot be explained in a short time. So let’s start with data sharding, which is a core, basic, and very important function point. Sharding and I think you’re familiar with it, but the basic idea is to split a library into multiple libraries, and a table into multiple tables. To achieve the purpose of horizontal expansion. In this scenario, there is a database with only one table, called T_ORDER. Because of its large amount of data, it has been split in the underlying storage. The sharding strategy is split into two libraries with two tables per library. Sharding-proxy helps users to mask real library tables, and users only need to use SQL against logical library tables.

2. Decode MySQL protocol

SELECT * FROM t_order where user_id = 10 and order_id = 1;Copy the code

Let’s take a look at where this SQL came from and what it really looks like. When a client connected to Proxy executes an SQL packet, Wireshark can be used to capture the packet on the network. As you can see, the MySQL protocol is carried on top of TCP. The transmission direction is from port 32883 to port 3307, which is the default Proxy port. The MySQL protocol also follows TLV principles like most protocols:

  • TYPE: indicates the command TYPE — Query

  • LENGTH: indicates the message LENGTH, which is 58

  • VALUE: is the ASCII code of this SQL

Once the Proxy decodes the logical SQL, it immediately sends it to the parsing module for processing.

3, SQL parsing

What we see now is the abstract syntax tree generated after the SQL is parsed. This syntax tree is automatically generated by Antlr.

The parsing process is divided into lexical parsing and grammatical parsing. A lexical parser is used to break SQL into non-separable atomic symbols (such as SELECT, FROM, t_order, and *, =,10), and then the syntax parser converts the SQL into an abstract syntax tree. Once you have this syntax tree, by walking through it, you can extract the context needed for sharding and mark locations that might need rewriting. For example, the values of user_id and order_id are retrieved, and they are the shard keys that determine the result of the route. The position of the table T_order must be recorded so that it can be found during rewriting.

4, routing,

Under the current simple sharding rule, the real library is calculated as user_id % 2 => 10% 2 and routed to the DS_0 library. Similarly, the real table order_id % 2 => 1%2 is routed to T_order_1. Of course, routing is more than that. The routing engine supports a variety of sharding strategies, including modulus, hash, range, label, time, and so on. It also supports a variety of sharding interfaces, including row expressions, built-in rules, and custom classes.

5, rewrite

Why rewrite it? The above SQL for logical libraries and logical tables cannot be executed directly in a real database. The purpose of SQL rewriting is to rewrite logical SQL into real SQL that can be executed correctly in a real library. We already know about real libraries and real tables, so let’s just rewrite SQL like this. Not all SQL rewriting is so simple, such as how to rewrite aggregate functions, how to rewrite SQL containing limits, and when to complement columns, these are all things that rewriting engines need to handle.

6, execute,

Now that route rewriting is complete, you can fetch the connection from the connection pool and execute the SQL. The entire execution process is accomplished through ShardingSphere’s execution engine. The execution engine automatically decides whether to use streaming result sets based on the number of routing nodes and the maximum number of connections allowed in a single query library. Streaming results minimize memory stress. This real SQL is finally sent by the Proxy to the MySQL server. After a series of caching, parsing, and optimization in the MySQL server, the resulting data is fetched from the storage engine and returned to the Proxy.

7, merge

After receiving result data, the Proxy needs to merge the data. To illustrate merging, let’s switch to SQL. This SQL contains only one shard key, user_id, which will be routed to the DS_0 library. Since order_ID is not specified, it is routed to all of the real tables T_ORDER_0 and T_ORDER_1. Two real tables each have one data that meets the condition, so merge the two data back to the client. The merge engine is very powerful, and our example is the simplest traversal merge. There are many other merging scenarios: for example, sort merging: when there is an order BY in SQL, you need to consider the least expensive sort. Another example is group merging: in SQL where both ORDER by and group by exist, you need to consider how to optimize memory usage.

8. MySQL protocol encoding



After obtaining the final merge result, the Proxy encodes the result into the MySQL protocol and sends it to the client. The packet contains descriptive information for each column, such as database name, table name, character set, data type, and so on.


Then there is the result data visible to the user. After receiving the protocol package, the MySQL command line client prints the result data on the terminal. If you are using a JDBC client, JDBC will store all results in a ResultSet. The above is a brief process of Sharding-proxy.

Iii. Core principles

IO & Thread model

The core principle part first introduces IO model and thread model. It’s easy to understand that the top two are the front ends and the bottom two are the back ends. A Boss Group is equivalent to a Reactor in the Reactor schema. Worker Group is equivalent to the Worker in the schema. The User Executor Group is used to execute MySQL commands. Sharding Execute Engine is used to access databases concurrently. As you can probably guess from the Boss Group and Worker Group, the front-end uses Netty. So the front-end uses IO multiplexing to process client requests. The backend uses a Hikari connection pool to synchronize the request database. This is special if XA transactions are enabled. Since the Resources of the Atomiks transaction manager are ThreadLocal, all SQL for a transaction must be executed in the same thread. A separate thread is created and cached when executing MySQL commands.

2. Streaming merge

Here are two early development issues that aren’t difficult to implement, but are worth exploring. This is a scenario I tested on my own computer, using five JDBC clients to connect to the Proxy, and each client queries 150,000 pieces of data. As you can see, the Proxy memory keeps growing, even though the GC can’t reclaim it. This is because the GET () method of the JDBC ResultSet blocks until all 150,000 pieces of data have been received. In simple terms, JDBC does not allow users to read data until the full result is received. This is the default method for retrieving data from a ResultSet. This causes the Proxy to cache large amounts of temporary data. So, is there a way for a ResultSet to be consumed as soon as it receives the results?

Two solutions can be found in the Connector/J documentation:


One is a streaming ResultSet: once the FetchSize of the Statement is set to this value, JDBC uses a streaming ResultSet to process the return result, allowing the user to consume the data as it is received, rather than the entire data.

Another approach is a vernior-based streaming result set: set the JDBC parameter useCursorFetch=true to use the cursor. Set FetchSize to indicate the number of rows of data returned each time.

This method is inefficient and should be used with caution. Packet capture shows that every time the client reads Data, it sends the Request Fetch Data to the server, which costs a lot of time on the network.

Proxy uses the first solution, and you can see that the memory has returned to normal. Streaming result set is a prerequisite for streaming merge, which also meets SQL conditions and join number conditions. It is complicated and will not be described in detail here. This memory usage effect occurs when the client consumes data from the Proxy at a rate greater than or equal to the speed of the Proxy consuming data from MySQL.

3, current limiting


What happens if the client is slow to consume for some reason, or doesn’t consume at all? And it turns out that memory usage has skyrocketed, even more dramatically than that graph. Let’s look at why this happens. There are several main caches added here, SO_RCVBUF and SO_SNDBUF, which are TCP caches.

ChannelOutboundBuffer is the Netty write cache. In the process of sending back the result data, if the Client blocks, its SO_RCVBUF will be filled with data instantly, triggering TCP’s sliding window to inform the Proxy not to send data again. At the same time, data is backlogged to the Proxy, so SO_SNDBUF on the Proxy is also filled up. When Proxy SO_SNDBUF is full, Netty’s ChannelOutboundBuffer will swallow all data from MySQL like a bottomless pit, because by default the ChannelOutboundBuffer is unbounded. The Proxy SO_RCVBUF is always empty because Netty is consuming. As a result, MySQL can send data continuously, while Netty keeps storing data to the ChannelOutboundBuffer until it runs out of memory.

After finding the root cause, what we need to do is to prevent the Proxy from receiving MySQL data when the Client blocks. Netty uses the water level parameter to control the write buffer. When the size of the buffer exceeds the high water level, we control Netty to stop writing to the buffer. When the size of the buffer falls below the low water level, we allow writes to the buffer. When the ChannelOutboundBuffer is full, the Proxy SO_RCVBUF is also full, triggering the TCP sliding window to notify MySQL to stop sending data. In this case, the memory consumed by the Proxy is only the size of the ChannelOutboundBuffer high watermark. We have achieved our goal.

4. Distributed link tracking system


This is a really cool interface from Skywalking. He monitors sharding-Proxy and executes a complete chain of SQL calls without any intrusion into Proxy code. How did he do it? Hooks are probably the first thing that comes to mind when it comes to not intruding into code. Yes, Skywalking is an automatic probe implemented using Instrument Agent. At present, dozens of probes have been supported, covering many mainstream applications. If You deploy SkyWalking on a real business system, you’ll see a richer chain of calls. So that’s the core principles section.

Fourth, performance optimization

1, code,

I have summarized the common performance problems here, and those who want to submit code can pay attention to avoid the same problems.

The first is code class problems. First example:

So if you look at this function, any questions? What are the possible consequences of joining a LinkedList? LinkedList is a LinkedList, and if you use get, the time complexity is O(n). N times, the total time is order n squared. I’ll just go for each and I’ll just go for each.

This has been tested before by students in the community. There are 50000 elements and the execution time difference is more than 1000 times.

Second example:

If Properties is used as a global variable for the entire system, getProperty() is called concurrently in the system. Properties is a subclass of Hashtable, where reads and writes are synchronous. So concurrent reads are processed serially. Use other data to hold global variables, such as ConcurrentHashMap.

2. Additional unexpected SQL

The second category is additional unexpected SQL. First example:

Recently there was a PR that called this INTERFACE of JDBC to extract the user name every time SQL was executed. JDBC uses the SQL select User () to query the user name in the database. Unknowingly, you thought you executed one SQL when you actually executed two. The optimization is to cache UserName. Performance can be improved by 32%.

Second example:

Another unexpected SQL problem relates to the streaming ResultSet we used earlier.

This is a screenshot I took from the Release Notes of JDBC 5.1.0. JDBC sets a network timeout each time a streaming ResultSet is created. There are a large number of SET net_write_TIMEOUT requests on the network.


The solution is to manually set this parameter to 0. After testing, the performance is improved by 33%. It is important to note that additional SQL has a significant impact on performance.

IO & system call

The third class involves comparing low-level IO and system calls.

In Netty, data is sent in two steps: write() and Flush (). Write() writes data to the netty cache, and Flush () sends data out. The previous implementation was to flush the result data immediately to the client every time it received a result data from MySQL. This is inefficient. Norman writes In Netty In Action about minimizing the number of flush() calls because system calls are very expensive.

In fact, not only is the system call expensive, but the network utilization is low. We can calculate that each time TCP sends data, the various headers add up to 54 bytes (MAC header: 14 bytes + IP header: 20 bytes + TCP header: 20 bytes), which is probably more data than you actually transmit. Flush frequently, and the number of valid data transmitted per unit of time is definitely smaller. The optimization is to call Flush () once for multiple results for a 50% performance improvement.

4. All routes

The last category is the total routing problem. (This picture is the internal link monitoring tool SGM of JINGdong Data Department, which is a screenshot of the scene of jingdong Baitiao connection)

SQL that was expected to be routed to a single node was routed to n nodes. It is usually caused by parsing problems. The response time increases exponentially. You can use Java performance monitoring tools, such as JMC and JProfiler, to locate these problems. Or APM tools SkyWalking, SGM, etc. Use them to track routing-related functions and check that they are called correctly.

5, test,

In order to find performance problems in real time, we run performance tests on each merged PR to locate problems in the smallest range.


In this build, for example, performance is significantly degraded, and Github’s commit record can be found by looking at the build sequence.

Unlike performance testing, stress testing focuses on real business, with long hours of stress testing each day to ensure the stability of the code that day.

At present, pressure tests are done for about 10 hours a day, and problems such as memory leaks can be detected. This page will be posted on the official website later. After you mention PR, you can see the effect the next day.

Sharding-proxy has become mature after more than a year of practice and has gradually been deployed to the production environment. Welcome interested partners to try it out, you can add any questions during the use of wechat group discussion. If you have any ideas, opinions and suggestions, please leave a comment and communicate with us. You are also welcome to join the Open source project of ShardingSphere.