0 x00 the

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm. This article and the following will introduce how the online learning algorithm FTRL is implemented in Alink, hoping to help you.

0 x01 concept

Since Alink implements LR + FTRL, we need to start with logistic regression LR.

1.1 Logistic regression

Although the Logistic Regression is called Regression, it is actually a classification model and is often used for dichotomies. The essence of Logistic regression is to assume that the data obey this distribution, and then use maximum likelihood estimation to estimate the parameters.

The idea of logistic regression is to first fit the decision boundary (not limited to linear, but also can be polynomial), and then establish the probability relation between the boundary and the classification, so as to get the probability under the binary classification.

1.1.1 Derivation process

Let’s start with linear regression. In some cases, it is not feasible to use linear function to fit the law and take the threshold value. It doesn’t work because the fitting function is too straight and outliers (also known as outliers) have too much influence on the result. But our overall idea is not wrong, what is wrong is to use too “straight” fitting function, if we used to fit the function is nonlinear, not so straight, isn’t it better?

So let’s do two things:

  • Find a way to solve the regression function is heavily affected by outliers.
  • Select a threshold.

For the first thing, we bend the regression function with sigmod.

For dichotomous problems, 1 represents positive cases and 0 represents negative cases. Logistic regression is to find a hypothesis function h_θ(x) = g(θ, x) on the basis of the predicted actual value of the output of the linear function, and map the actual value to between 0,1. Logistic function was selected as the activation function in logistic regression, and logistic function is an important representative of Sigmoid function (function with shape S).

For the second thing, we chose a threshold of 0.5.

That is, when I choose a threshold of 0.5, anything less than 0.5 must be negative, even if it is 0.49. Is it necessarily accurate to say that a sample is negative? Well, not necessarily, because there’s still a 49% chance that it’s positive. However, even if the probability of it being a positive example is 0.1, we randomly select 1W samples to make the prediction, and there will still be nearly 100 errors in predicting it to be a negative example, which turns out to be a positive example. No matter how you choose, there are errors, so when we choose a threshold, we are choosing an acceptable degree of error.

1.1.2 to solve

So now we have a basic idea of where logistic regression came from, and we know that the discriminant function for logistic regression is


h ( z ) = 1 1 + e x . z = W T X h(z) = \frac{1}{1+e^{-x}},z = W^TX

How to solve logistic regression? That is, how to find a set of W’s with the highest probability that h of z will all be correct.

There are many methods to solve logistic regression, we mainly talk about gradient descent and Newton’s method. The main objective of optimization is to find a direction in which the value of the loss function can be reduced by moving the parameter. This direction is usually obtained by the combination of the first partial derivative or the second partial derivative.

Gradient descent is to find the descent direction by the first derivative of J(w) with respect to W, and update parameters in an iterative way.

The basic idea of Newton’s method is to find the next estimate of the minimum point by performing a second-order Teller expansion on J(w) near the existing estimate of the minimum point.

1.1.3 Stochastic gradient descent

When N in the sample data is very large, the random gradient descent method is usually adopted, and the algorithm is shown as follows:

while {
    for i in range(0,m):
        w_j = w_j + a * g_j
}
Copy the code

The advantage of stochastic gradient descent is that distributed parallelization can be realized. The specific calculation process is as follows:

  1. At the time of each iteration, a certain proportion of samples are randomly sampled as the calculation samples of the current iteration.
  2. For each sample in the calculation sample, the computational gradients of different features are calculated separately.
  3. Through the aggregation function, the gradient of all the features of the calculated samples is accumulated, and the cumulative gradient and loss of each feature are obtained.
  4. Finally, the parameters are updated according to the latest gradient and previous parameters.
  5. Calculate the error value of the loss function according to the updated parameters. If the error value of the loss function reaches the allowable range, stop the iteration; otherwise, repeat Step 1

1.2 Parallel computation of LR

From the solution methods of logistic regression, we can find that these algorithms need to calculate gradients, so the parallelization of logistic regression is the parallelization of objective function gradient calculation.

