sequence

This article focuses on the pendingTriggers for Storm Trent Windows Manager

TridentBoltExecutor.finishBatch

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentBoltExecutor.java

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck! =null) { _collector.ack(tracked.delayedAck); tracked.delayedAck = null; } } catch(FailedException e) { failBatch(tracked, e); success =false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
Copy the code
  • This calls the finishBatch method of _bolt, which has two implementation classes, TridentSpoutExecutor for SPout and SubtopologyBolt for normal Bolts

SubtopologyBolt.finishBatch

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/SubtopologyBolt.java

    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); }}Copy the code
  • SubtopologyBolt. FinishBatch calls a series of TridentProcessor finishBatch operation

WindowTridentProcessor.finishBatch

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

   public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if(pendingTriggerIds ! = null) {for(Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); / /}}if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if(! pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0;for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
Copy the code
  • A Bolt in which the WindowTridentProcessor runs the finishBatch operation after it ack a batch of tuples
  • Execute of WindowTridentProcessor, receives a tuple, and dumps it to ProcessOrContext.state
  • FinishBatch, from processorContext. The state took out a batch of tuple, and then call tridentWindowManager. AddTuplesBatch (batchId, tuples)
  • Called after tridentWindowManager. GetPendingTriggers () to obtain pendingTriggerIds deposited in the store, at the same time, access to trigger triggerValues
  • Finally, construct triggerValues and send TriggerInfo and resultValue one by one

StoreBasedTridentWindowManager.addTuplesBatch

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
        List<WindowsStore.Entry> entries = new ArrayList<>();
        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
        }

        // tuples should be available in store before they are added to window manager
        windowStore.putAll(entries);

        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            addToWindowManager(i, key, tridentTuple);
        }

    }

    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
        TridentTuple actualTuple = null;
        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
            actualTuple = tridentTuple;
        }
        currentCachedTuplesSize.incrementAndGet();
        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
    }
Copy the code
  • StoreBasedTridentWindowManager addTuplesBatch method, will this batch of tuple into windowStore, then each addToWindowManager added to the windowManager

WindowManager.add

Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/WindowManager.java

    private final ConcurrentLinkedQueue<Event<T>> queue;

    /**
     * Add an event into the window, with {@link System#currentTimeMillis()} as* the tracking ts. * * @param event the event to add */ public void add(T event) { add(event, System.currentTimeMillis()); } /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T  event, long ts) { add(new EventImpl<T>(event, ts)); } /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event<T> windowEvent) { // watermark events are not added to the queue.if(! windowEvent.isWatermark()) { queue.add(windowEvent); }else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }
Copy the code
  • Add tuple to ConcurrentLinkedQueue

WindowManager.onTrigger

Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/WindowManager.java

   /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if(! prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear();if(! events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return! events.isEmpty(); }Copy the code
  • The onTrigger method first calls the scanEvents method to get windowEvents and then separates them into events and newEvents. Then the callback windowLifecycleListener. OnActivation (events, newEvents, expired) method

WindowManager.scanEvents

Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/WindowManager.java

    /**
     * Scan events in the queue, using the expiration policy to check
     * if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop
     *                 as soon as an event not satisfying the expiration policy is found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if(! fullScan || action == STOP) {break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if(! eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }
Copy the code
  • The scanEvents method retrieves events from ConcurrentLinkedQueue, determines whether they are expired, classifies them into expiredEvents and eventsToProcess, and returns events for eventsToProcess

TridentWindowLifeCycleListener.onActivation

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
Copy the code
  • OnActivation method call execAggregatorAndStoreResult, it calls the window of the aggregator, and then to save the result as a windowStore, Also add the resultantAggregatedValue as a TriggerResult to pendingTriggers

summary

  • TridentBoltExecutor of the WindowTridentProcessor. When it receives a tuple from spout, it calls the Processor’s execute method to cache the tuple into ProcessorContext. After a series of processor execute methods are executed, the processor ack the tuple
  • When the TridentBoltExecutor of the WindowTridentProcessor has ack all tuples of a batch, the checkFinish operation is triggered and the finishBatch operation is executed. The finishBatch operation invokes a set of finishBatch operations of TridentProcessor (Such as WindowTridentProcessor - > ProjectedProcessor - > PartitionPersistProcessor - > EachProcessor - > AggregateProcessor)
  • WindowTridentProcessor. FinishBatch from processorContext. The state took out a batch of tuple, Then call tridentWindowManager. AddTuplesBatch (batchId, tuples), will this batch of tuple into windowStore, Then add it to The windowManager ConcurrentLinkedQueue; Called after tridentWindowManager. GetPendingTriggers () to obtain pendingTriggerIds deposited in the store, at the same time, access to trigger triggerValues, Construct triggerValues and send them TriggerInfo and resultValue one by one
  • The WindowManager.onTrigger method, which is called when the window operation time window is triggered, gets the windowEvent from the WindowManager ConcurrentLinkedQueue, And then passed to the TridentWindowLifeCycleListener onActivation
  • TridentWindowLifeCycleListener. OnActivation method will execute window aggregator init, aggregate, complete operating resultantAggregatedVal get aggregated results Ue, and then put into The pendingTriggers, which completes the connection between the Window Trigger and the Windows Processor

doc

  • Windowing Support in Core Storm