sequence

This article focuses on Storm’s WindowedBolt

The instance

    @Test
    public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1); BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt() //windowLength , slidingInterval .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, Timeunit.seconds)) // Specify a field of a tuple with withTimestampField as the tuple timestamp. WithTimestampField ("timestamp") // Enter the minimum value of the latest tuple timestamp in the stream minus the Lag value =watermark, which is used to specify the interval for triggering the watermark. Default is 1 second. The tuple timestamp earlier than watermark window will be calculated. The withWatermarkInterval (new BaseWindowedBolt. Duration (1, Timeunit.seconds)) //withLag is used to process out-of-order data, when the timestamp of a tuple received is less than lastWaterMarkTs(' take the maximum value of the watermark '), Will be discarded. WithLag (new BaseWindowedBolt. Duration (5, TimeUnit. SECONDS)); builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum");
        SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology());
    }
Copy the code
  • Here, withWindow, withTimestampField, withWatermarkInterval and withLag are mainly set

  • SlidingWindowSumBolt

public class SlidingWindowSumBolt extends BaseWindowedBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);

    private int sum = 0;
    private OutputCollector collector;

    @Override
    public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        /*
         * The inputWindow gives a view of
         * (a) all the events in the window
         * (b) events that expired since last activation of the window
         * (c) events that newly arrived since last activation of the window
         */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
        /*
         * Instead of iterating over all the tuples in the window to compute
         * the sum, the values for the new events are added and old events are
         * subtracted. Similar optimizations might be possible in other
         * windowing computations.
         */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sum")); }}Copy the code
  • TupleWindow can get all data in the current window, new data arrived since the last window, and expired data

WindowedBolt

IWindowedBolt

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/IWindowedBolt Java

/**
 * A bolt abstraction forsupporting time and count based sliding & tumbling windows. */ public interface IWindowedBolt extends IComponent { /** *  This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting,
     * the tuples are automatically anchored to the tuples in the inputWindow.
     */
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);

    /**
     * Process the tuple window and optionally emit new tuples based on the tuples in the input window.
     */
    void execute(TupleWindow inputWindow);

    void cleanup();

    /**
     * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing
     * time.
     *
     * @return the timestamp extractor
     */
    TimestampExtractor getTimestampExtractor();
}
Copy the code
  • IWindowedBolt is stateless, meaning that the window’s data is stored in memory
  • IWindowedBolt interface has an abstract implementation class BaseWindowedBolt, its subclasses are BaseStatefulWindowedBolt, JoinBolt

IStatefulWindowedBolt

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/IStatefulWindowedBolt Java

/**
 * A windowed bolt abstraction for supporting windowing operation with state.
 */
public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt {
    /**
     * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory.
     * <p>
     * The default is to keep all the window events in memory.
     * </p>
     *
     * @return true if the windows should be persisted
     */
    default boolean isPersistent() {
        return false;
    }

    /**
     * The maximum number of window events to keep in memory.
     */
    default long maxEventsInMemory() {
        return 1_000_000L; // default
    }
}
Copy the code
  • IStatefulWindowedBolt does not define any methods in 1.2.2, while 2.0.0 defines two default methods, one isPersistent and one maxEventsInMemory
  • IsPersistent decided to create a PersistentWindowedBoltExecutor or StatefulWindowedBoltExecutor
  • MaxEventsInMemory determines how much data WindowState keeps in memory and moves the rest to KeyValueState(HBaseKeyValueState, InMemoryKeyValueState, and RedisKeyValueState)
  • The IStatefulWindowedBolt interface has an abstract implementation class called BaseStatefulWindowedBolt

