This article is based on Apache Flink 1.7. If the code reading experience is not good, you can click “View original text” to check.

Combined with the previous article, Source is the input of Flink program, Sink is the output of data after the Flink program processes Source, such as writing the output to files, sockets, external systems, or just display (in the big data ecosystem, many similar, For example, Flume also has the corresponding Source/Channel/Sink). Flink provides a variety of data output modes, which will be introduced one by one below.

concept

Flink predefined Sinks

  • File-based: such as writeAsText(), writeAsCsv(), writeUsingOutputFormat, and FileOutputFormat.

  • WriteToSocket: writeToSocket.

  • For display: print, printToErr.

  • User-defined Sink: addSink.

For write*, which is primarily used to test programs, Flink does not implement the checkpoint mechanism for these methods and therefore does not have exactly-once support. Therefore, to ensure exactly-once, flink-connector-Filesystem needs to be used, and custom addSink can also be supported.

Connectors

Connectors Are used to provide an interface for accessing third-party data. Currently supported Connectors include:

  • Apache Kafka

  • Apache Cassandra

  • Elasticsearch

  • Hadoop FileSystem

  • RabbitMQ

  • Apache NiFi

In addition, Apache Bahir can support Sink such as Apache ActiveMQ, Apache Flume, Redis and Akka.

Fault tolerance

In order to ensure end-to-end exact-once, Sink needs to implement checkpoint mechanism, as shown in the image below

In actual combat

Elasticsearch Connector

The following uses Elasticsearch Connector as Sink as an example to demonstrate the use of Sink. Elasticsearch Connector provides at least once semantics. At lease Once supports Flink checkpoint.

To use Elasticsearch Connector, you need to add a dependency based on the Elasticsearch version, as shown below.

In this case, we are using Elasticsearch version 5.6.9 and Scala version 2.11.

Add the following dependencies:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - elasticsearch5_2. 11 < / artifactId > <version>${flink.version}</version></dependency>Copy the code

See first ElasticsearchSink source code, we need to define ElasticsearchSinkFunction < T >, and optional ActionRequestFailureHandler, ActionRequestFailureHandler used to handle the request of the failure.

public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> {    private static final long serialVersionUID = 1L;    public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {        this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());    }    public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler) {        super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);    }}Copy the code

See the full example below:

package learn.sourcesAndsinksimport java.net.{InetAddress, InetSocketAddress}import java.utilimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandlerimport org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSinkimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.client.Requestsobject BasicSinks { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment Env. SetStreamTimeCharacteristic (TimeCharacteristic. ProcessingTime) / / define the stream val stream: DataStream[String] = env.fromCollection(List("aaa", "BBB "," CCC ")) // Docker-cluster val config = new util.HashMap[String, String]() config.put("cluster.name", "docker-cluster") config.put("bulk.flush.max.actions", "1") val transportAddress = new util.ArrayList[InetSocketAddress]() transportAddress.add(new InetSocketAddress(inetaddress.getByName ("127.0.0.1"), 9300)) stream.addsink (new ElasticsearchSink(config, transportAddress, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new util.HashMap[String, String]() json.put("data", element) return Requests.indexRequest() .index("my-index") .`type`("my-type") .source(json) } def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) = {indexer.add(createIndexRequest(element))}}, Not recommended for production environments new IgnoringFailureHandler())) env.execute()}}Copy the code

As shown below, is the result of the above procedure.

There is a basic Elasticsearch Sink implemented above, and some retry policies need to be added to ensure data integrity, which is mainly related to Elasticsearch.

ES Flush Configuration

bulk.flush.max.actions

bulk.flush.max.size.mb

bulk.flush.interval.ms

ES Error retry configuration

bulk.flush.backoff.enable

bulk.flush.backoff.type

bulk.flush.backoff.delay

bulk.flush.backoff.retries

If on the basis of the need to deal with Elasticsearch error, can realize ActionRequestFailureHandler method.

conclusion

This article mainly uses Flink Elasticsearch Connector as an example to talk about the Sink in Flink. Later, Source and Sink will be interpreted.

See here, please scan the qr code below to follow me, Happy Friday!