A list,

The following figure shows the operation flow chart of Strom. When developing the Storm flow handler, we need to implement spout(data source) and Bolt (processing unit) with built-in or custom implementation, and associate them with TopologyBuilder to form the Topology.

IComponent interface

The IComponent interface defines the common methods for all components in the Topology (SPout/Bolt). The custom SPout or Bolt must implement this interface directly or indirectly.

public interface IComponent extends Serializable {

    /** * Declare the output mode for all streams in this topology. *@paramDeclarer This is used to declare the output stream ID, output field, and whether each output stream is a direct stream */
    void declareOutputFields(OutputFieldsDeclarer declarer);

    /** * Declare the configuration of this component. * * /
    Map<String, Object> getComponentConfiguration(a);

}
Copy the code

Third, Spout

3.1 ISpout interface

A custom SPout needs to implement the ISpout interface, which defines all available spout methods:

public interface ISpout extends Serializable {
    /** * is called when the component is initialized@paramConf ISpout configuration *@paramContext An application context from which you can obtain task ids, component ids, and input and output information. *@paramThe collector is used to send tuples in spout. It is thread-safe and is recommended to save the instance variable */ for this SPout object
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /** * ISpout is called when it is about to be closed. It does not have to be executed, however, and cannot be executed if the process is killed by kill -9 in a clustered environment. * /
    void close(a);
    
    /** * is called when ISpout is activated from the disabled state */
    void activate(a);
    
    /** * is called when ISpout is disabled */
    void deactivate(a);

    /** * This is a core method that sends tuples to the next receiver primarily by calling collector in this method, which must be non-blocking. NextTuple /ack/fail/ is executed in the same thread, so don't worry about thread-safety. Let * nextTuple sleep when no tuples are emitted to avoid wasting CPU. * /
    void nextTuple(a);

    /** * Confirm tuples with msgId. Tuples will not be sent again */
    void ack(Object msgId);

    /** * Confirm that tuples processing failed with the msgId. The confirmed tuples will be sent again for processing */
    void fail(Object msgId);
}
Copy the code

3.2 BaseRichSpout abstract class

Typically, we implement our custom Spout not directly implementing the ISpout interface, but inheriting BaseRichSpout. BaseRichSpout inherits from BaseCompont and implements the IRichSpout interface.

The IRichSpout interface inherits from ISpout and IComponent and does not define any methods of its own:

public interface IRichSpout extends ISpout.IComponent {}Copy the code

Empty BaseComponent abstract class implements the IComponent getComponentConfiguration methods:

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration(a) {
        return null; }}Copy the code

BaseRichSpout inherits from the BaseCompont class and implements the IRichSpout interface, as well as some of its methods:

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    @Override
    public void close(a) {}

    @Override
    public void activate(a) {}

    @Override
    public void deactivate(a) {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}}Copy the code

With this design, there are only three methods we must implement when implementing a custom SPout that inherits from BaseRichSpout:

  • open: comes from ISpout, which can be used to get the object used to send tuplesSpoutOutputCollector;
  • NextTuple: From ISpout, tuples must be sent inside this method;
  • DeclareOutputFields: derived from IComponent, declares the name of the tuples sent so that the next component knows how to receive them.

Fourth, the Bolt

Bolt’s interface design is similar to spout’s:

4.1 IBolt interface

 /** * The IBolt object created on the client computer. It is serialized to the topology (using Java serialization) and presented to the cluster's host (Nimbus). * Nimbus starts the Workers deserialization object, calls Prepare, and begins processing tuples. * /

public interface IBolt extends Serializable {
    /** * is called when the component is initialized@paramConfiguration of this Bolt as defined in conf Storm *@paramContext An application context from which you can obtain task ids, component ids, and input and output information. *@paramThe collector is used to send tuples in spout. It is thread-safe and is recommended to save the instance variable */ for this SPout object
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

    /** * processes a single tuple input. * *@paramThe Tuple object contains metadata about it (such as from which component/stream/task) */
    void execute(Tuple input);

    /** * IBolt is called when it is about to be closed. It does not have to be executed, however, and cannot be executed if the process is killed by kill -9 in a clustered environment. * /
    void cleanup(a);
Copy the code

4.2 BaseRichBolt Abstract class

Similarly, when implementing a custom Bolt, you usually inherit from the BaseRichBolt abstract class. BaseRichBolt inherits from the BaseComponent abstract class and implements the IRichBolt interface.

The IRichBolt interface inherits from IBolt and IComponent and does not define any methods of its own:

public interface IRichBolt extends IBolt, IComponent {

}
Copy the code

With this design, there are only three necessary methods to implement when implementing a custom Bolt from BaseRichBolt:

  • prepare: comes from IBolt and can be used to get the object used to send tuplesOutputCollector;
  • Execute: comes from IBolt, processes tuples and sends tuples after processing;
  • DeclareOutputFields: derived from IComponent, declares the name of the tuples sent so that the next component knows how to receive them.

Fifth, word frequency statistics case

5.1 Case Introduction

Here we use our custom DataSourceSpout to generate word frequency data, and then use our custom SplitBolt and CountBolt for word frequency statistics.

Storm -word-count

5.2 Code Implementation

1. Project dependencies

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
</dependency>
Copy the code

2. DataSourceSpout

public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark"."Hadoop"."HBase"."Storm"."Flink"."Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple(a) {
        // The simulation generates data
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /** ** analog data */
    private String productData(a) {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t".0, endIndex); }}Copy the code

The above class uses the productData method to generate simulated data in the following format:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm
Copy the code

3. SplitBolt

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(newValues(word)); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word")); }}Copy the code

4. CountBolt

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        / / output
        System.out.print("Current real-time statistics :");
        counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
        System.out.println();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}}Copy the code

