sequence

This article focuses on the FreshCollector of Storm Windows Processor

The instance

        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .partitionBy(new Fields("user"))
                .window(windowConfig,windowsStoreFactory,new Fields("user"."score"),new UserCountAggregator(),new Fields("aggData"))
                .parallelismHint(1)
                .each(new Fields("aggData"), new PrintEachFunc(),new Fields());
Copy the code
  • This example follows the window operation with an each operation

WindowTridentProcessor

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

public class WindowTridentProcessor implements TridentProcessor { private FreshCollector collector; / /... public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) { this.topologyContext = context; List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();if(parents.size() ! = 1) { throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }

    public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if(pendingTriggerIds ! = null) {for(Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); / /}}if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if(! pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0;for(Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); } collector.setContext(null); }}Copy the code
  • The Windows Processor creates the FreshCollector while preparing
  • When finishBatch is used, freshCollector. emit is called to pass the aggregate result set of the window
  • The data structure passed in is ConsList, which is an AbstractList implementation consisting of an Object first element and a List_elems

FreshCollector

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/processor/FreshCollector.java

public class FreshCollector implements TridentCollector {
    FreshOutputFactory _factory;
    TridentContext _triContext;
    ProcessorContext context;
    
    public FreshCollector(TridentContext context) {
        _triContext = context;
        _factory = new FreshOutputFactory(context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc) {
        this.context = pc;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create(values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }            
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 

    public Factory getOutputFactory() {
        return_factory; }}Copy the code
  • The FreshCollector selfOutputFields(The first field is always _task_info, and the following fields are the functionFields defined by the user in the window method) construct FreshOutputFactory
  • Emit method, first use FreshOutputFactory to construct TridentTupleView from outputFields, then fetch TupleReceiver, Call the Execute method of TupleReceiver to pass the TridentTupleView
  • Here has ProjectedProcessor, PartitionPersistProcessor TupleReceiver

TridentTupleView.FreshOutputFactory

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/tuple/TridentTupleView.java

    public static class FreshOutputFactory  implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;

        public FreshOutputFactory(Fields selfFields) {
            _fieldIndex = new HashMap<>();
            for(int i=0; i<selfFields.size(); i++) {
                String field = selfFields.get(i);
                _fieldIndex.put(field, new ValuePointer(0, i, field));
            }
            _index = ValuePointer.buildIndex(selfFields, _fieldIndex);
        }
        
        public TridentTuple create(List<Object> selfVals) {
            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return 1;
        }
        
        @Override
        public List<String> getOutputFields() {
            returnindexToFieldsList(_index); }}Copy the code
  • The FreshOutputFactory class is a static class of TridentTupleView whose constructor evaluates _index and _fieldIndex
  • _fieldIndex = map, key = field, value = ValuePointer, delegateIndex(This is fixed at 0), index and field information; The first field is _task_info and the index is 0. The fields that follow are the functionFields defined by the user in the window method
  • The create method here essentially constructs TridentTupleView, whose constructor has the first value IPersistentVector, the second value _index, and the third value _fieldIndex

ValuePointer

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/tuple/ValuePointer.java

public class ValuePointer {
    public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
        Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
        for(ValuePointer ptr: pointers) {
            ret.put(ptr.field, ptr);
        }
        return ret;        
    }

    public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
        if(fieldsOrder.size()! =pointers.size()) { throw new IllegalArgumentException("Fields order must be same length as pointers map");
        }
        ValuePointer[] ret = new ValuePointer[pointers.size()];
        for(int i=0; i<fieldsOrder.size(); i++) {
            ret[i] = pointers.get(fieldsOrder.get(i));
        }
        return ret;
    }    
    
    public int delegateIndex;
    protected int index;
    protected String field;
    
    public ValuePointer(int delegateIndex, int index, String field) {
        this.delegateIndex = delegateIndex;
        this.index = index;
        this.field = field;
    }

    @Override
    public String toString() {
        returnToStringBuilder.reflectionToString(this); }}Copy the code
  • Here, buildIndex returns ValuePointer based on the order in which selfOutputFields are returned

ProjectedProcessor

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/processor/ProjectedProcessor.java

public class ProjectedProcessor implements TridentProcessor {
    Fields _projectFields;
    ProjectionFactory _factory;
    TridentContext _context;
    
