Gaining actionable insights from continuously generated data in real time is a common requirement for many enterprises today. A widespread use case for real-time data processing is dashboards. Typical architectures that support such use cases are based on data flow processors, data storage with low latency read/write access, and visualization frameworks.

In this blog post, we demonstrate how to use Apache Flink and Elasticsearch to build real-time event processing and search for streaming data analysis. The following diagram depicts our system architecture. In the real world, we can use Kibana to build real-time dashboard solutions together.

In our architecture, Apache Flink performs flow analysis jobs that ingest data streams, apply transformations to analyze, transform, and model dynamic data, and write the results to the Elasticsearch index. Kibana connects to the index and queries it to get the data to visualize. All components of our architecture are open source systems under Apache License 2.0. In today’s demo, I’ll focus on how to get data into Flink and do some processing, and finally write it into Elasticsearch.

Why use Apache Flink for stream processing?

Before diving into the details of implementing the demo application, let’s discuss some of the features that make Apache Flink an excellent stream processor. Apache Flink comes with a competitive set of stream processing capabilities, some of which are unique in the open source world. And most importantly:

  • Support for event timing and out-of-order flow: In practice, event flows rarely arrive in the order in which they were generated, especially from distributed systems and devices. Until now, it was up to application programmers to correct for this “time drift” or simply ignore it and accept inaccurate results, because streaming systems (at least in the open source world) do not support event time (that is, processing events as they occur in the real world). Flink is the first open source engine to support out-of-order streaming and consistently process events based on timestamps.
  • Expressive and easy-to-use apis in Scala and Java: Flink’s DataStream API ports many of the well-known operators in batch apis, such as Map, Reduce, and Join, to the streaming world. In addition, it provides flow-specific operations such as window, split, and connect. First-class support for user-defined functions simplifies the implementation of custom application behavior. The DataStream API is available in Scala and Java.
  • Support for sessions and unaligned Windows: Most streaming systems have the concept of some window, which is a set of events based on some time function. Unfortunately, in many systems, these Windows are hard-coded and associated with the system’s internal checkpoint mechanism. Flink was the first open source streaming engine to fully decouple Windows from fault tolerance, allowing richer window forms such as sessions.
  • Consistency, fault tolerance, and high availability: Flink guarantees consistency of status updates in the event of a failure (often referred to as “exact-once processing”), as well as consistent data movement between selected sources and receivers (for example, consistent data movement between Kafka and HDFS). Flink also supports worker and master failover, eliminating any single point of failure.
  • Low latency and high throughput: We have set Flink’s clock rate to 1.5 million events per core per second and have observed delays in the 25ms range for jobs including network data shuffling. Using adjustment knobs, Flink users can navigate delay-throughput tradeoffs, making the system suitable for both high-throughput data ingestion and conversion and ultra-low latency (millisecond range) applications.
  • Connectors and integration points: Flink integrates with various open source systems for data input and output (e.g., HDFS, Kafka, Elasticsearch, HBase, etc.), deployment (e.g., YARN), and as an execution engine for other frameworks (e.g., Cascading, Google Cloud Dataflow). The Flink project itself bundles a Hadoop MapReduce compatibility layer, a Storm compatibility layer, and libraries for machine learning and graphics processing.
  • Developer productivity and ease of operation: Flink runs in a variety of environments. Native execution in the IDE significantly simplifies the development and debugging of Flink applications. In a distributed setup, Flink runs with a massive horizontal scale. YARN mode allows users to start Flink clusters in seconds. Flink monitors job and system-wide metrics through well-defined REST interfaces. A built-in Web dashboard displays these metrics and makes Flink’s monitoring very easy.

The combination of these features makes Apache Flink a unique choice for many stream processing applications.

Flink stream processing API

In the following steps, we will complete the processing of events in the order described above.

The installation

For developers who don’t have access to Flink and Elastic Stack, you’ll need to install the following:

Elasticsearch

You can refer to my previous article “How to Install Elasticsearch on Linux, MacOS, and Windows” to install Elasticsearch on your preferred system.

Kibana

You can refer to my previous article “Kibana: How to Install Kibana in an Elastic Stack on Linux, MacOS, and Windows” to install Kibana on your preferred system.

Flink

For instructions in this section, you can refer to the following link:

  • macOS: How to Install Apache Flink On Mac OS
  • Windows: How to Install Apache Flink On Local Windows
  • Ubuntu: How to Install Apache Flink On Ubuntu

Installation on these systems is very straightforward. For my installation, I choose macOS. I run Flink as follows:

$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host liuxg.
Starting taskexecutor daemon on host liuxg.
Copy the code

As shown above, it shows that our Flink is up and running successfully. After starting up, we can even open the address http://localhost:8081 in our browser to see how Flink works. We can even submit our tasks here.

If you can see the above screen, our Flink installation is successful.

Creating a demo example

Next, we’ll use Java to build a demonstration example. It uses the API to access Flink. As shown above, we will use Flink’s Enviorvulnerabilities, Source, Transform and sink APIs to build our application. I have uploaded my project to Github for your convenience. You will need to use the following command to download:

git clone https://github.com/liu-xiao-guo/ElasticsearchFlink
Copy the code

You can use your favorite IDE to create a new project to get started.

