Author: Idle fish technology – Jian Xin

I. Business background

Marketing activities are a very important part of e-commerce operations, which are of great help to user growth and GMV. For e-commerce operators, how to select high-quality goods from the huge commodity database of sellers and push them to buyers in need is a problem to be thought about all the time, and this process should be as fast and real-time as possible. Ensuring speed and real-time improves user experience and engagement for both buyers and sellers.

Two, real-time selection

To solve the problems mentioned above, Xianyu developed the Mach system. Mach is a real-time high performance product selection system, which solves the problem of selecting high quality products and putting them into the market through rules in hundreds of millions of products. With the Mach system, the operation students of Xianyu can create screening rules on the Mach system, for example, the product title contains “Peppa Pig”, the category is “toy”, the price does not exceed 100 yuan, and the product status is not sold. After the establishment of rules in operation, Mach system will carry out two operations at the same time. The first step is to select qualified goods from the stock data for marking; The second step is to calculate the rules for the real-time change of goods and synchronize the results of the rules in real time.

The biggest characteristic of The Mach system is fast and real-time, reflected in the hit scale of 100W rules can be completed within 10 minutes; The synchronization time of rule matching results caused by product changes is 1 second. The operation can quickly screen goods and put them into users through the Mach system. The flow of idle fish can also be accurately cast to the qualified goods and maximize the flow.

So how does the Mach system solve this typical e-commerce problem? What is the relationship between the Mach system and the flow calculation? This is the part to be explained in detail below.

Third, flow calculation

Stream computing is a continuous, low-latency, event-triggered data processing model. Streaming computing model uses real-time data integration tools to transfer real-time changes of data to streaming data storage. At this time, data transmission becomes real-time, and a large amount of data accumulated over a long period of time is amortized to each time point and continuously transmitted in small batches in real time. Stream computing encapsulates the computing logic as resident computing service. Once it is started, it is always in the state of waiting for events to trigger. When there is data inflow, it will trigger the calculation and get results quickly. When the flow calculation results can be immediately output data, there is no need to wait for the overall data calculation results.



Blink is the stream computing framework used by Xianyu Real-time selection system. Blink is an enterprise-class stream computing framework customized and developed by Alibaba based on Flink, the open source stream computing framework. It can be considered as an enhanced version of Flink and is now open source. Flink is a high throughput, low latency computing engine with many advanced features. For example, it provides stateful calculation, supports state management, supports strong consistent data semantics, supports Event Time,WaterMark processing of message out of order and other features, which provides strong support for ultra-low delay selection of free fish real-time selection system.

3.1、Blink之State

State refers to the intermediate calculation result or metadata attribute of the compute node during the flow calculation. For example, if the aggregation process is aggregation, the intermediate aggregation result is recorded in the State. For example, if Apache Kafka is the data source, the offset of the read record is also recorded. This State data is persisted (inserted or updated) during the calculation. Therefore, State in Blink is a time-dependent snapshot of the internal data (calculated data and metadata attributes) of the Blink task. The Mach system will store all the data and rule result data after the commodity merge in State. When the commodity changes, The Mach system will merge the commodity change information with the commodity information saved by State, and run all the rules with the merged information as input. Finally, Diff the rule operation result with the rule operation result saved by State to get the final effective operation result. Therefore, Blink’s State feature is the key feature that Mach system depends on.

3.2. Blink Window

The Window feature of Blink refers to the unique data grouping method of stream computing system. The creation of a Window is data-driven, that is, the Window is created when the first element belonging to the Window arrives. Deletes the window and state data when the window ends. Blink includes two types of Windows, namely Tumble Window and Hop Window.

The rolling window has a fixed size, and a data calculation is performed at the end of each window. That is to say, the rolling window task will perform a data calculation every time it passes a fixed cycle, such as a total calculation every minute.



