sequence

This article focuses on Storm’s WindowedBolt Ecutor

WindowedBoltExecutor

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/WindowedBoltExecutor Java

/**
 * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
 */
public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
    private final IWindowedBolt bolt;
    // package level forunit tests transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator; private transient WindowedOutputCollector windowedOutputCollector; private transient WindowLifecycleListener<Tuple> listener; private transient WindowManager<Tuple> windowManager; private transient int maxLagMs; private TimestampExtractor timestampExtractor; private transient String lateTupleStream; private transient TriggerPolicy<Tuple, ? > triggerPolicy; private transient EvictionPolicy<Tuple, ? > evictionPolicy; private transient Duration windowLengthDuration; public WindowedBoltExecutor(IWindowedBolt bolt) { this.bolt = bolt; timestampExtractor = bolt.getTimestampExtractor(); } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false);
    }

    // NOTE: the queue has to be thread safe.
    protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
                             Collection<Event<Tuple>> queue, boolean stateful) {
        Objects.requireNonNull(topoConf);
        Objects.requireNonNull(context);
        Objects.requireNonNull(collector);
        Objects.requireNonNull(queue);
        this.windowedOutputCollector = new WindowedOutputCollector(collector);
        bolt.prepare(topoConf, context, windowedOutputCollector);
        this.listener = newWindowLifecycleListener();
        this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful);
        start();
        LOG.info("Initialized window manager {} ", windowManager);
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if(lateTupleStream ! = null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); }else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } windowedOutputCollector.ack(input); }}else {
            windowManager.add(input);
        }
    }

    @Override
    public void cleanup() {
        if(waterMarkEventGenerator ! = null) { waterMarkEventGenerator.shutdown(); } windowManager.shutdown(); bolt.cleanup(); } / /for unit tests
    WindowManager<Tuple> getWindowManager() {
        return windowManager;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
        if(lateTupleStream ! = null) { declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD)); } bolt.declareOutputFields(declarer); } @Override public Map<String, Object>getComponentConfiguration() {
        returnbolt.getComponentConfiguration(); } / /... }Copy the code
  • WindowedBoltExecutor implements the IRichBolt interface, which initializes the windowedOutputCollector, listener, and windowManager while preparing. Cleanup waterMarkEventGenerator, windowManager, bolt when cleanup; When we setBolt, the ologybuilder wrapped the original IWindowedBolt implementation class and replaced it with windowedbolt
  • DeclareOutputFields use bolt. DeclareOutputFields (declarer); GetComponentConfiguration also returns the bolt. GetComponentConfiguration ();
  • The execute method adds tuples to the windowManager and ack tuples that are not included in the Window immediately

WindowedOutputCollector

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/WindowedBoltExecutor Java

    /**
     * Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting.
     */
    private static class WindowedOutputCollector extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector delegate) {
            super(delegate);
        }

        void setContext(List<Tuple> inputTuples) {
            this.inputTuples = inputTuples;
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple) {
            returnemit(streamId, inputTuples, tuple); } @Override public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, inputTuples, tuple); }}Copy the code
  • The WindowedOutputCollector inherits the OutputCollector, and you can see that the EMIT emitDirect method is overridden to anchor inputTuples by default

WindowLifecycleListener

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/WindowLifecycleListener Java

/**
 * A callback for expiry, activation of events tracked by the {@link WindowManager}
 *
 * @param <T> The type of Event in the window (e.g. Tuple).
 */
public interface WindowLifecycleListener<T> {
    /**
     * Called on expiry of events from the window due to {@link EvictionPolicy}
     *
     * @param events the expired events
     */
    void onExpiry(List<T> events);

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}
     *
     * @param events        the list of current events in the window.
     * @param newEvents     the newly added events since last activation.
     * @param expired       the expired events since last activation.
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented");
    }

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in
     * state and is huge to be loaded entirely in memory.
     *
     * @param eventsIt      a supplier of iterator over the list of current events in the window
     * @param newEventsIt   a supplier of iterator over the newly added events since the last ativation
     * @param expiredIt     a supplier of iterator over the expired events since the last activation
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt,
                              Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented"); }}Copy the code
  • WindowLifecycleListener defines several callback methods, onExpiry and onActivation
  • They are triggered by EvictionPolicy and TriggerPolicy respectively

