sequence

This paper mainly studies the Reduce operation of Flink KeyedStream

The instance

    @Test
    public void testWordCount() throws Exception {
        // Checking input parameters
//        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
//        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text = env.fromElements(WORDS);

        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                            @Override
                            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                                System.out.println("value1:"+value1.f1+"; value2:"+value2.f1);
                                returnnew Tuple2<>(value1.f0, value1.f1 + value2.f1); }}); // emit result System.out.println("Printing result to stdout. Use --output to specify output path.");
        counts.print();

        // execute program
        env.execute("Streaming WordCount");
    }
Copy the code
  • Here, reduce operation is performed on KeyedStream, ReduceFunction is customized, and word count is accumulated in reduce method

KeyedStream.reduce

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public public class KeyedStream<T, KEY> extends DataStream<T> { //...... /** * Applies a reduce transformation on the grouped data stream grouped on by * the given key position. The {@link ReduceFunction} will receive input * values based on the key value. Only input values with the same key will * go to the  same reducer. * * @param reducer * The {@link ReduceFunction} that will be calledfor every
	 *            element of the input values with the same key.
	 * @return The transformed DataStream.
	 */
	public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
				clean(reducer), getType().createSerializer(getExecutionConfig())));
	}

	@Override
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> transform(String operatorName,
			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

		SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);

		// inject the key selector and key type
		OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
		transform.setStateKeySelector(keySelector);
		transform.setStateKeyType(keyType);

		return returnStream; } / /... }Copy the code
  • KeyedStream’s reduce method calls the Transform method, and the OneInputStreamOperator constructed is StreamGroupedReduce

ReduceFunction

Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/functions/ReduceFunction.java

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {

	/**
	 * The core method of ReduceFunction, combining two values into one value of the same type.
	 * The reduce function is consecutively applied to all values of a group until only a single value remains.
	 *
	 * @param value1 The first value to combine.
	 * @param value2 The second value to combine.
	 * @return The combined value of both input values.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	T reduce(T value1, T value2) throws Exception;
}
Copy the code
  • ReduceFunction defines the Reduce method, which is mainly used to operate two values of the same type into a value of the same type. The first parameter is the result of the previous Reduce and the second parameter is the current element

Task.run

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary forexample to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators  (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * 

Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { // ---------------------------- // Initial State transition // ---------------------------- //...... // all resource acquisitions and registrations from here on // need to be undone in the end Map > distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // now load and instantiate the task' ,>

s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if(! transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); / /... } catch (Throwable t) { //...... } finally { //...... }}}Copy the code
  • Task’s run method calls Invokable.invoke (), where Invokable is OneInputStreamTask, and OneInputStreamTask inherits from StreamTask, The invoke() method actually called here is inside StreamTask

StreamTask.invoke

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
		extends AbstractInvokable
		implements AsyncExceptionHandler {

	//......

	protected abstract void run() throws Exception;

	@Override
	public final void invoke() throws Exception {

		boolean disposed = false;
		try {
			// -------- Initialize ---------
			LOG.debug("Initializing {}.", getName());

			asyncOperationsThreadPool = Executors.newCachedThreadPool();

			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

			synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
				getExecutionConfig().isFailTaskOnCheckpointError(),
				getEnvironment());

			asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

			stateBackend = createStateBackend();
			checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

			// if the clock is not already set.then assign a default TimeServiceProvider
			if (timerService == null) {
				ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
					"Time Trigger for " + getName(), getUserCodeClassLoader());

				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
			}

			operatorChain = new OperatorChain<>(this, streamRecordWriters);
			headOperator = operatorChain.getHeadOperator();

			// task specific initialization
			init();

			// save the work of reloading state, etc, if the task is already canceled
			if (canceled) {
				throw new CancelTaskException();
			}

			// -------- Invoke --------
			LOG.debug("Invoking {}", getName());

			// we need to make sure that any triggers scheduled in open() cannot be
			// executed before all operators are opened
			synchronized (lock) {

				// both the following operations are protected by the lock
				// so that we avoid race conditions in the case that initializeState()
				// registers a timer, that fires before the open() is called.

				initializeState();
				openAllOperators();
			}

			// final check to exit early before starting to run
			if(canceled) { throw new CancelTaskException(); } / /let the task do its work
			isRunning = true;
			run();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			LOG.debug("Finished task {}", getName()); / /... } finally { //...... }}}Copy the code
  • The StreamTask invoke method calls the Run method, which is an abstract method implemented by subclasses, in this case the Run method of OneInputStreamTask

OneInputStreamTask.run

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {

	private StreamInputProcessor<IN> inputProcessor;

	private volatile boolean running = true;

	private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

	/**
	 * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
	 *
	 * @param env The task environment for this task.
	 */
	public OneInputStreamTask(Environment env) {
		super(env);
	}

	/**
	 * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
	 *
	 * <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if
	 * null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService}
	 * will be used.
	 *
	 * @param env The task environment for this task.
	 * @param timeProvider Optionally, a specific time provider to use.
	 */
	@VisibleForTesting
	public OneInputStreamTask(
			Environment env,
			@Nullable ProcessingTimeService timeProvider) {
		super(env, timeProvider);
	}

	@Override
	public void init() throws Exception {
		StreamConfig configuration = getConfiguration();

		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
		int numberOfInputs = configuration.getNumberOfInputs();

		if (numberOfInputs > 0) {
			InputGate[] inputGates = getEnvironment().getAllInputGates();

			inputProcessor = new StreamInputProcessor<>(
					inputGates,
					inSerializer,
					this,
					configuration.getCheckpointMode(),
					getCheckpointLock(),
					getEnvironment().getIOManager(),
					getEnvironment().getTaskManagerInfo().getConfiguration(),
					getStreamStatusMaintainer(),
					this.headOperator,
					getEnvironment().getMetricGroup().getIOMetricGroup(),
					inputWatermarkGauge);
		}
		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
		// wrap watermark gauge since registered metrics must be unique
		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
	}

	@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

	@Override
	protected void cleanup() throws Exception {
		if(inputProcessor ! = null) { inputProcessor.cleanup(); } } @Override protected voidcancelTask() {
		running = false; }}Copy the code
  • Cycle OneInputStreamTask run method will call inputProcessor. ProcessInput (), for StreamInputProcessor inputProcessor here

