Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Looking forward to joining the most combative team in the IOT era. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

1 DataStreamAPI

1.1 DataStream Data Sources

  • Source is the data input program, you can use the StreamExecutionEnvironment. AddSource (sourceFunction) to add a source for your program.

  • Flink provides a large number of implemented source methods. You can customize a source without parallelism by implementing the sourceFunction interface.

    Source Public class MyNoParalleSource implements SourceFunction<Long>{private Long count = 1; private boolean isRunning = true; /** * The main method * to start a source * In most cases, you need to implement a loop in the run method, * * @override public void run(SourceContext<Long> CTX) throws Exception {* @override public void run(SourceContext<Long> CTX) throws Exception {  while(isRunning){ ctx.collect(count); count++; Thread.sleep(1000); Override public void cancel() {isRunning = false; 2}} the Main method to perform the public class StreamingDemoWithMyNoPralalleSource {public static void Main (String [] args) throws Exception {/ / obtain the Flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); // DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); / / note: For this source, DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@override public Long map(Long value) throws Exception {system.out.println (" Received data: "+ value); return value; }}); // DataStream<Long> sum = num. TimeWindowAll (time.seconds (2)).sum(0); // Print the result sum.print().setparallelism (1); String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); }}Copy the code

Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

  • Can be realized through ParallelSourceFunction interface or inherit RichParallelSourceFunction source from definition has parallelism. Inherited RichParallelSourceFunction SourceFunction means that they are executed in parallel and may have few resources to open/close

    public class MyParalleSource implements ParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * The main method * to start a source * In most cases, you need to implement a loop in the run method, * * @override public void run(SourceContext<Long> CTX) throws Exception {* @override public void run(SourceContext<Long> CTX) throws Exception {  while(isRunning){ ctx.collect(count); count++; Thread.sleep(1000); Override public void cancel() {isRunning = false; } } public class StreamingDemoWithMyPralalleSource { public static void main(String[] args) throws Exception { / / get Flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); // DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@override public Long map(Long value) throws Exception {system.out.println (" Received data: "+ value); return value; }}); // DataStream<Long> sum = num. TimeWindowAll (time.seconds (2)).sum(0); // Print the result sum.print().setparallelism (1); String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } } public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * The main method * to start a source * In most cases, you need to implement a loop in the run method, * @param CTX * @throws Exception */ @Override public void run(SourceContext<Long> CTX) throws Exception { while(isRunning){ ctx.collect(count); count++; Thread.sleep(1000); Override public void cancel() {isRunning = false; } /** * This method will only be called once at the beginning * implement the code to get the link * @param parameters * @throws Exception */ @override public void open(Configuration parameters) throws Exception { System.out.println("open............." ); super.open(parameters); @throws Exception */ @override public void close() throws Exception {super.close(); } } public class StreamingDemoWithMyRichPralalleSource { public static void main(String[] args) throws Exception { / / get Flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); // DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@override public Long map(Long value) throws Exception {system.out.println (" Received data: "+ value); return value; }}); // DataStream<Long> sum = num. TimeWindowAll (time.seconds (2)).sum(0); // Print the result sum.print().setparallelism (1); String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); }}Copy the code
  • A text file is read based on the file readTextFile(PATH). The file follows the TextInputFormat reading rules and is read line by line and returned.

  • Based on the socket socketTextStream that reads data from the socker, the element can be cut with a delimiter.

    Public class SocketDemoFullCount {public static void main(String[] args) throws Exception{// Obtain the required port number int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9010--java"); port = 9010; } / / obtain the flink running environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); String hostname = "SparkMaster"; String delimiter = "\n"; DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // Connect the socket to obtain the input data. DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); }}); intData.keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() { @Override public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> Elements, Collector<String> out) throws Exception {system.out.println (" execute process"); long count = 0; for(Tuple2<Integer,Integer> element: elements){ count++; } out.collect("window:"+context.window()+",count:"+count); } }).print(); Env.execute ("Socket window count"); }}Copy the code
  • Create a data flow from a Java Collection based on the Collection fromCollection. All elements in the Collection must be of the same type.

    Public class StreamingFromCollection {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ArrayList<Integer> data = new ArrayList<>(); data.add(10); data.add(15); data.add(20); // Specify the data source DataStreamSource<Integer> collectionData = env.fromcollection (data); DataStream<Integer> num = collectionData. Map (new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value + 1; }}); Num.print ().setparallelism (1); env.execute("StreamingFromCollection"); }Copy the code

    }

  • AddSource is used to read data from third-party data sources. Kafka

1.2 DataStream Transformations

  • Map: Enter an element, then return an element, and do some cleaning and conversion in between

  • Flatmap: Enter an element and return zero, one or more elements

  • KeyBy: Groups data based on the specified key. Data with the same key goes to the same partition

    Datastream.keyby ("someKey") // Specify the "someKey" field in the object as the grouping key datastream.keyby (0) // specify the first element in the Tuple as the grouping key note: the following types cannot be used as keys: An entity-class object that does not override the hashCode method and relies on object's hashCode method 2: an arbitrary array type 3: primitive data type, int, longCopy the code
  • Filter: the filter function checks the incoming data and saves the data that meets the conditions.

    Public class StreamingDemoFilter {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); / / note: For this source, DataStream<Long> num = text.map(new MapFunction<Long, {@override public Long map(Long value) throws Exception {system.out.println (" original received data: "+ value); return value; }}); // Execute filter, DataStream<Long> filterData = num. Filter (new FilterFunction<Long>() {// Filter out all odd numbers @override public Boolean filter(Long value) throws Exception { return value % 2 == 0; }}); DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {@override public Long map(Long value) throws Exception {system.out.println (" Filter data: "+ value); return value; }}); // DataStream<Long> sum = resultData.timeWINDOwall (time.seconds (2)).sum(0); // Print the result sum.print().setparallelism (1); String jobName = StreamingDemoFilter.class.getSimpleName(); env.execute(jobName); }}Copy the code

Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

  • Reduce: Aggregates data, combines the current element with the value returned by the last Reduce, and returns a new value

  • Aggregations: sum (), min () and Max (), etc

  • Window: A separate explanation later

  • Union: Merge multiple streams. The new stream contains data from all streams, but Union is a constraint that all merged streams must be of the same type.

    Public class StreamingDemoUnion {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1); DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); DataStream<Long> text = text1.union(text2); DataStream<Long> num = text.map(new MapFunction<Long, {@override public Long map(Long value) throws Exception {system.out.println (" original received data: "+ value); return value; }}); // DataStream<Long> sum = num. TimeWindowAll (time.seconds (2)).sum(0); // Print the result sum.print().setparallelism (1); String jobName = StreamingDemoUnion.class.getSimpleName(); env.execute(jobName); }Copy the code

    }

  • Connect: Similar to union, but only two streams can be connected. The data types of the two streams can be different, and different processing methods are applied to the data in the two streams.

  • CoMap, CoFlatMap: This function is needed in ConnectedStreams, similar to map and flatMap

    Public class StreamingDemoConnect {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1); DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "str_" + value; }}); ConnectedStreams<Long, String> connectStream = text1.connect(text2_str); SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() { @Override public Object map1(Long value) throws Exception { return value; } @Override public Object map2(String value) throws Exception { return value; }}); Result.print ().setparallelism (1); String jobName = StreamingDemoConnect.class.getSimpleName(); env.execute(jobName); }}Copy the code
  • Split: To Split a data stream into multiple streams according to rules:

  • Select: Used with split to Select the shard stream

    Public class StreamingDemoSplit {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); / / note: For this source, parallelism can only be set to 1 // for convection segmentation, SplitStream<Long> SplitStream = text.split(new OutputSelector<Long>() {@override public Iterable<String> select(Long value) { ArrayList<String> outPut = new ArrayList<>(); if (value % 2 == 0) { outPut.add("even"); // even} else {output. add("odd"); } return outPut; }}); // Select one or more streams DataStream<Long> evenStream = splitstream. select("even"); DataStream<Long> oddStream = splitStream.select("odd"); DataStream<Long> moreStream = splitStream.select("odd","even"); Morestream.print ().setparallelism (1); String jobName = StreamingDemoSplit.class.getSimpleName(); env.execute(jobName); }}Copy the code

