Alink’s Rampage (9) : Feature hashing/standardized scaling for feature engineering

0 x00 the

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm. This article will analyze Alink “feature engineering” part of the corresponding code implementation.

0x01 Related Concepts

1.1 Feature Engineering

Feature engineering of machine learning is the process of converting raw input data into features to better represent potential problems and help improve the accuracy of prediction models.

Finding the right features is a difficult and time-consuming task that requires expert knowledge, and applied machine learning is basically known as feature engineering. However, feature engineering has a great impact on the application of machine learning models. There is a saying that “data and features determine the upper limit of performance of machine learning models”.

The input characteristics of machine learning include:

  • Numerical characteristics: including integer, floating point, etc., can have sequential meaning, or unordered data.
  • Classification characteristics: such as ID, gender, etc.
  • Time feature: time series such as month, year, quarter, date, hour, etc.
  • Spatial features: latitude and longitude, etc., can be converted into postcodes, cities, etc.
  • Text features: documents, natural language, statements, etc.

Feature engineering processing skills are as follows:

  • Packing (Binning)

  • One-hot Encoding

  • Hashing

  • Embedding method

  • Log Transformation

  • Scaling of features

  • Normalization.

  • Feature Interaction

This article will explain the implementation of feature scaling and feature hashing.

1.2 Scaling

Feature scaling is a method used to standardize the range of features of independent variables or data. In data processing, it is also called data normalization and is usually performed during the data preprocessing step. Feature scaling can limit a wide range of data to a specified range. Because the value range of the raw data varies greatly, in some machine learning algorithms, the objective function will not work properly without standardization. For example, most classifiers calculate the distance between two points in Euclidean terms. If one of the elements has a wide range of values, the distance is controlled by that particular element. Therefore, the range of all features should be normalized so that each feature is roughly proportional to the final distance.

Another reason for applying feature scaling is that gradient descent and feature scaling converge much faster than without it. Feature scaling mainly includes two types:

  • Min-max Scaling
  • Standard(Z) Scaling

1.3 Hashing

The input requirements of most machine learning algorithms are real number matrix. The transformation of original data into real number matrix is called feature engineering, and feature hashing (also known as hashing technique) is a feature engineering technology.

The goal of feature hashing is to convert a data point into a vector or compress the original high-dimensional feature vector into a lower-dimensional feature vector without losing the expression ability of the original feature.

Feature hash uses hash functions to convert original data into hash values within a specified range. Compared with the unique heat model, it has many advantages, such as supporting online learning and reducing the dimension.

For example, we carry out feature hashing for heroes in Liangshan, taking Guan Sheng as an example:

Name: Guan Sheng

Row name: Sit in fifth place

Native Place: Yuncheng (Yuncheng city, Shanxi Province)

Smart: Broadsword

Weapon: Green Dragon Crescent Blade

Star: Astral star

Appearance: eight feet five or six body, fine three mustache, two eyebrows into the temples, phoenix eyes to the sky, such as heavy jujube face, lips if coated with zhu.

Prototype: In the early southern Song Dynasty, Liu Yu served as the governor of Jinan, and the Jin Army attacked Jinan. Liu Yu was tempted by the jin men to kill guan Sheng, the general guarding the army, and surrendered jin. This story was narrated by Chen Chen of the Qing Dynasty and written into the Legend of The Water Margin. This may be the prototype in the novel.

Opening round: 063rd round

Descendants: Guan Ling, in the “Yue Quan Biography” appeared, Yue Yun’s adopted brother.

Above are the original input data, including numerical features, classification features, text features and so on, which cannot be recognized by the computer, and must be converted into data that can be recognized by the computer with feature hash.

After conversion the following (fictitious, just show use ^_^) :

// Suppose the result is a sparse vector of size 30000, the following format is :" index":"value"

