sequence

This paper mainly studies Flink’s ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
Copy the code
  • The Executor interface of the JDK defines the execute method, which receives arguments of type Runnable

ScheduledExecutor

Flink – release – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/concurrent/ScheduledExecutor. Java

public interface ScheduledExecutor extends Executor {

	/**
	 * Executes the given command after the given delay.
	 *
	 * @param command the task to execute in the future
	 * @param delay the time from now to delay the execution
	 * @param unit the time unit of the delay parameter
	 * @returna ScheduledFuture representing the completion of the scheduled task */ ScheduledFuture<? > schedule(Runnablecommand, long delay, TimeUnit unit);

	/**
	 * Executes the given callable after the given delay. The result of the callable is returned
	 * as a {@link ScheduledFuture}.
	 *
	 * @param callable the callable to execute
	 * @param delay the time from now to delay the execution
	 * @param unit the time unit of the delay parameter
	 * @param <V> result type of the callable
	 * @return a ScheduledFuture which holds the future value of the given callable
	 */
	<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

	/**
	 * Executes the given command periodically. The first execution is started after the
	 * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
	 * the third after {@code initialDelay + 2*period} and so on.
	 * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
	 * is cancelled.
	 *
	 * @param command the task to be executed periodically
	 * @param initialDelay the time from now until the first execution is triggered
	 * @param period the time after which the next execution is triggered
	 * @param unit the time unit of the delay and period parameter
	 * @return a ScheduledFuture representing the periodic task. This future never completes
	 * unless an execution of the given task fails or ifthe future is cancelled */ ScheduledFuture<? > scheduleAtFixedRate( Runnablecommand,
		long initialDelay,
		long period,
		TimeUnit unit);

	/**
	 * Executed the given command repeatedly with the given delay between the end of an execution
	 * and the start of the next execution.
	 * The task is executed repeatedly until either an exception occurs or if the returned
	 * {@link ScheduledFuture} is cancelled.
	 *
	 * @param command the task to execute repeatedly
	 * @param initialDelay the time from now until the first execution is triggered
	 * @param delay the time between the end of the current and the start of the next execution
	 * @param unit the time unit of the initial delay and the delay parameter
	 * @return a ScheduledFuture representing the repeatedly executed task. This future never
	 * completes unless the execution of the given task fails or ifthe future is cancelled */ ScheduledFuture<? > scheduleWithFixedDelay( Runnablecommand,
		long initialDelay,
		long delay,
		TimeUnit unit);
}
Copy the code
  • The ScheduledExecutor interface inherits Executor and defines the Schedule, scheduleAtFixedRate, scheduleWithFixedDelay methods, The Schedule method can receive either a Runnable or a Callable, which return a ScheduledFuture. The interface has two implementation class, is ScheduledExecutorServiceAdapter and ActorSystemScheduledExecutorAdapter respectively

ScheduledExecutorServiceAdapter

Flink – release – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter. Java

public class ScheduledExecutorServiceAdapter implements ScheduledExecutor { private final ScheduledExecutorService scheduledExecutorService; public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) { this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); } @Override public ScheduledFuture<? > schedule(Runnablecommand, long delay, TimeUnit unit) {
		return scheduledExecutorService.schedule(command, delay, unit);
	}

	@Override
	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
		returnscheduledExecutorService.schedule(callable, delay, unit); } @Override public ScheduledFuture<? > scheduleAtFixedRate(Runnablecommand, long initialDelay, long period, TimeUnit unit) {
		return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); } @Override public ScheduledFuture<? > scheduleWithFixedDelay(Runnablecommand, long initialDelay, long delay, TimeUnit unit) {
		return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
	}

	@Override
	public void execute(Runnable command) {
		scheduledExecutorService.execute(command); }}Copy the code
  • ScheduledExecutorServiceAdapter implements ScheduledExecutor interface, it USES the JDK ScheduledExecutorService, The schedule, scheduleAtFixedRate, scheduleWithFixedDelay, and execute methods of scheduledExecutorService are used

ActorSystemScheduledExecutorAdapter

Flink – release – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAda pter.java