We can see that in the gradient vector calculation of the objective function, only the dot product and addition of vectors are needed. Each iteration process can be easily divided into independent calculation steps, which are independently calculated by different nodes, and then merged to calculate the results.

Therefore, parallel LR is actually parallelized for the gradient direction calculation in finding the descent direction of the loss function in the process of solving the optimal solution of the loss function, and parallelization can also be used in the process of determining the descent direction by using the gradient.

If the sample matrix is divided into rows, the sample feature vectors are distributed to different computing nodes, and each computing node completes the dot product and sum calculation of the samples it is responsible for, and then the calculation results are merged, “parallel LR by row” can be achieved.

According to line parallel LR have solved the problem of sample size, but the actual situation will exist in the high-dimensional feature vector for logistic regression scenarios (such as advertising system up to hundreds of millions of the feature dimension), just press line parallel processing, unable to meet the needs of this kind of scenario, so need according to the list of the high-dimensional feature vector split into several small vector to solve.

1.3 Traditional machine learning

The traditional machine learning development process is basically the following steps:

  1. Data fusion, acquisition of training and evaluation data sets.
  2. Feature engineering.
  3. Build models, such as LR, FM, etc.
  4. The optimal solution is obtained by training the model.
  5. Evaluate model effectiveness.
  6. Save the model and train online using the valid model of the training.

There are two major bottlenecks to this approach:

  1. The model update cycle is slow and cannot effectively reflect the online changes. The fastest time is at the level of hours, usually at the level of days or even weeks.
  2. The model has few parameters and the prediction effect is poor. When model parameters are on multiple lines, predict requires large memory, which cannot be guaranteed by QPS.

For example, the traditional Batch algorithm calculates the whole training data set (such as global gradient calculation) in each iteration, which has the advantage of good accuracy and convergence, but the disadvantage is that it cannot effectively process the large data set (the cost of global gradient calculation is too high at this time), and cannot be applied to data stream for online learning.

Generally speaking, there are two ways to solve these problems:

  • The algorithm of on-line learning is adopted for 1.
  • For 2, some optimization methods are adopted to obtain sparse solutions as far as possible on the premise of ensuring accuracy, so as to reduce the number of model parameters.

1.4 Online Learning

Online learning (OnlineLearningOnlineLearning) represents a series of machine learning algorithms, and is characterized by each to a sample can be training, according to online feedback data, real-time adjusted model quickly and makes the model reflect the change of line in time, improve the online prediction accuracy.

The traditional training method is generally static after the model training goes online and does not have any interaction with the online situation. When the prediction error is added, it can only be corrected in the next update, which usually takes a long time.

Online Learning has different training methods. It will dynamically adjust the model according to the Online prediction results, add model prediction errors, and make timely corrections. Therefore, Online Learning can respond to Online changes in a more timely manner.

The optimization goal of Online Learning is to minimize the overall loss function, and it needs to quickly solve the optimal solution of the objective function.

The characteristics of online learning algorithm are as follows: Every time a training sample comes, loss and gradient generated by the sample are used to iterate the model and train data one by one. Therefore, large data volume training and online training can be processed. Commonly used have the online gradient descent (OGD) and the stochastic gradient descent (SGD) and so on, the nature of thought is in above [problem description] * * without and the loss of a single data function L do gradient descent (w, zi), because each step in the direction of the * * is not the global optimal, so the whole present will be a seemingly random fall in line.

1.5 FTRL

FTR is the predecessor of FTRL. The idea is to find the parameter that minimizes the sum of the loss functions of all previous samples each time.

FTRL, namely Follow The Regularized Leader, was created in previous work with The main purpose of improving sparsity and meeting accuracy requirements. On the basis of FTL’s optimization objective, FTRL adds regularization to prevent over-fitting.

The loss function of FTRL is also not easy to solve. In this case, it is generally necessary to find a proxy loss function.

The agent loss function needs to meet the following conditions:

  1. The proxy loss function is easier to solve, preferably with an analytical solution.
  2. The smaller the difference between the solution of the proxy loss function and the solution of the original function, the better