EvictionPolicy

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/EvictionPolicy Java

/**
 * Eviction policy tracks events and decides whether an event should be evicted from the window or not.
 *
 * @param <T> the type of event that is tracked.
 */
public interface EvictionPolicy<T, S> {
    /**
     * Decides if an event should be expired from the window, processed in the current window or kept for later processing.
     *
     * @param event the input event
     * @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
     */
    Action evict(Event<T> event);

    /**
     * Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not.
     *
     * @param event the input event to be tracked
     */
    void track(Event<T> event);

    /**
     * Returns the current context that is part of this eviction policy.
     *
     * @return the eviction context
     */
    EvictionContext getContext();

    /**
     * Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to
     * set the reference timestamp.
     *
     * @param context the eviction context
     */
    void setContext(EvictionContext context);

    /**
     * Resets the eviction policy.
     */
    void reset();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the eviction policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);

    /**
     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
     */
    public enum Action {
        /**
         * expire the event and remove it from the queue.
         */
        EXPIRE,
        /**
         * process the event in the current window of events.
         */
        PROCESS,
        /**
         * don't include in the current window but keep the event in the queue for evaluating as a part of future windows. */ KEEP, /** * stop processing the queue, there cannot be anymore events satisfying the eviction policy. */ STOP } }Copy the code
  • EvictionPolicy is responsible for tracking events and determining whether they should be removed from the window
  • EvictionPolicy several implementation class: CountEvictionPolicy, TimeEvictionPolicy, WatermarkCountEvictionPolicy, WatermarkTimeEvictionPolicy

TriggerPolicy

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/TriggerPolicy Java

/**
 * Triggers the window calculations based on the policy.
 *
 * @param <T> the type of the event that is tracked
 */
public interface TriggerPolicy<T, S> {
    /**
     * Tracks the event and could use this to invoke the trigger.
     *
     * @param event the input event
     */
    void track(Event<T> event);

    /**
     * resets the trigger policy.
     */
    void reset();

    /**
     * Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete.
     */
    void start();

    /**
     * Any clean up could be handled here.
     */
    void shutdown();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the trigger policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);
}
Copy the code
  • TriggerPolicy is mainly responsible for window computation
  • TriggerPolicy several implementation class: CountTriggerPolicy, TimeTriggerPolicy, WatermarkCountTriggerPolicy, WatermarkTimeTriggerPolicy

WindowedBoltExecutor.newWindowLifecycleListener

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/WindowedBoltExecutor Java

    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() {
            @Override
            public void onExpiry(List<Tuple> tuples) {
                for(Tuple tuple : tuples) { windowedOutputCollector.ack(tuple); } } @Override public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { windowedOutputCollector.setContext(tuples); boltExecute(tuples, newTuples, expiredTuples, timestamp); }}; } protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp)); }Copy the code
  • An anonymous WindowLifecycleListener implementation is created
  • Tuple are ack one by one at the time of onExpiry, boltExecute is called at the time of onActivation, TupleWindowImpl is constructed and passed to Bolt for execution