WithWindow and withTumblingWindow

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/base/BaseWindowedBolt Java

    /**
     * Tuple count based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * A time duration based tumbling window.
     *
     * @param duration the time duration after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Duration duration) {
        return withWindowLength(duration).withSlidingInterval(duration);
    }

    /**
     * A count based tumbling window.
     *
     * @param count the number of tuples after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Count count) {
        return withWindowLength(count).withSlidingInterval(count);
    }
Copy the code
  • The BaseWindowedBolt abstract class defines withWindow methods, which define windowLength and slidingIntervals parameters that have two dimensions, Duration and Count
  • @withTumblingWindow/tumbling Window
  • You can see from the method definition that the windowLength and slidingInterval parameters of withTumblingWindow have the same value

WindowedBoltExecutor

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/WindowedBoltExecutor Java

public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag

    //......

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if(topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number)  topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding intervalif (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if(topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);  }else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if(timestampExtractor ! = null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);if(lateTupleStream ! = null) {if(! context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if(lateTupleStream ! = null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); }else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } windowedOutputCollector.ack(input); }}else{ windowManager.add(input); }}Copy the code
  • The initWindowManager lags behind the maxLags (lags) and lags behind the Original WaterMarkEventGenerator (lags behind the Original WaterMarkEventGenerator)
  • If waterMarkEventGenerator track method returns false, and there is no configuration Config. TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, can print the log, The format is Received a late tuple {} with ts {}. This will not be processed.

WaterMarkEventGenerator

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/windowing/WaterMarkEventGenerator Java

public class WaterMarkEventGenerator<T> implements Runnable {

    /**
     * Creates a new WatermarkEventGenerator.
     *
     * @param windowManager The window manager this generator will submit watermark events to
     * @param intervalMs    The generator will check ifit should generate a watermark event with this interval * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late * @param inputStreams The input streams this generator is expected to  handle */ public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs, int eventTsLagMs, Set<GlobalStreamId> inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("watermark-event-generator-%d")
            .setDaemon(true)
            .build();
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

        this.interval = intervalMs;
        this.eventTsLag = eventTsLagMs;
        this.inputStreams = inputStreams;
    }

    public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } / /... /** * Tracks the timestamp of the eventin the stream, returns true if the event can be considered for processing or false if its a late
     * event.
     */
    public boolean track(GlobalStreamId stream, long ts) {
        Long currentVal = streamToTs.get(stream);
        if (currentVal == null || ts > currentVal) {
            streamToTs.put(stream, ts);
        }
        checkFailures();
        return ts >= lastWaterMarkTs;
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }

    /**
     * Computes the min ts across all streams.
     */
    private long computeWaterMarkTs() {
        long ts = 0;
        // only if some data has arrived on each input stream
        if (streamToTs.size() >= inputStreams.size()) {
            ts = Long.MAX_VALUE;
            for(Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) { ts = Math.min(ts, entry.getValue()); }}returnts - eventTsLag; }}Copy the code
  • The track method determines whether a tuple needs to be processed based on its timestamp and lastWaterMarkTs
  • LastWaterMarkTs is updated in the Run method of WaterMarkEventGenerator, which computes the minimum timestamp of the streamToTs tuple and subtracts eventTsLag, Is waterMarkTs value
  • If waterMarkTs is greater than lastWaterMarkTs, it updates, that is, the Run method of WaterMarkEventGenerator continuously evaluates waterMarkTs and then guarantees that lastWaterMarkTs takes the maximum value of waterMarkTs
  • WaterMarkEventGenerator triggers a scheduled task in the start method at the watermarkInterval, that is, the run method is executed every watermarkInterval

summary

  • – Storm’s WindowedBolt is divided into IWindowedBolt and IStatefulWindowedBolt, one is stateless and the other is stateful
  • The window has two important parameters, windowLength and slidingInterval, which have two dimensions, Duration and Count
  • The withTumblingWindow method of BaseWindowedBolt sets the same windowLength and slidingInterval parameters. Tumbling window is a special sliding window. This parameter does not overlap
  • The WaterMarkEventGenerator will trigger a scheduling task that will calculate the waterMarkTs(The minimum of the latest tuple timestamp in the input stream minus the Lag value), and then update lastWaterMarkTs if the value is greater than lastWaterMarkTs
  • WaterMarkEventGenerator. Track method is used to calculate the tuple should deal with, if the timestamp of the tuple is less than lastWaterMarkTs, it returns false, If config.topology_bolts_LATE_tuple_stream is configured, it is sent to the stream, if not, log is printed

doc

  • Windowing Support in Core Storm
  • Tumbling Windows vs Sliding Windows