1. The background

The mass data itself has commercial value, and the mass low-density data will be professionally processed to realize the three-dimensional embodiment of the mass data. Organizations can easily turn data into insights, quickly make data-driven decisions, and add value to their products or services. The following is a practice of visualizing multi-tenant data platform based on news media direction from 0-1. Real-time/offline statistics of mobile APP user behavior (PV/UV of all dimensions, user retention analysis, path analysis, in-depth analysis of news browsing, operator analysis, news transmission power and influence analysis, operator analysis, APP crash analysis, etc.) and CMS user related indicators.

2. Implementation principle

Data display is realized by WEB service, data processing is realized by data service, and they share data storage middleware service to realize data communication

2.1 web service

Web services won’t go into too much detail

2.2 Data Service

Data service mainly realizes data collection, data storage and data processing. The specific processing process is shown in the figure below:

The traditional data collection, storage, and processing methods (such as MySql, Oracle, and Postgre) have massive data bottlenecks:

  • Collection: With large data flow and high concurrency, it is necessary to isolate business service resources and build a collection service cluster environment, which is troublesome and costly to maintain.
  • Storage: single table storage is limited, horizontal scalability is average or poor;
  • Computing: Single instance services (storing service SQL and running in-memory code) have limited computing power;

The bottleneck of traditional backend technology is the problem of single-node scalability. If it is not easy to scale or not scalable, the capability of a single node is always limited. The three core components of Hadoop, MR, HDFS, and Yarn can solve the above problems. Both the dial-and-conquer algorithm of MR, NameNode in HDFS, and ResourceManager service role in Yarn have the distributed coordination capability, that is, scalability: When the computing capability of a node is insufficient, the computing capability of a new node is expanded. When the storage capacity of a node is insufficient, the storage capability of a new node is expanded. Node extensions are transparent to developers and simple to operate. Spark, as a distributed computing framework, is a benchmark for MR in Hadoop. Spark is faster than MR for two main reasons. Spark processes data in memory, whereas MR iterates on disk, with intermediate results stored in files (which reduces memory overhead but compromises performance). DAG is introduced in Spark to optimize computing plans and reduce Shuffle.

Spark is an efficient and fast computing engine that is generated due to the low efficiency of MR in Hadoop. The batch processing speed is 10 times faster than MR, and the in-memory data analysis speed is 100 times faster than Hadoop (excerpt from the official website).

2.2.1 Flow batch Architecture Mode

Service scenarios require offline computing and real-time computing statistics. Offline computing periodically generates T+1 reports in batches. Real-time computing is as its name implies. In general, the business logic of computing is the same, and the offline task is done after a full day of real-time computing. Then, according to whether the stream batch is unified, the architecture mode can be divided into Lambda and Kappa modes. Finally, Lambda architecture was chosen because of stronger data reliability and easier data traceability

3 Implementation Scheme

3.1 Platform selection — CDH or Ali Cloud service

There are many big data cluster component services involved in the project technical solution. Manual maintenance without the platform is completely unrealistic. The two mainstream solutions are CDH and Ali Cloud services, mainly Dataworks (including MaxComputer) and Kafka. The following is the comparison between CDH and Ali Cloud service scheme. In terms of cost, the comparison is made on the premise of ECS (8C /32G/3T) *4. CDH is finally selected in consideration of business scenario and volume, economic cost, learning cost, operation and maintenance complexity, market mainstream and other factors

CDH Ali Cloud Service
The deployment of Supports private cloud deployment Does not support
The installation complex simple
reliability In the high
scalability The relatively low high
cost Phase constant, i.e., ECS cost Linear increase, data volume cost is estimated to be slightly lower than CDH in 1-2 years.
Form a complete set of components complete No cloud service was found to replace the Flume role
Component freedom high low
An introduction to More complex. The CDH environment needs to be maintained Simple. Supports console SQL and provides interfaces and SDK
maintenance Need to maintain Less maintenance

Aliyun Service fee:Help.aliyun.com/document_de…

3.2 Component Selection

3.2.1 Data burying point –Flume

Flume is a highly available, highly reliable, distributed, massive log collection, aggregation, and transmission system provided by Cloudera software company. Flume is currently a top-level project of Apache.

