sequence

This article focuses on the split operation of Flink DataStream

The instance

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        returnoutput; }});Copy the code
  • This example splits dataStream into two datastreAms, with one outputName set to even and the other outputName set to odd

DataStream.split

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

	//......

	public SplitStream<T> split(OutputSelector<T> outputSelector) {
		returnnew SplitStream<>(this, clean(outputSelector)); } / /... }Copy the code
  • DataStream’s split operation takes the OutputSelector parameter and creates and returns a SplitStream

OutputSelector

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {

	Iterable<String> select(OUT value);

}
Copy the code
  • OutputSelector defines the select method used to assign outputNames to an element

SplitStream

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {

	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
	}

	public DataStream<OUT> select(String... outputNames) {
		return selectOutput(outputNames);
	}

	private DataStream<OUT> selectOutput(String[] outputNames) {
		for (String outName : outputNames) {
			if (outName == null) {
				throw new RuntimeException("Selected names must not be null");
			}
		}

		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
		returnnew DataStream<OUT>(this.getExecutionEnvironment(), selectTransform); }}Copy the code
  • SplitStream inherits DataStream. It defines a select method that can be used to select split DataStream based on outputNames. The select method creates a SelectTransformation

StreamGraphGenerator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internal public class StreamGraphGenerator { //...... private Collection<Integer> transform(StreamTransformation<? > transform) {if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}

		LOG.debug("Transforming " + transform);

		if (transform.getMaxParallelism() <= 0) {

			// if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection
      
        transformedIds; if (transform instanceof OneInputTransformation
       ) { transformedIds = transformOneInputTransform((OneInputTransformation
       ) transform); } else if (transform instanceof TwoInputTransformation
       ) { transformedIds = transformTwoInputTransform((TwoInputTransformation
       ) transform); } else if (transform instanceof SourceTransformation
       ) { transformedIds = transformSource((SourceTransformation
       ) transform); } else if (transform instanceof SinkTransformation
       ) { transformedIds = transformSink((SinkTransformation
       ) transform); } else if (transform instanceof UnionTransformation
       ) { transformedIds = transformUnion((UnionTransformation
       ) transform); } else if (transform instanceof SplitTransformation
       ) { transformedIds = transformSplit((SplitTransformation
       ) transform); } else if (transform instanceof SelectTransformation
       ) { transformedIds = transformSelect((SelectTransformation
       ) transform); } else if (transform instanceof FeedbackTransformation
       ) { transformedIds = transformFeedback((FeedbackTransformation
       ) transform); } else if (transform instanceof CoFeedbackTransformation
       ) { transformedIds = transformCoFeedback((CoFeedbackTransformation
       ) transform); } else if (transform instanceof PartitionTransformation
       ) { transformedIds = transformPartition((PartitionTransformation
       ) transform); } else if (transform instanceof SideOutputTransformation
       ) { transformedIds = transformSideOutput((SideOutputTransformation
       ) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (! alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() ! = null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() ! = null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() ! = null && transform.getPreferredResources() ! = null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private 
       
         Collection
        
          transformSelect(SelectTransformation
         
           select) { StreamTransformation
          
            input = select.getInput(); Collection
           
             resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List
            
              virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private 
             
               Collection
              
                transformSplit(SplitTransformation
               
                 split) { StreamTransformation
                
                  input = split.getInput(); Collection
                 
                   resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } / /... }
                 
                
               
              
             
            
           
          
         
        
       
      Copy the code
  • The Transform inside the StreamGraphGenerator handles both SelectTransformation and SplitTransformation accordingly
  • The transformSelect method addVirtualSelectNode based on select.getSelectedNames()
  • The transformSplit method adds an OutputSelector from split.getOutputSelector()

summary

  • DataStream’s split operation takes the OutputSelector parameter and creates and returns a SplitStream
  • OutputSelector defines the select method used to assign outputNames to an element
  • SplitStream inherits DataStream and defines a select method that can be used to select split DataStream based on outputNames

doc

  • DataStream Transformations