A slide window is similar to a scroll window in that it has a fixed size. Unlike a scroll window, a slide window can control the frequency of new slide Windows through the slide parameter. Therefore, when the slide value is smaller than the value of the window size, multiple sliding Windows will overlap and the data will be allocated to multiple Windows, as shown in the figure below:



The Window feature of Blink has many application scenarios in data calculation and statistics. Mach system mainly uses the Window computing system to process real-time data speed and delay, which is used for data statistics and alarm monitoring.

3.3、Blink之UDX

UDX is a user-defined function in Blink that can be called in a task to implement some custom logic. Blink UDX includes three types:

  • Udf-user-defined Scalar Function UDF is the simplest self-defined Scalar Function. The input is any field of a row of data, and the output is a field, which can realize data comparison, data conversion and other operations.
  • Udtf-user-defined table-valued Function A UDTF is a table-valued Function that returns N(N>=0)Row data for each input (single or multiple columns). The Blink framework provides a small number of UDTFs, for example: STRING_SPLIT,JSON_TUPLE, and GENERATE_SERIES3 Built-in UDTFs.
  • Udaf-user-defined Aggregate Function A UDAF is an Aggregate Function. The input is multiple lines of data and the output is one field. The BUILT-IN UDAF of Blink framework includes MAX,MIN,AVG,SUM,COUNT, etc., which basically meets 80% of common collection scenarios, but there are still a certain proportion of complex business scenarios, which need to customize their own aggregation functions.

Mach system uses a large number of UDX for logic customization, including message parsing, data processing, etc. The core processes of The Mach system, such as commodity data merging, rule running and result Diff, are realized through UDAF.

Four, second grade selection scheme

Product selection system after the project has also designed a number of sets of technical solutions. After several rounds of discussion, the final decision was made to verify the implementation of the two programs and determine the final implementation program.

The first set of solutions is based on PostgreSQL. PostgreSQL can easily define functions for data merge operations, and define execution rule logic on PostgreSQL trigger. Postgresql-based technology is more complex to implement, but it can meet the functional requirements. However, performance tests show that PostgreSQL performs well with small amounts of data (millions of levels); If there are too many triggers, complex trigger logic, or hundreds of millions of levels of data, PostgreSQL performance deteriorates and cannot meet the performance specifications of second-level options. So the PostgresQl-based solution was rejected (and still used in the idle fish pond scenario).

The second scheme is based on Blink flow calculation scheme. Through verification, it is found that Blink SQL is suitable for expressing data processing logic and Blink performance is very good. After comprehensive comparison, the Blink flow calculation scheme is finally selected as the practical implementation of the technical scheme.

In order to cooperate with the flow calculation scheme, the Mach system is designed and decoupled to seamlessly connect with Blink computing engine. Among them, the data processing module is the core functional module of The Mach system, which is responsible for accessing all kinds of data related to commodities, verifying data, merging data, executing rules, processing execution results and output, etc. Therefore, the processing speed and delay of the data processing module can represent the data processing speed and delay of the Mach system to a large extent. Next, let’s look at how the data processing module can be combined with Blink depth to reduce the data processing latency to seconds.



The data processing module structure is shown in the figure above, including data access layer, data combination layer, rule operation layer and rule operation result processing layer. Each layer is individually designed for the flow computing processing mode.

4.1. Data Access Layer



The data access layer is the precursor of the data processing module and is responsible for connecting various types of business data through multiple channels. The main logic is as follows:

  • The data access layer connects multiple channels and types of business data.
  • Parse business data and perform simple verification;
  • Collect and monitor the business data of all channels, including total volume and year-on-year change;
  • Obtain field-level Metadata configuration from the Metadata center. A MetaData center is a component that stores and manages MetaData configuration information for all fields. Metadata configuration refers to field Metadata configuration, including basic information such as field value type, value range, and value format.
  • Field level data verification is performed according to Metadata configuration.
  • Assemble data according to the standard data paradigm defined by Mach.