StreamInputProcessor.processInput

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

@Internal
public class StreamInputProcessor<IN> {

	//......

	public boolean processInput() throws Exception {
		if (isFinished) {
			return false;
		}
		if (numRecordsIn == null) {
			try {
				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); }}while (true) {
			if(currentRecordDeserializer ! = null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {
					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
					currentRecordDeserializer = null;
				}

				if (result.isFullRecord()) {
					StreamElement recordOrMark = deserializationDelegate.getInstance();

					if (recordOrMark.isWatermark()) {
						// handle watermark
						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
						continue;
					} else if (recordOrMark.isStreamStatus()) {
						// handle stream status
						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
						continue;
					} else if (recordOrMark.isLatencyMarker()) {
						// handle latency marker
						synchronized (lock) {
							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
						}
						continue;
					} else {
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							streamOperator.setKeyContextElement1(record);
							streamOperator.processElement(record);
						}
						return true; }}} / /... }} / /... }Copy the code
  • StreamInputProcessor’s processInput method will continuously process nextRecord in a while True loop, depending on the type of StreamElement. If it is normal data, Call the streamOperator processElement processed, here streamOperator StreamGroupedReduce

StreamGroupedReduce.processElement

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/operators/StreamGroupedReduce.java

/**
 * A {@link StreamOperator} for executing a {@link ReduceFunction} on a
 * {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
 */

@Internal
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
		implements OneInputStreamOperator<IN, IN> {

	private static final long serialVersionUID = 1L;

	private static final String STATE_NAME = "_op_state";

	private transient ValueState<IN> values;

	private TypeSerializer<IN> serializer;

	public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
		super(reducer);
		this.serializer = serializer;
	}

	@Override
	public void open() throws Exception {
		super.open();
		ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
		values = getPartitionedState(stateId);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		IN value = element.getValue();
		IN currentValue = values.value();

		if(currentValue ! = null) { IN reduced = userFunction.reduce(currentValue, value); values.update(reduced); output.collect(element.replace(reduced)); }else{ values.update(value); output.collect(element.replace(value)); }}}Copy the code
  • StreamGroupedReduce uses ValueState to store the result of the Reduce operation, calls the Reduce operation of userFunction in the processElement method, UserFunction is a user-defined ReduceFunction, and the first parameter of Reduce is the value of ValueState, that is, the result value of the last Reduce operation, and the second parameter is the value of the current Element. After the reduce operation of userFunction is performed, the result is updated to ValueState

summary

  • The transform method is called in the reduce method of KeyedStream, and the OneInputStreamOperator constructed is StreamGroupedReduce; The Reduce method receives the ReduceFunction, which defines the Reduce method for manipulating two values of the same type into one value of the same type
  • Task’s run method calls Invokable.invoke (), where Invokable is OneInputStreamTask, and OneInputStreamTask inherits from StreamTask, The invoke() method actually called here is from StreamTask; The StreamTask invoke method calls the Run method, which is an abstract method implemented by subclasses. Here is OneInputStreamTask’s Run method; OneInputStreamTask run method, cycle will call inputProcessor. ProcessInput (), inputProcessor here for StreamInputProcessor; StreamInputProcessor’s processInput method will continuously process nextRecord in a while True loop, depending on the type of StreamElement. If it is normal data, Call the streamOperator processElement processed, here streamOperator StreamGroupedReduce
  • StreamGroupedReduce’s processElement method calls userFunction’s Reduce operation. The first parameter is ValueState’s value, the result of the last Reduce operation. The second argument is the value of the current element; After the reduce operation of userFunction is performed, the result is updated to ValueState

doc

  • datastream-transformations