preface

At present, the underlying computing platform based on Hadoop technology stack is more stable and mature, and computing capacity is no longer the main bottleneck. Diversified data, complex business analysis requirements, system stability, data reliability, these soft requirements, gradually become the main problems faced by log analysis systems. In 2018, the integration of online and offline has become a trend. Suning Tesco proposed and practiced the two-line integration model and put forward the grand strategy of smart retail, which is data-driven in nature and provides better services for consumers. As the first link of data analysis, Suning log analysis system lays a solid foundation for data operation.

Data analysis process and architecture

Business background

Suning’s online and offline operation personnel have increasingly high requirements for diversified data analysis and timeliness. At present, real-time log analysis system processes billions of traffic logs every day. It not only needs to ensure low latency, data loss and other requirements, but also needs to face complex analysis and calculation logic, which puts forward high standards for system construction. As shown below:

  • Data sources: online and offline traffic data, sales data, customer service data, etc
  • Diversified business needs: support marketing, procurement, finance, supply chain merchants and other data needs

Please pay attention to the wechat public account “AI Front”, (ID: AI-front)

Process and Architecture

The underlying data processing of Suning real-time log analysis system is divided into three steps: collection, cleaning and index calculation, as shown in Figure 1.

  • Collection module: Collects logs of each data source and sends Kafka in real time through Flume.
  • Cleaning module: receives log data in real time, performs data processing and conversion, and the cleaning task is implemented based on Storm. At present, it processes one billion level of traffic data every day. The structured data processed by cleaning task will be sent to Kafka queue again
  • Indicator calculation: Receives structured traffic data from Kafka in real time and calculates relevant indicators in real time. Indicator calculation tasks are mainly divided into two types: Storm task and Spark Streaming task, both of which have their own application scenarios. Spark Streaming is suitable for quasi-real-time scenarios and has the following advantages: Storm and Spark benefit from the support provided by Suning data cloud platform. Currently, Suning data cloud platform integrates: Big data development components such as Hive, Spark, Storm, Druid, ES, Hbase, and Kafka support the group’s big data computing and storage requirements.

The calculated data is stored in storage engines such as HBase and Druid. The service system reads the calculated data in real time and provides data analysis services for operation personnel.

Spark Streaming practice in index analysis

The Spark Streaming introduced

Spark Streaming is a quasi-real-time computing framework based on the concept of batch processing. Data is processed in batches according to time, and the batch interval is set according to delay requirements in practical applications, as shown in the following figure. Spark Streaming supports multiple data sources such as Kafka, Flume, HDFS, and Kenisis. The platform supports native writing to storage media such as HDFS and common relational databases.

Compared with Storm, Spark Streaming has a quasi-real-time architecture with higher throughput, SQL support, better support with HDFS, database and other storage media, and easy development. It also supports the Window feature and can support complex Window function calculation.

NDCG index analysis

Normalized coefficient Cumulative Gain, namely NDCG, is commonly used as evaluation index of search sorting. Under ideal circumstances, the higher the ranking of search results, the higher the click probability, that is, the higher the score (Gain). CG = sum of the scores of the sorting results, Beijing subway is calculated according to the ranking, the score of each result * ranking weight, weight = 1/ log(1 + ranking), the higher the ranking, the higher the weight. First, we calculate the ideal DCG (called IDCG), and then calculate the real DCG according to the user click result. NDCG = DCG/IDCG, the closer the value is to 1, the better the search result is. DCG calculation formula is as follows:

Search the keyword “apple” in Suning Shopping and take the first 4 results as an example.

IDCG = 1, DCG = 0.5, NDCG = DCG/IDCG = 0.5. Finally, NDCG score is calculated for each search, which is used as an evaluation index to judge the quality of search results.

NDCG calculation scheme design

According to the statistics of the time span of search behaviors, 86% of search behaviors are completed within 5 minutes and 90% within 10 minutes (the interval from the start of search to the last click on the result list). Through analysis and comparison, the NDCG real-time calculation time range is set at 15 minutes. This presents two difficulties in calculation:

  • Time window calculations: Each time is an overall analysis of the first 15 minutes of data
  • Deduplication: Ensure that a search is computed only once in a time window

Finally, Spark Streaming framework is selected to realize time Window calculation by using its Window feature. The time window is 15 minutes and the step is 5 minutes, which means that the calculation is performed every 5 minutes. In each calculation, NDCG calculation is performed only for the search behavior initiated in the interval [15 minutes ago, 10 minutes ago], so as not to cause double calculation.

After the solution was developed, the problem was quickly found in the online test. The data stored for 15 minutes consumed too much resources. Through analysis, it was found that the search data only accounted for a small part of the traffic data, the cleaning task stored a separate search data in Kafka, and the NDCG calculation subscribed to the new search data, which greatly reduced the resource consumption.

Performance and data security assurance

The performance guarantee

Capacity estimation and expansion