1.3 DataStream API之partition

  • Random Partitioning: Random partitioning

    dataStream.shuffle()

  • Rebalancing: Rebalance or repartition a dataset to eliminate data skew

    dataStream.rebalance()

  • Rescaling: If the upstream operation has two concurrent operations and the downstream operation has four concurrent operations, one concurrent result upstream is allocated to the two concurrent operations downstream, and one concurrent result downstream is allocated to the other two concurrent operations. On the other hand, if there are two concurrent operations downstream and four concurrent operations upstream, the results of two of the upstream operations are allocated to one concurrent operation downstream and the results of the other two concurrent operations are allocated to another concurrent operation.

  • Nifty: Rebalancing results in full repartitioning, while Rebalancing does not.

    dataStream.rescale()

  • Custom Partitioning: Custom partitions need to implement the Partitioner interface

    DataStream. PartitionCustom (partitioner, “someKey”) in view of the object

    DataStream. PartitionCustom (partitioner, 0) in the Tuple

    public class MyPartition implements Partitioner<Long> { @Override public int partition(Long key, Int numPartitions) {system.out.println (" total number of partitions: "+numPartitions); if(key % 2 == 0){ return 0; }else{ return 1; } } } public class SteamingDemoWithMyParitition { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()); // Convert the data, DataStream< tuple1 < long >> tupleData = text.map(new MapFunction< long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); }}); / / after the partition of data DataStream < Tuple1 < Long > > partitionData = tupleData. PartitionCustom (new MyPartition (), 0). DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, {@override public Long map(Tuple1<Long> value) throws Exception {system.out.println (" current thread id: " + Thread.currentThread().getId() + ",value: " + value); return value.getField(0); }}); result.print().setParallelism(1); env.execute("SteamingDemoWithMyParitition"); }}Copy the code
  • Broadcasting: More on that later

