Basic API Concepts

Official Documnet – Basic API concepts

Flink program is a normalization program that implements distributed collection transformations (e.g. filter, map, join, group, aggregation, define window, update status). Collections from source (e.g., read files, kafka subscriptions, read collections in local memory…) Create; The result is returned by sink. Flink can run independently or embedded in other applications. It can run in a local JVM or be clustered across multiple servers.

For the bounded and unbounded data source types, the DataSet API can be used to write a batch program or DataStream API can be used to write a stream handler to process them.

DataSet & DataStream

Flink uses the DataSet and DataStream classes to represent the data in the program, which can be thought of as immutable collections of data that may contain duplicates. For a DataSet, the data is finite; For DataStream, the data is infinite.

These Java collections of set contract standards have some key differences. First, they are immutable, which means that once they are created, they cannot add and remove elements, nor can they simply examine their internal elements.

In the Flink program, collections are created by adding data sources, such as map(…) And the filter (…). The API methods transform the data to derive a new collection.

Profile a Flink program

.

Delay calculation

Whether executed locally or in a cluster, all Flink programs execute lazily: when the program’s main method is executed, instead of immediately loading and converting the data set, each action is created and added to the program’s execution plan. The actions are actually executed when the execution environment calls the execute() method to explicitly trigger execution.

Specify the key

Some transformation operations (join, coGroup, keyBy, keyGroup) require a key to be defined on a collection; Other transformation operations (Reduce, groupReduce, Aggregate, Windows) allow data keys to be grouped before these transformations are applied.

Grouping the DataSet as follows:

DataSet<... > input = .../ / create a DataSet.DataSet<... > reduce = input .goupBy(/* Define Key. */ here)
    .reduceGroup(/* Do some processing */);
Copy the code

DataStream is grouped as follows:

DataStream<... > input = .../ / create a DataStream.DataStream<... > window = input .keyBy(/* Define Key */ here)
    .window(/* Create window. */);
Copy the code

Flink’s data model is not based on key-value pairs. Therefore, there is no need to physically package the collection of data types into the key and value model. The function of the key is to instruct the grouping operator which data to group.

Supported data types

Flink places some restrictions on the elements that can be contained in the DataSet and DataStream. This is done so that the system can analyze the type to determine an effective execution strategy.

Seven different data types are supported:

  1. Java Tuple and Scala Case Class.
  2. Java POJO.
  3. Basic data types.
  4. Regular classes.
  5. Value.
  6. Hadoop Writable.
  7. Special type.

A Tuple and Case Class

A Tuple is a composite data type that contains a fixed number of fields of various types. The Java API provides classes from Tuple1 to Tuple25. Each field of a Tuple can be any of the data types supported by Flink, including tuples, which are nested tuples. The Tuple field can be directly accessed by the field name. For example, Tuple. F1 accesses the first field. You can also use the getter method tupl.getField (int position).

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElement(
    new Tuple2<String, Integer>("Hello".1),
    new Tuple2<String, Integer>("World".2)
)

wordCounts
    .map(tuple -> tuple.f1)
    .keyBy(0); //. KeyBy ("f0") also works.
Copy the code

POJO

Flink treats Java classes that satisfy the following criteria as special POJO data types:

  • Classes must be public.
  • A class must have a public, no-parameter constructor.
  • All fields are eitherpulbicOf, or can passgettersetterAccess.
  • The type of the field must be supported by a registered serializer.

Pojos are typically represented with PojoTypeInfo and serialized using PojoSerializer (Kryo as a configurable alternate serializer). The exception is when poJOs are Avro types (avRO-specified records) or generated as “Avro reflection types”; In this case, the POJO is represented by AvroTypeInfo and serialized by AvroSerializer. You can register your own serializers if you want.

Flink analyzes the construction of POJO types, that is, it says that the fields of the POJO will be inferred. As a result, POJO types are easier to use than regular types. In addition, Flink can process POJOs more efficiently than normal types.

Basic data types

Flink supports all of Java’s basic data types, such as Integer, String, Double, and so on.

The regular class

Flink supports most Java classes (apis and customizations). Except for classes that contain fields that cannot be serialized, such as file Pointers, I/O streams, or other local resources. Classes that follow Java Beans conventions usually work just fine.