In order to measure the difference between two solutions in condition 2, we introduce regret.

1.5.1 regret & sparsity

For online learning in general, we focus on solving two problems: reducing regret and improving Sparsity. Regret, regret, regret, regret


R e g r e t = t = 1 T l t ( w t ) m i n w t = 1 T l t ( w ) Regret = \ sum_ {t = 1} ^ Tl_t (w_t) – min_w \ sum_ {t = 1} ^ Tl_t (w)

Where T represents iteration t in the total t round, ℓt representing the loss function, and W representing the parameter to be learned. We Regret the difference between the solution of the proxy function and the solution of the real loss function.

The second term refers to the optimal solution of the loss function after all the samples are obtained. Since online learning can only update parameters according to a few samples at a time, the randomness is large, so a robust optimization method is needed.

Theoretically, it can be proved that if an online learning algorithm can guarantee that regret is a sublinear function of t, then:


lim t up R e g r e t ( t ) t = 0 \ lim_ – up} {t \ frac {Regret (t)} {t} = 0

Then with the increase of training samples, the model learned online is infinitely close to the optimal model. That is, with the increase of training samples, the gap between the actual loss values of the parameters calculated by the proxy loss function and the original loss function becomes smaller and smaller. Not surprisingly, FTRL fulfills this feature.

On the other hand, sparsity, or sparsity of the model, is also valued in reality. It is not uncommon to see hundreds of millions of features. The more complex the model, the higher the storage and time resources required, while the sparse model will greatly reduce the memory and complexity of prediction. In addition, sparse models are relatively interpretable, which is the advantage of L1 regularization.

1.5.2 Pseudo-code of FTRL

Per-coordinate means that FTRL is trained and updated separately for each dimension of W, and each dimension uses a different learning rate, which is the one before LAMDA2 in the code above. And w all dimension compared to use unified learning rate, this method considers the training sample itself on the different characteristics of the distribution inhomogeneity, if contain w one dimension characteristic of the training sample is small, each sample is all very precious, so the characteristic dimension of the corresponding training rate alone can maintain larger values, each to a contains the characteristics of the sample, You can take a big step forward in the gradient of that sample without having to force the same pace of progress as the other characteristic dimensions.

1.5.3 Brief Understanding

Let’s look at the updated formula of feature weight at the next moment to increase understanding (I personally think the explanation found is relatively easy to understand) :

In the formula, the first term is an estimate of the contribution of the loss function, the second term is to control w (model) not to change too much in each iteration, and the third term represents L1 regularization (to obtain sparse solutions).

0x02 Sample code

We use the official Alink sample code. We can see that it is roughly divided into several parts:

  • The feature processing pipeline, including StandardScaler and FeatureHasher, was established to carry out standardized scaling and feature hash, and finally the feature vector was obtained. For this part, please refer to Alink’s discussion (9) : feature hash/standardized scaling of feature engineering;
  • Real-time segmentation of stream data source to obtain original training data and original prediction data. For this part, please refer to Alink’s Ramabout: How to Divide training data set and test data set;
  • A logistic regression model is trained as the initial model of FTRL algorithm for the cold start of the system. Logistic regression is similar to linear regression, so you can refer to these two articles: Alink ramble (10) : Data preprocessing for linear regression implementation and Alink Ramble (11) : L-BFGS optimization for linear regression;
  • On the basis of the initial model, FTRL online training is carried out.
  • On the basis of FTRL online model, the prediction is made by connecting the prediction data.
  • For details, see Alink’s Ramble (8) : How to achieve binary evaluation of AUC, K-S, PRC, Precision, Recall and LiftChart;

As you can probably tell, I’ve done a lot of work to dissect FTRL……

public class FTRLExample {

    public static void main(String[] args) throws Exception {...// setup feature engineering pipeline
        Pipeline featurePipeline = new Pipeline()
                .add(
                        new StandardScaler() // Standard zoom
                                .setSelectedCols(numericalColNames)
                )
                .add(
                        new FeatureHasher() // Feature hash
                                .setSelectedCols(selectedColNames)
                                .setCategoricalCols(categoryColNames)
                                .setOutputCol(vecColName)
                                .setNumFeatures(numHashFeatures)
                );
        // Build the feature engineering pipeline
        // fit feature pipeline model
        PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);
      
        // prepare stream train data
        CsvSourceStreamOp data = new CsvSourceStreamOp()
                .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv")
                .setSchemaStr(schemaStr)
                .setIgnoreFirstLine(true);
      
        // The original training data and the original prediction data are obtained by real-time segmentation of the stream data source
        // split stream to train and eval data
        SplitStreamOp splitter = new SplitStreamOp().setFraction(0.5).linkFrom(data);
      
        // Train a logistic regression model as the initial model of FTRL algorithm, which is required for the cold start of the system.
        // train initial batch model
        LogisticRegressionTrainBatchOp lr = new LogisticRegressionTrainBatchOp()
                .setVectorCol(vecColName)
                .setLabelCol(labelColName)
                .setWithIntercept(true)
                .setMaxIter(10); BatchOperator<? > initModel = featurePipelineModel.transform(trainBatchData).link(lr);// Perform FTRL online training based on the initial model
        // ftrl train
        FtrlTrainStreamOp model = new FtrlTrainStreamOp(initModel)
                .setVectorCol(vecColName)
                .setLabelCol(labelColName)
                .setWithIntercept(true)
                .setAlpha(0.1)
                .setBeta(0.1)
                .setL1(0.01)
                .setL2(0.01)
                .setTimeInterval(10)
                .setVectorSize(numHashFeatures)
                .linkFrom(featurePipelineModel.transform(splitter));
      
        // On the basis of the FTRL online model, connect the prediction data for prediction
        // ftrl predict
        FtrlPredictStreamOp predictResult = new FtrlPredictStreamOp(initModel)
                .setVectorCol(vecColName)
                .setPredictionCol("pred")
                .setReservedCols(new String[]{labelColName})
                .setPredictionDetailCol("details")
                .linkFrom(model, featurePipelineModel.transform(splitter.getSideOutput(0)));
      
        // Evaluate the flow of predicted results
        // ftrl eval
        predictResult
                .link(
                        new EvalBinaryClassStreamOp()
                                .setLabelCol(labelColName)
                                .setPredictionCol("pred")
                                .setPredictionDetailCol("details")
                                .setTimeInterval(10)
                )
                .link(
                        new JsonValueStreamOp()
                                .setSelectedCol("Data")
                                .setReservedCols(new String[]{"Statistics"})
                                .setOutputCols(new String[]{"Accuracy"."AUC"."ConfusionMatrix"})
                                .setJsonPath(new String[]{"$.Accuracy"."$.AUC"."$.ConfusionMatrix"}) ) .print(); StreamOperator.execute(); }}Copy the code

