sequence

This article focuses on how flink is compatible with StormTopology

The instance

    @Test
    public void testStormWordCount() throws Exception {
        //NOTE 1 build Topology the Storm way
        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 5)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");

        //NOTE 2 convert StormTopology to FlinkTopology
        FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);

        //NOTE 3 execute program locally using FlinkLocalCluster
        Config conf = new Config();
        // only required to stabilize integration test
        conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);

        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology("stormWordCount", conf, flinkTopology);
        cluster.shutdown();
    }
Copy the code
  • Here use FlinkLocalCluster. GetLocalCluster () to create or obtain FlinkLocalCluster, called after FlinkLocalCluster. SubmitTopology to submit the topology, Through FlinkLocalCluster. At the end of shutdown to close the cluster
  • The RandomWordSpout built here is derived from Storm’s BaseRichSpout and WordCountBolt is derived from Storm’s BaseBasicBolt. PrintBolt is derived from Storm’s BaseRichBolt(Since Flink uses Checkpoint and does not convert storm ack operations, there is no special requirement to use BaseBasicBolt or BaseRichBolt)
  • FlinkLocalCluster. SubmitTopology here using the topology is StormTopoloy FlinkTopology after conversion

LocalClusterFactory

Flink – release – 1.6.2 / flink – contrib/flink – storm/SRC/main/Java/org/apache/flink/storm/API/FlinkLocalCluster Java

	// ------------------------------------------------------------------------
	//  Access to default local cluster
	// ------------------------------------------------------------------------

	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

	/**
	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
	 *
	 * @return a {@link FlinkLocalCluster} to be used for execution
	 */
	public static FlinkLocalCluster getLocalCluster() {
		return currentFactory.createLocalCluster();
	}

	/**
	 * Sets a different factory for FlinkLocalClusters to be used for execution.
	 *
	 * @param clusterFactory
	 * 		The LocalClusterFactory to create the local clusters for execution.
	 */
	public static void initialize(LocalClusterFactory clusterFactory) {
		currentFactory = Objects.requireNonNull(clusterFactory);
	}

	// ------------------------------------------------------------------------
	//  Cluster factory
	// ------------------------------------------------------------------------

	/**
	 * A factory that creates local clusters.
	 */
	public interface LocalClusterFactory {

		/**
		 * Creates a local Flink cluster.
		 * @return A local Flink cluster.
		 */
		FlinkLocalCluster createLocalCluster();
	}

	/**
	 * A factory that instantiates a FlinkLocalCluster.
	 */
	public static class DefaultLocalClusterFactory implements LocalClusterFactory {

		@Override
		public FlinkLocalCluster createLocalCluster() {
			returnnew FlinkLocalCluster(); }}Copy the code
  • Flink provides a static method getLocalCluster in FlinkLocalCluster to get FlinkLocalCluster, which creates a FlinkLocalCluster using the LocalClusterFactory
  • LocalClusterFactory used here is DefaultLocalClusterFactory implementation class, its createLocalCluster method, new a FlinkLocalCluster directly
  • The current implementation, each invocation FlinkLocalCluster. GetLocalCluster, will create a new FlinkLocalCluster, this was the call need to pay attention to

FlinkTopology

Flink – release – 1.6.2 / flink – contrib/flink – storm/SRC/main/Java/org/apache/flink/storm/API/FlinkTopology Java

/** * Creates a Flink program that uses the specified spouts and bolts. * @param stormBuilder The Storm topology builder  to usefor creating the Flink topology.
	 * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
	 */
	public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
		return new FlinkTopology(stormBuilder);
	}

	private FlinkTopology(TopologyBuilder builder) {
		this.builder = builder;
		this.stormTopology = builder.createTopology();
		// extract the spouts and bolts
		this.spouts = getPrivateField("_spouts");
		this.bolts = getPrivateField("_bolts");

		this.env = StreamExecutionEnvironment.getExecutionEnvironment();

		// Kick off the translation immediately
		translateTopology();
	}
Copy the code
  • FlinkTopology provides a static factory method createTopology to create FlinkTopology
  • FlinkTopology saves the TopologyBuilder. Then it calls getDeclaredField with the getPrivateField reflection to get the _spouts and _bolts private attributes and saves them for later transformation topology to use
  • Then we get the ExecutionEnvironment, and finally we call translateTopology to transform the entire StormTopology

translateTopology