"725":"0.8223484445229384" / / name
"1000":"0.8444219609970856" / / name
"4995":"0.18307661612028242" / / penetration
"8049":"0.060151616110215377" Herein / / the number
"8517":"0.7340742756048447" / / wu
"26798":":-0.734299689415312" / / star number
"24390":"0.545435" / / phase
"25083":"0.4543543" / / the original type
"25435":"0.243432" // Turn out
"25721":"0.7340742756048447" / / after generation
Copy the code

This turns guan Sheng into a vector that can be processed by the program.

0 x02 data set

Our data set and sample code were both taken from FTRLExample.

Let’s start with the data set.

String schemaStr 
                = "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, "
                + "site_category string, app_id string, app_domain string, app_category string, device_id string, "
                + "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, "
                + "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int";
                
// Print out the first few columns
trainBatchData.firstN(5).print(); id|click|dt|C1|banner_pos|site_id|site_domain|site_category|app_id|app_domain|app_category|device_id|device_ip|device_mo del|device_type|device_conn_type|C14|C15|C16|C17|C18|C19|C20|C21 --|-----|--|--|----------|-------|-----------|-------------|------|----------|------------|---------|---------|--------- ---|-----------|----------------|---|---|---|---|---|---|---|---3199889859719711212|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|cfa82746|c6263d8a|1|0|15708|320|50|1722|0|35|-1|79
3200127078337687811|0|14102101|1005|1|e5c60a05|7256c623|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|ffb0e59a|83ca6fdb|1|0|19771|320|50|2227|0|687|100075|48
3200382705425230287|1|14102101|1005|0|85f751fd|c4e18dd6|50e219e0|98fed791|d9b5648e|0f2161f8|a99f214a|f69683cc|f51246a7|1|0|20984|320|50|2371|0|551|-1|46
320073658191290816|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|8e5b1a31|711ee120|1|0|15706|320|50|1722|0|35|100083|79
3200823995473818776|0|14102101|1005|0|f282ab5a|61eb5bc4|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|9cf693b4|8a4875bd|1|0|18993|320|50|2161|0|35|-1|157
Copy the code

0x03 Sample code

As you can see from the sample code, feature scaling is done first, followed by feature hashing.

String[] selectedColNames = new String[]{
                "C1"."banner_pos"."site_category"."app_domain"."app_category"."device_type"."device_conn_type"."C14"."C15"."C16"."C17"."C18"."C19"."C20"."C21"."site_id"."site_domain"."device_id"."device_model"};
String[] categoryColNames = new String[]{
                "C1"."banner_pos"."site_category"."app_domain"."app_category"."device_type"."device_conn_type"."site_id"."site_domain"."device_id"."device_model"};
String[] numericalColNames = new String[]{
                "C14"."C15"."C16"."C17"."C18"."C19"."C20"."C21"};

// setup feature engineering pipeline
Pipeline featurePipeline = new Pipeline()
        .add(   // Feature scaling
                new StandardScaler()
                        .setSelectedCols(numericalColNames) // Perform a transform on a column of type Double
        )
        .add(   // Feature hash
                new FeatureHasher()
                        .setSelectedCols(selectedColNames)
                        .setCategoricalCols(categoryColNames)
                        .setOutputCol(vecColName)
                        .setNumFeatures(numHashFeatures)
        );
// fit feature pipeline model
PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);
Copy the code

0x04 Standardized scaling StandardScaler

The role of StandardScaler is to convert standard deviation and/or zero mean each feature of the data set. transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean.

The online article makes a good point about the benefits of feature scaling:

When x is all positive or all negative, the gradient returned each time will change in only one direction, that is, the gradient will change in the direction indicated by the red arrow, too much up one moment, too much down the next. In this way, the convergence efficiency of weights is very low.

But when the number of positive and negative x is “about the same”, the direction of the gradient change can be “corrected”, accelerating the convergence of the weights.

Let’s think about what we need to do if we do standardized scaling:

  • We need to compute means, stdEnv, etc. for the columns we need to process, and we need to traverse the entire table. So this is the training process.
  • It is necessary to traverse every data in the whole table according to the values of means and stdEnv trained above, and apply the means and stdEnv results to calculate one by one. So this is a Mapper process.