Flink treats all classes that do not recognize the type of a POJO (see requirements for POJOs above) as regular classes. Flink treats this data as a black box and cannot access its contents (for purposes such as efficient sorting). Regular classes use the Kyro serialization framework for serialization and deserialization.

value

Value types need to be manually added with their serialization and deserialization processes; They are not through the serialization framework, but by implementing org. Apache. Flinktypes. The Value of the interface the read and write methods for these operations to provide custom serialization and deserialization process. The use of value types makes sense when generic serialization is very inefficient. For example, using arrays to implement sparse vectors; Given that most elements of an array are zero, you can use special encoding for non-zero elements, whereas a generic serialization framework would simply write all array elements.

. Org. Apache. Flinktypes CopyableValue interface in a similar manner to support internal manual cloning logic.

/** * Interface to be implemented by basic types that support to be copied efficiently. */ @Public public interface CopyableValue<T> extends Value {... }Copy the code

Flink has predefined value types that correspond to basic data types, ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue. These value types act as mutable variants of the base data types; Their values can be changed, allowing programmers to reuse objects and taking the pressure off GC.

Hadoop Writable

Can use to achieve the org. Apache. Hadoop. Writable interface type. They use the serialization logic defined in the write() and readFields() methods.

Special type

Special types can be used, including Scala’s Either, Option, and Try. The Java API has a custom implementation of Either. Similar to Scala’s Either, it represents a value with two possible types, Left and Right. Either can be used for error handling or for operators that need to output records of two different types.

Type inference and type erasure

Type erasure specifies that the Java compiler throws away a lot of generic information after compilation; This means that an instance of an object does not know its specific generic type at the time the program runs. For example, DataStream

and DataStream

instances are the same to the JVM at run time.

Flink requires type information when the prep program executes (when the main method is called). The Flink Java API attempts to reconstruct type information discarded in various ways and store its display in datasets and operators; You can get the data data type from datastream.getType (). This method returns an instance of TypeInformation, which is Flink’s internal way of representing a type.

Type inference has its limitations and, in some cases, requires programmer “coordination.” This aspect of the way to create data sets examples from the collection, such as ExecutionEnviroment. FromCollection () method, which can be passed a description type parameters. Generic functions such as Map

may also require additional type information.
,>

The Resultpy Queryable interface can be implemented through input formats and functions to explicitly inform the API of its return type. The input type of the function being called can usually be inferred from the result type of the previous operation.

Accumulator and counter

The accumulator simply consists of the addition operation and the final accumulation result, and can be used after the job is finished.

The simplest Accumulator is a counter: it can be incremented using the accumulator.add (V value) method. At the end of the job, Flink totals (merges) all the partial results and sends them to the client. Accumulators are useful for debugging or for quickly understanding data.

Flink currently has the following built-in accumulators. Both implement the Accumulator interface.

  • IntCounter,LongCounterDoubleCounter.
  • Histogram: Histogram realization of discrete data buckets. Internally, it’s just a mapping from integers to integers. It can be used to calculate the distribution of values, such as the distribution of word frequency per line in a word frequency statistics program.

How do I use an accumulator?

The accumulator object must first be created in the user-defined transformation function that uses it.

private IntCounter numLines = new IntCounter();
Copy the code

Second, class adder objects must be registered, usually in the open() method of rich functions, where names can also be defined.

getRuntimeContext().addAccumulator("num-lines", this.numLines);
Copy the code

You can now use the accumulator anywhere in the operator function, including the open() and close() methods.

this.numLines.add(1);
Copy the code

The overall result will be stored in the JobExecutionResult object, which is returned from the execute() method of the execution environment (currently this only works when the wait job is executed).

myJobExecutionResult.getAccumulatorResult("num-lines");
Copy the code

All accumulators for each job share a namespace so that the same accumulator can be used in different operator functions of the job. Flink internally merges all accumulators of the same name.

Note: Currently, the accumulator results are only available after the entire job is completed; Flink plans to implement the next iteration using the results of the previous iteration available; Aggregators can be used to calculate the statistics for each iteration and determine the appropriate termination of the iteration based on this information.

Custom accumulator

To implement your own accumulator, simply write an implementation of the accumulator interface. You can select Accumulator or SimpleAccumulator.

  • Accumulator<V, R> is the most flexible: it defines type V for the value to increment and type R for the final result. For example, for histogram, V is numerical and R is histogram.
  • SimpleAccumulatorIs applicable when two types are the same, such as a counter.