Flink – release – 1.6.2 / flink – contrib/flink – storm/SRC/main/Java/org/apache/flink/storm/API/FlinkTopology Java

	/**
	 * Creates a Flink program that uses the specified spouts and bolts.
	 */
	private void translateTopology() {

		unprocessdInputsPerBolt.clear();
		outputStreams.clear();
		declarers.clear();
		availableInputs.clear();

		// Storm defaults to parallelism 1
		env.setParallelism(1);

		/* Translation of topology */

		for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
			final String spoutId = spout.getKey();
			final IRichSpout userSpout = spout.getValue();

			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
			userSpout.declareOutputFields(declarer);
			final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
			this.outputStreams.put(spoutId, sourceStreams); declarers.put(spoutId, declarer); final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); final DataStreamSource<? >source;

			if (sourceStreams.size() == 1) {
				final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
				spoutWrapperSingleOutput.setStormTopology(stormTopology);

				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];

				DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
						declarer.getOutputType(outputStreamId));

				outputStreams.put(outputStreamId, src);
				source = src;
			} else {
				final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
						userSpout, spoutId, null, null);
				spoutWrapperMultipleOutputs.setStormTopology(stormTopology);

				@SuppressWarnings({ "unchecked"."rawtypes" })
				DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
						spoutWrapperMultipleOutputs, spoutId,
						(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));

				SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
						.split(new StormStreamSelector<Tuple>());
				for (String streamId : sourceStreams.keySet()) {
					SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
							.map(new SplitStreamMapper<Tuple>());
					outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
					outputStreams.put(streamId, outStream);
				}
				source = multiSource;
			}
			availableInputs.put(spoutId, outputStreams);

			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
			if (common.is_set_parallelism_hint()) {
				int dop = common.get_parallelism_hint();
				source.setParallelism(dop);
			} else {
				common.set_parallelism_hint(1);
			}
		}

		/**
		 * 1. Connect all spout streams with bolts streams
		 * 2. Then proceed with the bolts stream already connected
		 *
		 * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
		 * its producer
		 * ->thus, we might need to repeat multiple times
		 */
		boolean makeProgress = true;
		while (bolts.size() > 0) {
			if(! makeProgress) { StringBuilder strBld = new StringBuilder(); strBld.append("Unable to build Topology. Could not connect the following bolts:");
				for (String boltId : bolts.keySet()) {
					strBld.append("\n ");
					strBld.append(boltId);
					strBld.append(": missing input streams [");
					for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
							.get(boltId)) {
						strBld.append("'");
						strBld.append(streams.getKey().get_streamId());
						strBld.append("' from '");
						strBld.append(streams.getKey().get_componentId());
						strBld.append("'; ");
					}
					strBld.append("]");
				}

				throw new RuntimeException(strBld.toString());
			}
			makeProgress = false;

			final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
			while (boltsIterator.hasNext()) {

				final Entry<String, IRichBolt> bolt = boltsIterator.next();
				final String boltId = bolt.getKey();
				final IRichBolt userBolt = copyObject(bolt.getValue());

				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();

				Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
				if (unprocessedBoltInputs == null) {
					unprocessedBoltInputs = new HashSet<>();
					unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
					unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
				}

				// check if all inputs are available
				final int numberOfInputs = unprocessedBoltInputs.size();
				int inputsAvailable = 0;
				for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
					final String producerId = entry.getKey().get_componentId();
					final String streamId = entry.getKey().get_streamId();
					final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
					if(streams ! = null && streams.get(streamId) ! = null) { inputsAvailable++; }}if(inputsAvailable ! = numberOfInputs) { // traverse other bolts first until inputs are availablecontinue;
				} else {
					makeProgress = true;
					boltsIterator.remove();
				}

				final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);

				for(Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) { final GlobalStreamId streamId = input.getKey(); final Grouping grouping = input.getValue(); final String producerId = streamId.get_componentId(); final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId); inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); } final SingleOutputStreamOperator<? > outputStream = createOutput(boltId, userBolt, inputStreams);if (common.is_set_parallelism_hint()) {
					int dop = common.get_parallelism_hint();
					outputStream.setParallelism(dop);
				} else{ common.set_parallelism_hint(1); }}}}Copy the code
  • The transformation is to transform Spout first, then Bolt, based on spouts and bolts information retrieved from Storm’s TopologyBuilder object using reflection in the constructor
  • Use FlinkOutputFieldsDeclarer flink (It implements Storm's OutputFieldsDeclarer interface) to hold storm’s IRichSpout and declareOutputFields configured in IRichBolt. Note that Flink does not support dirCT emit; Here by userSpout declareOutputFields method, the original declare information set to FlinkOutputFieldsDeclarer spout
  • Flink using SpoutWrapper to packaging spout, converts it to RichParallelSourceFunction type, is here to spout outputStreams number greater than 1 for different processing; After is the RichParallelSourceFunction as StreamExecutionEnvironment. Create flink DataStreamSource addSource method of parameters, and added to the availableInputs, Then set DataStreamSource parallelism based on SPout parallelismHit
  • For bolt conversion, unprocessdInputsPerBolt is maintained. Key is boltId and value is the GlobalStreamId and Grouping mode that the bolt connects to. Because map is used for traversal, the converted bolt may be out of order. If the link GlobalStreamId exists, transform it and remove it from the bolts. If the bolt GlobalStreamId is not in availableInputs, the next bolt needs to be skipped and not removed from the bolts. Because the outer loop condition is that the bolts’ size is greater than zero, it relies on this mechanism to handle out-of-order
  • An important method of converting Bolt is processInput, which converts Bolt’s grouping to spout’s DataStream.For example, switching from shuffleGrouping to rebalance DataStream, fieldsGrouping to keyBy of DataStream, and globalGrouping to Global, The allGrouping operation is converted to broadcast), and then call createOutput to convert Bolt to flink’s OneInputStreamOperator using BoltWrapper or MergedInputsBoltWrapper. And then as a parameter to transform operation returns the flink SingleOutputStreamOperator stream, at the same time, will be transformed SingleOutputStreamOperator added to the availableInputs, According to bolt parallelismHint after the parallelism SingleOutputStreamOperator Settings