    public ProjectedProcessor(Fields projectFields) {
        _projectFields = projectFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        if(tridentContext.getParentTupleFactories().size()! =1) { throw new RuntimeException("Projection processor can only have one parent");
        }
        _context = tridentContext;
        _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        TridentTuple toEmit = _factory.create(tuple);
        for(TupleReceiver r: _context.getReceivers()) {
            r.execute(processorContext, _context.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return_factory; }}Copy the code
  • The ProjectedProcessor creates the ProjectionFactory while preparing, whose _projectFields are the functionFields defined by the window method. Here also USES tridentContext. GetParentTupleFactories (). The get (0) extracted from the parent of the first Factory, because it is FreshCollector passed, So here is the TridentTupleView FreshOutputFactory
  • Create (); TridentTupleView (); ToEmit is a TridentTupleView reextracted from the functionFields defined by the window method
  • After the execute method, execute calls _context.getReceivers() one by one, passing toEmit. Receiver is the processors after window, such as the EachProcessor

TridentTupleView.ProjectionFactory

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/tuple/TridentTupleView.java

public static class ProjectionFactory implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;
        Factory _parent;

        public ProjectionFactory(Factory parent, Fields projectFields) {
            _parent = parent;
            if(projectFields==null) projectFields = new Fields();
            Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
            _fieldIndex = new HashMap<>();
            for(String f: projectFields) {
                _fieldIndex.put(f, parentFieldIndex.get(f));
            }            
            _index = ValuePointer.buildIndex(projectFields, _fieldIndex);
        }
        
        public TridentTuple create(TridentTuple parent) {
            if(_index.length==0) return EMPTY_TUPLE;
            else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return _parent.numDelegates();
        }

        @Override
        public List<String> getOutputFields() {
            returnindexToFieldsList(_index); }}Copy the code
  • ProjectionFactory is a static class for TridentTupleView. It constructs _index and _fieldIndex according to projectFields in the constructor, so that the CREATE method can create TridentTupleView based on the required fields

EachProcessor

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/processor/EachProcessor.java

public class EachProcessor implements TridentProcessor {
    Function _function;
    TridentContext _context;
    AppendCollector _collector;
    Fields _inputFields;
    ProjectionFactory _projection;
    
    public EachProcessor(Fields inputFields, Function function) {
        _function = function;
        _inputFields = inputFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()! =1) { throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }

    @Override
    public void cleanup() {
        _function.cleanup();
    }    

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return_collector.getOutputFactory(); }}Copy the code
  • The execute method of the EachProcessor first sets the _collector context to processorContext and then invokes the _function.execute method
  • _megui.create (tuple) is called to extract the fields, mainly from the inputFields defined by _function
  • The collector passed to _function here is AppendCollector

AppendCollector

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {
    OperationOutputFactory _factory;
    TridentContext _triContext;
    TridentTuple tuple;
    ProcessorContext context;
    
    public AppendCollector(TridentContext context) {
        _triContext = context;
        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc, TridentTuple t) {
        this.context = pc;
        this.tuple = t;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 
    
    public Factory getOutputFactory() {
        return_factory; }}Copy the code
  • AppendCollector creates the OperationOutputFactory in the constructor, whose EMIT method also extracts the OperationOutputFields, Then call the execute method of _tricontext.getReceivers () one by one; If there is no other operation after each, the _tricontext.getReceivers () of AppendCollector is empty

summary

  • WindowTridentProcessor uses FreshCollector. When WindowTridentProcessor is used in finishBatch, The pendingTriggers for window creation is extracted from TridentWindowManager (After extraction, its data is removed from The pendingTriggersThe FreshCollector uses TriggerInfo as the first value and values as the second value
  • FreshCollector emit method first use TridentTupleView. FreshOutputFactory according to selfOutputFields (The first field is always _task_info, and the following fields are the functionFields defined by the user in the window methodBuild TridentTupleView, and then call the execute method of _tricontext.getReceivers () one by one
  • Subsequent receivers have a ProjectedProcessor for the TridentTupleView reextracted from functionFields defined by the Window method. The execute method is similar to the FreshCollector.emit method, which first extracts the required fields to construct TridentTupleView, and then calls the execute method of _tricontext.getReceivers () one by one.Such as EachProcessor. Execute)
  • AppendCollector is used by EachProcessor. Its EMIT method is similar to the EMIT method of FreshCollector. TridentTupleView is generated through field extraction. Then call the execute method of _tricontext.getReceivers () one by one
  • The FreshCollector emit method is very similar to the Execute method of ProjectedProcessor and the AppendCollector emit method. The first step is to build TridentTupleView using Factory to extract the required fields. Then call the execute method of _tricontext.getReceivers () one by one; When a _triContext has no receiver, the tuple is stopped

doc

  • Windowing Support in Core Storm