sequence

This paper mainly studies The SpoutWrapper of Flink

SpoutWrapper

Flink – storm_2. 11-1.6.2 – sources. The jar! /org/apache/flink/storm/wrappers/SpoutWrapper.java

/**
 * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
 * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see * {@link SpoutCollector} for supported types).

*

* Per default, {@link SpoutWrapper} calls the wrapped spout'
s {@link IRichSpout#nextTuple() nextTuple()} method in * an infinite loop.<br> * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper} * terminates if no record was emitted to the output collector for the first time during a call to * {@link IRichSpout#nextTuple() nextTuple()}.<br> * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until * {@link FiniteSpout#reachedEnd()} returns true. */ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction { //...... /** The number of {@link IRichSpout#nextTuple()} calls. */ private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop /** * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout's declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @param numberOfInvocations * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is * disabled. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0; 25]. */ public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations) throws IllegalArgumentException { this(spout, (Collection ) null, numberOfInvocations); } /** * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout' s declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0; 25]. */ public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { this(spout, (Collection<String>) null, null); } @Override public final void run(final SourceContext<OUT> ctx) throws Exception { final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); StormConfig stormConfig = new StormConfig();if(config ! = null) {if (config instanceof StormConfig) { stormConfig = (StormConfig) config; } else { stormConfig.putAll(config.toMap()); } } final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext( (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name, this.stormTopology, stormConfig); SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes, stormTopologyContext.getThisTaskId(), ctx); this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector)); this.spout.activate(); if (numberOfInvocations == null) { if (this.spout instanceof FiniteSpout) { final FiniteSpout finiteSpout = (FiniteSpout) this.spout; while (this.isRunning && !finiteSpout.reachedEnd()) { finiteSpout.nextTuple(); } } else { while(this.isRunning) { this.spout.nextTuple(); }}}else { int counter = this.numberOfInvocations; if (counter >= 0) { while((--counter >= 0) && this.isRunning) { this.spout.nextTuple(); }}else { do { collector.tupleEmitted = false; this.spout.nextTuple(); } while (collector.tupleEmitted && this.isRunning); } } } /** * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void cancel() { this.isRunning = false; } /** * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void stop() { this.isRunning = false; } @Override public void close() throws Exception { this.spout.close(); }}Copy the code
  • SpoutWrapper inherited RichParallelSourceFunction class, has realized the StoppableFunction the stop method of the interface
  • SpoutWrapper’s run method creates flink’s SpoutCollector as a constructor parameter for Storm’s SpoutOutputCollector, and then calls spout’s open method to wrap the SpoutCollector(flinkSpoutOutputCollector is passed to SPOUT to collect data from SPOUT emission
  • The spout.nextTuple() method is then called with the numberOfInvocations parameter to emit the data; NumberOfInvocations controls the number of times the nextTuple of spout is called. It can be set in the constructor when SpoutWrapper is created. If you use a constructor that has no numberOfInvocations parameter, this value is null. Said the infinite loop
  • Flink encapsulates Storm’s Spout and provides a FiniteSpout interface, which has a reachedEnd interface to determine whether data has been sent, and converts Storm’s Spout to Finite mode. If we were using Storm’s original spout, we would be looping over the nextTuple method
  • If numberOfInvocations is set and greater than or equal to 0, the nextTuple method is called the specified number of times; If the value is less than 0, the end of the loop is determined by the collector.tupleemitted value

SpoutCollector

Flink – storm_2. 11-1.6.2 – sources. The jar! /org/apache/flink/storm/wrappers/SpoutCollector.java

/** * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm * compatible output collector to the  wrapped spout. It transforms the emitted Storm tuples into * Flink tuples and emits them via the provide {@link SourceContext} object. */ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector  { /** The Flinksource context object. */
	private final SourceContext<OUT> flinkContext;

	/**
	 * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the
	 * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
	 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
	 *
	 * @param numberOfAttributes
	 *            The number of attributes of the emitted tuples.
	 * @param taskId
	 *            The ID of the producer task (negative value for unknown).
	 * @param flinkContext
	 *            The Flink source context to be used.
	 * @throws UnsupportedOperationException
	 *             ifthe specified number of attributes is greater than 25 */ SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException { super(numberOfAttributes, taskId); assert (flinkContext ! = null); this.flinkContext = flinkContext; } @Override protected List<Integer>doEmit(final OUT flinkTuple) {
		this.flinkContext.collect(flinkTuple);
		// TODO
		return null;
	}

	@Override
	public void reportError(final Throwable error) {
		// not sure, if Flink can support this
	}

	@Override
	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
		return this.tansformAndEmit(streamId, tuple);
	}

	@Override
	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
	}

	public long getPendingCount() {
		return0; }}Copy the code
  • SpoutCollector implements storm’s ISpoutOutputCollector interface and implements the EMIT, emitDirect, getPendingCount, reportError methods. Flink does not currently support the emitDirect method, and getPendingCount always returns 0. The reportError method is an empty operation
  • Flinkcontext.collect (flinkTuple) is called from doEmit to emit data. This method is protected and is mainly called for tansformAndEmit
  • The tansformAndEmit method is provided by the parent class AbstractStormCollector

