This paper summarized the topic “Practice of PB Data Ad Hoc Query Based on Flink Forward Asia 2020” shared by Su Jun and Liu Jia, big data engineers from 360 Government and Enterprise Security Group. The content of the article is as follows:

  1. Architecture and Design of Threat Hunting Platform (Soviet Army)
  2. Optimization and exploration for reducing IO (LIU Jia)
  3. The future planning

Begin with a brief personal and team introduction. We are from 360 Government-Enterprise Security Group, and currently we are mainly engaged in the development work of 360 Security Brain’s “Threat Hunting” project. Our team came into contact with Flink early, during which time we developed a number of products based on Flink, and participated in the Flink Forward conference in Berlin in 2017 and 2019. Introduced our “UEBA” and “AutoML” two products respectively.

This sharing is mainly divided into two parts:

  • The first part of “Threat Hunting Platform Architecture and Design” will be shared by the Soviet Army.
  • The second part, “Optimization and Exploration for Reducing IO” will be shared by Liu Jia.

I. Architecture and Design of Threat Hunting Platform (Soviet Army)

The first part is roughly divided into three parts, respectively:

  • Platform evolution
  • Architecture design
  • Explore index structures in depth

1. Platform evolution

We believe that all technology evolution and innovation needs to be driven by specific business issues. Here are a few products that our team has developed based on Flink in recent years:

  • In 2017, we developed a user behavior analysis system, UEBA, based on Flink Datastream. It uses various behavioral data of enterprise IT topology, such as identity authentication data, application system access data, terminal security data, network traffic analysis data and so on. Taking user/asset as the core, the threat behavior is detected in real time, and finally the system of user threat level and portrait is constructed.
  • Based on the implementation experience of UEBA in 2018, we found that security analysts often need a means to obtain the original logs corresponding to security events, so as to further confirm the source and solution of security threats. Therefore, we developed HQL based on Spark to solve the problem of data retrieval in the offline mode. HQL can be considered as a query language with more expressive ability than SQL, which can be roughly regarded as adding algorithm class calculation on the basis of SQL ability.
  • With the use of offline HQL in customers in 2019, we found that it can quickly define security rules and build threat models. If you write statements in offline mode and directly publish them into online tasks, the development cycle will be greatly shortened. In addition, Flink SQL has relatively perfect capabilities. Therefore, based on Flink SQL + CEP, we upgraded the capabilities of HQL and produced the HQL Realtime version.
  • With the increase of customer data volume in 2020, many of them have reached PB level. Previous solutions have resulted in the performance of offline data retrieval far below expectations. Security analysts are accustomed to using fuzzy matching operations such as LIKE and full-text retrieval, resulting in a very large query delay. Since this year, we have focused on optimizing HQL’s offline search capabilities and launched a new Threat Hunting platform.

According to the survey, customers with petabytes of data tend to have the following business needs:

  • The first is a low-cost cloud-native architecture. As we know, most of the current big data architectures are based on Hadoop. Its characteristic is that the data is on the computing node, which can reduce a lot of network overhead and accelerate the computing performance. However, in order to achieve resource balance, the whole cluster often needs the same resource allocation, and in order to store as much data as possible, the scale of the cluster will be large, so this kind of architecture needs to invest a lot of hardware cost in the early stage.

    Separation of memory and computation and elastic computing can solve this problem, because the price of disk is much lower than the memory and CPU, so using cheap disk storage with low CPU and memory to store data, and using a small number of high-equipped machines to do computation, can greatly reduce the cost.

  • The second is low latency query response. When security analysts do threat detection, they spend most of their time on AD hoc query, that is, data retrieval and association are done through filtering and join. In order to get the query result as soon as possible, the corresponding technical scheme is: column save/index/cache.

    • Needless to say, column storage is a common storage scheme in the field of big data.
    • On the basis of column storage, an efficient indexing scheme can greatly reduce IO and improve query performance.
    • The network delay caused by memory analysis can be compensated by distributed cache.
  • The third is the need for rich query capabilities, including single-line Fields/Filter/UDF, multi-line aggregation/Join, and even algorithm class analysis capabilities, which we mainly rely on our own analysis language HQL to provide.

2. Architectural design

First of all, the data came from the historical data already stored in ES and the real-time data in Kafka. The historical data in ES was synchronized by our own synchronization tool, while the real-time data in Kafka was written into an ORC File into the storage cluster through the Streaming File Sink. At the same time of data synchronization, we will update the index information of this batch of data to the database.