1.4 DataStream Sink

  • WriteAsText () : Writes elements line-by-line as strings retrieved by calling each element’s toString() method

  • Print ()/printToErr() : Prints the value of each element’s toString() method to the standard output or standard error output stream

  • AddSink [kafka, redis]

    < the dependency > < groupId > org. Apache. Bahir < / groupId > < artifactId > flink - connector - redis_2. 11 < / artifactId > <version>1.0</version> </dependency> public class StreamingDemoToRedis {public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n"); Tuple2< string, string > DataStream< tuple2< string String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String value) throws Exception { return new Tuple2<>("l_words", value); }}); // Create redis configuration FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("SparkMaster").setPort(6379).build(); Redissink <Tuple2<String, String>> redissink = new redissink <>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper<Tuple2<String, Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; Override public String getValueFromData(Tuple2<String, String> data) {return data.f1; } @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); }}}Copy the code

2 DataSet API

2.1 the DataSet Sources

  • Based on file readTextFile(path)

    public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath = "D:\\data\\file"; String outPath = "D:\\data\\result"; / / get the environment ExecutionEnvironment env = ExecutionEnvironment. GetExecutionEnvironment (); DataSource<String> text = env.readTextFile(inputPath); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"\n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token,1)); } } } } }Copy the code
  • Based on the Collection fromCollection

2.2 the DataSet Transformations

  • Map: Enter an element, then return an element, and do some cleaning and conversion in between

  • FlatMap: Enter an element and return zero, one or more elements

  • MapPartition: similar to Map, it processes data in one partition at a time. MapPartition is recommended if you need to obtain third-party resource links during map processing.

    Public class BatchDemoMapPartition {public static void main(String[] args) throws Exception{// Obtain the running environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> text = env.fromCollection(data); /*text.map(new MapFunction<String, String>() {@override public String map(String value) throws Exception { Return value; return value; return value; }}); */ DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {Collector<String> out) throws Exception { Iterator<String> it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split("\\W+"); for (String word : split) { out.collect(word); }} // close the link}}); mapPartitionData.print(); }}Copy the code
  • Filter: the Filter function checks the incoming data and saves the data that meets the conditions

  • Reduce: Aggregates data, combines the current element with the value returned by the last Reduce, and returns a new value

  • Aggregate: sum, Max, and min

  • Distinct: Returns a de-weighted element from a dataset, data.distinct()

    Public class BatchDemoDistinct {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> text = env.fromCollection(data); FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] split = value.toLowerCase().split("\\W+"); For (String word: split) {system.out.println (" 词 : "+word); out.collect(word); }}}); Flatmapdata.distinct ()// Delete the whole data.print (); }}Copy the code
  • Join: internal Join

    Public class BatchDemoJoin {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //tuple2< user ID, user name > ArrayList< tuple2< Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1,"zs")); data1.add(new Tuple2<>(2,"ls")); data1.add(new Tuple2<>(3,"ww")); //tuple2< user id, user city > ArrayList< tuple2< Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1,"beijing")); data2.add(new Tuple2<>(2,"shanghai")); data2.add(new Tuple2<>(3,"guangzhou")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); Text1.join (text2).where(0)// specifies the element in the first dataset to be compared. EqualTo (0)// Specifies the element in the second dataset to be compared. With (new) JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.f0,first.f1,second.f1); } }).print(); // Note that the result of using map is the same as that of using with. /*text1.join(text2).where(0)// specifies the element in the first dataset to be compared.equalto (0)// Specifies the element in the second dataset to be compared.map(new) MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception { return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1); } }).print(); * /}}Copy the code
  • OuterJoin: External link

    Public class BatchDemoOuterJoin {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment  env = ExecutionEnvironment.getExecutionEnvironment(); //tuple2< user ID, user name > ArrayList< tuple2< Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1,"zs")); data1.add(new Tuple2<>(2,"ls")); data1.add(new Tuple2<>(3,"ww")); //tuple2< user id, user city > ArrayList< tuple2< Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1,"beijing")); data2.add(new Tuple2<>(2,"shanghai")); data2.add(new Tuple2<>(4,"guangzhou")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); /** ** * The element in the second tuple may be null * */ text1.leftouterJoin (text2).where(0).equalto (0).with(new) JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(second==null){ return new Tuple3<>(first.f0,first.f1,"null"); }else{ return new Tuple3<>(first.f0,first.f1,second.f1); } } }).print(); /** ** ** First The data in the tuple may be null * */ text1.rightouterJoin (text2). Where (0).equalto (0).with(new) JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<>(second.f0,"null",second.f1); } return new Tuple3<>(first.f0,first.f1,second.f1); } }).print(); /** ** ** Both tuples may be null * */ text1.fullouterJoin (text2). Where (0).equalto (0).with(new) JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<>(second.f0,"null",second.f1); }else if(second == null){ return new Tuple3<>(first.f0,first.f1,"null"); }else{ return new Tuple3<>(first.f0,first.f1,second.f1); } } }).print(); }}Copy the code
  • Cross: Gets the Cartesian product of two data sets

    Public class BatchDemoCross {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment env  = ExecutionEnvironment.getExecutionEnvironment(); //tuple2< user id, user name > ArrayList<String> data1 = new ArrayList<>(); data1.add("zs"); data1.add("ww"); //tuple2< user id, user city > ArrayList<Integer> data2 = new ArrayList<>(); data2.add(1); data2.add(2); DataSource<String> text1 = env.fromCollection(data1); DataSource<Integer> text2 = env.fromCollection(data2); CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2); cross.print(); }Copy the code
  • Union: Returns the sum of two datasets of the same data type

  • First-n: Gets the First n elements of the set

  • Sort Partition: Sorts all partitions of a data set locally, sorting multiple fields through a link call to sortPartition()

    Public class BatchDemoFirstN {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data = new ArrayList<>(); data.add(new Tuple2<>(2,"zs")); data.add(new Tuple2<>(4,"ls")); data.add(new Tuple2<>(3,"ww")); data.add(new Tuple2<>(1,"xw")); data.add(new Tuple2<>(1,"aw")); data.add(new Tuple2<>(1,"mw")); DataSource<Tuple2<Integer, String>> text = env.fromCollection(data); Text.first (3).print(); System.out.println("=============================="); Text.groupby (0).first(2).print(); System.out.println("=============================="); Text.groupby (0).sortGroup(1, order.ascending).first(2).print(); text.groupby (0).sortGroup(1, order.ascending).print(); System.out.println("=============================="); // Get the first 3 elements of the set in ascending order for the first element, DESCENDING (1,Order.DESCENDING).first(3).print(); }Copy the code

    }

2.3 the DataSet partition

  • Rebalance: Rebalance and repartition data sets to eliminate data skew

  • Hash-partition: Partitions the data set according to the Hash value of the specified key

    partitionByHash()

  • Range-partition: Range Partition of a data set based on the specified key. partitionByRange()

  • Custom Partitioning: Custom Partitioning rules

    Custom partitions need to implement the Partitioner interface

    partitionCustom(partitioner, “someKey”)

    partitionCustom(partitioner, 0)

    Public class BatchDemoHashRangePartition {public static void main (String [] args) throws the Exception {/ / for running environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data = new ArrayList<>(); data.add(new Tuple2<>(1,"hello1")); data.add(new Tuple2<>(2,"hello2")); data.add(new Tuple2<>(2,"hello3")); data.add(new Tuple2<>(3,"hello4")); data.add(new Tuple2<>(3,"hello5")); data.add(new Tuple2<>(3,"hello6")); data.add(new Tuple2<>(4,"hello7")); data.add(new Tuple2<>(4,"hello8")); data.add(new Tuple2<>(4,"hello9")); data.add(new Tuple2<>(4,"hello10")); data.add(new Tuple2<>(5,"hello11")); data.add(new Tuple2<>(5,"hello12")); data.add(new Tuple2<>(5,"hello13")); data.add(new Tuple2<>(5,"hello14")); data.add(new Tuple2<>(5,"hello15")); data.add(new Tuple2<>(6,"hello16")); data.add(new Tuple2<>(6,"hello17")); data.add(new Tuple2<>(6,"hello18")); data.add(new Tuple2<>(6,"hello19")); data.add(new Tuple2<>(6,"hello20")); data.add(new Tuple2<>(6,"hello21")); DataSource<Tuple2<Integer, String>> text = env.fromCollection(data); /*text.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> it = values.iterator(); while (it.hasNext()){ Tuple2<Integer, String> next = it.next(); System.out.println(" currentThread id: "+ thread.currentthread ().getid ()+","+next); } } }).print(); */ text.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> it = values.iterator(); while (it.hasNext()){ Tuple2<Integer, String> next = it.next(); System.out.println(" currentThread id: "+ thread.currentthread ().getid ()+","+next); } } }).print(); }}Copy the code