Flume has one or more agents, and each Agent is divided into Source, Channel, and Sink segments. The Flume advantage:

  • It is very simple to use and configure. Only by adding a line of source configuration, one more port can be opened for listening processing, and the network router (such as Nginx) can load balance forwarding
  • Support the access way of diversity, Kafka, Thrift, HTTP, Avro, Exec, Syslog, see: flume.apache.org/releases/co…
  • Support custom sink and source, can be convenient for data cleaning, conversion and loading, see: flume.apache.org/releases/co…
  • Channels support memory and file, which can be selected based on service requirements. In terms of data reliability, Flume, as the upstream of the data link, makes persistence, which greatly avoids the problem of data loss in the process of data link.
  • CDH has its own Flume, which makes service operation and maintenance simple

3.2.2 Message Queue — Kafka

The Kafka advantage is not to mention that throughput is its biggest advantage over other MQ, and one of the most commonly used components in big data. The main use of Kafka here is for

  • Peak clipping: The client has heavy buried traffic and high concurrency. Kafka is needed to buffer the downstream service from downtime or even avalanches
  • Decoupling: Flume and Spark Streaming are both consumers
  • Data reliability: Data is persisted temporarily to prevent data loss caused by downstream HBase RS downtime
  • Natural component support. This technology architecture supports Kafka as a data source for downstream components, reducing the complexity of interaction between components

3.2.3 Data Store HBase

Hbase is a distributed and extensible NoSQL database based on HDFS. HBase is used instead of Hive. HBase Advantages:

  • Unstructured data storage enables heterogeneous buried data templates to be stored in the same table. Otherwise, a new buried event needs to be added to a table, or even a page consumption event. Due to different page types and parameters, multiple tables may be built
  • Based on HBase and Hive read and write processes, HBase writes fast and reads slowly (range query, excluding KV query). However, Hive requires MR for each execution, which does not work well in scenarios where a small amount of data is read and written.
    • To avoid slow Hive write, you are advised to perform HDFS API write operations and associate Hive periodically
  • Spark-hbase technology is feasible
  • CDH with HBase

3.2.4 Offline Computing: Spark

The advantages and disadvantages of MR and Spark have been described in the previous section

3.2.5 Real-time Computing – Spark Streaming

Although Sprak Streaming is a microbatch operation and not a true Streaming operation, SS is chosen over Flink. Here’s why:

  • Service scenario: The SS delay is not high (within 0-2s)
  • Spark Streaming has been introduced, and using Spark Streaming is very simple, has a consistent syntax, and is cheap to develop
  • Flink is not in THE CDH and requires separate maintenance, which is costly

Spark has stopped maintaining Spark Streaming, and Structure Streaming is recommended for Spark2.0+

3.2.6 Task Scheduling — Azkaban

This component doesn’t make a lot of comparisons, such as Ozzie and the airflow and Dolphin Scheduler I’ve just read about. I simply feel that Azkaban has a simple interface and simple operation, which meets my business needs

3.2.7 Data Synchronization

Sqoop (tenant SqlServer synchronized to HBase) was used at the beginning, and DataX was used later to support multiple data sources. Both calls are executed periodically by the Linux Cron of the host. Spark offline data computing tasks are scheduled periodically by Azkaban, and some Spark offline tasks depend on data synchronized periodically by The Linux Cron. Therefore, the two tasks are logically synchronized. However, the actual execution is not strongly dependent. It is not reliable to estimate the time of the first data synchronization task and then estimate the approximate time of the second Spark task to perform batch tasks. Spark SQL is used for data synchronization, and Azkaban is used to schedule data synchronization jobs. Azkaban supports job dependencies (build DAG, data synchronization jobs <- run batch jobs offline).

3.3 Data service technical architecture design

3.3.1 Overall design

Here is a layered diagram of the infrastructure: The Hive in the diagram is drawn because Impala component is used in AD hoc query, and Impala relies on Hive. However, Impala is not discussed in this article. Kylin even tried to use Kudu as an alternative to the whole system in technical research, interested friends can study by themselves

3.3.2 Data flow

Data flow is divided into mobile terminal and CMS terminal, and mobile terminal data flow is divided into real-time and offline:

  • Real-time computing data flow on mobile terminal:
    1. The mobile terminal transmits data to Flume through HTTP
    2. Flume posts data to Kafka
    3. SS consumes Kafka and increments the calculated result data into Mysql
  • Mobile terminal offline computing data transfer:
    1. The mobile terminal transmits data to Flume through HTTP
    2. Flume posts data to Kafka
    3. The other Flume service consumes Kafka, which processes data (desensitization, Rowkey Settings, etc.) and writes it to HBase
    4. The Spark offline task retrieves data from Hbase and writes data statistics to Mysql
  • Data flow at CMS end:
    1. Use data synchronization tool SQoop /datax or script (Spark SQL) to import data to HBase
    2. The Spark offline task retrieves data from Hbase and writes data statistics to Mysql

