Source data processing for a complete set of machine learning engineering solutions for industrial use


As we all know, the commonly used machine learning platform includes two modules: offline training and online prediction. Among them, the offline part is generally responsible for log data sorting, sample processing, feature processing and model training. The online part includes the process of online Inference (also known as online model Inference). As shown below:

For a relatively mature system, we will be in the front page (such as Html, App) collected by means of the SDK buried point log user behavior, generally including user actions like the tail, page request, exposure, click download, play, fill all kinds of behavior, such as it will also record log server return data, such as: For an advertising system, there are user request logs and AD delivery logs, among which one request log may correspond to multiple delivery logs.

After being processed by big data processing frameworks such as Flume, Kafka, and Storm, the system logs are stored in hive tables or HDFS text files on the big data platform. Generally, each log is saved as a row in the log file, covering several fields that uniquely determine the user behavior, such as timestamp, androidID, IMEI, userId, Requestid, user page ID, and user behavior. It’s basically a logic of who did what on a device at a certain time.

Hive SQL, Spark, or Flink are used to process these logs. Data in hive tables can be read and processed in various ways. The processing methods in 3 are mainly introduced here:

  1. hive sql
  2. sparksession sql
  3. spark Rdd

Method 1, log data processing -shell + hive SQL

Use shell script to drive Hive SQL to execute SQL statements to find source data and write several fields into fixed Hive tables. A code example is as follows:

@filename format_log.sh
#! /bin/bash
source ~/.bashrc
set -x

cur_day=$1
source_table_name=user_xxx_rcv_log
des_table_name=user_xxx_rcv_data
des_table_location="hdfs:/user/base_table/${des_table_name}"

#If the table does not exist, create a new table
${HIVE_HOME}/bin/hive  -e "
CREATE EXTERNAL TABLE IF NOT EXISTS ${des_table_name}(${column_name}) PARTITIONED BY (day STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE  LOCATION '${des_table_location}';
ALTER TABLE ${des_table_name} SET SERDEPROPERTIES('serialization.null.format' = '');
ALTER TABLE ${des_table_name} drop partition(day = ${cur_day});
"

#Example Delete an existing zone on the destination table
${HADOOP_HOME}/bin/hadoop fs -rm -r -skipTrash ${des_table_location}/day=${cur_day}

$HIVE_HOME/bin/hive -e " ALTER TABLE ${table_name} drop partition(day = ${cur_day});
ALTER TABLE ${table_name} add partition(day = ${cur_day});
"


#Run hive SQL to write data to the destination table
${HIVE_HOME}/bin/hive  -e "

set hive.exec.reducers.max = 100;
set hive.execution.engine=mr;
set mapreduce.map.java.opts=-Xmx4096M;
set mapreduce.map.memory.mb=4096;
set mapred.reduce.child.java.opts=-Xmx6g
set mapred.child.java.opts=-Xmx4g
set hive.exec.reducers.max = 100;

insert overwrite table ${des_table_name} partition(day = ${cur_day})
select timestamp,imei,userid,event_tyep from ${source_table_name} where date = ${cur_day}
"

RES=$?
if [ $RES -eq 0 ]
then
	echo "hive job finished!"
	${HADOOP_HOME}/bin/hadoop fs -touchz ${table_location}/day=${cur_day}/_SUCCESS
	exit 0
else
        echo "hive job Error !!!"
	exit -1
fi
Copy the code

Execute the above shell script, you can use

nohup sh -x format_log.sh 20210701 > rcv.log 2>&1 & 
Copy the code

Method 2, log data processing – SparkSession SQL

Saprk SQL uses Spark Session to execute SQL statements to process data and save data to a text file in the HDFS.

**talk is cheap, show the code !!! 民运分子

In this case, shell scripts are used to submit the Scala Spark task. The Scala Spark code is as follows:

@filename format_log.scalapackage Data

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object LogMiddleDataGenerate {
  def main(args: Array[String]) {
    val Array(event_day,all_logdata_path) = args
    val sparkConf = new SparkConf(a)val sparkSession = SparkSession.builder()
    .appName("LogMiddleDataGenerate")
    .config(sparkConf)
    .config("spark.kryoserializer.buffer.max"."1024m")
     .config("spark.serializer"."org.apache.spark.serializer.KryoSerializer")      .getOrCreate()
   val sc = sparkSession.sparkContext
   val day = event_day
    / / SQL statements
    val all_log_sql = " select timestamp,imei,userid,event_tyep from ${source_table_name} where date '"+ day +"'"
    val all_log_df = sparkSession.sql(all_log_sql).distinct()
      .rdd
      .map(e=>{
        e.mkString("\t")
      }).persist(StorageLevel.MEMORY_AND_DISK)
 val outputPartNum = math.ceil(all_log_df.count() / 400000).toInt
    all_log_df.repartition(outputPartNum).saveAsTextFile(all_logdata_path)
all_log_df.unpersist()
  }
}
Copy the code

To execute the Scala Spark task, type the preceding code into a JAR package and submit the Spark task using the following code.

@welcome to follow wechat public account: Path of algorithm full stack @filename spark_log_format.sh
#! /bin/shsource ~/.bash_profile set -x mvn clean package -U || exit echo "current working dir: $(pwd)" day=`date -d "2 day ago" +%Y%m%d` [ $# -ge 1 ] && day=$1 all_logdata_path=hdfs://dependy_data/all_logdata/${day} JAR_PATH =. / target/FCLPC - 1.0 - the SNAPSHOT. Jar class = Data. LogMiddleDataGenerate
${SPARK23_HOME}/bin/spark-submit \
    --master yarn  \
    --deploy-mode cluster \
    --class ${class}  \
    --driver-memory 10G \
    --executor-memory 6G  \
    --conf spark.driver.maxResultSize=8G \
    --conf spark.yarn.priority=VERY_HIGH \
    --conf spark.sql.hive.convertMetastoreParquet=false\
    --conf spark.sql.hive.convertMetastoreOrc=false\
    --conf spark.sql.hive.metastorePartitionPruning=false \
    ${JAR_PATH} \
    ${day}\
    ${all_logdata_path}\
Copy the code

Execute the above shell script, you can use

nohup sh -x spark_log_format.sh 20210701 > spark.log 2>&1 & 
Copy the code

Method 3: Spark Rdd for log data processing

Using Spark RDD is not very different from using SparkSession SQL. The sparkContext interface is used to read files in the Hdfs cluster, and the sc.textFile() interface is used to read files. And the above difference is not big, here will not be introduced in detail.


Here, the whole raw data processing process is introduced, you can use the above method to adapt to their own business code, feel useful to like and share it ~

Welcome to scan code to pay attention to the author’s public number: algorithm full stack road