4.1 StandardScalerTrainBatchOp

StandardScalerTrainBatchOp standardized scale related work has been done. Only numeric columns are converted here.

/* StandardScaler transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean. */
public class StandardScalerTrainBatchOp extends BatchOperator<StandardScalerTrainBatchOp>
    implements StandardTrainParams<StandardScalerTrainBatchOp> {

    @Override
    public StandardScalerTrainBatchOp linkFrom(BatchOperator
       ... inputs) { BatchOperator<? > in = checkAndGetFirst(inputs); String[] selectedColNames = getSelectedCols(); StandardScalerModelDataConverter converter =new StandardScalerModelDataConverter();
        converter.selectedColNames = selectedColNames;
        converter.selectedColTypes = new TypeInformation[selectedColNames.length];

        // Get the column to be converted
        for (int i = 0; i < selectedColNames.length; i++) {
            converter.selectedColTypes[i] = Types.DOUBLE;
        }

// Get the following variables
converter = {StandardScalerModelDataConverter@9229} 
 selectedColNames = {String[8] @9228} 
  0 = "C14"
  1 = "C15"
  2 = "C16"
  3 = "C17"
  4 = "C18"
  5 = "C19"
  6 = "C20"
  7 = "C21"
 selectedColTypes = {TypeInformation[8] @9231} 
  0 = {FractionalTypeInfo@9269} "Double"
  1 = {FractionalTypeInfo@9269} "Double"
  2 = {FractionalTypeInfo@9269} "Double"
  3 = {FractionalTypeInfo@9269} "Double"
  4 = {FractionalTypeInfo@9269} "Double"
  5 = {FractionalTypeInfo@9269} "Double"
  6 = {FractionalTypeInfo@9269} "Double"
  7 = {FractionalTypeInfo@9269} "Double"      
      
        // summarize with statisticshelper. summary, and then operate with BuildStandardScalerModel
        DataSet<Row> rows = StatisticsHelper.summary(in, selectedColNames)
            .flatMap(new BuildStandardScalerModel(converter.selectedColNames,
                converter.selectedColTypes,
                getWithMean(),
                getWithStd()));

        this.setOutput(rows, converter.getModelSchema());

        return this;
}
Copy the code

So I’ll print out the call stack for you when you build the execution plan.

summarizer:277, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
summarizer:240, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
summary:71, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
linkFrom:49, StandardScalerTrainBatchOp (com.alibaba.alink.operator.batch.dataproc)
train:22, StandardScaler (com.alibaba.alink.pipeline.dataproc)
fit:34, Trainer (com.alibaba.alink.pipeline)
fit:117, Pipeline (com.alibaba.alink.pipeline)
main:59, FTRLExample (com.alibaba.alink)
Copy the code

StandardScalerTrainBatchOp. LinkFrom constructed logically the execution of the plan is:

  • 1) Get the column information to be converted
  • 2) Use the obtained column information to summarize by StatisticsHelper. Summary (StatisticsHelper class is a tool class for Batch Statistical Calculation)
    • 2.1) Obtain table statistics using summarizer
      • 2.1.1) use in = in.select(selectedColNames); Gets the data corresponding to the columns in the input data that need to be adjusted
      • 2.1.2) Call the summarizer function of the same name to perform statistics on IN
        • 2.1.2.1) Call TableSummarizerPartition to collect data statistics for each partition.
          • 2.1.2.1.1) Tablesummarizer. visit Calculates the Row(that is, each in data) passed to this partition and generates statistics such as squareSum, min, Max, normL1.
        • 2.1.2.2) Go back to the Summarizer function and call reduce to summarize the statistics for all partitions.
    • 2.2) Summarizer.tosummary () is called to map the summarizer result. TableSummary is obtained, which is a simple statistic.
  • 3) Generate model/store operations on statisticShelper. summary results via flatMap(BuildStandardScalerModel)
    • 3.1) BuildStandardScalerModel. Call flatMap StandardScalerModelDataConverter. Save
      • 3.1.1) data. The add (JsonConverter. ToJson (means)); Deposit means
      • 3.1.2) data. The add (JsonConverter. ToJson (stdDevs)); Save stdDevs