Capacity estimation is not a static exercise

  • Traffic logs are growing, and the system’s processing capacity is limited
  • The big push creates extra data spikes.

In view of these situations, expanding capacity based on business growth in advance is the most important guarantee means. Capacity expansion depends on the horizontal expansion capability of the system. Data processing performance can meet service requirements by adjusting the number of Kafka Topic partitions, Storm processing nodes and concurrency, Spark Streaming concurrency and other parameters.

Multi-dimensional analysis calculation optimization

For example, the NDCG indicator supports the calculation of the combination of four dimensions: region, city, channel, and search term. To support any combination of the four dimensions, 15 calculations are required and 15 storage update operations are performed in HBase. As shown in the figure below.

At present, the time granularity is only days. If hours, weeks, months and other time dimensions are added, the number of tasks and storage will be multiplied several times. At this point, a high-performance OLAP computing engine to improve the efficiency of index analysis becomes more urgent.

In the second half of 2016, OLAP engine construction was started on the data cloud platform, and Druid service was officially provided in 2017. Druid supports sum, Max, MIN, AVG, Count, and DISTINCT Count aggregation. The Druid supports real-time data access from Kafka. Its columnar storage structure improves data retrieval efficiency and pre-aggregation improves computing efficiency.

Through pre-study and performance testing, Druid significantly improved the efficiency of calculation and analysis of indicators such as NDCG, making the task of analyzing indicators more lightweight. The multi-dimensional analysis of indicators was handled by Druid.

Data security

Protect data against loss

Storm and Spark data tasks often need to be restarted to publish data. It is particularly important to ensure that data is not lost within a certain period of time. Break it down to ensure two points:

  • The data source ensures that data is not lost
  • Data tasks ensure that data is processed.

First, Kafka ensures that data is not lost through a disk-to-disk and backup mechanism

Second, Storm provides an Ack mechanism to ensure that data must be processed.

Spark Streaming provides the Check Point (WAL) backup mechanism. After a job fails or restarts, the Check Point data can be used to restore data to ensure that data is processed. However, WAL logs store all data in the HDFS. Spark Streaming is optimized for Kafka and provides a Kafka Direct API. WAL logs only need to record the Offset of the Kafka queue. Read Kafka data again according to offset. The whole process is shown in the figure below.

Exact-once semantic guarantee

For sales data, it is necessary not only to ensure that the data is processed, but also to ensure that the data is processed only once, and the data related to sales financial indicators must be 100% accurate.

The first solution: Labmda architecture + Redis deduplication

  • Real-time deduplication: After an order is calculated, the order number is written into Redis, and the data is not processed repeatedly by comparing the order number.
  • Offline update: recalculate the sales index every morning and update the previous day’s index data

Second option: MPP + primary key

  • Application scenario: Suitable for external applications where external systems query and analyze data from Mpp data
  • Technical solution: PG CITUS database is selected for MPP, tables are built in MPP database, and unique fields such as order number are set as primary keys.

Future architecture evolution and optimization

At present, the whole bottom processing system is based on the open source framework of the industry, the system is far from perfect, especially to do the bottom data is a more meticulous, hard work, data quality problems frequently, because there is no monitoring system, often passive discovery, solve the problem. Data cleaning logic changes are common and code releases are frequent due to new business growth.

At the end of 2017, the architecture optimization design of the system began, and two modules were mainly added.

  • Data quality monitoring: Verifies real-time and offline data by configuring quality monitoring rules, including sampling verification and full verification, and notifts developers of data exceptions in a timely manner.
  • Data cleaning rule configuration system: The cleaning logic is abstracted into configurable rules, and the change of data cleaning logic is realized by defining clear change rules. The difficulty here is rule abstraction. After technical pre-research, it is preliminarily determined to use Drools and Groovy to realize the configuration of cleaning rules.

Summary and Prospect

As the behind-the-scenes support of data mining, BI analysis and other high-level applications, the log processing and analysis system plays the role of connecting the preceding and the following, especially for scenarios with multiple business lines and large data volume, without the support of systematic platformization, big data will eventually be empty words. I believe that not only the algorithm model, but also the underlying data quality, timeliness and system stability will be the winner of smart retail.

Author’s brief introduction

Wang Fuping, head of Data Center technology of Suning.com Big Data Center, has served successively as senior engineer of Baidu Big Data Department and architect of Yihaodian Search and Precision Department. For many years, I have been engaged in the research and development of big data, and have a deep understanding of big data tools and machine learning. I have rich experience in the field of real-time computing, and have an in-depth understanding of Storm and Spark Streaming. I love sharing and technology dissemination, and currently focus on the construction of data analysis platform, aiming at connecting data modeling to data analysis. Based on OLAP technologies such as Druid and Kylin, we provide a platform-level data indicator service and create a one-stop solution of “data as a service”.

Thanks to CAI Fangfang for proofreading this article.