0 x03 problem

It is better to use questions to guide the analysis. Here are some of the questions that come to mind.

  • Are there prefabricated models for the training phase and the prediction phase for “cold start”?
  • How do the training phase and the prediction phase relate?
  • How to pass the trained model to the prediction stage?
  • When the output model, how to deal with the model too large?
  • What is the mechanism by which the model of online training is updated? Is it timing driver update?
  • Is it possible to predict during model loading in the prediction stage? Is there a mechanism to make it predictable during that time?
  • What stages of training and prediction use parallel processing?
  • How to deal with high dimensional vectors? Cut them apart?

We’ll explore these questions later.

0x04 General Logic

Online training is implemented in the FtrlTrainStreamOp class, whose linkFrom function implements the basic logic.

The main logic is:

  • 1) Load the initialization model into dataBridge; dataBridge = DirectReader.collect(model);
  • 2) Obtain relevant parameters. For example, if the default vectorSize is 30000, is it hasInterceptItem?
  • 3) Obtain segmentation information. splitInfo = getSplitInfo(featureSize, hasInterceptItem, parallelism); We’ll use that in a second.
  • 4) Slice higher dimensional vectors. The initial data has been hashed to a feature, and it produces a higher dimensional vector, which needs to be cut. InitData. FlatMap (new SplitVector (splitInfo hasInterceptItem, vectorSize, vectorTrainIdx, featureIdx, labelIdx));
  • 5) build a IterativeStream ConnectedIterativeStreams iteration, it will build two data flow (or connection) : training feedback flow and flow;
  • 6) Use Iteration to build iterativeBody, which includes two parts: CalcTask and ReduceTask;
    • CalcTask is divided into two parts. FlatMap1 is required by FTRL iteration for distribution calculation predict, and flatMap2 is the updated parameter part of FTRL.
    • ReduceTask can be divided into two functions: merge the results of these predict calculations/merge the model and output the model to downstream operators if the conditions are met;
  • 7) the result = iterativeBody. Filter; Basically, it is judged by the standard of time interval (it can also be considered as time driven). The data with “unexpired time & meaningful vector” will be sent back to the feedback data stream, continue to iterate and return to Step 6), and enter flatMap2.
  • 8) output = iterativeBody. Filter; Data that meets the criteria (time expired) will jump out of the iteration, and the algorithm will call WriteModel to convert LineModelData into multiple rows, which will be forwarded to the downstream operator (i.e., the online prediction phase). That is, update the model regularly to the online prediction stage.

