Logs collected by Meitu need to be cleaned and organized by the ETL program and continuously landed in the HDFS or Hive to facilitate subsequent unified analysis and processing.


What is ETL?

ETL, or extract-transform-load, is used to describe the process of extracting, transforming, and loading data from the source to the destination. The term ETL is more commonly used in data warehouses, but its object is not limited to data warehouses.

In the unique business environment of Meitu, ETL needs to meet the following requirements:

1. Large amount of data, efficient cleaning and landing. Meitu has a large number of businesses, a large user base and a large amount of data. In addition, business parties hope that data can be queried quickly after data collection.

2. Flexible configuration to meet various data formats. Because of the continuous access of new services, when the data of new service parties is accessed, it is necessary to be flexible and universal. Adding a configuration information can clean and land the new service data. In addition, each business side has various data formats. ETL needs to be compatible with a variety of common data formats (such as JSON, Avro, DelimiterText, etc.) to meet the requirements of different services.

3. Constraints and norms. The database warehouse specifications must be met. Data is landed at different layers (such as STG layer and ODS layer), different libraries (such as default.db and meipai.db), and different partitions (the time partition must be specified).

4. Fault tolerance. The collection of service logs may contain dirty data, and an alarm is generated when the dirty data reaches a certain threshold. In addition, various conditions such as Hadoop cluster failure and Kafka failure may occur. Therefore, data rerunning recovery is required.


ETL comes in two forms: real-time streaming ETL and offline ETL.

As shown in the following figure, there are two forms of real-time stream ETL: Flume is used to collect server logs and HDFS is used to land. The other is to collect data to Kafka first and then land HDFS through Storm or Spark streaming. Real-time streaming ETL is difficult to replay and recover in case of failure. Meitu currently only uses real-time streaming ETL for data injection and cleaning.

According to the Lambda structure, if the real-time stream ETL fails, it needs to be patched by the offline ETL. The offline ETL pulls messages from Kafka, passes ETL, and lands them from HDFS. In order to improve real-time performance and reduce data pressure, offline ETL is scheduled at 05 minutes per hour to clean the data of the last hour. To reduce the pressure on HDFS NameNode and reduce small files, append data on the same topic&partition under the date partition to the same log file.


Architecture design and implementation principle of offline ETL

The offline ETL uses the MapReduce framework to process and clean data of different services. It mainly adopts the idea of divide and conquer to expand the data cleaning capability horizontally.



As shown in the figure above, offline ETL is divided into three modules:

  • Input (InputFormat) : The data source (Kafka data) is parsed and sharded and allocated to different Map processes according to certain policies. Create a RecordReader to read and parse fragments of data and generate key-values for downstream processing.

  • Map (Mapper) : processes key-value data.

  • Output (OutputFormat) : Creates a RecordWriter and lands the processed key-value data according to libraries, tables, and partitions. Finally, the integrity of the message processing is checked during the COMMIT phase.


Offline ETL workflow

Above is the basic workflow of an offline ETL:

1. Kafka-etl abstracts the common configuration information in the service data cleaning process into an ETL Schema, representing different data of each service.

2. When Kafka-ETL is started, topic&Schema information of service data to be processed is pulled from ZooKeeper.

3. Kafka-etl will each service data according to the topic, partition to obtain the offset data (beginOffset, endOffset) to consume this time, and persist mysql;

4. Kafka-etl abstracts the offset information of topic&partition to be processed this time into kafkaEvent, and then fragments these KafkaEvents according to a certain policy, that is, each mapper processes a part of kafkaEvent;

5. The RecordReader consumes the offset information, parsing it into key-values and sending it downstream for cleaning.

6. After cleaning, the key-values are deployed on the HDFS using RecordWriter data.



Module implementation of offline ETL

Data Split

We get the maximum offset from the current topic&partition and the cut-off offset from the last consumption, and form the [beginOffset, endOffset]kafkaEvent to consume this time, KafkaEvent is scattered to each Mapper for processing, and the offset information is persisted to the mysql table.

So how do you make sure the data doesn’t skew? First, customize the number of mapper, and create the corresponding number of ETLSplit. Since kafkaEevent contains the Offset consumed before and the maximum Offset to be consumed by a single topic&partition, the total number of messages to be consumed by each kafkaEvent can be obtained. Finally, all kafkaeEvents are traversed, and the current kafkaEevent is added to the current smallest ETLSplit (by comparing the total amount of data that needs to be consumed, can be obtained). The ETLSplit generated in this way can try to ensure data balance.



Data parsing and cleaning (Read)

As shown in the figure above, each fragment is parsed by a RecordReader. The RecordReade contains multiple KafkaconSumerReaders, which consume each KafkaEevent. Each KafkaEevent will correspond to a KafkaConsumer. After the byte data message is pulled, it needs to be deserialized by decode. At this time, the structure of MessageDecoder is involved. MessageDecoder currently supports three formats:

format

Related to the topic

Avro

Android, ios, ad_sdk_android…

Json

The app server – meipai, anti – spam…

DelimiterText

App server – youyan, app server – youyan – im…

The MessageDecoder deserializes Kafka keys and values when it receives them, and finally generates ETLKey and ETLValue. MessageDecoder contains Injector, which does the following:

  • Injection Aid: For log data collected by arachnia Agent, parse KafkaKey injection log data that uniquely identifies the Aid.

  • Inject GeoIP information: Inject geographic information (such as country_id, province_id, city_id) based on GeoIP resolve IP information;

  • Injection of SdkDeviceInfo: The real-time stream ETL will inject information such as GID and IS_APP_new, but the offline ETL will check whether the information is complete for further guarantee.

The process also involves the DebugFilter, which filters the logs of the SDK debugging device without landing in the HDFS.



Multiple file landing (Write)

The MapReduce RecordWriter does not support multiple files, so special processing is required. The HDFS file does not support multiple process (thread) writers and Appends. So we will KafkaKey+ business partition + time partition + Kafka partition to define a unique file, each file is to bring Kafka partition information. Create a RecordWriter for each file.

As shown in the preceding figure, each RecordWriter contains multiple writers, and each Writer corresponds to a file. In this way, multithreaded reading and writing of the same file is avoided. Currently, the guava cache is used to maintain the number of writers. If the number of writers is too large or there is no write access for a long time, the close action is triggered. In the next batch, kafka messages of the corresponding directory are created in the writer and the Append operation is performed. This allows you to append to multiple files within the same map.


Checking data consumption integrity (Commit)



MapReduce Counter provides us with a window to observe the various details of MapReduce job running time. And it comes with a number of default counters to check if the data is consumed completely:

Reader_records: indicates the number of successfully parsed messages.

Decode_records_error: number of messages that fail to be parsed.

Writer_records: indicates the number of successfully written messages.

.

Finally, whether the message consumption is complete can be confirmed by the consistent number of topic offsets, reader_Records and writer_Records to be consumed.

* A certain proportion of dirty data is allowed. If the amount exceeds the threshold, an SMS alarm is generated


Core features of the ETL system

Data make-up run and its optimization

How does ETL implement data catch – up and optimization? First, a look at the reruns:

* When the user calls Application kill, it goes through three phases: 1) kill SIGTERM (-15) PID; 2) Sleep for 250ms; 3) kill SIGKILL (-9) PID.


So what are the ways to rerun?

The following figure shows the overall process of the third replay mode. ETL is scheduled on an hourly basis. Data is first written to a temporary directory on an hourly basis. If the landing is successful, the target file will be merged into the warehouse directory. If the merge fails, the alarm will be notified and the manual rerun will be performed to merge the small files into the target file.



The re-run analysis after optimization is shown in the following table:



Automatic horizontal spread

Now the offline Kafka-ETL is scheduled at 05 minutes per hour. Each ETL will obtain the latest and largest offset of each topic&Partition. Also combine the cost of the kafkaEvent to be consumed with the cutoff offset consumed in the last hour. In some cases, the number of topic and Partition offsets grows so fast that the number of partitions in the Kafka topic cannot be adjusted, causing ETL processing delays. Affects the downstream business process:

  • Due to capacity expansion or faults, missed data or historical data need to be collected. In this case, the message offset of a topic&&partition grows very fast. It is not reliable to rely on Kafka Topic Partiton for capacity expansion. After the data collection is complete, the expanded partition must be deleted.

  • During the peak periods of user traffic on weekends, holidays, 6.18, and Double 11, the collected user behavior data will be several times or dozens of times more than usual. However, there is also a situation that there is no time to expand the number of topic partition and the capacity needs to be reduced after the expansion.


Whether the Kafka ETL automatically scales horizontally does not strongly depend on the number of Kafka topic partitions. If a topic kafkaEvent needs to process too much data, the maximum number of items that can be consumed by a single mapper in a reasonable time range will be evaluated, and then the kafkaEvent level will be divided into multiple child KafkaEvents and allocated to each mapper for processing. This avoids delays caused by a single mapper having to process a large kafkaEvent at a time and improves the ability to scale horizontally. The split logic is shown in the figure below:

In the future, we will optimize automatic horizontal expansion for the following two points:

  • If the total message data processed by a single mapper is large, expand the number of mapper and generate split splits for load balancing.

  • The message processing speed of each format is not the same, and some mapper burden may occur when distributing. A certain weight will be configured for each format, and kafkaEvent will be allocated according to the number of messages and the weight.