sequence

This paper focuses on storm’s PartialKeyGrouping

The instance

    @Test
    public void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        String spoutId = "wordGenerator";
        String counterId = "counter";
        String aggId = "aggregator";
        String intermediateRankerId = "intermediateRanker";
        String totalRankerId = "finalRanker"; int TOP_N = 5; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new TestWordSpout(), 5); //NOTE using partialKeyGrouping instead of fieldsGrouping, SetBolt (counterId, New RollingCountBolt(9, 3), 4). PartialKeyGrouping (spoutId, New Fields)"word"));
        builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
        builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
        builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
        submitRemote(builder);
    }
Copy the code
  • It is worth noting that bolt in wordCount uses PartialKeyGrouping. The same word is no longer sent to the same task, so RollingCountAggBolt is also grouped in fieldsGrouping.

PartialKeyGrouping(Version 1.2.2 for)

Storm – core – 1.2.2 – sources jar! /org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -447379837314000353L;
    private List<Integer> targetTasks;
    private long[] targetTaskStats;
    private HashFunction h1 = Hashing.murmur3_128(13);
    private HashFunction h2 = Hashing.murmur3_128(17);
    private Fields fields = null;
    private Fields outFields = null;

    public PartialKeyGrouping() {
        //Empty
    }

    public PartialKeyGrouping(Fields fields) {
        this.fields = fields;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        targetTaskStats = new long[this.targetTasks.size()];
        if(this.fields ! = null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {
            byte[] raw;
            if(fields ! = null) { List<Object> selectedFields = outFields.select(fields, values); ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o: selectedFields) {
                    if (o instanceof List) {
                        out.putInt(Arrays.deepHashCode(((List)o).toArray()));
                    } else if (o instanceof Object[]) {
                        out.putInt(Arrays.deepHashCode((Object[])o));
                    } else if (o instanceof byte[]) {
                        out.putInt(Arrays.hashCode((byte[]) o));
                    } else if (o instanceof short[]) {
                        out.putInt(Arrays.hashCode((short[]) o));
                    } else if (o instanceof int[]) {
                        out.putInt(Arrays.hashCode((int[]) o));
                    } else if (o instanceof long[]) {
                        out.putInt(Arrays.hashCode((long[]) o));
                    } else if (o instanceof char[]) {
                        out.putInt(Arrays.hashCode((char[]) o));
                    } else if (o instanceof float[]) {
                        out.putInt(Arrays.hashCode((float[]) o));
                    } else if (o instanceof double[]) {
                        out.putInt(Arrays.hashCode((double[]) o));
                    } else if (o instanceof boolean[]) {
                        out.putInt(Arrays.hashCode((boolean[]) o));
                    } else if(o ! = null) { out.putInt(o.hashCode()); }else {
                      out.putInt(0);
                    }
                }
                raw = out.array();
            } else {
                raw = values.get(0).toString().getBytes(); // assume key is the first field
            }
            int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());
            int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());
            int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;
            boltIds.add(targetTasks.get(selected));
            targetTaskStats[selected]++;
        }
        returnboltIds; }}Copy the code
  • PartialKeyGrouping is a kind of CustomStreamGrouping. When preparing, initialize long[] targetTaskStats to calculate each task
  • PartialKeyGrouping if no fields are specified, the first field of outputFields is calculated by default
  • Here we use the Hashing. Murmur3_128 function provided by the Guava library to construct two hashfunctions, and then calculate the absolute value of the hash with the remainder of targetTasks. Size () to get two optional taskId subscripts
  • Then select the taskId that has been used less times according to the targetTaskStats statistics and update targetTaskStats

PartialKeyGrouping(2.0.0 version)

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/grouping/PartialKeyGrouping Java

/**
 * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send
 * Tuples from a given partition to multiple downstream tasks.
 *
 * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each
 * key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.
 *
 * Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default
 * AssignmentCreator hashes the key and produces an assignment of two tasks
 */
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -1672360572274911808L;
    private List<Integer> targetTasks;
    private Fields fields = null;
    private Fields outFields = null;

    private AssignmentCreator assignmentCreator;
    private TargetSelector targetSelector;

    public PartialKeyGrouping() {
        this(null);
    }

    public PartialKeyGrouping(Fields fields) {
        this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
        this(fields, assignmentCreator, new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
        this.fields = fields;
        this.assignmentCreator = assignmentCreator;
        this.targetSelector = targetSelector;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        if(this.fields ! = null) { this.outFields = context.getComponentOutputFields(stream); } } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {
            final byte[] rawKeyBytes = getKeyBytes(values);

            final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
            final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

            boltIds.add(selectedTask);
        }
        return boltIds;
    }

    /**
     * Extract the key from the input Tuple.
     */
    private byte[] getKeyBytes(List<Object> values) {
        byte[] raw;
        if(fields ! = null) { List<Object> selectedFields = outFields.select(fields, values); ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o : selectedFields) {
                if (o instanceof List) {
                    out.putInt(Arrays.deepHashCode(((List) o).toArray()));
                } else if (o instanceof Object[]) {
                    out.putInt(Arrays.deepHashCode((Object[]) o));
                } else if (o instanceof byte[]) {
                    out.putInt(Arrays.hashCode((byte[]) o));
                } else if (o instanceof short[]) {
                    out.putInt(Arrays.hashCode((short[]) o));
                } else if (o instanceof int[]) {
                    out.putInt(Arrays.hashCode((int[]) o));
                } else if (o instanceof long[]) {
                    out.putInt(Arrays.hashCode((long[]) o));
                } else if (o instanceof char[]) {
                    out.putInt(Arrays.hashCode((char[]) o));
                } else if (o instanceof float[]) {
                    out.putInt(Arrays.hashCode((float[]) o));
                } else if (o instanceof double[]) {
                    out.putInt(Arrays.hashCode((double[]) o));
                } else if (o instanceof boolean[]) {
                    out.putInt(Arrays.hashCode((boolean[]) o));
                } else if(o ! = null) { out.putInt(o.hashCode()); }else {
                    out.putInt(0);
                }
            }
            raw = out.array();
        } else {
            raw = values.get(0).toString().getBytes(); // assume key is the first field
        }
        returnraw; } / /... }Copy the code
  • 2.0.0 version will encapsulate the RandomTwoTaskAssignmentCreator and BalancedTargetSelector logic