The code summary is:

@Override
public FtrlTrainStreamOp linkFrom(StreamOperator
       ... inputs) {...// 3) Obtain segmentation information
    final int[] splitInfo = getSplitInfo(featureSize, hasInterceptItem, parallelism);

    DataStream<Row> initData = inputs[0].getDataStream();

    // 4) Segmentation of higher dimensional vectors.
    // Tuple5<SampleId, taskId, numSubVec, SubVec, label>
    DataStream<Tuple5<Long, Integer, Integer, Vector, Object>> input
        = initData.flatMap(new SplitVector(splitInfo, hasInterceptItem, vectorSize,
        vectorTrainIdx, featureIdx, labelIdx))
        .partitionCustom(new CustomBlockPartitioner(), 1);

    // train data format = <sampleId, subSampleTaskId, subNum, SparseVector(subSample), label>
    // feedback format = Tuple7<sampleId, subSampleTaskId, subNum, SparseVector(subSample), label, wx, timeStamps>
    . / / 5) build a IterativeStream ConnectedIterativeStreams iteration, it will build two data flow (or connection) : training feedback flow and flow;
    IterativeStream.ConnectedIterativeStreams<Tuple5<Long, Integer, Integer, Vector, Object>,
        Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>
        iteration = input.iterate(Long.MAX_VALUE)
        .withFeedbackType(TypeInformation
            .of(new TypeHint<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() {}));

    // 6) Use Iteration to build iterativeBody, which includes two parts: CalcTask and ReduceTask;
    DataStream iterativeBody = iteration.flatMap(
        new CalcTask(dataBridge, splitInfo, getParams()))
        .keyBy(0)
        .flatMap(new ReduceTask(parallelism, splitInfo))
        .partitionCustom(new CustomBlockPartitioner(), 1);

    // 7)result = iterativebody.filter; Basically, it is judged by the standard of time interval (it can also be considered as time driven). The data with "unexpired time & meaningful vector" will be sent back to the feedback data stream, continue to iterate and return to Step 6), and enter flatMap2.
    DataStream<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>
        result = iterativeBody.filter(
        new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() {
            @Override
            public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> t3)
                throws Exception {
                // if t3.f0 > 0 && t3.f2 > 0 then feedback
                return (t3.f0 > 0 && t3.f2 > 0); }});// 8)output = iterativebody.filter; Data that meets the criteria (time expired) will jump out of the iteration, and the algorithm will call WriteModel to convert LineModelData into multiple rows, which will be forwarded to the downstream operator (i.e., the online prediction phase). That is, update the model regularly to the online prediction stage.
    DataStream<Row> output = iterativeBody.filter(
        new FilterFunction<Tuple7<Long, Integer, Integer, Vector, Object, Double, Long>>() {
            @Override
            public boolean filter(Tuple7<Long, Integer, Integer, Vector, Object, Double, Long> value)
                throws Exception {
                /* if value.f0 small than 0, then output */
                return value.f0 < 0;
            }
        }).flatMap(new WriteModel(labelType, getVectorCol(), featureCols, hasInterceptItem));
   
    // Specifies that a stream will be the end of the iteration and that this stream will be fed back to the iteration as the second input
    iteration.closeWith(result);

    TableSchema schema = newLinearModelDataConverter(labelType).getModelSchema(); .this.setOutput(output, names, types);
    return this;
}
Copy the code