Security analyst HQL would write interactive from the front page, through the analysis of language data retrieval request, the request will be into the scheduling system, once started operations, will first analyze the statement parsed into list of operator, operator will determine whether the second query cache algorithm can hit an existing cache data caching system.

  • If the input to the parse statement is an intermediate result that has already been computed and is cached, then read the cache directly to continue the computation.
  • If it doesn’t hit, prove that we have to recalculate from the ORC file.

We will first extract the filtering conditions of the query language or the JOIN conditions to do the predicate push down, enter the index database to get the list of files that currently meet the query, and then hand the list of files to the computing engine for calculation. For the calculation engine, we use the dual-engine mode, in which the complicated statements are completed by the Flink engine, and other relatively simple tasks are handed over to the “hummingbird engine” inside the platform. The “Hummingbird Engine” is based on Apache Arrow for vectorized execution. With LLVM compilation, the query latency is very small.

Due to the separation of memory and computation in the whole system, in order to speed up data reading, we added Aluxio to the computing cluster node to provide data caching service. In this service, not only the data on the Remote Cluster will be cached, but also part of the historical job results will be cached, and the algorithm of operator caching will be used to accelerate the next computing task.

Two more points need to be emphasized here:

  • The first one is that the indexing database returns a list of files that meet this condition, and if the list of files is very large, the current version of Flink, when it’s building the Job Graph, when it’s getting the FileList Statistics logic and when it’s traversal over a large number of files, This will cause the problem of not building the Job Graph for a long time. We have repaired it and will contribute it to the community at a later stage.
  • The second is data caching, which our HQL was previously implemented through Spark. As those of you who have used Spark may know, Spark uses a table to cache or persist. We also used this operator when we migrated to Flink. Flink implements our own system, that is, when the user caches a table, we will register it as a new table source, and then when the user re-reads the table, we will only use this new table source to complete the whole process.

3. Explore index structure in depth

In order to speed up data retrieval, we often create an index for the data in advance, and then locate the initial position of the data through the index before scanning the data, so as to speed up data retrieval. The traditional database is a common row index, through one or several fields to create an index, the index results are stored in a tree structure, such index can be accurate to the row level, the index efficiency is the highest.

Some big data projects also support row indexing, but its drawback is that a large amount of index data will cause a delay in writing and retrieving. However, what our platform deals with is machine data, such as terminal/network data, which is characterized by a very high degree of repetition, but the results of security analysis are often very few, and very few threat behaviors are hidden in massive data, accounting for 1/1000 or less.

Therefore, we choose a more cost-effective block indexing scheme, which has been able to support the current application scenarios. At present, according to the customer data, the index can provide more than 90% clipping rate for 85% of the statements, basically meeting the delay requirements.

Some big data platforms store index data as files on disk, plus some cache mechanisms to speed up data access, whereas we store index data directly in the database. There are two main considerations as follows:

  • The first is Transaction. We know that column files are often not updatable, and we do Merge File operations when we periodically optimize File distribution. To ensure query consistency, we need the database to provide Transaction capability.
  • The second is performance. The database has a strong ability to read and write and retrieve, and can even push the predicate down to the database to complete, and the high compression ratio of the database can further save storage.

Above is the design of the block index. In our index database, we divide these data into different types of data sources. For example, terminal data is classified as first-class data source, and network data is classified as first-class data source. The logic of classifying data sources is whether they have a unified Schema. For a single data source, it takes the date as the Partition. Inside the Partition is a large number of small ORC files. As for the index structure, we will build a min/ Max index for each field, and we will build a Bloom index for the fields with the cardinality less than 0.001.

As mentioned above, security personnel prefer to use like and full text search. For the like part, we also made some optimizations. In the aspect of full-text retrieval, we will do word segmentation for data, construct inverted index, and also do bitmap index for a single item after a single word segmentation at the level of file distribution.

The figure above is a rough scale of index size assuming that the original log in JSON format is 50PB in size, which translates to roughly 1PB in ORC. Our Index data is 508GB, of which 8GB is the Min/Max Index and 500GB is Bloom. Add the bitmaps and inversions mentioned above, and the proportion of indexed data will increase even further. Based on this, we use a distributed indexing scheme.

We know that the log is constantly changing. For some datastators, fields may be added or removed, and even the type of fields may change.

So we adopt the Merge Schema scheme, in the process of incremental file writing, that is, when the index information of this batch of data is updated at the same time to do the Schema Merge operation. As shown in the figure, file 3 is the last to be written in BLOCK123. As files are written, a new Merge Schema is formed. It can be seen that field B and field C are actually historical fields, and field A_V is the historical version of field A. In this way, we try to let customers see more complete data. Finally, a new table source is constructed based on the Input format and Merge Schema developed by myself, so as to get through the whole process.

