A list,

Storm-Redis provides Storm and Redis integration support, you only need to import the corresponding dependencies to use:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
    <type>jar</type>
</dependency> 
Copy the code

Storm-redis uses Jedis as the Redis client and provides the following three basic Bolt implementations:

  • RedisLookupBolt: Query data from Redis;
  • RedisStoreBolt: Stores data to Redis;
  • RedisFilterBolt: Queries eligible data;

RedisLookupBolt, RedisStoreBolt, and RedisFilterBolt all inherit from the AbstractRedisBolt class. We can extend this functionality by inheriting this abstract class and implementing a custom RedisBolt.

Ii. Integration cases

2.1 Project Structure

Here is an integration example: take word frequency statistics and store the final results in Redis. The project structure is as follows:

Use case source code download address: storm- Redis-integration

2.2 Project Dependency

The project mainly relies on the following:

<properties>
    <storm.version>1.2.2</storm.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-redis</artifactId>
        <version>${storm.version}</version>
    </dependency>
</dependencies>
Copy the code

2.3 DataSourceSpout

/** * The source of the word frequency sample */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark"."Hadoop"."HBase"."Storm"."Flink"."Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple(a) {
        // The simulation generates data
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /** ** analog data */
    private String productData(a) {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t".0, endIndex); }}Copy the code

The generated simulated data format is as follows:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm
Copy the code

2.4 SplitBolt

/** * splits each row with the specified delimiter */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word, String.valueOf(1))); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"."count")); }}Copy the code

2.5 CountBolt

/** ** count word frequency */
public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        / / output
        collector.emit(new Values(word, String.valueOf(count)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"."count")); }}Copy the code

2.6 WordCountStoreMapper

RedisStoreMapper interface is implemented to define the mapping relationship between tuple and Redis data. That is, specify which field is key and which field is value in a Tuple, and store them in what data structure of Redis.

/** * define the mapping between tuple and data in Redis */
public class  WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountStoreMapper(a) {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription(a) {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getStringByField("count"); }}Copy the code

2.7 WordCountToRedisApp

/** * Perform word frequency statistics and store statistics results in Redis */
public class WordCountToRedisApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String STORE_BOLT = "storeBolt";

    // In real development these parameters can be passed in externally to make the program more flexible
    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // count
        builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
        // save to redis
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);

        // If cluster is transmitted externally, it indicates that the system is started online; otherwise, it indicates that the system is started locally
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWordCountToRedisApp".new Config(), builder.createTopology());
            } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountToRedisApp".newConfig(), builder.createTopology()); }}}Copy the code

2.8 Starting tests

You can run it directly in local mode, or you can package it and commit it to a server cluster. The source code provided by this repository is packaged with maven-shade-plugin by default, and the package command is as follows:

# mvn clean package -D maven.test.skip=true
Copy the code

After starting, look at the data in Redis:

Iii. Implementation principle of Storm-Redis

3.1 AbstractRedisBolt

RedisLookupBolt, RedisStoreBolt, and RedisFilterBolt all derive from the AbstractRedisBolt abstract class. AbstractRedisBolt is indirectly derived from BaseRichBolt.

Important in AbstractRedisBolt is the prepare method, In the method through an external incoming jedis connection pool configuration (jedisPoolConfig/jedisClusterConfig) create for managing JedisCommandsInstanceContainer jedis instances of container.

public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
    protected OutputCollector collector;

    private transient JedisCommandsInstanceContainer container;

    private JedisPoolConfig jedisPoolConfig;
    privateJedisClusterConfig jedisClusterConfig; .@Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        // FIXME: stores map (stormConf), topologyContext and expose these to derived classes
        this.collector = collector;

        if(jedisPoolConfig ! =null) {
            this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
        } else if(jedisClusterConfig ! =null) {
            this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
        } else {
            throw new IllegalArgumentException("Jedis configuration not found"); }}... }Copy the code

JedisCommandsInstanceContainer the build () method is as follows, in fact, is to create JedisPool or JedisCluster and introduced into the container.

public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
        return new JedisContainer(jedisPool);
    }

 public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
        return new JedisClusterContainer(jedisCluster);
    }
Copy the code

3.2 RedisStoreBolt and RedisLookupBolt

The more important method in RedisStoreBolt is the process method, which mainly obtains the value of the incoming key/value from storeMapper and stores it by calling the corresponding method of jedisCommand according to its storage type dataType.

The implementation of RedisLookupBolt is basically similar, fetching the incoming key from lookupMapper and performing query operations.

public class RedisStoreBolt extends AbstractRedisBolt {
    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

