Flink Transformation includes four types: single-data flow basic Transformation, key-based packet Transformation, multi-data flow Transformation, and data redistribution Transformation. Readers can use the Flink Scala Shell or Intellij Idea for this exercise:

  • Flink Scala Shell tutorial

  • Intellij Idea development environment setup tutorial

  • Flink single data stream basic conversion: Map, filter, flatMap

  • Flink key-based group transformations: keyBy, Reduce, and Aggregations

  • Flink multi-data stream conversion: Union and Connect

parallelism

Flink uses the degree of parallelism to define how many operator subtasks an operator is sliced into. Most of the Transformation operations we’ve written forma logical view that, when actually run, splits the operators into one or more operator subtasks in parallel, each of which processes a portion of the data. As shown in the figure below, each operator executes in parallel on multiple sub-tasks. If the degree of parallelism of the operator is 2, then it has two instances.

The parallelism can be set uniformly at the execution environment level of a Flink job, so that the parallelism of all operators of the job can be set, or the parallelism of an operator can be set separately. If nothing is set, by default the parallelism of all operators in a job depends on the environment in which the job is executed. If a job is executed locally, parallelism defaults to the number of native CPU cores. When we submit the job to the Flink cluster, we need to use the client that submitted the job and specify a number of parameters, one of which is parallelism.

The following code shows how to get the default parallelism of the execution environment and how to change the parallelism of the execution environment.

val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// Get the default parallelism of the current execution environment
val defaultParallelism = senv.getParallelism

// Set the parallelism of all operators to 4, indicating that the number of instances of parallel execution of all operators is 4
senv.setParallelism(4)
Copy the code

We can also set the degree of parallelism for an operator:

dataStream.map(new MyMapper).setParallelism(defaultParallelism * 2)
Copy the code

Data redistribution

By default, data is automatically allocated to multiple instances. Sometimes, we need to manually distribute data among multiple instances. For example, we know that there is too much data on one instance and too little data on other instances, resulting in data skew. In this case, we need to evenly distribute data among all instances to avoid excessive load on some instances. The data skew problem can result in long computation time or insufficient memory for the entire job.

DataStream is the input and DataStream is the output of each data redistribution operator described below. KeyBy also has the ability to group and redistribute data, but keyBy outputs a KeyedStream.

shuffle

Shuffle distributes data randomly to each downstream operator instance based on normal distribution.

dataStream.shuffle()
Copy the code

Rebalance and rescale

Rebalance using round-ribon to evenly distribute data across instances. Round-ribon is a uniform distribution method commonly used in load balancing, in which upstream data is poll-distributed to all downstream instances. As shown in the figure below, the upstream operator sends data in turn to all the downstream operator instances.

dataStream.rebalance()
Copy the code

Rescale is similar to rebalance in that it distributes data evenly across downstream instances, but it has a lower transfer overhead because rescale does not poll each instance downstream, but rather sends it to the nearest downstream instance.

dataStream.rescale()
Copy the code

As shown in the diagram above, when there are two instances upstream, the first instance upstream sends data to the first and second instances downstream, and the second instance upstream sends data to the third and fourth instances downstream. The transfer overhead of rescale is less than that of rebalance sending data to each instance downstream. The figure below shows that when there are four instances upstream, the first two instances upstream send data to the first instance downstream, and the last two instances upstream send data to the second instance downstream.

broadcast

The word “broadcast” translates to broadcast. In Flink, data is copied and broadcast to all instances downstream.

dataStream.broadcast()
Copy the code

global

Global sends all data to the first instance of the downstream operator, so use this operator with care so as not to cause serious performance problems.

dataStream.global()
Copy the code

partitionCustom

We can also customize the data redistribution logic using partitionCustom. PartitionCustom takes two arguments. The first argument is a custom Partitioner, which we need to override the partition function inside. The second parameter is which field of the data flow to use the Partiton logic on. The partition function returns an integer indicating the number of downstream instances to which the element will be routed.

Case class (id: Long, name: String, score: String); Double) this data structure is evenly distributed among downstream instances according to ID, so the generic T is the data type Long of ID. Also, the generic T is the data type of the first argument to the partition(key, numPartitions) function. When we call the partitionCustom(Partitioner, Field), the first argument is the partitioner we overwrote, and the second argument represents processing by the ID field.

The following code redistributes data according to the second field in the data stream, which, when it contains a number, is routed to the first half of the downstream operator, otherwise to the second half. If parallelism is set to 4, which means that the number of instances of all operators is 4, or that there are 4 partitions, then the element will be allocated to the 0 and 1 instances if the string contains numbers, and to the 2 and 3 instances otherwise.

package com.flink.tutorials.api.transformations

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._

object PartitionCustomExample {

  /** * Partitioner[T] where the generic T is the specified field type * overwrites the partiton function and reallocates all elements in the data flow according to the T field ** /
  class MyPartitioner extends Partitioner[String] {

    val rand = scala.util.Random

    /** * key generic T is the field by which data is redistributed, in this case, the String in (Int, String) * numPartitons is how many parallel instances there are currently ** / The function returns an Int which is the number of instances the element will be sent downstream ** /
    override def partition(key: String, numPartitions: Int) :Int = {
      var randomNum = rand.nextInt(numPartitions / 2)

      If the string contains a number, the element will be routed to the first half, otherwise it will be routed to the second half.
      if (key.exists(_.isDigit)) {
        return randomNum
      } else {
        return randomNum + numPartitions / 2}}}def main(args: Array[String) :Unit = {

    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // Get the default parallelism of the current execution environment
    val defaultParalleism = senv.getParallelism

    // Set the parallelism of all operators to 4, indicating that the number of instances of parallel execution of all operators is 4
    senv.setParallelism(4)

    val dataStream: DataStream[(Int.String)] = senv.fromElements((1."123"), (2."abc"), (3."256"), (4."zyx"), (5."bcd"), (6."666"))



    // Use the redistribution logic from MyPartitioner for the second field in (Int, String)
    val partitioned = dataStream.partitionCustom(new MyPartitioner.1)

    partitioned.print()

    senv.execute("partition custom transformation")}}Copy the code