RandomTwoTaskAssignmentCreator

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/grouping/PartialKeyGrouping Java

    /**
     * This interface is responsible for choosing a subset of the target tasks to use for a given key.
     *
     * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus
     * each of them needs to come up with the same assignment fora given key. */ public interface AssignmentCreator extends Serializable { int[] createAssignment(List<Integer> targetTasks, byte[] key); } /*========== Implementations ==========*/ /** * This implementation of AssignmentCreator chooses two arbitrary tasks. */ public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { /** * Creates a two task assignment  by selecting random tasks. */ public int[] createAssignment(List<Integer> tasks, byte[] key) { // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key final long seedForRandom = Arrays.hashCode(key); final Random random = new Random(seedForRandom); final int choice1 = random.nextInt(tasks.size()); int choice2 = random.nextInt(tasks.size()); // ensure that choice1 and choice2 are not the same task choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;returnnew int[]{ tasks.get(choice1), tasks.get(choice2) }; }}Copy the code
  • Instead of using the Hashing. Murmur3_128 hash function provided by the Guava library, version 2.0.0 uses the hash of key as the seed and the Random function to compute the subscripts of the two taskId, which returns two values for Bolt to load balance

BalancedTargetSelector

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/grouping/PartialKeyGrouping Java

    /**
     * This interface chooses one element from a task assignment to send a specific Tuple to.
     */
    public interface TargetSelector extends Serializable {
        Integer chooseTask(int[] assignedTasks);
    }

    /**
     * A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples
     * overall from this instance of the grouping.
     */
    public static class BalancedTargetSelector implements TargetSelector {
        private Map<Integer, Long> targetTaskStats = Maps.newHashMap();

        /**
         * Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
         */
        public Integer chooseTask(int[] assignedTasks) {
            Integer taskIdWithMinLoad = null;
            Long minTaskLoad = Long.MAX_VALUE;

            for (Integer currentTaskId : assignedTasks) {
                final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
                if (currentTaskLoad < minTaskLoad) {
                    minTaskLoad = currentTaskLoad;
                    taskIdWithMinLoad = currentTaskId;
                }
            }

            targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
            return taskIdWithMinLoad; }}Copy the code
  • BalancedTargetSelector returns the taskIdWithMinLoad based on the selected taskId, and then computs the taskIdWithMinLoad return based on targetTaskStats

FieldsGrouper

Storm – 2.0.0 / storm – client/SRC/JVM/org daemon/GrouperFactory/apache/storm/Java

    public static class FieldsGrouper implements CustomStreamGrouping {

        private Fields outFields;
        private List<List<Integer>> targetTasks;
        private Fields groupFields;
        private int numTasks;

        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
            this.outFields = outFields;
            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = new ArrayList<List<Integer>>();
            for (Integer targetTask : targetTasks) {
                this.targetTasks.add(Collections.singletonList(targetTask));
            }
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
            returntargetTasks.get(targetTaskIndex); }}Copy the code
  • Here can see FieldsGrouper chooseTasks method USES TupleUtils. ChooseTaskIndex to choose taskId subscript

TupleUtils.chooseTaskIndex

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/utils/TupleUtils Java

    public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {
        return Math.floorMod(listHashCode(keys), numTasks);
    }

    private static <T> int listHashCode(List<T> alist) {
        if (alist == null) {
            return 1;
        } else {
            returnArrays.deepHashCode(alist.toArray()); }}Copy the code
  • ListHashCode is applied to keys, followed by a Math. FloorMod operation with numTasks
  • ListHashCode calls array.deephashCode (alist.toarray ()) for the hash calculation

summary

  • But the problem of skewed load of bolt nodes caused by field grouping is solved by storm’s PartialKeyGrouping
  • FieldsGrouping uses a hash of the selected field and then modulo down the taskId number to select the subscript of taskId
  • An implementation of PartialKeyGrouping in version 1.2.2 uses the Hashing. Murmur3_128 hash function provided by Guava to compute the hash value, then takes the absolute value and the remainder of the number of taskId to get two optional taskId subscripts; In version 2.0.0, the hash of the key is used as the seed, and the Random function is used to calculate the subscripts of the two taskId. Note that bolt returns two values for load balancing, as opposed to fieldsGrouping. After getting two candidate taskId, PartialKeyGrouping maintains the extra count of the taskId, using less for each selection and updating the count for each selection.
  • It is worth noting that bolt in wordCount uses PartialKeyGrouping. The same word is no longer sent to the same task, so RollingCountAggBolt is also grouped in fieldsGrouping.

doc

  • Common Topology Patterns
  • The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines
  • Storm – source code analysis – Streaming Grouping (the backtype. Storm. The daemon. Executor)