II. Optimization and exploration for reducing IO (LIU Jia)

Now that you’ve seen why you chose a block index, you’ll see how to use it. The core of a block index can fall on two words: “crop.” Trimming is to filter out irrelevant files before the query statement is actually executed, so as to reduce the amount of data entering the computing engine as much as possible, and throttle the flow from the data source side.

This figure shows the tailoring process of the whole system using IndexDB:

  • The first step is to parse the query statement. Get the relevant filter
 and you could see that in the leftmost SQL statement there were two filtering conditions, with src_address = a certain IP, and in the occur_time > a certain timestamp.
  • The second step is to put the query conditions into the Meta table of the data source corresponding to Index DB to filter the files 
. SRC_ADDRESS is a string type field that is cropped using a combination of the Min/Max and Bloom indexes. In the occur_time was a numerical type field and a time field, and we would look in the min/ Max index first for file clipping. It needs to be emphasized that here we encapsulate the filter written by the user as the query condition of index db, and directly push the filter down to the database to complete.
  • Step 3: After getting the list of files, these files together with the merged schema mentioned above will be constructed into a TableSource and handed to Flink for subsequent calculation.

At the same time, when we built the Source, we made some optimizations in the details. For example, when passing a filter to an ORC reader, remove the filter that has been pushed down to avoid secondary filtering on the engine side. Of course, we don’t clear all the filters here, we keep the like expression, filter pushdown for like will be explained later.

Next, I will focus on the four optimization points:

  • First, when the data is not sorted, the clipping rate has a theoretical upper limit. We improve the clipping rate by sorting the original data with Hilbert curve when the data is written.
  • Second, because of the particularity of the security field, threat detection relies heavily on LIKE syntax, so we enhance the ORC API to support PUSHDOWN OF LIKE syntax.
  • Thirdly, because the usage scenario relies heavily on join, we also optimize the join operation accordingly.
  • Fourthly, our system supports a variety of file systems at the bottom, so we choose Aluxio, a mature cloud native data arrangement system, as the data cache to improve the locality of data access.

1. Theoretical upper limit of clipping rate and Hilbert space filling curve

Cropping can be abstractly reduced to the probability of N balls thrown into M buckets, and here we directly say the conclusion. Assuming that the rows are distributed randomly and evenly in the block, the total number of rows of all blocks is fixed, and the total number of rows hit by the query condition is also fixed, then the block hit ratio is directly positively correlated with the “total number of rows hit/total number of blocks”.

There are two conclusions:

  • The first point is, if the total number of rows hit is equal to the total number of blocks, if the X-axis value is 1, and the hit rate is 2/3, so 2/3 of the blocks contain the hit rows, the clipping rate of the blocks is capped at 1/3. 1/3 is a very low number, but since it assumes that the data is distributed randomly and evenly, we need to sort the original data when the data is written in order to make the data distribution better.
  • Second, assuming that the total number of hit lines is fixed, significantly reducing the number of rows in each block to increase the total number of blocks can also improve the block pruning rate. So we reduced the block size. According to the test results, we set the size of each file as 16M. Reducing file size is simple. For sorting, we introduce Hilbert space filling curve.

Why the Hilbert Curve? Mainly based on two points:

  • The first is, what path do you take to traverse a 2-dimensional space so that the address sequence of the path is roughly ordered for any of the dimensions? Why do I order every column or every subset? Because the system in the process of use, query conditions are not fixed. Five fields may be used for the sort of the data write, but only one or two fields may be used for the query. Hilbert Sorting allows multiple fields to be both globally ordered and locally ordered.
  • In addition, there are a lot of space filling curves, there are Z curve, snake curve and so on, you can look at the right of the two contrast graphs. Intuitively, the less long span jumps of the curve path, the better, and the more stable the position of the point in the iteration process, the better. The Hilbert curve performed the best among the space filling curves.

The Hilbert usage is to implement a UDF, input column values, output coordinate values, and then sort by coordinate values.

We sampled 1,500 SQL statements used in the customer environment and filtered out the relevant statements with a cropped rate of 100%, i.e. invalid statements that did not hit a file. Then there are 1148 pieces left. After sorting the clipping rate with these statements, we compared the clipping rate, and the clipping rate in the 95th percentile increased from 68% to 87%, an increase of 19%. You might not think that 19% is a very high number, but if we take a base number, say 100,000 files, then it looks pretty impressive.