WindowedBoltExecutor.initWindowManager

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/WindowedBoltExecutor Java

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if(topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number)  topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding intervalif (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if(topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);  }else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if(timestampExtractor ! = null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);if(lateTupleStream ! = null) {if(! context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        returnmanager; } private EvictionPolicy<Tuple, ? > getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {if(windowLengthCount ! = null) {if (isTupleTs()) {
                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
            } else {
                returnnew CountEvictionPolicy<>(windowLengthCount.value); }}else {
            if (isTupleTs()) {
                return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
            } else {
                returnnew TimeEvictionPolicy<>(windowLengthDuration.value); } } } private TriggerPolicy<Tuple, ? > getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration, WindowManager<Tuple> manager, EvictionPolicy<Tuple, ? > evictionPolicy) {if(slidingIntervalCount ! = null) {if (isTupleTs()) {
                return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
            } else {
                returnnew CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy); }}else {
            if (isTupleTs()) {
                return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
            } else {
                returnnew TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy); }}}Copy the code
  • For WindowedBoltExecutor, stateful is false, where WindowManager was created
  • The default DEFAULT_MAX_LAG_MS is 0, meaning there is no lag, and the default DEFAULT_WATERMARK_EVENT_INTERVAL_MS is 1000, meaning 1 second
  • If timestampField is set to Duration, TriggerPolicy is set to Duration. If timestampField is set to Duration, Duration is set to Duration. Is created and WatermarkTimeEvictionPolicy WatermarkTimeTriggerPolicy

WindowManager

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing WindowManager. Java

/** * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of  the window due to * {@link TriggerPolicy}. * * @param <T> thetype of event inthe window. */ public class WindowManager<T> implements TriggerHandler { protected final Collection<Event<T>> queue; private final AtomicInteger eventsSinceLastExpiry; / /... /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T  event, long ts) { add(new EventImpl<T>(event, ts)); } /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event<T> windowEvent) { // watermark events are not added to the queue.if(! windowEvent.isWatermark()) { queue.add(windowEvent); }else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }

    /**
     * feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger.
     */
    private void track(Event<T> windowEvent) {
        evictionPolicy.track(windowEvent);
        triggerPolicy.track(windowEvent);
    }

    /**
     * expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.
     */
    protected void compactWindow() {
        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
            scanEvents(false);
        }
    }

    /**
     * Scan events in the queue, using the expiration policy to check if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is
     *                 found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if(! fullScan || action == STOP) {break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if(! eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        returneventsToProcess; } / /... }Copy the code
  • The main thing that WindowedBoltExecutor does is add tuples to the windowManager
  • EventImpl’s isWatermark returns false, where the track and compactWindow operations are performed
  • Track is mainly entrusted to evictionPolicy and triggerPolicy for track. CompactWindow will trigger scanEvents when events exceed the specified threshold. If events are not fullScan, Detected a event is not expired traversal, then testing eventsToExpire for null. If there is the trigger windowLifecycleListener onExpiry (eventsToExpire);

WaterMarkEventGenerator

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/WaterMarkEventGenerator Java

/**
 * Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple
 * timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can
 * be considered as late events.
 */
public class WaterMarkEventGenerator<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
    private final WindowManager<T> windowManager;
    private final int eventTsLag;
    private final Set<GlobalStreamId> inputStreams;
    private final Map<GlobalStreamId, Long> streamToTs;
    private final ScheduledExecutorService executorService;
    private final int interval;
    private ScheduledFuture<?> executorFuture;
    private volatile long lastWaterMarkTs;

    //......

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th); throw th; }}}Copy the code
  • WindowedBoltExecutor calls WaterMarkEventGenerator’s start method when the WindowedBoltExecutor starts
  • This method dispatches the WaterMarkEventGenerator task every watermarkInterval
  • The run method is to calculate watermark(Minimum value - LAG of this batch of data), update lastWaterMarkTs when greater than lastWaterMarkTs, add WaterMarkEvent to windowManager (The isWatermark of this event is true)
  • Windowmanager.add (new WaterMarkEvent<>(waterMarkTs)) triggers triggerPolicy.track(windowEvent) and compactWindow operations

