sequence

In this paper, we study the flink RichParallelSourceFunction

RichParallelSourceFunction

/**
 * Base class for implementing a parallel data source. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>The data source has access to context information (such as the number of parallel
 * instances of the source, and which parallel instance the current instance is)
 * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
 * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
		implements ParallelSourceFunction<OUT> {

	private static final long serialVersionUID = 1L;
}
Copy the code
  • RichParallelSourceFunction ParallelSourceFunction interface is achieved, and at the same time inherited AbstractRichFunction

ParallelSourceFunction

Flink – streaming – java_2. 11-1.6.2 – sources. The jar! /org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java

/**
 * A stream data source that is executed in parallel. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>This interface acts only as a marker to tell the system that this source may
 * be executed in parallel. When different parallel instances are required to perform
 * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
 * context, which reveals information like the number of parallel tasks, and which parallel
 * task the current instance is.
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}
Copy the code
  • ParallelSourceFunction inherits the SourceFunction interface. It does not define any additional methods, but simply uses the interface name to express the intent, that is, a stream data source that can be executed in parallel

AbstractRichFunction

Flink – core – 1.6.2 – sources jar! /org/apache/flink/api/common/functions/AbstractRichFunction.java

/**
 * An abstract stub implementation for rich user-defined functions.
 * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
 * teardown ({@link #close()}), as well as access to their runtime execution context via
 * {@link #getRuntimeContext()}.*/ @Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private static final long serialVersionUID = 1L; // -------------------------------------------------------------------------------------------- // Runtime context access // -------------------------------------------------------------------------------------------- private transient  RuntimeContext runtimeContext; @Override public voidsetRuntimeContext(RuntimeContext t) {
		this.runtimeContext = t;
	}

	@Override
	public RuntimeContext getRuntimeContext() {
		if(this.runtimeContext ! = null) {return this.runtimeContext;
		} else {
			throw new IllegalStateException("The runtime context has not been initialized.");
		}
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
		if (this.runtimeContext == null) {
			throw new IllegalStateException("The runtime context has not been initialized.");
		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
			return (IterationRuntimeContext) this.runtimeContext;
		} else {
			throw new IllegalStateException("This stub is not part of an iteration step function.");
		}
	}

	// --------------------------------------------------------------------------------------------
	//  Default life cycle methods
	// --------------------------------------------------------------------------------------------

	@Override
	public void open(Configuration parameters) throws Exception {}

	@Override
	public void close() throws Exception {}
}
Copy the code
  • AbstractRichFunction mainly realized RichFunction interface setRuntimeContext, getRuntimeContext, getIterationRuntimeContext method; The open and close methods are null

RuntimeContext

Flink – core – 1.6.2 – sources jar! /org/apache/flink/api/common/functions/RuntimeContext.java

/**
 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 *
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
 */
@Public
public interface RuntimeContext {

	/**
	 * Returns the name of the task in which the UDF runs, as assigned during plan construction.
	 *
	 * @return The name of the task in which the UDF runs.
	 */
	String getTaskName();

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * @return The metric group for this parallel subtask.
	 */
	@PublicEvolving
	MetricGroup getMetricGroup();

	/**
	 * Gets the parallelism with which the parallel task runs.
	 *
	 * @return The parallelism with which the parallel task runs.
	 */
	int getNumberOfParallelSubtasks();

	/**
	 * Gets the number of max-parallelism with which the parallel task runs.
	 *
	 * @return The max-parallelism with which the parallel task runs.
	 */
	@PublicEvolving
	int getMaxNumberOfParallelSubtasks();

	/**
	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
	 * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
	 *
	 * @return The index of the parallel subtask.
	 */
	int getIndexOfThisSubtask();

	/**
	 * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
	 *
	 * @return Attempt number of the subtask.
	 */
	int getAttemptNumber();

	/**
	 * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
	 * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
	 * {@link #getNumberOfParallelSubtasks()}.
	 *
	 * @return The name of the task, with subtask indicator.
	 */
	String getTaskNameWithSubtasks();

	/**
	 * Returns the {@link org.apache.flink.api.common.ExecutionConfig} forthe currently executing * job. */ ExecutionConfig getExecutionConfig(); / /... }Copy the code
  • RuntimeContext defines a lot of ways, here we see the getNumberOfParallelSubtasks method, it can return to the current task parallelism; GetIndexOfThisSubtask gets the subscript of the current Parallel SubTask; From this information, you can develop ParallelSourceFunction that can execute in parallel but emit data separately without duplicating it

JobMaster.startJobExecution

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/jobmaster/JobMaster.java

	private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
		validateRunsInMainThread();

		checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

		if (Objects.equals(getFencingToken(), newJobMasterId)) {
			log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

			return Acknowledge.get();
		}

		setNewFencingToken(newJobMasterId);

		startJobMasterServices();

		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());

		resetAndScheduleExecutionGraph();

		return Acknowledge.get();
	}

	private void resetAndScheduleExecutionGraph() throws Exception {
		validateRunsInMainThread();

		final CompletableFuture<Void> executionGraphAssignedFuture;

		if (executionGraph.getState() == JobStatus.CREATED) {
			executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
		} else {
			suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
			final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
			final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);

			executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync(
				(JobStatus ignored, Throwable throwable) -> {
					assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
					return null;
				},
				getMainThreadExecutor());
		}

		executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
	}

	private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); }}Copy the code
  • Here call resetAndScheduleExecutionGraph method, while resetAndScheduleExecutionGraph combines scheduleExecutionGraph method; ScheduleExecutionGraph here call executionGraph. ScheduleForExecution () to dispatch