The specific combination code is as follows

4.2 StatisticsHelper. The summary

Statisticshelper. summary First calls summarizer to summarize the original input table, corresponding to code 2.)

/* table summary, selectedColNames must be set. */
public static DataSet<TableSummary> summary(BatchOperator in, String[] selectedColNames) {
    return summarizer(in, selectedColNames, false) // Code 2.1 will be called.)
        .map(new MapFunction<TableSummarizer, TableSummary>() {
            @Override
            public TableSummary map(TableSummarizer summarizer) throws Exception {
                return summarizer.toSummary(); // The corresponding code is 2.2)
            }
        }).name("toSummary");
}
Copy the code

Summarizer (in, selectedColNames, false) takes the selected columns from the original input and proceeds to call another function of the same name, summarizer.

/** * table stat */
private static DataSet<TableSummarizer> summarizer(BatchOperator in, String[]  selectedColNames, boolean calculateOuterProduct) { // Corresponding code 2.1)
    in = in.select(selectedColNames); // Code 2.1.1)
    return summarizer(in.getDataSet(), calculateOuterProduct, getNumericalColIndices(in.getColTypes()), selectedColNames); // Corresponding code 2.1.2)
}
Copy the code

TableSummarizerPartition Is a function with the same name that summarizer calls TableSummarizerPartition. TableSummarizerPartition is used for each partition. After processing each partition, it goes back to the reduce function here to merge.

