Thank you for reading the 13th article of “Meitu Data Technology team”, and pay attention to our continuous access to the latest data technology trends of Meitu.

Logs collected by Meitu need to be cleaned and structured by the ETL program, and stored in HDFS/Hive persistently for unified subsequent analysis and processing.

Figure 1

/ What is ETL? /

ETL is Extract- transform-load, which describes the process of extracting, transforming, and loading data from the source to the destination. The term ETL is more commonly used in data warehouses, but its object is not limited to data warehouses.

In the unique business environment of Meitu, ETL needs to meet the following requirements:

1. Large amount of data, efficient cleaning landing. Meitu has a large number of businesses, a large user base and a large amount of data. In addition, the business side hopes that the data can be queried quickly after data collection.

2. Flexible configuration to meet multiple data formats. As new services are constantly connected, when new service data is connected, it must be flexible and universal. By adding a configuration information, the new service data can be cleaned and implemented. At the same time, the data formats of each business party are diverse. ETL needs to be compatible with a variety of common data formats to meet the requirements of different services (such as JSON, AVro, and DelimiterText).

3. Constraints and norms. To meet the database warehouse specifications, data is landed on different layers (STG layer, ODS layer, etc.), different libraries (default.db, meipai.db, etc.) and different partitions (time partition must be specified).

4. Fault tolerance. Dirty data may exist in service log collection, and alarms must be generated when the service log collection reaches a specific threshold. In addition, Hadoop cluster failures and Kafka failures may occur, so data rerun recovery is required.

ETL comes in two forms: real-time stream ETL and offline ETL.

As shown in Figure 2, real-time stream ETL usually has two forms: One is to collect server logs through Flume, and then directly land through HDFS. The other is to collect data into Kafka and then stream it to HDFS via Storm or Spark streaming. In the event of a failure, real-time ETL is difficult to play back and recover. Meitu currently only uses real-time streaming ETL for data injection and cleaning.

Figure 2

According to the Lambda structure, if the real-time stream ETL fails, it needs to be repaired by the offline ETL. The offline ETL pulls messages from Kafka, passes through the ETL, and drops them from HDFS. In order to improve real-time performance and reduce data pressure, off-line ETL is scheduled at 05 minutes per hour to clean the data of the last hour. To reduce HDFS NameNode pressure and reduce small files, data in the same topic&partition under the date partition is appended to the same log file by append.

/ Architecture design and implementation principle of offline ETL /

Offline ETL uses MapReduce framework to process and clean data of different services, mainly adopting the idea of divide and conquer, which can expand data cleaning capability horizontally.

Figure 3: Offline ETL architecture

As shown in Figure 3, offline ETL is divided into three modules:

  • Input (InputFormat) : parses data sources (Kafka data) and distributes them to different Map processes according to certain policies. Create RecordReader to read and parse fragment data and generate key-value for downstream processing.

  • Map (Mapper) : processes key-value data.

  • Output (OutputFormat) : When a RecordWriter is created, key-value data processed by the database, table, and partition will be landed. Finally, the integrity of message processing is checked during the COMMIT phase.

Offline ETL workflow

Figure 4.

Figure 4 shows the basic workflow of offline ETL:

1. Kafka-etl abstracts the common configuration information in the process of service data cleaning into an ETL Schema, representing the different data of each service;

2. When Kafka-ETL starts, topic&Schema information is pulled from ZooKeeper.

3. Kafka-etl obtains the offset data (beginOffset, endOffset) to be consumed by each service data according to topic and partition and persists mysql;

4. Kafka-etl abstracts the offset information of topic&partition to kafkaEvents, and then fragments these KafkaEvents according to a certain strategy, that is, each mapper processes part of kafkaEvents.

5.RecordReader will consume the offset information, decode it into key-value data one by one, and pass it to the downstream cleaning process;

6. The cleaned key-value is logged to HDFS through RecordWriter data.

Offline ETL module implementation

Split data

KafkaEvent [beginOffset, endOffset]kafkaEvent [beginOffset, endOffset]kafkaEvent Kafkaevents are broken up into mappers for processing, and the offset information is persisted in the mysql table.

Figure 5

So how do you make sure the data doesn’t skew? First through the configuration of custom mapper number, and create the corresponding number of ETLSplit. Since a kafkaEvent contains the Offset previously consumed by a single Topic&partition and the maximum Offset to be consumed, you get the total number of messages consumed by each kafkaEvent. Finally, all kafkaeEvents are iterated, and the current kafkaEevent is added to the current minimum ETLSplit (by comparing the total amount of data to be consumed, it can be obtained). The generated ETLSplit can ensure data balance as much as possible.

Data parsing cleaning (Read)

Figure 6.

As shown in Figure 6, first of all, each fragment will have a corresponding RecordReader to parse. The RecordReade contains multiple KafkaconSumerReaders, which are consumed for each KafkaEevent. Each KafkaEevent corresponds to a KafkaConsumer, which needs to be decode deserialized after the byte data message is pulled. At this time, the structure of MessageDecoder is involved. MessageDecoder currently supports three formats:

format

Related to the topic

Avro

Android, ios, AD_sdk_Android…

Json

The app server – meipai, anti – spam…

DelimiterText

App server – youyan, app server – youyan – im…

MessageDecoder receives Kafka’s key and value and deserializes them, eventually generating ETLKey and ETLValue. MessageDecoder also includes the Injector, which does the following:

  • Aid injection: For the log data collected by Arachnia Agent, the KafkaKey injection log uniquely identifies Aid.

  • Inject GeoIP information: Inject geographic information (such as country_id, province_id, city_id) based on the GeoIP resolved IP information.

  • Inject SdkDeviceInfo: The real-time stream ETL itself will inject gid, is_APP_new and other information, but the offline ETL checks whether these information is complete for further assurance.

The process also involves the DebugFilter, which filters the SDK debug device logs without landing in HDFS.

Multiple file Landing (Write)

The RecordWriter of MapReduce does not support a single file to be deployed to multiple files, and the HDFS file does not support multiple process (thread) writers and Appends. KafkaKey+ business partition + time partition + Kafka partition defines a unique file. Each file will carry Kafka partition information. Also create a RecordWriter for each file.

Figure 7.

As shown in Figure 7, each RecordWriter contains multiple writers, and each Writer corresponds to a file. In this way, multi-threaded reads and writes to the same file are avoided. At present, guava cache is used to maintain the number of writers. If there are too many writers or no write access for a long time, the close action is triggered, and the next batch of Kafka messages with corresponding directories are used to create writers for append operation. This allows us to append writes to multiple files in the same map.

Checking data consumption integrity (Commit)

Figure 8.

MapReduce Counter provides us with a window to observe and collect all kinds of detailed data during the running period of MapReduce jobs. And it comes with a number of default counters to check if the data is completely consumed:

Reader_records: Number of successfully parsed messages;

Decode_records_error: number of failed parsing messages;

Writer_records: indicates the number of successfully written messages.

.

Finally, whether the number of topic offset, reader_Records and writer_records to be consumed this time is consistent can confirm whether the message consumption is complete.

* A certain percentage of dirty data is allowed. If it exceeds the limit, an SMS alarm will be generated

/ ETL system core features /

Data running and its optimization

How does ETL implement data replenishment and optimization? First, look at the scenarios that require a rerun:

* When the user calls application kill, it will go through three phases: 1) Kill SIGTERM (-15) PID; 2) Sleep for 250ms; 3) Kill SIGKILL (-9) PID.

So what are some ways to run again?

Figure 9 shows the overall process of the third rerun mode. ETL is scheduled on an hourly basis. Data is first written to the temporary directory on an hourly basis. If the landing succeeds, the target file will be merged into the warehouse directory. If the merging fails, the alarm will be notified and the manual re-run will be performed to merge the small files into the target file.

Figure 9.

The re-run analysis after optimization is shown in the following table:

Automatic horizontal expansion

Kafka-etl is now scheduled every 05:00 hours. Each ETL gets the latest offset of each topic&partition. Combine the kafkaEvent to be consumed with the offset consumed in the last hour. Because the latest offset is not controllable each time, in some cases the message offset of some Topic&partition grows very fast, and the number of Kafka topic partitions cannot be adjusted, resulting in the delay of ETL consumption processing. Affecting downstream business processes:

  • Due to capacity expansion, faults and other reasons, missed data or historical data need to be collected. In this case, the message offset of topic&& Partition grows very fast. It is not reliable to only rely on Kafka Topic Partiton for capacity expansion. After completing the collection, the added partition must be deleted.

  • The user behavior data collected will be several times or dozens times more than usual during the peak user traffic on weekends, holidays, June 18 and Double 11. However, the number of topic partitions cannot be expanded and the capacity needs to be reduced after expansion.

Whether Kafka ETL automatically scales horizontally is not strongly dependent on the number of Kafka topic partitions. If a topic kafkaEvent needs to process too much data, evaluate the maximum number of kafkaevents that a single Mapper can consume in a reasonable time range, then divide the kafkaEvent level into several sub-KafkaEvents and distribute them to each mapper for processing. This avoids the need for a single Mapper to process too many Kafkaevents at a time, resulting in delays, and improves horizontal scaling. The logic for splitting is shown in Figure 10:

Figure 10.

We will optimize automatic horizontal scaling for the following two points:

  • If the total message data processed by a single mapper is large, the number of Mapper is expanded and split is generated for load balancing.

  • The message processing speed of each format is different, and some Mapper burdens may be heavy during allocation. Therefore, a certain weight is configured for each format, and kafkaEvent is allocated according to the number of messages and weight.

Meitu is doing these things. See?

Meitu big data platform architecture practice

Practice and exploration of Meitu personalized recommendation

Meitu distributed Bitmap practice: Naix

Video clustering scheme based on user behavior

Application of depth model DNN in personalized recommendation scenario

Meitu AB Test Practice: Meepo system