For ease of reading, the flow chart is shown as follows (the split training/test dataset is omitted here) :

Forgive me for drawing this way, because I hate to see a good article only to find it missing…

-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the initial stage of training model │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ trainBatchData │ │ trainStreamData │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ featurePipeline │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ linear regression model │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- │ The online training │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ dataBridge │ │ loading initialization model featurePipeline │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ obtain segmentation information getSplitInfo │ inputs [0]. GetDataStream () │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ │ SplitInfo │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ eigenvector │ │ SplitVector │ < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ parsing the input, DataStream<Tuple5<SampleId, taskId, numSubVec, SubVec, Label > > input │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ < Tuple5, Tuple7 > iteration │ iterative build, two input train data Tuple5 < >, Feedback data Tuple7 < > └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ CalcTask logically divided into two modules: flatMap1, FlatMap2 │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ CalcTask. FlatMap1 │ input Tuple5 < > │ CalcTask. FlatMap2 Tuple7 │ inputs < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ FTRL distributed computation algorithm of predict │ feedback/update data distribution parameters/cumulative expires a │ │ │ │ │ │ │ │ < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - │ │ above both flatmap output to the following ReduceTask │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ ReduceTask. FlatMap │1.If time expires & all collection is complete, merge/output model (value.f0 <0) │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘2.Has not expired, merge and predict every CalcTask calculation, forming a lable y │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ result = filter │if t3.f0 > 0 && t3.f2 > 0or not ? │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ │ │if t3.f0 > 0 && t3.f2 > 0Then ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- > │ CalcTask. FlatMap2 │ output Tuple7 -- -- -- -- -- -- -- -- -- │"Time is not expired & vectors make sense"Will send back feedback, continue to iterative └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ if no form feedback data flow, continued to filter │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ output = filter │if value.f0 small than 0or not ? └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │if value.f0 small than 0, then the output │ accord with a standard data is out of date (time) will jump out of the iteration, the output model │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ WriteModel │ filter out because it is over, So regular output model └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- │ online prediction stage │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ testStreamData │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ FTRL Predict │ < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- │ featurePipeline │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘Copy the code

Because the image above can be distorted on your phone, the following image is for you to view on your phone.

0 XFF reference

[Machine learning] Logistic regression (very detailed)

Logistic Regression

Machine learning Distributed (parallelized) implementation of LR

Parallel logistic regression

Machine learning algorithms and their parallelization

Online LR — FTRL algorithm understanding

Principle and implementation of online optimization algorithm FTRL

LR+FTRL algorithm principle and engineering implementation

Iterative API analysis of Flink stream processing

Derivation of FTRL formula

FTRL paper notes

Introduction to FTRL(Follow-the-Regularized-Leader) algorithm for online machine learning

FTRL code implementation

FTRL field LR+FTRL (code using dense data)

Online learning algorithm FTRL-Proximal principle

Online CTR prediction algorithm based on FTRL

Ftrl-proximal for CTR prediction algorithm

A detailed explanation of FTRL, an online learning algorithm widely used by companies

Online Optimization v: FTRL

FOLLOW THE REGULARIZED LEADER (FTRL) algorithm summary

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.