This design considerations for a variety of business data are, such as commodities information commodities including database table records, offline data produced by the change of MQ message and algorithm, if directly by Blink docking these business data, you need to create multiple Blink docking tasks to different types of business data, this approach is too heavy. Moreover, the data access logic is tightly coupled with Blink, which is not flexible enough.

The data access layer can solve the above problems well. The data access layer can flexibly access various business data, decouple the data access from Blink, and finally send messages through the same Topic. As long as the Blink task monitored the corresponding Topic, it could continuously receive business data flow and trigger the following data processing process.

4.2. Data merge Layer



Data merge is an important step in data processing process. The main function of data merge is to combine the latest commodity information with the commodity information stored in memory for subsequent rule operation. The main logic of data consolidation is:

  • Listen to the specified message queue Topic to get the business data message;
  • Parses the message and reassembles the data from the message content into fields in the format{key:[timestamp, value]}, key is the name of the field, value is the value of the field, timestamp is the timestamp generated by the field data;
  • The assembled data and the historical data stored in memory are merged at the field level according to TIMESTAMP. The merging algorithm is to compare the size of timestamp and take the latest field value. The specific logic is shown in the following figure.

Data consolidation has several premises:

  1. Memory can hold the amount of data; This is a feature provided by Blink. Blink can store the amount of data generated during task running in memory and take it out of memory for further processing in the next run.
  2. The merged data can represent the latest status of the product; This requires a clever design: the product information has many fields, each of which is an array of values, recording not only the actual value, but also the timestamp of the current value change. When commodity information is merged, it is merged according to the field. The merging rule is to take the maximum value of the timestamp as the criterion.

Of goods kept in a memory, for example, the information of ID = 1 is {” desc “: [1,” describe 1] “, “price” : [4, 100.5]}, the data flow information of commodity ID = 1 is {” desc “: / 2,” 2 “, “price” : [3, 99.5]}, {“desc”: [2, “description 2”], “price”: [4, 100.5]}, each value is the latest, representing the latest information of the product.

When the commodity information changes, the latest data will flow into the data access layer, and the data will be merged into the memory through the data merge layer. All the latest data of the commodity are stored in the Blink memory.

4.3 Rule operation layer



The rule operation layer is the core module of the data processing process. The matching results of each rule can be obtained through rule operation, and the logic is as follows:

  • The rule execution layer accepts the input as the data after the data merge.
  • Obtain the field-level Metadata configuration from the Metadata center;
  • Parse data according to field Metadata configuration;
  • Get a list of valid rules from the rule center, which is the component that creates and manages the rule life cycle;
  • Cyclic rule list, run single rule, the rule hit results saved in memory;
  • Record the data of exceptions thrown by running rules and monitor alarms.

Rules here refer to business rules created by operations, such as an item whose price is greater than 50 and whose status is online. The input of the rule is the commodity data after data merging, and the output is true or false, that is, whether the rule condition is hit or not. Rules represent the business launching scene. The business value of Mach system is to judge whether the previously unhit rules or the previously hit rules are hit as soon as possible after the commodity changes, and reflect the hit and miss results into the launching scene as soon as possible.

Rule operation requires the use of Blink powerful computing power to ensure fast execution. Mach system currently has nearly 300 rules, and the number is still growing rapidly. This means that hundreds of rules will be run on Blink after each commodity changes. There are hundreds of millions of commodities changed in Xianyu every day, and the amount of calculation required behind this is very amazing.

4.4. Operation result processing layer

The reader may be surprised to see that after running the rule, it is possible to print the result directly to the cast scene, without running the result processing layer. In fact, the running result processing layer is the most important part of the data processing module.

Because in real world scenarios, changes to goods hit only a few rules in most cases, and the hit results rarely change. That is, goods hit results for many rules are meaningless, and if they are also printed, they will only increase the operational TPS and do nothing for the actual results. It is the function of the run-result processing layer to filter out valid run-result. The logic of the running result processing layer is as follows:

  • Get the rule running results of commodity data;
  • Parse the result of the run according to whether the rule is hit or not;
  • Diff the run result with the historical run result in memory. The diff function is to exclude the same hit items in the old and new results, as shown in the following figure.