2.4 the DataSet Sink

  • WriteAsText () : Writes elements line-by-line as strings retrieved by calling each element’s toString() method
  • WriteAsCsv () : Writes tuples to files as comma-separated, with configurable separation between lines and fields. The value for each field comes from the object’s toString() method
  • Print () : Prints the value of each element’s toString() method to the standard output or standard error output stream

3 Flink serializer

  • Flink comes with serializers for standard types such as int, Long, and String

  • For data types that Flink can’t serialize, we can hand them to Avro and Kryo

  • Method of use: ExecutionEnvironment env = ExecutionEnvironment. GetExecutionEnvironment ();

    Serialize with avro: env.getConfig().enableForceAvro(); Serialize with kryo: env.getConfig().enableForcekryo (); Using custom serialization: env. GetConfig () addDefaultKryoSerializer (Class <? > type, Class<? extends Serializer<? > > serializerClass) https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/custom_serializers.htmlCopy the code

4 Flink data type

  • Java Tuple and Scala Case Class

  • Java POJOs: Java entity classes

  • Primitive Types

    Java and Scala base data types are supported by default

  • General Class Types

    Most Java and Scala classes are supported by default

  • Hadoop Writables support in the Hadoop implements org.. Apache Hadoop. The data type of the Writable

  • Special Types

    Examples include Either Option and Try in Scala

      https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/api_concepts.html#supported-data-types
    Copy the code

4 summarizes

Qin Kaixin in Shenzhen 201812022220