ExecutionGraph.scheduleForExecution

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/executiongraph/ExecutionGraph.java

	public void scheduleForExecution() throws JobException {

		final long currentGlobalModVersion = globalModVersion;

		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

			final CompletableFuture<Void> newSchedulingFuture;

			switch (scheduleMode) {

				case LAZY_FROM_SOURCES:
					newSchedulingFuture = scheduleLazy(slotProvider);
					break;

				case EAGER:
					newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
					break;

				default:
					throw new JobException("Schedule mode is invalid.");
			}

			if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
				schedulingFuture = newSchedulingFuture;

				newSchedulingFuture.whenCompleteAsync(
					(Void ignored, Throwable throwable) -> {
						if(throwable ! = null && ! (throwable instanceof CancellationException)) { // only failif the scheduling future was not canceled
							failGlobal(ExceptionUtils.stripCompletionException(throwable));
						}
					},
					futureExecutor);
			} else {
				newSchedulingFuture.cancel(false); }}else {
			throw new IllegalStateException("Job may only be scheduled from state "+ JobStatus.CREATED); }}Copy the code
  • This is in EAGER mode, so the scheduleEager method is called

ExecutionGraph.scheduleEager

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/executiongraph/ExecutionGraph.java

	/**
	 *
	 *
	 * @param slotProvider  The resource provider from which the slots are allocated
	 * @param timeout       The maximum time that the deployment may take, before a
	 *                      TimeoutException is thrown.
	 * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
	 * The future can also be completed exceptionally if an error happened.
	 */
	private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
		checkState(state == JobStatus.RUNNING, "job is not running currently");

		// Important: reserve all the space we need up front.
		// that way we do not have any operation that can fail between allocating the slots
		// and adding them to the list. If we had a failure in between there, that would
		// cause the slots to get lost
		final boolean queued = allowQueuedScheduling;

		// collecting all the slots may resize and fail in that operation without slots getting lost
		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

		// allocate the slots (obtain all their futures
		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
			// these calls are not blocking, they only return futures
			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
				slotProvider,
				queued,
				LocationPreferenceConstraint.ALL,
				allocationTimeout);

			allAllocationFutures.addAll(allocationFutures);
		}

		// this future is complete once all slot futures are complete.
		// the future fails once one slot future fails.
		final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

		final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture
			.thenAccept(
				(Collection<Execution> executionsToDeploy) -> {
					for (Execution execution : executionsToDeploy) {
						try {
							execution.deploy();
						} catch (Throwable t) {
							throw new CompletionException(
								new FlinkException(
									String.format("Could not deploy execution %s.", execution),
									t));
						}
					}
				})
			// Generate a more specific failure message for the eager scheduling
			.exceptionally(
				(Throwable throwable) -> {
					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
					final Throwable resultThrowable;

					if (strippedThrowable instanceof TimeoutException) {
						int numTotal = allAllocationsFuture.getNumFuturesTotal();
						int numComplete = allAllocationsFuture.getNumFuturesCompleted();
						String message = "Could not allocate all requires slots within timeout of " +
							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;

						resultThrowable = new NoResourceAvailableException(message);
					} else {
						resultThrowable = strippedThrowable;
					}

					throw new CompletionException(resultThrowable);
				});

		return currentSchedulingFuture;
	}
Copy the code
  • ScheduleEager calls getVerticesTopologically to obtain ExecutionJobVertex
  • Called after ExecutionJobVertex. AllocateResourcesForAll to allocate resources Collection < CompletableFuture < Execution > >
  • Finally, we wait for these futures via FutureUtils.combineAll(allAllocationFutures) and then call execution. Deploy () one by one

ExecutionJobVertex.allocateResourcesForAll

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

	/**
	 * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
	 * pairs of the slots and execution attempts, to ease correlation between vertices and execution
	 * attempts.
	 *
	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
	 *
	 * @param resourceProvider The resource provider from whom the slots are requested.
	 * @param queued if the allocation can be queued
	 * @param locationPreferenceConstraint constraint for the location preferences
	 * @param allocationTimeout timeout forallocating the individual slots */ public Collection<CompletableFuture<Execution>> allocateResourcesForAll( SlotProvider  resourceProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) { final ExecutionVertex[] vertices = this.taskVertices; final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length]; // try to acquire a slot futurefor each execution.
		// we store the execution with the future just to be on the safe side
		for (int i = 0; i < vertices.length; i++) {
			// allocate the next slot (future)
			final Execution exec = vertices[i].getCurrentExecutionAttempt();
			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
				resourceProvider,
				queued,
				locationPreferenceConstraint,
				allocationTimeout);
			slots[i] = allocationFuture;
		}

		// all good, we acquired all slots
		return Arrays.asList(slots);
	}