2. Optimization of LIKE on dictionary index

As mentioned before, the particularity of the security industry, when we do threat detection, we will rely heavily on LIKE query. For this reason, we also optimized it.

  • First, we added a LIKE conditional expression to the ORC API to ensure that the SQL LIKE can be pushed down to the ORC Record Reader.
  • Secondly, the row group filter logic of ORC Record Reader is reconstructed. If it is found to be a LIKE expression, first read the dict steam of this field to determine whether the dict stream contains the LIKE target string. If the value does not exist in the dictionary, skipping the row group without reading the data stream and length steam can significantly speed up file loading. In the later stage, we also consider to build the dictionary index into the index database, and directly pushdown the dictionary into the database.

For example, as shown in the figure, there are three expressions in the leftmost SQL. Filter like ‘% bid %’, attachment_name = ‘% bid %’; The only records that need to be read are the ROW groups that dict contains “bids,” which means Row Grouplevel filtering is done, further reducing the amount of data that needs to enter the computing engine.

3. Optimization of join based on index

Join operation is widely used in threat intelligence matching. To speed up the performance of join, filter pushdown of WHERE condition is far from enough.

Flink has many built-in join algorithms, such as broadcast join, hash join, and sort merge join. Among them, sort merge join to advance sorted table joins are very friendly, and we have mentioned above using the Hilbert curve to joint sort of many fields, so the sort merge join temporarily are not within the scope of our optimization.

In addition, we know that the performance of join is positively correlated with the size of the left and right tables, while the sparsity of threat intelligence join is very high, so cutting the left and right tables in advance can greatly reduce the data entering the join stage.

As mentioned above, we’ve created Bloom indexes for common fields. It makes sense to prefilter files using Bloom that have already been created, and to save the time needed to build Bloom.

For the case of Broadcast Join, we scan the small table directly, and put the records of the small table into the bloom of the file belonging to the large table in turn to determine whether the data block is needed or not, and pre-crop the table with large data amount.

For a hash join, as we can see, we can pre-join the file Bloom of the join key. Specifically, we can “and” the Bloom of the file of the left table with the Bloom of the file of the right table. Only keep the files of the left and right tables that can “and the number of subsequent results is not 0”, and then let the remaining files of each table enter the engine for subsequent calculation.

For example, the three tables in the picture are Table1, Table2 and Table3. We can get the table statistics from index DB, which is the number of files or the size of the file table. The number of files is directly listed on the graph: Table 1 is 1000 files, then Table 2 is 50,000 files, and Table 3 is 30,000 files.

We just carry out pre-join according to the logic in the last picture, and then estimate the cost of join. We will let the low-cost pre-join be carried out first, so that the intermediate results can be greatly reduced and the efficiency of join can be improved.

4. Aluxio as a cache for object storage

Because of the variety of underlying file storage systems, we chose the Aluxio data compilation system. The advantages of Aluxio are to make the data closer to the computing framework, and to speed up the file access by using memory or SSD multilevel caching mechanism. If the cache is completely hit, It can reach the file access speed of memory level IO, reduce the frequency of reading files directly from the underlying file system, and relieve the pressure of the underlying file system to a large extent.

For our system it leads to higher concurrency and is more friendly to queries with low cropping rate, which means a large number of files have to be read.

If these files have been loaded into the cache in the previous query, the query speed can be greatly improved.

After these optimizations, we did a performance comparison test. We took an ES cluster with a size of 249TB. It used 20 servers, Flink used two servers, and we selected 16 test results in order to see a more visual comparison on the chart.

The red and orange on the chart are ES, the blue are before HQL optimization, and the green are after HQL optimization. The number label above is the performance difference of HQL compared to ES. For example, the first tag means that the performance of HQL is five times that of ES, in which numbers 6 and 7 are slower than ES, mainly because HQL is a block index, and ES is a row index, which is all in memory, so it can achieve super fast retrieval speed. No. 13 is because HQL has a relatively poor clipping rate under the condition of not equal.

Overall, the optimization was significant, with most statements equal to or slightly better than ES query speed. Fully meet the customer’s expectations for long cycle data storage and query.

III. Future planning

Above is the future plan. Because the customer site often involves a lot of BI Dashboard operation and long cycle operation report requirements, so we will consider to do BI budget next, as well as the container and JVM warm-up mentioned by Soviet military, of course, as well as the reference ES, and improve the ability of multi-user concurrent query.


For more Flink related technical questions, please join the community pin exchange group to get the latest technical articles and community news. Please pay attention to the public account ~