Takeaway: Data bus (DBus) focuses on real-time data collection and distribution. IT can aggregate the data generated by IT systems in business processes and convert them into unified JSON data format (UMS), which can be provided to different data users for subscription and consumption. It acts as data source of data warehouse platform, big data analysis platform, real-time report and real-time marketing. In the last article on DBus, we mainly introduced how to deal with table structure changes and the various problems caused by them in the design of DBus. In this paper, from the perspective of data sharding, we specifically introduce what kind of sharding strategy and sharding principle DBus uses in the process of data collection, as well as the problems encountered in the process and solutions.

1. Sharding strategy

For the traditional relational database, DBus can satisfy the users’ data acquisition requirements by providing full data pull and incremental data acquisition. The DBus data extraction process is shown in the figure below (taking mysql as an example) :

The main principle of full data collection is to determine the fragment column according to the primary key, unique index, index and other information. The reason why shard columns are selected based on primary keys, unique indexes, and indexes is that the data in these columns are well indexed in the database and can improve the data scanning efficiency.

According to the selected fragment column, the data is disassembled to determine the upper and lower bounds of each piece of data. Then according to the upper and lower bounds of each piece, the data is pulled with a concurrency of about 6~8. (A concurrency of around 6 to 8 is an empirical value based on extensive testing. Experiments show that the concurrency of about 6~8 will not cause too much pressure on the source database, and can maximize the efficiency of full data pulling.

Schematic diagram of DBus fragmentation strategy:

Schematic diagram of DBus pull strategy:

So, what types of columns does DBus support as shard columns? What about sharding strategies for different types of sharding columns?

In terms of sharding strategy, DBus uses Sqoop’s sharding design for reference and supports the following types of columns as sharding columns:

BigDecimal/numeric

Boolean

Date/time/timestamp

Float/double

Integer/smallint/long

Char/Varchar/Text/NText

The principle of slice disassembly is basically the same. The upper and lower bounds of each slice are calculated and determined according to the maximum and minimum values of the slice column and the set size of each slice. But the implementation details vary greatly. Especially the Text/NText type, we found some problems in the process of reference and application, so we made some adjustments and optimizations.

This article is mainly to share with you encountered pit and our solution.

Two, sharding principle

1. The digit type is fragmented

Let’s first introduce sharding to the simplest, most straightforward example of digital sharding.

As mentioned earlier, we determine the shard column by the priority of the primary key -> unique index -> index. If the table has a primary key, we shard the table with the primary key. If there is no primary key, there is a unique index, and we have a shard column with a unique index… And so on. If the key or index I find is a federated primary key or index, I take the first column of it as the shard column. If no suitable column is found as a shard column, no shard is performed, and all data is pulled in one piece (without the benefit of concurrent pull).

Firstly, a certain column should be selected as the fragment column according to certain rules. Then, the upper and lower bounds of each fragment should be calculated and determined according to the maximum and minimum values of the fragment column and the set size of each slice:

• 1 Get MIN() and MAX() of the sharded field

• “SELECT MIN(” + qualifiedName + “),

• MAX(” + qualifiedName + “) FROM (” + query + “) AS “+ alias

• 2 Different segmentation methods are adopted according to different types of MIN and MAX

• Supports Date, Text, Float, Integer, Boolean, NText, BigDecimal, and more.

• Take numbers as an example:

• Step size = (Max – Min)/Number of Mapper

• The generated interval is

• [minimum, minimum + step size)

• [Minimum + step size, minimum +2* step size)

•…

• [Max – Step size, Max]

• The generated condition is similar:

• splitcol >= min and splitcol < min+splitsize

The implementation code snippet is as follows:

2. String fragment column

This is easy to understand when the shard column type is numeric.

What if the shard column type is a string such as char/ vARCHar? How do I calculate the upper and lower bounds of each piece?

The principle is the same: find out the minimum and maximum value of the column, calculate the cut-off point of each piece according to the size of each piece, and generate the upper and lower bounds of each piece.

The technical details are different: the calculation of the upper and lower boundaries for each slice.

Shard type: int, min: 2, Max: 10, shard size: 3

The Split [2, 5)

The Split [5, 8)

The Split [8, 10]

If the shard column type is VARCHar (128), min is ABC, Max is XYZ, how to calculate the shard point?

Sqoop sharding mechanism is to map “string” to “number”, calculate the upper and lower bounds of shard based on the number, and then map the upper and lower bounds of shard expressed in number back to the string, using the string as the upper and lower bounds of shard. As follows:

  1. String mapping to numeric value (A /65536 + B /65536^2 + C /65536^3)

  2. The value split computes the split points and generates the interpolation

  3. Interpolation maps back to the string

However, in practice, the sharding mechanism mentioned above has run into various problems, and the experiences we have encountered and solved these problems are shared below.

Third, sharding experience

1. First of all, pull data according to the above sharding, there is a stuck situation.

• phenomenon

• No error output, but the full extraction process outputs part of the fragments and freezes, no output

• After 30 seconds of checking, storm worker was restarted for no apparent reason?

• analysis

• Nimbus.task.timeout. secs default time is 30 seconds. Nimbus restarts the worker when it finds that the worker is not responding

• Why is the worker not responding?

• String interpolation is arbitrary, for example:

• splitcol >= ABC and splitcol < ‘FXXX’ xx ‘

• Solutions

• Use binding variables instead of concatenating strings

• Select * from T splitcol >=? And splitcol

2. Illegal mix of collations

• phenomenon

• Display exception: [ERROR] Illegal mix of collations (utf8_general_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE) for operation ‘<‘

• Java. SQL. SQLException: COERCIBLE: Illegal mix of collations (UTf8_general_ci,IMPLICIT) and (UTF8MB4_general_ci, utfcoercible) for operation ‘< ‘

• analysis

• What are Utf8 and UTF8MB4?

• UTF8 is a character set in Mysql that only supports UTF-8 characters up to three bytes long

• Full encoding space of three bytes: 000000~ 00FFFF

• MySQL added the UTF8MB4 encoding after 5.5.3. Mb4 stands for Most Bytes 4, which is designed to be compatible with four-byte Unicode

• Four bytes of new encoding space: 010000~10FFFF

• The string that appears to generate the utF8MB4 code, splitcol and the generated interpolation string, belong to different character sets and cannot be compared. Splitcol belongs to the UTF8 character set and interpolation belongs to the UTF8MB4 character set

• Inspection found:

• character_set_server: utf8mb4

• character_set_database/table: utf8

• Connection url: utf8 = utf8mb4

• Unicode

• Code space: There are 1,114,112 code points numbered from 0x0 to 0x10FFFF

• Code planes: Unicode is divided into 17 Code planes, numbered #0 through #16. 65,536 code points per code plane

• UTF16

• From U+0000 to U+FFFF Basic Multilingual plane (BMP)

• Contains the most commonly used characters

• The actual characters need to be removed from the proxy area, that is, from U+0000 to U+D7FF and U+E000 to U+FFFF.

• UTF8

• Code points from U+D800 to U+DFFF (proxy area)

• Unicode standard U+D800.. The value of U+DFFF does not correspond to any character



• Supplementary Planes (Supplementary Planes) from U+10000 to U+10FFFF

• ENCODED in UTF-16 as a pair of 16-bit codes (32bit, 4Bytes) called a surrogate pair

• The top 6 bits of the first WORD are 110110 and the top 6 bits of the second WORD are 110111. Visible,

• The value range of the first WORD (binary) is 11011000 00000000 to 11011011 11111111, i.e. 0xD800-0xDBFF.

• The value range of the second WORD (binary) is 11011100 00000000 to 11011111 11111111, i.e. 0xDC00-0xDFFf.

• Examples of Emoji characters:

• The corresponding Unicode is \u1F601

• The corresponding UTF16 code is two Words, namely, 0xD83D and 0xDE01. The corresponding Java String length is 2.

Based on the above character set, we find the crux of the problem:

• Interpolation generated by bigDecimalToString() :

• There is no guarantee that it will fall into the U+D800 to U+DFFF proxy zone

• There is no guarantee that two consecutive Words meet the agent pair’s criteria and may be considered garbled

• The proxy interval is small for the entire U+FFFF interval

• Solutions

• Sidestep characters generated in the proxy area and replace them with legitimate BMP characters

• if (0xD800 <= codePoint && codePoint <= 0xDFFF) {

• codePoint = 0 xd3ff;

•}

• Possible disadvantages: Sharding is not as uniform, but since the agent region is small across the U+FFFF range, the impact is minor

Left left left

3. Wrong total.

After the problem of garbled character set is solved, data can be pulled normally, but the total number is incorrect.

• phenomenon

• There is no error, the whole table is only 3 million, but the actual number is 5 million?

• analysis

• The program is not wrong, there is duplicate data

• UTF8_GENERA_CI is case insensitive. Ci is short for Case Insensitive insensitive insensitive

• UTf8_bin stores every character in a string as binary data, case sensitive

SELECT * FROM table WHERE TXT = ‘a’

• Then you won’t find TXT = ‘A’ in UTf8_bin, whereas utf8_general_ci will.

• Solutions

• Queries should be performed using UTf8_bin

SELECT * FROM tableName WHERE binary columnName = ‘a’;

So far, there is good support for shard columns of char and vARCHAR strings.

Source: Yin Hongchun, Creditease Institute of Technology