AbstractStormCollector.tansformAndEmit

Flink – storm_2. 11-1.6.2 – sources. The jar! /org/apache/flink/storm/wrappers/AbstractStormCollector.java

	/**
	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
	 * to the specified output stream.
	 *
	 * @param The
	 *            The output stream id.
	 * @param tuple
	 *            The Storm tuple to be emitted.
	 * @return the return value of {@link #doEmit(Object)}
	 */
	@SuppressWarnings("unchecked")
	protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
		List<Integer> taskIds;

		int numAtt = this.numberOfAttributes.get(streamId);
		int taskIdIdx = numAtt;
		if (this.taskId >= 0 && numAtt < 0) {
			numAtt = 1;
			taskIdIdx = 0;
		}
		if (numAtt >= 0) {
			assert (tuple.size() == numAtt);
			Tuple out = this.outputTuple.get(streamId);
			for (int i = 0; i < numAtt; ++i) {
				out.setField(tuple.get(i), i);
			}
			if (this.taskId >= 0) {
				out.setField(this.taskId, taskIdIdx);
			}
			if (this.split) {
				this.splitTuple.streamId = streamId;
				this.splitTuple.value = out;

				taskIds = doEmit((OUT) this.splitTuple);
			} else {
				taskIds = doEmit((OUT) out); }}else {
			assert (tuple.size() == 1);
			if (this.split) {
				this.splitTuple.streamId = streamId;
				this.splitTuple.value = tuple.get(0);

				taskIds = doEmit((OUT) this.splitTuple);
			} else {
				taskIds = doEmit((OUT) tuple.get(0));
			}
		}
		this.tupleEmitted = true;

		return taskIds;
	}
Copy the code
  • AbstractStormCollector tansformAndEmit, mainly dealt with the split scenario here, that is, a spout declare more than one stream, finally have a subclass SpoutCollector. DoEmit to launch the data
  • If split is true, the doEmit method is passed splitTuple, or SplitStreamType, which records streamId and its value
  • If split is false, the type passed to the doEmit method is Tuple, which is equivalent to the value in SplitStreamType, with less streamId information than SplitStreamType

Task.run

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary forexample to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators  (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * 

Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task'

s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if(! transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); / /... }}Copy the code
  • Task’s run method calls invokable.invoke(), where Invokable is StreamTask

StreamTask

Flink – streaming – java_2. 11-1.6.2 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * 

The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * *

The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * *

The life cycle of the task is set up as follows: *

{@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * }

* *

The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param

* @param

*/ @Internal public abstract class StreamTask

> extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); / /... } finally { // clean up everything we initialized isRunning = false; / /... }}}
,>

Copy the code

  • StreamTask invoke method calls in subclasses of the run method of subclasses for StoppableSourceStreamTask here

StoppableSourceStreamTask

Flink – streaming – java_2. 11-1.6.2 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java

/**
 * Stoppable task for executing stoppable streaming sources.
 *
 * @param <OUT> Type of the produced elements
 * @param <SRC> Stoppable source function
 */
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
	extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {

	private volatile boolean stopped;

	public StoppableSourceStreamTask(Environment environment) {
		super(environment);
	}

	@Override
	protected void run() throws Exception {
		if(! stopped) { super.run(); } } @Override public voidstop() {
		stopped = true;
		if(this.headOperator ! = null) { this.headOperator.stop(); }}}Copy the code
  • StoppableSourceStreamTask inherited SourceStreamTask, mainly realizes the StoppableTask stop method, its run method directly from its parent class SourceStreamTask to implement

SourceStreamTask

Flink – streaming – java_2. 11-1.6.2 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

/**
 * {@link StreamTask} for executing a {@link StreamSource}.
 *
 * <p>One important aspect of this is that the checkpointing and the emission of elements must never
 * occur at the same time. The execution must be serial. This is achieved by having the contract
 * with the StreamFunction that it must only modify its state or emit elements in
 * a synchronized block that locks on the lock Object. Also, the modification of the state
 * and the emission of elements must happen in the same block of code that is protected by the
 * synchronized block.
 *
 * @param <OUT> Type of the output elements of this source.
 * @param <SRC> Type of the source function for the stream source operator
 * @param <OP> Type of the stream sourceoperator */ @Internal public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { //...... @Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }}Copy the code
  • The SourceStreamTask primarily calls the StreamSource’s Run method

StreamSource

Flink – streaming – java_2. 11-1.6.2 – sources. The jar! /org/apache/flink/streaming/api/operators/StreamSource.java

/**
 * {@link StreamOperator} for streaming sources.
 *
 * @param <OUT> Type of the output elements
 * @param <SRC> Type of the source function of this stream source operator
 */
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
		extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

	//......	

	public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
		run(lockingObject, streamStatusMaintainer, output);
	}

	public void run(final Object lockingObject,
			final StreamStatusMaintainer streamStatusMaintainer,
			final Output<StreamRecord<OUT>> collector) throws Exception {

		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
			? getExecutionConfig().getLatencyTrackingInterval()
			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

		LatencyMarksEmitter<OUT> latencyEmitter = null;
		if (latencyTrackingInterval > 0) {
			latencyEmitter = new LatencyMarksEmitter<>(
				getProcessingTimeService(),
				collector,
				latencyTrackingInterval,
				this.getOperatorID(),
				getRuntimeContext().getIndexOfThisSubtask());
		}

		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

		this.ctx = StreamSourceContexts.getSourceContext(
			timeCharacteristic,
			getProcessingTimeService(),
			lockingObject,
			streamStatusMaintainer,
			collector,
			watermarkInterval,
			-1);

		try {
			userFunction.run(ctx);

			// if we get here, then the user function either exited after being done (finite source)
			// or the function was canceled or stopped. For the finite source case, we should emit
			// a final watermark that indicates that we reached the end of event-time
			if(! isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closedin any case
			ctx.close();
			if(latencyEmitter ! = null) { latencyEmitter.close(); }}}Copy the code
  • It calls userfunction.run (CTX), where userFunction is SpoutWrapper, to trigger the nextTuple of spout

summary

  • Flink wraps Storm’s original spout with SpoutWrapper. It creates Flink’s SpoutCollector in the run method as a constructor parameter to Storm’s SpoutOutputCollector, and then calls spout’s open method. Wrap the SpoutCollector(flinkSpoutOutputCollector is passed to SPOUT to collect spOUT emission data; The spout.nextTuple() method is then called with the numberOfInvocations parameter to emit the data; NumberOfInvocations controls the number of times the nextTuple of spout is called. It can be set in the constructor when SpoutWrapper is created. If you use a constructor that has no numberOfInvocations parameter, this value is null. Said the infinite loop
  • SpoutCollector emit method calls the internal AbstractStormCollector. TansformAndEmit (It finally calls the spoutCollector.doemit method to emit), encapsulates the tuple of SplitStreamType to the doEmit method for multiple stream scenarios; If there is only one stream, just pass the normal tuple to the doEmit method
  • The Run method of Flink’s Task calls StreamTask’s Invoke method, which in turn calls subclasses (Subclasses of StoppableSourceStreamTask here) run method, StoppableSourceStreamTask run method is direct to the superclass SourceStreamTask implementation, and it is called the main StreamSource run method, The StreamSource’s run method calls userfunction.run (CTX), where userFunction is SpoutWrapper, to execute spout’s nextTuple logic. Launch via Flink’s SpoutCollector

doc

  • Storm Compatibility Beta