   public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }
       
  
    @Override
    public void process(Tuple input) {
        String key = storeMapper.getKeyFromTuple(input);
        String value = storeMapper.getValueFromTuple(input);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();

            switch (dataType) {
                case STRING:
                    jedisCommand.set(key, value);
                    break;

                case LIST:
                    jedisCommand.rpush(key, value);
                    break;

                case HASH:
                    jedisCommand.hset(additionalKey, key, value);
                    break;

                case SET:
                    jedisCommand.sadd(key, value);
                    break;

                case SORTED_SET:
                    jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
                    break;

                case HYPER_LOG_LOG:
                    jedisCommand.pfadd(key, value);
                    break;

                case GEO:
                    String[] array = value.split(":");
                    if(array.length ! =2) {
                        throw new IllegalArgumentException("value structure should be longitude:latitude");
                    }

                    double longitude = Double.valueOf(array[0]);
                    double latitude = Double.valueOf(array[1]);
                    jedisCommand.geoadd(additionalKey, longitude, latitude, key);
                    break;

                default:
                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
            }

            collector.ack(input);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(input);
        } finally{ returnInstance(jedisCommand); }}... }Copy the code

3.3 JedisCommands

All Redis client commands are defined in the JedisCommands interface, which has the following three implementation classes: Jedis, JedisCluster, ShardedJedis. Strom mainly uses the first two implementation classes. Which implementation class is called to execute commands depends on whether jedisPoolConfig or jedisClusterConfig is passed in.

3.4 RedisMapper and TupleMapper

RedisMapper and TupleMapper define how data in tuple and Redis is mapped and transformed.

1. TupleMapper

TupleMapper defines two main methods:

  • GetKeyFromTuple (ITuple tuple) : Get the field from the tuple as the Key;

  • GetValueFromTuple (ITuple tuple) : Get the field from the tuple as the Value;

2. RedisMapper

Defines the data type of method to obtain getDataTypeDescription (), RedisDataTypeDescription RedisDataType enumeration class defines all of the available in Redis data types:

public class RedisDataTypeDescription implements Serializable { 

    public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
     ......
    }
Copy the code

3. RedisStoreMapper

RedisStoreMapper inherits the TupleMapper and RedisMapper interfaces and is used for data storage without defining additional methods.

4. RedisLookupMapper

RedisLookupMapper inherits the TupleMapper and RedisMapper interfaces:

  • DeclareOutputFields defines the declareOutputFields method to declareOutputFields.
  • The toTuple method is defined to assemble the query results into a collection of Storm Values for sending.

The following example shows that the word field is obtained from the input Tuple as the key, and the key and the result value are assembled into values after the RedisLookupBolt query and sent to the next processing unit.

class WordCountRedisLookupMapper implements RedisLookupMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountRedisLookupMapper(a) {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public List<Values> toTuple(ITuple input, Object value) {
        String member = getKeyFromTuple(input);
        List<Values> values = Lists.newArrayList();
        values.add(new Values(member, value));
        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("wordName"."count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription(a) {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null; }}Copy the code

5. RedisFilterMapper

RedisFilterMapper inherits TupleMapper and RedisMapper interfaces to query data. It defines declareOutputFields to declareOutputFields. The following implementation:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("wordName"."count"));
}

Copy the code

4. Customized RedisBolt to realize word frequency statistics

4.1 Implementation Principle

Custom RedisBolt: It mainly uses hincrby key field command of hashed structure in Redis for word frequency statistics. In Redis, hincrby is executed as follows. Hincrby increments a field by a specified value, or creates a new field if it doesn’t already exist and assigns it a value of 0. Using this command, you can easily implement the word frequency statistics function.

redis>  HSET myhash field 5
(integer) 1
redis>  HINCRBY myhash field 1
(integer) 6
redis>  HINCRBY myhash field -1
(integer) 5
redis>  HINCRBY myhash field -10
(integer) -5
redis> 
Copy the code

4.2 Project Structure

4.3 Custom code implementation of RedisBolt

/** * Custom RedisBolt uses the hincrby key field command in the Redis hash data structure for word frequency statistics */
public class RedisCountStoreBolt extends AbstractRedisBolt {

    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

    public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;
        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    @Override
    protected void process(Tuple tuple) {
        String key = storeMapper.getKeyFromTuple(tuple);
        String value = storeMapper.getValueFromTuple(tuple);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();
            if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
                jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
            } else {
                throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
            }

            collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        } finally{ returnInstance(jedisCommand); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}}Copy the code

4.4 CustomRedisCountApp

/** * Use custom RedisBolt to implement word frequency statistics */
public class CustomRedisCountApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String STORE_BOLT = "storeBolt";

    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // save to redis and count
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);

        // If the external parameter cluster is transmitted, it indicates that the online environment is started; otherwise, it indicates that the local environment is started
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterCustomRedisCountApp".new Config(), builder.createTopology());
            } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalCustomRedisCountApp".newConfig(), builder.createTopology()); }}}Copy the code

The resources

  1. Storm Redis Integration

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series