public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

	private final ActorSystem actorSystem;

	public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
		this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); } @Override @Nonnull public ScheduledFuture<? > schedule(@Nonnull Runnablecommand, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

		Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	@Nonnull
	public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

		Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		returnscheduledFutureTask; } @Override @Nonnull public ScheduledFuture<? > scheduleAtFixedRate(@Nonnull Runnablecommand, long initialDelay, long period, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
			command,
			triggerTime(unit.toNanos(initialDelay)),
			unit.toNanos(period));

		Cancellable cancellable = actorSystem.scheduler().schedule(
			new FiniteDuration(initialDelay, unit),
			new FiniteDuration(period, unit),
			scheduledFutureTask,
			actorSystem.dispatcher());

		scheduledFutureTask.setCancellable(cancellable);

		returnscheduledFutureTask; } @Override @Nonnull public ScheduledFuture<? > scheduleWithFixedDelay(@Nonnull Runnablecommand, long initialDelay, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
			command,
			triggerTime(unit.toNanos(initialDelay)),
			unit.toNanos(-delay));

		Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	public void execute(@Nonnull Runnable command) {
		actorSystem.dispatcher().execute(command);
	}

	private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
		return actorSystem.scheduler().scheduleOnce(
			new FiniteDuration(delay, unit),
			runnable,
			actorSystem.dispatcher());
	}

	private long now() {
		return System.nanoTime();
	}

	private long triggerTime(long delay) {
		return now() + delay;
	}

	private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

		private long time;

		private final long period;

		private volatile Cancellable cancellable;

		ScheduledFutureTask(Callable<V> callable, long time, long period) {
			super(callable);
			this.time = time;
			this.period = period;
		}

		ScheduledFutureTask(Runnable runnable, long time, long period) {
			super(runnable, null);
			this.time = time;
			this.period = period;
		}

		public void setCancellable(Cancellable newCancellable) {
			this.cancellable = newCancellable;
		}

		@Override
		public void run() {
			if(! isPeriodic()) { super.run(); }else if (runAndReset()){
				if (period > 0L) {
					time += period;
				} else {
					cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

					// check whether we have been cancelled concurrently
					if (isCancelled()) {
						cancellable.cancel();
					} else {
						time = triggerTime(-period);
					}
				}
			}
		}

		@Override
		public boolean cancel(boolean mayInterruptIfRunning) {
			boolean result = super.cancel(mayInterruptIfRunning);

			return result && cancellable.cancel();
		}

		@Override
		public long getDelay(@Nonnull  TimeUnit unit) {
			return unit.convert(time - now(), TimeUnit.NANOSECONDS);
		}

		@Override
		public int compareTo(@Nonnull Delayed o) {
			if (o == this) {
				return 0;
			}

			long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
			return(diff < 0L) ? -1 : (diff > 0L) ? 1:0; } @Override public booleanisPeriodic() {
			returnperiod ! = 0L; }}}Copy the code
  • ActorSystemScheduledExecutorAdapter implements ScheduledExecutor interface, it USES actorSystem to implement; The execute method uses actorSystem.dispatcher().execute method
  • Schedule with FixedDelay calls internalSchedule, which uses the ActorSystem.scheduler ().scheduleOnce method. The ScheduledFutureTask is different, where the ScheduledFutureTask period of the Schedule method is 0, The ScheduledFutureTask period of scheduleWithFixedDelay method is unit.tonanos (-delay); The ScheduledFutureTask’s run method determines the period. If the period is less than or equal to 0, the ScheduledFutureTask will call the internalSchedule method again to achieve the effect of scheduling with FixedDelay
  • The scheduleAtFixedRate method uses the actorSystem.scheduler().schedule method, whose ScheduledFutureTask period is the period of the method parameter. Ununit.tonanos (-delay) is not used as a period, as scheduleWithFixedDelay does

summary

  • The ScheduledExecutor interface inherits Executor and defines the Schedule, scheduleAtFixedRate, scheduleWithFixedDelay methods, The Schedule method can receive either a Runnable or a Callable, which return a ScheduledFuture. The interface has two implementation class, is ScheduledExecutorServiceAdapter and ActorSystemScheduledExecutorAdapter respectively
  • ScheduledExecutorServiceAdapter implements ScheduledExecutor interface, it USES the JDK ScheduledExecutorService, The schedule, scheduleAtFixedRate, scheduleWithFixedDelay, and execute methods of scheduledExecutorService are used
  • ActorSystemScheduledExecutorAdapter implements ScheduledExecutor interface, it USES actorSystem to implement; The execute method uses actorSystem.dispatcher().execute method. Schedule with FixedDelay calls internalSchedule, which uses the ActorSystem.scheduler ().scheduleOnce method. The ScheduledFutureTask is different, where the ScheduledFutureTask period of the Schedule method is 0, The ScheduledFutureTask period of scheduleWithFixedDelay method is unit.tonanos (-delay); The ScheduledFutureTask’s run method will determine the period. If the period is less than or equal to 0, the internalSchedule method will be called again to achieve the effect of scheduling with FixedDelay. The scheduleAtFixedRate method uses the actorSystem.scheduler().schedule method, whose ScheduledFutureTask period is the period of the method parameter. Ununit.tonanos (-delay) is not used as a period, as scheduleWithFixedDelay does

doc

  • ScheduledExecutor
  • ScheduledExecutorServiceAdapter
  • ActorSystemScheduledExecutorAdapter