WatermarkTimeTriggerPolicy.track

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/WatermarkTimeTriggerPolicy Java

    @Override
    public void track(Event<T> event) {
        if (started && event.isWatermark()) {
            handleWaterMarkEvent(event);
        }
    }

    /**
     * Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context
     * so that the events falling within that window can be processed.
     */
    private void handleWaterMarkEvent(Event<T> event) {
        long watermarkTs = event.getTimestamp();
        long windowEndTs = nextWindowEndTs;
        LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
        while (windowEndTs <= watermarkTs) {
            long currentCount = windowManager.getEventCount(windowEndTs);
            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
            if (handler.onTrigger()) {
                windowEndTs += slidingIntervalMs;
            } else {
                /*
                 * No events were found in the previous window interval.
                 * Scan through the events in the queue to find the next
                 * window intervals based on event ts.
                 */
                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
                LOG.debug("Next aligned window end ts {}", ts);
                if (ts == Long.MAX_VALUE) {
                    LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
                    break;
                }
                windowEndTs = ts;
            }
        }
        nextWindowEndTs = windowEndTs;
    }

    /**
     * Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return
     * the end ts of the next aligned window, i.e. the ts when the window should fire.
     *
     * @param startTs the start timestamp (excluding)
     * @param endTs   the end timestamp (including)
     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed.
     */
    private long getNextAlignedWindowTs(long startTs, long endTs) {
        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
            return nextTs;
        }
        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
    }
Copy the code
  • HandleWaterMarkEvent triggers the handler.onTrigger() method

WindowManager.onTrigger

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing WindowManager. Java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if(! prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear();if(! events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime());
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return! events.isEmpty(); }Copy the code
  • The onTrigger method computes three types of data: events, expiredEvents, and newEvents
  • When events are not empty, trigger windowLifecycleListener onActivation, which is called the execute method of bolt

summary

  • Windowedbolt is a new class that allows users to implement a new class called IWindowedBolt. Instead of WindowedBoltExecutor, which modifiesthe Execute method by adding windowManager.add to Windows and ack the discarded one, bolt’s execute waits for the window to fire
  • WindowLifecycleListener has two callback operations, onExpiry triggered by EvictionPolicy and onActivation triggered by TriggerPolicy
  • Since Windows’ windowLength and slidingInterval parameters have Duration and Count dimensions, EvictionPolicy and TriggerPolicy also have those dimensions, as well as the watermark attribute, Thus, each policy has four implementation classes, and EvictionPolicy has several implementation classes: CountEvictionPolicy, TimeEvictionPolicy, WatermarkCountEvictionPolicy, WatermarkTimeEvictionPolicy; TriggerPolicy several implementation class: CountTriggerPolicy, TimeTriggerPolicy, WatermarkCountTriggerPolicy, WatermarkTimeTriggerPolicy
  • Windowmanager.add calls two types of trigger track and compactWindow in addition to saving the tuple. WatermarkTimeEvictionPolicy track no current operations, and WatermarkTimeTriggerPolicy track method at the time of the event is WaterMarkEvent triggers window operation, Call the WindowManager onTrigger method, and then select window of data, and then trigger windowLifecycleListener. OnActivation operation, the final trigger windowedBolt the execute method
  • Both the onTrigger method and the Add method of WindowManager call scanEvents. The difference is that the former is fullScan and the latter is not. ScanEvents invokes evictionPolicy. To judge whether the evict excluding tuple, triggering windowLifecycleListener. OnExpiry operation, the operation will be ack on the tuple, Expired tuples will automatically ack if they are expired.TOPOLOGY_MESSAGE_TIMEOUT_SECS is required to be greater than windowLength + slidingInterval to avoid a timeout before ack)
  • WindowedBoltExecutor starts WaterMarkEventGenerator when it starts, which registers a timed task to calculate watermark(Minimum value - LAG of this batch of data), update lastWaterMarkTs when greater than lastWaterMarkTs, add WaterMarkEvent to windowManager (The isWatermark of this event is true), the entire WindowManager onTrigger method (Namely windowLifecycleListener onActivation operation) is triggered here
  • About an ack, WindowedBoltExecutor. The execute method for failure to make the window of the queue, no configuration configuration Config. TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, ack immediately; It will ack itself when the tuple expires. The WindowedOutputCollector is used by the WindowedBoltExecutor, which inherits from the OutputCollector to anchor the input tuples

doc

  • Windowing Support in Core Storm