source

In our exercise, we will use the NC tool to send data. You will need to install the NC on your own platform. We use the following command to start nc:

nc -l 8888
Copy the code

As shown above, it opens port 8888 and listens (-L) for connections sent to that port. We can run the above command ina terminal. In the following experiment, we can type the string in the terminal and enter. This allows it to send data to an already established connection.

ElasticsearchFlink.java

This is the most important part of the entire code. It’s actually pretty simple. Let me post the code:

ElasticsearchFlink.java

import com.liuxg.User; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ElasticsearchFlink { public static void main(String[] args) { // Create Flink environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Define a source try { DataStreamSource<String> source = env.socketTextStream("localhost", 8888); DataStream<String> filterSource = source.filter(new FilterFunction<String>() { @Override public boolean filter(String s)  throws Exception { return ! s.contains("hello"); }}); DataStream<User> transSource = filterSource.map(value -> { String[] fields = value.split(","); return new User(fields[ 0 ], fields[ 1 ]); }); // Use ESBuilder to construct an output List<HttpHost> hosts = new ArrayList<>(); hosts.add(new HttpHost("localhost", 9200, "http")); ElasticsearchSink.Builder<User> builder = new ElasticsearchSink.Builder<User>(hosts, new ElasticsearchSinkFunction<User>() { @Override public void process(User u, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { Map<String, String> jsonMap = new HashMap<>(); jsonMap.put("id", u.id); jsonMap.put("name", u.name); IndexRequest indexRequest = Requests.indexRequest(); indexRequest.index("flink-test"); // indexRequest.id("1000"); indexRequest.source(jsonMap); requestIndexer.add(indexRequest); }}); // Define a sink builder.setBulkFlushMaxActions(1); transSource.addSink(builder.build()); // Execute the transform env.execute("flink-es"); } catch (Exception e) { e.printStackTrace(); }}}Copy the code

As shown above, we get enviroment at the beginning:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Copy the code

To set up a connection to localhost:8888, use the following method:

   DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
Copy the code

If our NC has run successfully, the above sentence will return normally.

Next, we use a Flink filter function. It does a simple transform of the data. If the string contains “hello”, this data is ignored. Eventually it will not be written to Elasticsearch:

DataStream<String> filterSource = source.filter(new FilterFunction<String>() { @Override public boolean filter(String s)  throws Exception { return ! s.contains("hello"); }});Copy the code

Next, we use a Map’s Transform function. For example, when our input data is 1, LIUxg, we want the data id:1, and name: LIUxg to arrive early.

            DataStream<User> transSource = filterSource.map(value -> {
                String[] fields = value.split(",");
                return new User(fields[ 0 ], fields[ 1 ]);
            });
Copy the code

This transfrom is also very simple.

At the end of the Flink API is sink. You can write data to Elasticsearch as follows:

// Use ESBuilder to construct an output List<HttpHost> hosts = new ArrayList<>(); hosts.add(new HttpHost("localhost", 9200, "http")); ElasticsearchSink.Builder<User> builder = new ElasticsearchSink.Builder<User>(hosts, new ElasticsearchSinkFunction<User>() { @Override public void process(User u, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { Map<String, String> jsonMap = new HashMap<>(); jsonMap.put("id", u.id); jsonMap.put("name", u.name); IndexRequest indexRequest = Requests.indexRequest(); indexRequest.index("flink-test"); // indexRequest.id("1000"); indexRequest.source(jsonMap); requestIndexer.add(indexRequest); }}); // Define a sink builder.setBulkFlushMaxActions(1); transSource.addSink(builder.build()); // Execute the transform env.execute("flink-es");Copy the code

The important thing to note here is that we are building hosts:

   hosts.add(new HttpHost("localhost", 9200, "http"));
Copy the code

We need to modify our Elasticsearch address and port number accordingly. In particular, the following sentence should be noted above:

builder.setBulkFlushMaxActions(1);
Copy the code

Since Flink has batch and real-time processing, we set this parameter value to 1 above, indicating that whenever any information is received, it will be processed immediately instead of waiting for certain events to be collected.

So let’s run the application. We make sure the NC is running successfully before running, otherwise the application will exit. Next, enter the following line in the interface where nc is running and press Enter:

1,liuxg
Copy the code

Let’s check it out in Kibana:

GET _cat/indices/flink-test
Copy the code

It will show that an index named flink-test has been successfully created:

We then use the following command to search:

GET flink-test/_search
Copy the code

We see that a document has been created.

Let’s type the following line:

2,hello
Copy the code

Obviously in this input, it contains a Hello string. In our design, if hello is present, false will be returned in the filter design, meaning that this data will not be written to Elasticsearch. We can use the same command above to view this in Kibana.

conclusion

In this blog post, we demonstrate how to build real-time event processing and search applications using Apache Flink and Elasticsearch. By supporting event time processing, Apache Flink can produce meaningful and consistent results, even for historical data or in environments where events arrive out of order. The expressive DataStream API with flexible window semantics can significantly reduce custom application logic compared to other open source stream processing solutions. In this demonstration, we use a small part of Flink’s data transform functionality. Flink has a number of data analysis capabilities. With the combination of Flink and Elastic Stack, it will produce a lot of rich application scenarios.