The running result processing layer uses Blink memory to save the running result of the rule after the last change of the commodity, and compares the running result of the rule after the current change with the result in memory to calculate the effective running result. For example, after the last change of commodity A, the rule hit result was {“rule1”:true, “rule2”:true, “rule3”:false, “rule4”:false}, After the current rule change, the matching result is {“rule1”:true, “rule2”:false, “rule3”:false, “rule4”:true}. Because the hitting result of commodity A on Rule1 and Rule3 does not change after the change, the actual effective hitting result is {“rule2”:false, “rule4”:true}. The minimum set of valid results is output after the running result processing layer, which can greatly reduce the output of invalid results. Improve the overall performance and efficiency of data processing.

4.5. Analysis of difficulties

Although xianyu real-time product selection system has been pre-studied and demonstrated at the beginning of the project, due to the use of many new technical frameworks and flow computing ideas, there are some difficulties in the development process, including design and function realization, many of which are typical problems of design flow computing system. One of the issues we discuss with our readers is the conversion of rules and formulas.

4.5.1 Conversion of rules and formulas

The business scenario of this problem is as follows: the operation student saves the rules after screening the commodity fields on the Page of Mach system. The server side is the existing old system, and the logic is to generate a section of SQL according to the rules. The WHERE conditions of SQL are the same as the operation screening conditions. SQL has two functions. On the one hand, as an offline rule, SQL is executed in the offline database to filter the offline commodity data conforming to the rule. On the other hand, it is converted into online rules, which are executed on real-time commodity change data in the Blink task to judge whether it is hit or not. Since real-time rule execution uses the MVEL expression engine and MVEL expressions are Java grammar-like, the problem is to convert the SQL of offline rules into Java expressions of online rules with consistent logic and performance and efficiency. The solution to the problem is clear. After parsing the SQL, convert SQL operators to Java operators and convert SQL specific syntax to Java syntax, such as A like ‘%test%’ to A.contains(‘test’). The difficulty of this problem is how to parse the SQL and translate the parsed semantics into Java statements. After investigation, a simple and elegant solution is given. The main steps are as follows:

  • SQL statements are parsed using the Druid framework into a binary tree and the WHERE condition subtree is extracted separately.
  • The where conditional subtree is traversed by post-order traversal algorithm.

    • Replace the SQL operator with the corresponding Java operator; Currently, the following operations are supported: and, or, equal to, not equal to, greater than, less than, less than or equal to, like, not like, and in.
    • Convert SQL syntax format to Java syntax; Change in the grammar to Java or syntax, for example, in A (‘ hello ‘, ‘world’) into (A = = ‘hello’) | | (A = = ‘world’).

The actual running result is as follows:



The code logic is as follows (mainly binary tree subsequent traversal and operator conversion, which will not be explained in detail) :

Five, the conclusion

Since its launch, Mach system has supported nearly 400 events and launch scenarios, and processed nearly 140 million messages per day, with a peak TPS of 50,000. Mach system has become an important support for free fish selection delivery.

This paper mainly describes the specific design scheme of data processing in Mach system, and explains the whole design of the context. Although the xianyu real-time selection system is aimed at commodity selection, the input and output of the data processing flow computing technology solution is MQ message, which is not bound to specific business, so the data processing flow computing technology solution is not only suitable for commodity selection, but also for other similar real-time screening business scenarios. I hope our technical solutions and design ideas can bring you some ideas and thinking, and welcome to discuss with us, thank you.

The resources

  • Idle fish choose product system: real-time mp.weixin.qq.com/s/8ROsZniYD…
  • Blink:github.com/apache/flin…
  • PostgreSQL:www.postgresql.org/
  • Druid:github.com/alibaba/dru…