FlinkLocalCluster

Flink – storm_2. 11-1.6.2 – sources. The jar! /org/apache/flink/storm/api/FlinkLocalCluster.java

/**
 * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
 */
public class FlinkLocalCluster {

	/** The log used by this mini cluster. */
	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

	/** The Flink mini cluster on which to execute the programs. */
	private FlinkMiniCluster flink;

	/** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
	public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";

	public FlinkLocalCluster() {
	}

	public FlinkLocalCluster(FlinkMiniCluster flink) {
		this.flink = Objects.requireNonNull(flink);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
			throws Exception {
		this.submitTopologyWithOpts(topologyName, conf, topology, null);
	}

	@SuppressWarnings("rawtypes")
	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
		LOG.info("Running Storm topology on FlinkLocalCluster");

		boolean submitBlocking = false;
		if(conf ! = null) { Object blockingFlag = conf.get(SUBMIT_BLOCKING);if (blockingFlag instanceof Boolean) {
				submitBlocking = ((Boolean) blockingFlag).booleanValue();
			}
		}

		FlinkClient.addStormConfigToTopology(topology, conf);

		StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
		streamGraph.setJobName(topologyName);

		JobGraph jobGraph = streamGraph.getJobGraph();

		if (this.flink == null) {
			Configuration configuration = new Configuration();
			configuration.addAll(jobGraph.getJobConfiguration());

			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

			this.flink = new LocalFlinkMiniCluster(configuration, true);
			this.flink.start();
		}

		if (submitBlocking) {
			this.flink.submitJobAndWait(jobGraph, false);
		} else {
			this.flink.submitJobDetached(jobGraph);
		}
	}

	public void killTopology(final String topologyName) {
		this.killTopologyWithOpts(topologyName, null);
	}

	public void killTopologyWithOpts(final String name, final KillOptions options) {
	}

	public void activate(final String topologyName) {
	}

	public void deactivate(final String topologyName) {
	}

	public void rebalance(final String name, final RebalanceOptions options) {
	}

	public void shutdown() {
		if(this.flink ! = null) { this.flink.stop(); this.flink = null; }} / /... }Copy the code
  • FlinkLocalCluster’s submitTopology method calls submitTopologyWithOpts, which basically sets parameters, Call the topology. GetExecutionEnvironment (.) getStreamGraph (), according to the transformations to generate StreamGraph to obtain JobGraph, LocalFlinkMiniCluster is then created and started, and the entire JobGraph is submitted using the submitJobAndWait or submitJobDetached of LocalFlinkMiniCluster

summary

  • Flink provides some compatibility with Storm via FlinkTopology, which is very helpful for migrating Storm to Flink
  • To run Storm topology on Flink, there are several main steps. One is to build a native TopologyBuilder for Storm, After through FlinkTopology createTopology (builder) to converting StormTopology FlinkTopology, finally through FlinkLocalCluster (Local mode) or FlinkSubmitter (Remote submittedThe submitTopology method submits FlinkTopology
  • FlinkTopology flink is compatible with the core of the storm, it is responsible for converting StormTopology flink corresponding structure, such as using SpoutWrapper converts spout RichParallelSourceFunction, Added to the StreamExecutionEnvironment then create DataStream, the bolt of the grouping into corresponding operation to spout DataStream (For example, switching from shuffleGrouping to rebalance DataStream, fieldsGrouping to keyBy of DataStream, and globalGrouping to Global, The allGrouping operation is converted to broadcast), then use BoltWrapper or MergedInputsBoltWrapper to convert bolt to Flink’s OneInputStreamOperator, and then transform the stream as an argument
  • Once the FlinkTopology is built, it is committed to local execution using FlinkLocalCluster or to remote execution using FlinkSubmitter
  • FlinkLocalCluster submitTopology method is mainly through FlinkTopology role StreamExecutionEnvironment StreamGraph, JobGraph obtained through it, Then LocalFlinkMiniCluster is created and started, and JobGraph is submitted via LocalFlinkMiniCluster

doc

  • Storm Compatibility Beta