5. LocalWordCountApp

The components defined above are connected in series through the Topology builder to form the Topology, which is presented to the LocalCluster to run. Typically during development, you can test in local mode first, and then commit to a server cluster for running after the test is complete.

public class LocalWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("DataSourceSpout".new DataSourceSpout());
        
        // Send the DataSourceSpout data to SplitBolt for processing
        builder.setBolt("SplitBolt".new SplitBolt()).shuffleGrouping("DataSourceSpout");
        
        // to send SplitBolt data to CountBolt for processing
        builder.setBolt("CountBolt".new CountBolt()).shuffleGrouping("SplitBolt");

        // Create a local cluster to test this mode. You don't need to install Storm locally, just run the Main method
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountApp".newConfig(), builder.createTopology()); }}Copy the code

6. Running result

Start the main method of WordCountApp, Storm will automatically set up a cluster in the local mode, so the startup process will be a little slower, you can see the log output after successful startup.

Commit to a server cluster for execution

6.1 Code Changes

The code submitted to the server is slightly different from the native code in that the StormSubmitter is used for submission to the server cluster. The main code is as follows:

For clarity, we’ll create a new ClusterWordCountApp class to demonstrate the cluster-mode commit. In practice, you can write the code for both schemas in the same class and decide which schema to start by passing in external parameters.

public class ClusterWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("DataSourceSpout".new DataSourceSpout());
        
        // Send the DataSourceSpout data to SplitBolt for processing
        builder.setBolt("SplitBolt".new SplitBolt()).shuffleGrouping("DataSourceSpout");
        
        // to send SplitBolt data to CountBolt for processing
        builder.setBolt("CountBolt".new CountBolt()).shuffleGrouping("SplitBolt");

        // Use StormSubmitter to submit the Topology to the server cluster
        try {
            StormSubmitter.submitTopology("ClusterWordCountApp".new Config(), builder.createTopology());
        } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}}Copy the code

6.2 Packing and Uploading

Package it and upload it to any location on the server. In this case, THE packaged name is storm-word-count-1.0.jar

# mvn clean package -Dmaven.test.skip=true
Copy the code

6.3 submit the Topology

Submit the Topology to the cluster using the following command:

#Command format: storm jar jar package location Full path of main class... Optional argumentsStorm jar/usr/appjar/storm - word count - 1.0. Jar com. Heibaiying. Wordcount. ClusterWordCountAppCopy the code

If the message “successfully” is displayed, the submission is successful:

6.4 Checking Topology and Stopping Topology (CLI)

#View all Topology
storm list

#To stop the stormkill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3
Copy the code

6.5 Viewing Topology or Stopping Topology (Interface mode)

You can also stop the Topology operation using the UI. On the WEB UI (port 8080), click Topology Summary to go to the Topology details screen.

Vii. Extended instructions on project packaging

Limitations of the MVN Package

In the above steps, we packaged the project directly using the MVN package without configuring any plug-ins in the POM, which is feasible for projects that do not use external dependency packages. However, if a third-party JAR package is used in the project, there will be a problem, because the JAR packaged by package does not contain the dependency package, and if you submit it to the server to run, there will be an exception that the third-party dependency cannot be found.

At this point, you might wonder if we didn’t use the storm-core dependency in our project. This JAR package is provided in the Storm cluster environment in the lib directory of the installation directory:

To illustrate this, I introduced a third-party JAR package in Maven and modified the method of generating the data:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.8.1</version>
</dependency>
Copy the code

Stringutils.join () is available in both commons.lang3 and storm-core. Nothing needs to be changed from the original code except to specify the use of commons.lang3 when importing.

import org.apache.commons.lang3.StringUtils;

private String productData(a) {
    Collections.shuffle(list);
    Random random = new Random();
    int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
    return StringUtils.join(list.toArray(), "\t".0, endIndex);
}
Copy the code

Run the MVN Clean Package directly, and the exception shown below will be thrown. Therefore, this direct packaging approach is not suitable for real development, where third-party JAR packages are often required.

Maven provides two plugins, maven-assembly-plugin and Maven-shade-plugin, to add dependency packages to the final JAR. Since this article is quite long and there is much to be explained about Storm packaging, the packaging method of Storm will be sorted out separately in the next article:

Storm: Comparison and analysis of three packaging methods

The resources

  1. Running Topologies on a Production Cluster
  2. Pre-defined Descriptor Files