3.3.3 Design details

The role of each service has been described in the component selection summary. The role of process and service is not described here. Lower-layer services, such as Resource Manager and Node Manager of Yarn (Used by Spark on Yarn), NameNode and DataNode of HDFS (used by HBase), are not shown in this figure. You can learn their specific functions by yourself.

3.4 Service architecture design

The focus of business architecture is mainly to reduce the access cost of different tenants to enter the platform, and the scope of concern is WEB and data service. WEB service can be designed towards the direction of Centralization, but this time the focus is on data service. WEB only realizes the data structure encapsulation tool class, and solves the unification of similar chart data. Tenant personalization of statistical dimensions and metrics implementation sinks into data services. The sooner data services are unified, the less personalized development there will be:

  • At the APP end of different tenants: unified specification of user behavior burying point and encapsulation of general SDK, covering most of the burying point of demand, reducing repeated coding
  • Data synchronization Is developed independently for different tenant CMS service libraries to unify heterogeneous source data
  • The service supports multi-tenant and isolates data by tenant ID

3.5 Buried document design

3.5.1 Buried male parameter

The outer field Properties Internal service field
event Event names are as follows:
A p p S t a r t AppStart
AppEnd
A p p F i r s t O p e n AppFirstOpen
AppCrashed
time Timestamp, the time at which the event occurred (column: 1531900256000)
distinct_id Unique identifier of the client device
app_id The tenant id
properties
$lib The SDK type is Android or iOS
$lib_version Buried point statistics SDK version (column: 3.3.3)
$event_duration The duration of the event, such as how long the page stays, how long the app starts, etc
$is_first_day The first day after the app is installed
$os System (e.g. Android, iOS, H5, etc.)
$os_version System version number (column :8.0.0)
$manufacturer Manufacturer (for example :HUAWEI)
$brand Brand, trademark (e.g. HONOR)
$model Models (e.g., PRA-AL00)
$screen_name The name of the page displayed on the screen (English)
$screen_dpi Screen density, expressed as pixels per inch (column :480)
$screen_height High mobile phone resolution (e.g. 1794)
$screen_width Wide mobile phone resolution (e.g. 1080)
$app_version App version number (column :1.0.0)
$app_version_code App version code (column :1)
$has_nfc Whether NFC functionality is available (column :false)
$has_telephone Has the ability to make phone calls (column :true)
$carrier Name of network operator (e.g. CDMA)
$network_type Network type, for example, 4G
$wifi Whether wifi is connected (column :true)
$app_crashed_reason Exception details
$title Page display Chinese name (temporarily useless)
user_id The user id

3.5.2 Buried Point Events (Part)

The name of the event Event Unique Identifier (Event) parameter Parameter is introduced The parameter types Enumerated values
Click on the search click_search content Search content String
AD exposure show_advertisement ad_ids Advertising ids String
ad_position Advertising position String See “AD Location enumeration”
Click advertising click_advertisement ad_id AD id String
ad_position Advertising position String See “AD Location enumeration”
Special exposure show_theme_list theme_ids Project ids String
Share the alerts share_news_flash to Forward way String
Content Sharing share_news news_type Manuscript type String See “Enumeration of News Types”
related_id The correlation id String
to Forward way String
Special to share share_theme related_id The correlation id String
to Forward way String
Special collections collect_theme related_id The correlation id String
Live content sharing share_live related_id The correlation id String
to Forward way String
Subscription number forwarding share_subscribe related_id The correlation id String
to Forward way String
Video sharing share_video related_id The correlation id String
to Forward way String
Comments on manuscript content comment_news content Comment on the content String
related_id The correlation id String
news_type Manuscript type String
Live content commentary comment_live content Comment on the content String
related_id The correlation id String
Video review comment_video content Comment on the content String
related_id The correlation id String
Give a thumbs-up to the content of the manuscript like_news news_type Manuscript type String See “Enumeration of News Types”
related_id The correlation id String
Manuscript comments like like_news_comment news_id News id String
comment_id Comment id String
Live content thumbs up like_live related_id The correlation id String
Collection of manuscript contents collect_news news_type Manuscript type String See “Enumeration of News Types”
related_id The correlation id String
On-site content collection collect_live related_id The correlation id String
Video collection collect_video related_id The correlation id String
Comments on Private line content comment_special_line related_id The correlation id String
content Comment on the content String
External news commentary external_comment_news related_id The correlation id String
content Comment on the content String
news_type Manuscript type String See “Enumeration of News Types”
Focus on focus type Focus on target types String Subscribe
related_id The correlation id String