/* given data, return summary. numberIndices is the indices of cols which are number type in selected cols. */
private static DataSet<TableSummarizer> summarizer(DataSet<Row> data, boolean bCov, int[] numberIndices, String[] selectedColNames) {
    return data // mapPartition 对应代码 2.1.2.1)
        .mapPartition(new TableSummarizerPartition(bCov, numberIndices, selectedColNames))
        .reduce(new ReduceFunction<TableSummarizer>() { // reduce corresponding code 2.1.2.2)
            @Override
            public TableSummarizer reduce(TableSummarizer left, TableSummarizer right) {
                return TableSummarizer.merge(left, right); // Merge the result of all partitions}}); }Copy the code

TableSummarizerPartition For each partition, let each worker use tablesumMarizer. visit for table summary, which will be merged later. Corresponding code 2.1.2.1.1).

/* It is table summary partition of one worker, will merge result later. */
public static class TableSummarizerPartition implements MapPartitionFunction<Row.TableSummarizer> {
    @Override
    public void mapPartition(Iterable<Row> iterable, Collector<TableSummarizer> collector) {
        TableSummarizer srt = new TableSummarizer(selectedColNames, numericalIndices, outerProduct);
        srt.colNames = selectedColNames;
        for(Row sv : iterable) { srt = (TableSummarizer) srt.visit(sv); } collector.collect(srt); }}// The variables are as follows
srt = {TableSummarizer@10742} "count: 0\n"
sv = {Row@10764} ",50,1722,0,35, 15708320-1,"
srt.colNames = {String[8] @10733} 
 0 = "C14"
 1 = "C15"
 2 = "C16"
 3 = "C17"
 4 = "C18"
 5 = "C19"
 6 = "C20"
 7 = "C21"  
Copy the code

As you can see, tablesumMarizer.visit is looped over iterable. That is, cumulative calculation is performed on each input item (which is a Row of columns corresponding to srt.colNames) by visit to calculate squareSum, min, Max, normL1, etc., as shown in the following variables.

this = {TableSummarizer@10742} "Count: 1\nsum: 15708.0 320.0 50.0 1722.0 0.035.0-1.079.0 \nsquareSum: 2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0\nmin: 15708.0 320.0 50.0 1722.0 0.0 35.0-1.0 79.0\nmax: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
 colNames = {String[8] @10733} 
 xSum = null
 xSquareSum = null
 xyCount = null
 numericalColIndices = {int[8] @10734} 
 numMissingValue = {DenseVector@10791} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0"
 sum = {DenseVector@10792} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
 squareSum = {DenseVector@10793} "2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0"
 min = {DenseVector@10794} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
 max = {DenseVector@10795} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
 normL1 = {DenseVector@10796} "15708.0 320.0 50.0 1722.0 0.0 35.0 1.0 79.0"
 vals = {Double[8] @10797} 
 outerProduct = null
 count = 1
 calculateOuterProduct = false
Copy the code

4.3 BuildStandardScalerModel

The function here is to generate the model/store.

/* table summary build model. */
public static class BuildStandardScalerModel implements FlatMapFunction<TableSummary.Row> {
    private String[] selectedColNames;
    private TypeInformation[] selectedColTypes;
    private boolean withMean;
    private boolean withStdDevs;

    @Override
    public void flatMap(TableSummary srt, Collector<Row> collector) throws Exception {
        if (null! = srt) { StandardScalerModelDataConverter converter =new StandardScalerModelDataConverter();
            converter.selectedColNames = selectedColNames;
            converter.selectedColTypes = selectedColTypes;
            / / business
            converter.save(new Tuple3<>(this.withMean, this.withStdDevs, srt), collector); }}}Copy the code

The save function call is StandardScalerModelDataConverter. Save, logic is clear:

  1. Storage scheme
  2. Storage stdDevs
  3. Build the metadata Params
  4. serialization
  5. Sending serialized results
/* * Serialize the model data to "Tuple3
      
       , List
       
        >". * * @param modelData The model data to serialize. * @return The serialization result. */
       
      ,>
@Override
public Tuple3<Params, Iterable<String>, Iterable<Row>> serializeModel(Tuple3<Boolean, Boolean, TableSummary> modelData) {
    Boolean withMean = modelData.f0;
    Boolean withStandarDeviation = modelData.f1;
    TableSummary summary = modelData.f2;

    String[] colNames = summary.getColNames();
    double[] means = new double[colNames.length];
    double[] stdDevs = new double[colNames.length];

    for (int i = 0; i < colNames.length; i++) {
        means[i] = summary.mean(colNames[i]); // 1. Store mean
        stdDevs[i] = summary.standardDeviation(colNames[i]); // 2. Store stdDevs
    }

    for (int i = 0; i < colNames.length; i++) {
        if(! withMean) { means[i] =0;
        }
        if(! withStandarDeviation) { stdDevs[i] =1; }}// 3. Build metadata Params
    Params meta = new Params()
        .set(StandardTrainParams.WITH_MEAN, withMean)
        .set(StandardTrainParams.WITH_STD, withStandarDeviation);

    // 4. Serialize
    List<String> data = new ArrayList<>();
    data.add(JsonConverter.toJson(means));
    data.add(JsonConverter.toJson(stdDevs));

    return new Tuple3<>(meta, data, new ArrayList<>());
}
Copy the code

The call stack and variables are shown below, and we can see how the model is constructed.

save:68, RichModelDataConverter (com.alibaba.alink.common.model)
flatMap:84, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc)
flatMap:63, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc)
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
run:152, AllReduceDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
  
// The following is the input
modelData = {Tuple3@10723} 
 f0 = {Boolean@10726} true
 f1 = {Boolean@10726} true
 f2 = {TableSummary@10707} "colName|count|numMissingValue|numValidValue|sum|mean|variance|standardDeviation|min|max|normL1|normL2\r\n-------|-----| -- -- -- -- -- -- -- -- -- -- -- -- -- -- - | -- -- -- -- -- -- -- -- -- -- -- -- - | -- - | -- - | -- -- -- -- -- -- -- - | -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - | -- - | -- - | -- - | -- -- -- -- -- - \ nC14 | 399999 | | | 399999.0000 0.0000 7 257042877.0000 | | | | | | 21705.0000 375.0000 3315.6116 10993280.1107 18142.6525 7257042877.0000 11664445.8724 \ | nC15 | 399999 | | 0.0000 3 99999.0000 127629988.0000 | | | | | | | 1024.0000 120.0000 20.2814 411.3345 319.0758 127629988.0000 202208.2328 \ | nC16 | 399999 | | 399 0.0000 999.0000 22663266.0000 | | | | | | | 768.0000 20.0000 36.3690 1322.7015 56.6583 22663266.0000 42580.9842 \ | nC17 | 399999 | | 0.0000 399999.000 0 | | | | | | 112.0000 412.5124 170166.5008 2024.8148 809923879.0000 2497.0000 1306909.3634 \ | | 809923879.0000 nC18 | 399999 | | 399999 0.0000 0000 | | | | | | 0.0000 1.2598 1.5871 1.0360 414396.0000 3.0000 1031.5736 \ | | 414396.0000 nC19 | 399999 | | | | 77641159.0000 399999.0000 0.0000 | | | | | 1835.0000 33.0000 271.6367 73786.4929 194.1034 77641159.0000 211151.2756 \ | $12-inch nC20 | 399999 | | | 399999.0000 0.0000 16665597769.0000 | | | | | 49341.5620 2434589745.2799 41664.0986 1.0000 100"
  colNames = {String[8] @10728} 
  numMissingValue = {DenseVector@10729} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0"
  sum = {DenseVector@10730} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6665597769E10 3.0589982E7"
  squareSum = {DenseVector@10731} "1.36059297509295E14 4.0888169392E10 1.813140212E9 1.708012084269E12 1064144.0 4.4584861175E10 1.668188137320503E15 3.044124336 e9. ""
  min = {DenseVector@10732} "375.0 120.0 20.0 112.0 0.0 33.0 -1.0 13.0"
  max = {DenseVector@10733} "21705.0 1024.0 768.0 2497.0 3.0 1835.0 100248.0 195.0"
  normL1 = {DenseVector@10734} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6666064771E10 3.0589982E7"
  numericalColIndices = {int[8] @10735} 
  count = 399999  
  
// This is the output
model = {Tuple3@10816} "(Params {withMean=true, WithStd = true}, [[18142.652549131373, 319.07576768941925, 56.658306645766615, 2024.814759536899, 1.035992589981475, 194.1033827 584569416, 64.098582746454, 76.47514618786548], [3315.6115741652725, 20.281383913437733, 36.36896282478844, 412.51242496870356, 1.259797591740416, 271.6366927754722, 49341.56 204742555,41.974829196745965]], [])"
 f0 = {Params@10817} "Params {withMean=true, withStd=true}"
 f1 = {ArrayList@10820}  size = 2
  0 = "[18142.652549131373, 319.07576768941925, 56.658306645766615, 2024.814759536899, 1.035992589981475, 194.1033827584569, 41664.0 98582746454,76.47514618786548]. ""
  1 = "[3315.6115741652725, 20.281383913437733, 36.36896282478844, 412.51242496870356, 1.259797591740416, 271.6366927754722, 49341.5 6204742555,41.974829196745965]. ""
 f2 = {ArrayList@10818}  size = 0  
Copy the code

4.4 conversion mapper

After training, each item row will be mapped during conversion, in which means/stdDevs calculated before will be used for specific standardization.

@Override
public Row map(Row row) throws Exception {
    Row r = new Row(this.selectedColIndices.length);
    for (int i = 0; i < this.selectedColIndices.length; i++) {
        Object obj = row.getField(this.selectedColIndices[i]);
        if (null! = obj) {if (this.stddevs[i] > 0) {
                double d = (((Number) obj).doubleValue() - this.means[i]) / this.stddevs[i];
                r.setField(i, d);
            } else {
                r.setField(i, 0.0); }}}return this.predResultColsHelper.getResultRow(row, r);
}

// stddevs is the total value calculated before those columns, and is converted according to these.
this = {StandardScalerModelMapper@10909} 
 selectedColNames = {String[8] @10873} 
 selectedColTypes = {TypeInformation[8] @10874} 
 selectedColIndices = {int[8] @10912} 
 means = {double[8] @10913} 
  0 = 18142.652549131373.7 = 76.47514618786548
 stddevs = {double[8] @10914} 
  0 = 3315.6115741652725.7 = 41.974829196745965
Copy the code

The variables are as follows: Row is the input data, and r is the data generated after converting the data to be converted.

After standardization by OutputColsHelper. GetResultRow Row and r merge together.

row = {Row@10865} "3200382705425230287,1,14102101,1005,0,85 f751fd c4e18dd6, e219e0 50, 98 fed791 d9b5648e, 0 f2161f8, a99f214a, f69683cc, f51246a7, 1,0,20984,320,50,2371,0,551, 1 46-48"Among them"46-48,50,2371,0,551, 20984320-1"Is the data that needs to be transformed. r = {Row@10866} "0.8569602884149525, 0.04557047559108551, 0.18307661612028242, 0.8392116685682023, 0.8223484445229384, 1.313874843618953, 0 8444219609970856, 0.7260338343491822"Here is the normalized result of the above data that needs to be convertedCopy the code

The stack as follows

getResultRow:177, OutputColsHelper (com.alibaba.alink.common.utils)
map:88, StandardScalerModelMapper (com.alibaba.alink.operator.common.dataproc)
map:43, ModelMapperAdapter (com.alibaba.alink.common.mapper)
map:18, ModelMapperAdapter (com.alibaba.alink.common.mapper)
run:103, MapDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
Copy the code

0x05 Feature hash FeatureHasher

FeatureHasher completes feature hashing, which has no training, called mapper. The details are:

  • Project a categorical or numeric feature onto a feature vector for a given domain.
  • Use the MurMurHash3 algorithm.
  • For the categorical feature, use “colName=value” for the hash. ColName is the feature column name and value is the feature value. The correlation hash value is 1.0
  • For numerical features, use “colName” for the hash. The correlation hash is the eigenvalue
  • Categorical or numeric features are found automatically.

Look at the corresponding code.

5.1 Sparse Matrix

An eigenmatrix of size 30,000 with the final name “VEC” is generated. Here is the sparse matrix.

String vecColName = "vec";
int numHashFeatures = 30000;
// setup feature engineering pipeline
Pipeline featurePipeline = new Pipeline()
        .add(
                new StandardScaler()
                        .setSelectedCols(numericalColNames)
        )
        .add(
                new FeatureHasher()
                        .setSelectedCols(selectedColNames)
                        .setCategoricalCols(categoryColNames)
                        .setOutputCol(vecColName)
                        .setNumFeatures(numHashFeatures)
        );
Copy the code

5.2 FeatureHasherMapper

When passed to the map function, a Row is “the normalized data of the original data.”

The numerical characteristic column is traversed and the hash transformation is carried out. Traversing the categorical feature column for hash transformation.

public class FeatureHasherMapper extends Mapper {
    /**
     * Projects a number of categorical or numerical features into a feature vector of a specified dimension.
     *
     * @param row the input Row type data
     * @return the output row.
     */
    @Override
    public Row map(Row row) {
        TreeMap<Integer, Double> feature = new TreeMap<>();
        // Run the hash transform over the numeric feature column;
        for (int key : numericColIndexes) {
            if (null! = row.getField(key)) {doublevalue = ((Number)row.getField(key)).doubleValue(); String colName = colNames[key]; updateMap(colName, value, feature, numFeature); }}// Traverses the categorical feature column for hash transformation
        for (int key : categoricalColIndexes) {
            if (null! = row.getField(key)) { String colName = colNames[key]; updateMap(colName +"=" + row.getField(key).toString(), 1.0, feature, numFeature); }}return outputColsHelper.getResultRow(row, Row.of(newSparseVector(numFeature, feature))); }}// Run time prints the following variables
selectedCols = {String[19] @9817} 
 0 = "C1"
 1 = "banner_pos"
 2 = "site_category"
 3 = "app_domain"
 4 = "app_category"
 5 = "device_type"
 6 = "device_conn_type"
 7 = "C14"
 8 = "C15"
 9 = "C16"
 10 = "C17"
 11 = "C18"
 12 = "C19"
 13 = "C20"
 14 = "C21"
 15 = "site_id"
 16 = "site_domain"
 17 = "device_id"
 18 = "device_model"
   
numericColIndexes = {int[8] @10789} 
 0 = 16
 1 = 17
 2 = 18
 3 = 19
 4 = 20
 5 = 21
 6 = 22
 7 = 23
   
categoricalColIndexes = {int[11] @10791} 
 0 = 3
 1 = 4
 2 = 7
 3 = 9
 4 = 10
 5 = 14
 6 = 15
 7 = 5
 8 = 6
 9 = 11
 10 = 13   
Copy the code

5.3 Hashing updateMap

UpdateMap performs the specific hash operation, generating the index of the sparse matrix using the hash function, and then placing the value into the corresponding index.

The specific use org.apache.flink.shaded.guava18.com.google.common.hash hash function.

/* Update the treeMap which saves the key-value pair of the final vector, use the hash value of the string as key * and the accumulate the corresponding value. * * @param s the string to hash * @param value the accumulated value */
private static void updateMap(String s, double value, TreeMap<Integer, Double> feature, int numFeature) {
    // HASH = {Murmur3_32HashFunction@10755} "Hashing.murmur3_32(0)" 
    int hashValue = Math.abs(HASH.hashUnencodedChars(s).asInt());

    int index = Math.floorMod(hashValue, numFeature);
    if (feature.containsKey(index)) {
        feature.put(index, feature.get(index) + value);
    } else{ feature.put(index, value); }}Copy the code

For example, if the index is 26798, Value will be set in 26798 in VEC

s = "C14"
value = 0.33428145187593655
feature = {TreeMap@10836}  size = 1
 {Integer@10895} 26798 -> {Double@10896} 0.33428145187593655
numFeature = 30000
hashValue = 23306798
index = 26798
Copy the code

After the final feature is hashed, the resulting VEC is appended to the 25th item on the original Row (originally 24, now appended to the last), which is 24 = {SparseVector@10932} below.

row = {Row@10901} 
 fields = {Object[25] @10907} 
  0 = "3199889859719711212"
  1 = "0"
  2 = "14102101"
  3 = "1005"
  4 = {Integer@10912} 0
  5 = "1fbe01fe" // "device_type" is this value, this is the raw input, you can look back at the sample code output if you forget.
  6 = "f3845767"
  7 = "28905ebd"
  8 = "ecad2386"
  9 = "7801e8d9"
  10 = "07d7df22"
  11 = "a99f214a"
  12 = "cfa82746"
  13 = "c6263d8a"
  14 = "1"
  15 = "0"
  16 = {Double@10924} -0.734299689415312
  17 = {Double@10925} 0.04557047559108551
  18 = {Double@10926} -0.18307661612028242
  19 = {Double@10927} -0.7340742756048447
  20 = {Double@10928} -0.8223484445229384
  21 = {Double@10929} -0.5857212482334542
  22 = {Double@10930} -0.8444219609970856
  23 = {Double@10931} 0.060151616110215377
  24 = {SparseVector@10932} "$30000 $725:1000-0.8223484445229384:1.0:3044-4995:0.8444219609970856-0.18307661612028242-8049:0.060151616110215377 8517:1.0 10962, 1.0 17954, 1.0 18556, 1.0 21430, 1.0 23250, 1.0 24010:1.0 24390:25083-1.0:0.04557047559108551 25435:-0.5857212482334542 25721:-0.7340742756048447 26169:1.0 26798:-0.734299689415312 29671:1.0"
    
// 30000 indicates that there are a total of 30000 sparse vectors
// 725:-0.8223484445229384 indicates that the value in item 725 is -0.8223484445229384, and so on.
Copy the code

0 XFF reference

Why Zero-mean is Used in Deep Learning Image preprocessing

Feature Hashing of Data Feature Processing

Feature engineering related technology introduction

Feature Hashing

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.