Copy the code
  • According to ExecutionJobVertex taskVertices here each call exec. AllocateAndAssignSlotForExecution allocated; You can see that the total parallelism is determined by the taskVertices

Execution.deploy

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/executiongraph/Execution.java

	/**
	 * Deploys the execution to the previously assigned resource.
	 *
	 * @throws JobException if the execution cannot be deployed to the assigned resource
	 */
	public void deploy() throws JobException {
		final LogicalSlot slot  = assignedResource;

		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); / /... try { //...... final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore tolet it be GC'ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture
      
        submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); submitResultFuture.whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure ! = null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + '
      )'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not  responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, executor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); }}Copy the code
  • After Execution. Deploy creates TaskDeploymentDescriptor, through taskManagerGateway. SubmitTask submitted the deployment; The TaskExecutor is then triggered to trigger the Task’s run method

ExecutionJobVertex

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

	private final ExecutionVertex[] taskVertices;

	public ExecutionJobVertex(
			ExecutionGraph graph,
			JobVertex jobVertex,
			int defaultParallelism,
			Time timeout,
			long initialGlobalModVersion,
			long createTimestamp) throws JobException {

		if(graph == null || jobVertex == null) { throw new NullPointerException(); } this.graph = graph; this.jobVertex = jobVertex; int vertexParallelism = jobVertex.getParallelism(); int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism; final int configuredMaxParallelism = jobVertex.getMaxParallelism(); this.maxParallelismConfigured = (VALUE_NOT_SET ! = configuredMaxParallelism); //if no max parallelism was configured by the user, we calculate and set a default
		setMaxParallelismInternal(maxParallelismConfigured ?
				configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

		// verify that our parallelism is not higher than the maximum parallelism
		if (numTaskVertices > maxParallelism) {
			throw new JobException(
				String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), numTaskVertices, maxParallelism)); } this.parallelism = numTaskVertices; this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; / /... // create all task verticesfor(int i = 0; i < numTaskVertices; i++) { ExecutionVertex vertex = new ExecutionVertex( this, i, producedDataSets, timeout, initialGlobalModVersion, createTimestamp, maxPriorAttemptsHistoryLength); this.taskVertices[i] = vertex; } / /... }Copy the code
  • TaskVertices are an ExecutionVertex[] whose size is determined by the numTaskVertices
  • ExecutionJobVertex = jobvert.getParallelism ();Generally greater than 0If jobvertex.getParallelism () is greater than 0, then jobVertex.getParallelism() is numTaskVertices; If not greater than 0 then defaultParallelism(When executionJobGraph's attachJobGraph method creates ExecutionJobVertex, pass defaultParallelism to 1)
  • Then we create executionVertices one by one from the numTaskVertices and place them into the taskVertices data
  • And jobVertex parallelism is StreamingJobGraphGenerator in createJobVertex approach based on streamNode. GetParallelism () to set (If streamNode getParallelism () value greater than 0)
  • StreamNode parallelism if he did not set, the default is to take StreamExecutionEnvironment parallelism (See the constructor of DataStreamSource, datastream. transform method, and DataStreamSink constructor for details. DataStreamSource resets parallelism for non-parallel sources to 1); If LocalEnvironment is used, it defaults to Runtime.getruntime ().availableprocessors ().

summary

  • RichParallelSourceFunction ParallelSourceFunction interface is achieved, and at the same time inherited AbstractRichFunction; AbstractRichFunction mainly realized RichFunction interface setRuntimeContext, getRuntimeContext, getIterationRuntimeContext method; RuntimeContext defined getNumberOfParallelSubtasks method (Returns the parallelism of the current task) and getIndexOfThisSubtask (Gets the subscript of the current Parallel Subtask) method, which makes it easy to develop parallelSourceFunctions that can execute in parallel but emit data separately without duplicating
  • JobMaster call when startJobExecution executionGraph. ScheduleForExecution scheduling (); During the ExecutionJobVertex. AllocateResourcesForAll to allocate resources Collection < CompletableFuture < Execution > >, Execute execution. Deploy () one by one; After Execution. Deploy creates TaskDeploymentDescriptor, through taskManagerGateway. SubmitTask submitted the deployment; The TaskExecutor is then triggered to trigger the Task’s run method
  • ExecutionJobVertex allocateResourcesForAll is according to ExecutionJobVertex taskVertices each call exec. AllocateAndAssignSlotForExecution into The total parallelism is determined by the taskVertices. TaskVertices are initialized in the ExecutionJobVertex constructor. If jobvert.getParallelism () is greater than 0, then getParallelism() is equal to 1. And jobVertex parallelism is StreamingJobGraphGenerator in createJobVertex approach based on streamNode. GetParallelism () to set (If streamNode getParallelism () value greater than 0), if the user does not set the default is to take StreamExecutionEnvironment parallelism; For LocalEnvironment, it defaults to Runtime.getruntime ().availableprocessors ().

doc

  • RichParallelSourceFunction