4. Problems encountered

4.1 Hbase Reads Data Slowly And The Connection To the ZK Times out

The main reason is that hbase data is read in full table scan mode (followed by the prefix matching of the Rowkey header), which is inefficient. In case of data skew, the connection times out due to slow execution speed. Solution: Pre-partitioning and RowKey design. RowKey’s partitioning policy: The first two bits of the Rowkey are random salt values 00-19(SALT). Batch processing is performed by date. Therefore, to obtain data from hbase, the time stamp of the client event time is attached to the rowkey

4.2 Data Buried point data of the previous days were collected

The difficulty of this problem lies in BUG recurrence, and the solution is very simple to filter out the data in the first layer of Flume (archived data is not recommended to change, so it is directly discarded rather than modify historical data and do incremental). To reduce bandwidth usage and traffic, the mobile terminal uploads compressed data to the server every 30 seconds. If the app is not started for 30 seconds, the user will kill the app background process or disconnect the network. This data is stored locally in the app. When the app is reopened or restored to normal, the data will continue to be sent.

4.3 Spark runs slowly or memory overflows

  • RDD cache
  • Operator optimization, such as avoiding data aggregation calculation on the driver side, reducing network transmission, and reducing the memory usage of a single node
  • Use the radio

4.4 How to Ensure Reliability Priorities among Tenants (Not Implemented)

How to ensure priority for key customer applications when resources are scarce or abnormal? From the perspective of common resources among tenants:

  • Software resources: gateway, Redis, mysql, MQ, third-party services (such as Ali Cloud service)
  • Hardware resources: disk, CPU, memory, and I/O

In terms of data service, we can only think of:

  • Priority queue of Yarn
  • Kafka assigns different instances or message topics by tenant, with different configurations for different topics

WEB piece (partially unimplemented)

  • In the case of limited resources, the gateway (or SLB and Nginx before it) limits traffic by tenant to prevent small tenants from occupying a large number of concurrent connection resources
  • Redis assigns different instances or databases by tenant
  • Mysql assigns different instances by tenant
  • Adjust the number of tenant cluster instances and perform remote Dr

4.5 Warehouse design

In particular, in ADS table, different tenants have different definitions for the calculation of the same dimension, for example, the number of news reading statistics: Tenant A includes picture manuscripts and video manuscripts, while tenant B includes.

In addition, the internal business design of different tenants is also different. The design of tenant A is channel-column-manuscript level 3, and the design of tenant B is channel-manuscript level 2. There will also be services such as subscription number and other directly related articles, so it is difficult to adapt to A unified data model after data synchronization transformation.

For example, there is another scenario, advertising statistics. Some advertisements are implanted in the news list. In order to simplify the implementation of list paging, an “empty” record is directly inserted into the news table, and the “empty” record is recorded in the associated advertising table. In this way, when the mobile terminal transmits the buried points related to advertising, it obtains the ID recorded in the news table, and the buried point attributes (events) belong to advertising. In this case, the data service needs to screen and process these “unmatched” data to ensure the accuracy of data statistics. The above problems not only test the business capabilities of data development, business architecture and modeling, but also require the product managers, data product managers, data developers, application back-end developers and application burying point developers of their respective applications to coordinate, communicate repeatedly and reach agreement.

4.5 other

The solution is not detailed here, the business direction of the problem has been solved, there are still parts of the technical solution is not good. For example, there are unresolved problems with AD hoc queries that include user portraits and poor real-time size table joins. During the development process, the Spark code was layered and split. A buried point was written into multiple tables according to different calculation dimensions, and the Spark code was split vertically based on the buried point. If split by table dimension, there would be a lot of repetitive fetching logic and similar operations with the same RDD scattered across files, resulting in confusing code management.

Also because for easy operation and management of CDH components, IP and port exposed to the public network, the Trojan is treated as a mining machine.

Recommended reading

Guava Cache actual Combat – From scenario to principle analysis

Details of HTTP2.0 and HTTPS protocols

Wechat official account

The article is published synchronously, the public number of political cloud technology team, welcome to pay attention to