sequence

This article focuses on storm window trigger

WindowTridentProcessor.prepare

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size() ! = 1) { throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }
Copy the code
  • Here call tridentWindowManager. Prepare ()

AbstractTridentWindowManager.prepare

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
        this.windowTaskId = windowTaskId;
        this.windowStore = windowStore;
        this.aggregator = aggregator;
        this.delegateCollector = delegateCollector;

        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        windowManager.setEvictionPolicy(evictionPolicy);
        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
    }

    public void prepare() {
        preInitialize();

        initialize();

        postInitialize();
    }

    private void postInitialize() {
        // start trigger once the initialization is done.
        triggerPolicy.start();
    }
Copy the code
  • AbstractTridentWindowManager in constructor calls windowStrategy. GetTriggerPolicy get triggerPolicy; The prepare method calls postInitialize, which triggers triggerPolicy.start().

SlidingDurationWindowStrategy.getTriggerPolicy

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

    /**
     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
     *
     * @param triggerHandler
     * @param evictionPolicy
     * @return
     */
    @Override
    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
    }
Copy the code
  • SlidingDurationWindowStrategy, for example, created here is TimeTriggerPolicy, its duration is windowConfig. GetSlidingLength (), The triggerHandler is a WindowManager

TimeTriggerPolicy.start

Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/TimeTriggerPolicy.java

    public void start() {
        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    }

   private Runnable newTriggerTask() {
        return new Runnable() {
            @Override
            public void run() {
                // do not process current timestamp since tuples might arrive while the trigger is executing
                long now = System.currentTimeMillis() - 1;
                try {
                    /*
                     * set the current timestamp as the reference time for the eviction policy
                     * to evict the events
                     */
                    if(evictionPolicy ! = null) { evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration)); } handler.onTrigger(); } catch (Throwable th) { LOG.error("handler.onTrigger failed ", th); /* * propagate it so that task gets canceled and the exception * can be retrieved from executorFuture.get() */ throw th;  }}}; }Copy the code
  • The start method registers a scheduling task that is fired every duration (windowConfig.getSlidingLength()); The run method triggers handler.ontrigger (), windowManager.ontrigger ()

WindowManager.onTrigger

Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/WindowManager.java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if(! prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear();if(! events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return! events.isEmpty(); }Copy the code
  • Here call windowLifecycleListener. OnActivation (events, newEvents, expired), And windowLifecycleListener AbstractTridentWindowManager TridentWindowLifeCycleListener

TridentWindowLifeCycleListener.onActivation

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

   private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
Copy the code
  • TridentWindowLifeCycleListener. Mainly execAggregatorAndStoreResult onActivation method
  • While execAggregatorAndStoreResult, in turn, calls the aggregator init, aggregate and complete method
  • Finally, place the TriggerResult into pendingTriggers

summary

  • When the storm TimeTriggerPolicy. Start registered task TriggerTask regularly, in the case of SlidingDurationWindowStrategy, Its scheduling interval for windowConfig. GetSlidingLength ()
  • TriggerTask timing trigger WindowManager onTrigger method, this method can callback windowLifecycleListener onActivation
  • Provided the TridentWindowLifeCycleListener AbstractTridentWindowManager, its execAggregatorAndStoreResult onActivation mainly call; On aggregator execAggregatorAndStoreResult method is mainly to complete a series of calls, call the init method first, and then traverse resultTuples turn call aggregate method, finally complete method (You can clearly see the invocation logic and sequence of each method on the Aggregator interface)

doc

  • Windowing Support in Core Storm