This article is compiled by Miao Wenting, a community volunteer, from Spark batch to Flink Batch, shared by Zhang Chenya, senior development engineer of big data at LinkedIn, in Flink Forward Asia 2020. The topics to be shared are how to go from Spark to Flink to stream batch integration, some practical experiences to explore on LinkedIn. The main contents are:

  1. Why do you want to stream batch one?
  2. Current industry existing solutions and status quo, advantages and disadvantages
  3. Explore the experience of production practice scenarios
  4. Comparison of Shuflle Service on Spark and Flink, and what to consider behind the Flink community
  5. conclusion

1. Why do you want to make a stream batch

What are the benefits of doing batch streaming, especially in BI/AI/ETL scenarios? On the whole, if you can help users to achieve stream batch integration, there will be four more obvious benefits:

  • Can avoid code duplication, reuse code core processing logic

It would be best if the code logic were perfectly consistent, but that would be difficult. But on the whole, business logic is getting longer and more complex and demanding, and if we use different frameworks and different engines, users have to rewrite the logic every time, which is very stressful and difficult to maintain. So overall, it is particularly important to avoid code repetition and help users reuse code logic.

  • The flow batch has two directions

The problems to be considered in these two directions are very different. Currently, some frameworks such as Flink for Streaming and Spark for Batch are mature in Batch or stream processing and have already generated many one-sided users. When we want to help users move in the other direction, such as some business requirements, we usually fall into two categories: stream to batch or batch to stream. The two production practice scenarios introduced later just correspond to these two directions.

  • Reduce maintenance workload

Avoid maintaining multiple systems, which may vary greatly from one system to another, with different frameworks and engines, causing more problems. If there are multiple pipelines in the company, one is real-time and the other is offline, it will cause data inconsistency. Therefore, a lot of work will be done in data verification, data accuracy query, data storage and other aspects to maintain data consistency as far as possible.

  • To learn more

There are many frameworks and engines, and business logic runs both live and offline, so there is a lot to learn when it comes to supporting users.

2. Current status of the industry

Both Flink and Spark are engines that support both streaming and batch processing. We all agree that Flink’s stream processing is better, but how good is its batch processing? In addition, Spark’s batch processing is good, so can its stream processing be enough to help users solve their existing requirements?

There are a lot of different engine frameworks out there. Could there be a unified framework on top of them, like federated processing or some simple physical API like Beam or custom interface?

The question for Beam is how well can it be optimized for batch and stream processing? Beam is still a bit of a physical implementation at the moment, but we’ll have to figure out what’s next.

LinkedIn, as well as other companies, will consider doing some sort of custom interface solution, consider having a common SQL layer, common SQL or API layer, running different framework engines underneath. The thing to consider here is that frameworks like Spark and Flink are mature and already have a large user base. When we come up with a new API, a new solution, how is it received? How to maintain a new set of solutions within the company?

3. Production case scenario

The following content mainly focuses on the effect of Flink making batch, a simple comparison between Flink and Spark, and some internal solutions of LinkedIn. Share two production example scenarios, one is how to do stream batch in machine learning feature engineering generation, the other is how to do stream batch in complex ETL data flow.

3.1 Case A – Machine learning feature Engineering

The first direction, stream -> batch, is classified as stream – batch integration.

The subject logic of case A is how to stream and batch from stream to batch when doing feature generation in machine learning. The core business logic is feature transformation. The process and logic of transformation are complicated, so it is used to do some standardization.

For example, the background information of some members entered on LinkedIn page needs to be extracted and standardized to make some recommendations and help you find some jobs. When the member’s identity information is updated, there will be filtering, pre-processing logic, including the process of reading Kafka, the process of doing feature transformation, there may be some small table queries. This logic is very straightforward, without complicated join operations and other data processing.

Previously, its pipelines were real-time, requiring regular readings of supplemental information from offline pipelines to update the stream. This kind of backfill puts a lot of pressure on the real-time cluster. During the backfill, you need to wait for the backfill to work and monitor the workflow so that the real-time cluster does not go down. Therefore, the user asked if they could backfill offline instead of backfill via real-time stream processing.

Currently, our users use Beam on Samza for stream processing. They are very familiar with Beam API and Spark Dataset API, and also use Dataset API to do some other business processing except backfill.

It should be especially emphasized that many Dataset apis operate directly on Object and have high requirements on type security. It is impractical to suggest that these users directly change to WORKflows such as SQL or DataFrame. Because their existing business logic is the direct operation and transformation of Object.

In this case, we can provide the user with some alternatives, Imperative API. Take a look at what the industry has to offer:

The first choice is the Flink DataStream API that will be unified. We also investigated the Flink DataSet API (Deprecated) when evaluating the solution. DataStream API can be unified. And the support for stream processing and batch processing is relatively complete. However, the disadvantage is that it is Imperative API after all, there may not be much optimization, and the subsequent optimization should continue. Flip-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) Batch execution for the DataStream API.

The second option is Spark Dataset, which is also a natural choice for users. The Dataset API can be used to do Streaming, which is different from Flink’s Dataset, DataStream API and other physical apis. It is based on Spark Dataframe SQL engine to do some type safety. The degree of optimization is relatively good. Databricks: Introducing Apache Spark Datasets and Spark Structured Streaming Programming Guide Unsupported – operations.

The third option is Beam On Spark, which currently mainly uses RDD Runner, but currently has some difficulty supporting Runner with Optimization. Some of Beam’s ongoing work in case B will be discussed in detail later. See Beam Documentation – Using the Apache Spark Runner and Beam-8470 to Create a new Spark Runner based on Spark Structured Streaming framework.

In terms of user feedback, Flink’s DataStream (DataSet) API and Spark’s DataSet API are very similar in terms of user interface. As an Infra engineer, to help users solve problems, familiarity with the API is more important.

But Beam’s API is very different from Flink’s and Spark’s. It’s a Google ecosystem, and we’ve worked with users before, and their workflow is on Beam on Samza, They write some business logic using P collections or P Transformation. The signature of output and input methods are quite different. We developed some lightweight Converters to help users reuse existing business logic. Better for rewriting Flink or Spark jobs.

From the perspective of DAG, case A is A very simple business process, which simply and directly transforms Object. In this case, Flink and Spark are very close in performance.

Typically, we use the Flink Dashboard UI to see exceptions, business processes, etc., which is a distinct advantage over Spark. When Spark queries the Driver log, an exception occurs. But Flink still has several areas to improve:

  • History Server – Support for richer Metrics, etc

The Spark History Server UI provides rich metrics, which helps users to perform performance analysis. Will Flink’s batch processing place also allow Spark users to see the same amount of metrics information, making development easier and more efficient for users?

  • Better batch operation and maintenance tools

Share something LinkedIn has been doing for two or three years. LinkedIn has 200,000 jobs running on clusters every day and needs better tools to support batch users to run their own jobs. We provide Dr. Elephant and GridBench to help users debug and run their own jobs.

Dr. Elephant is open source and can help users debug their jobs better, find problems and provide suggestions. In addition, before moving from a test cluster to a production cluster, the decision to allow production is made based on the score of the evaluation results in the report generated by Dr. Elephant.

GridBench does statistical analysis of data, including CPU hot spot analysis, to help users optimize their jobs. GridBench also plans to open source and support a variety of engine frameworks, including the ability to add Flink, which can be used to better evaluate Flink jobs. GridBench Talk: Project Optimum: Spark Performance at LinkedIn Scale.

Users can view not only GridBench reports and Dr. Elephant reports, but also some basic information about jobs, such as application CPU time and resource consumption, through the command line. You can also compare and analyze Spark jobs and Flink jobs.

These are the two areas where Flink batch processing needs to improve.

3.2 Case B – Complex ETL data flow

The second direction, batch -> stream, is classified as stream batch.

The core logic of ETL data flow is relatively complex, such as the session Window aggregation window, the page views of the page are calculated every hour, different jobs are divided, page keys in the metadata table are shared in the middle, and the first job is processed at 00. The second job deals with the 01 time point, performs some sessionize operations, and finally outputs the results, which are divided into open session and close session, to incrementally process the data per hour.

This Workflow is originally an offline delta process using Spark SQL. It is purely offline delta processing. When users want to move their work online to do some real-time processing, they need to re-build a real-time workflow, such as Beam On Samza. During the construction process, we have a very close contact and communication with users, and users encounter many problems, including the reuse of the whole development logic. Ensuring that the two business logic produce the same results, where the data ends up, and so on, takes a long time to migrate, and the end result is not very good.

In addition, the user’s job logic uses Both Hive and Spark to write a lot of large and complex UDFs. This migration is also a lot of work. Users are familiar with Spark SQL and Spark DataFrame apis.

The solid black line in the figure above is the process of real-time processing, and the gray arrow is the process of batch processing, which is equivalent to a Lambda structure.

For case B, there are many Joins and Session Windows in the job, which they also developed using Spark SQL. Obviously, we will start with the Declartive API, which currently provides three schemes:

The first option is Flink Table API/SQL, stream processing batch processing can be done, the same SQL, functional support is very comprehensive, stream processing and batch processing are optimized. Alibaba Cloud Blog: What’s All Involved with Blink Merging with Apache Flink? Flink-11439 INSERT INTO flink_SQL SELECT * FROM blink_sql

The second option is Spark DataFrame API/SQL. You can also use the same interface for batch and stream processing, but Spark’s stream support is not strong enough. Check out Databricks Blog: Deep Dive into Spark SQL’s Catalyst Optimizer and Databricks Blog: Project Tungsten: Bringing Apache Spark Closer to Bare Metal.

The third option is Beam Schema Aware API/SQL. Beam is more of a physical API, and the work on Schema Aware API/SQL is still in the early stage, so it is not considered for the moment. Therefore, the main analysis results and experience are derived from the comparison between Flink Table API/SQL and Spark DataFrame API/SQL. See Beam Design Document-Schema-Aware PCollections and Beam User Guide-Beam SQL Overview.

From the perspective of users, Flink Table API/SQL and Spark DataFrame API/SQL are very similar. Some minor differences, such as how to write keywords, rules and joins, will also cause some confusion to users. You wonder if you’re using it wrong.

Flink and Spark are well integrated with Hive, such as Hive UDF reuse, reducing the migration pressure of UDF in case B by half.

The performance of Flink in pipeline mode is obviously better than that of Spark. It can be imagined that whether or not to drop disks will have a great impact on performance. If a large number of drops are needed, each stage will drop data to disk and read it again. Must be worse than the processing performance of pipeline mode without falling. Pipeline is more suitable for short processing, in 20 minutes 40 minutes or has a relatively large advantage, if the fault tolerance of long pipeline must not be compared with the Batch mode. Spark’s batch performance is still better than Flink’s. This area needs to be evaluated based on the case of your own company.

Flink’s support for Windows is significantly richer than other engines, such as Session Window, which makes it easier for users to use. In order to realize the session window, our users specially wrote a lot of UDF, including doing incremental processing, building up the session, taking out the record to do processing and so on. Now use the Session Window Operator directly, saving a lot of development overhead. At the same time, group aggregation and other window operations are also supported at the same time.

The Session Window:

// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
    
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on $"proctime" as $"w")
Copy the code

Slide Window:

// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
    
// Sliding Processing-time Window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")

// Sliding Row-count Window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")
Copy the code

Udfs are the biggest obstacle when migrating between engine frameworks. If the UDF is written in Hive, it is easy to migrate, because both Flink and Spark support Hive UDF is very good, but if the UDF is written in Flink or Spark, migrating to either engine framework will be very problematic. Such as migrating to Presto for OLAP near-real-time queries.

In order to reuse UDF, we at LinkedIn have developed an internal transport project, which has been open source on Github. You can check out LinkedIn’s blog: Transport: Towards Logical Independence Using Translatable Portable UDFs.

Transport provides a user-oriented User API for all engine frameworks, provides a common function development interface, and automatically generates UDFs based on different engine frameworks, such as Presto, Hive, Spark, Flink, etc.

Using a common UDF API to break through all engine frameworks enables users to reuse their business logic. Users can easily use it, such as the following user to develop a MapFromTwoArraysFunction:

public class MapFromTwoArraysFunction extends StdUDF2<StdArray.StdArray.StdMap>{

    private StdType _mapType;
    
    @Override
    public List<String> getInputParameterSignatures(a){
        return ImmutableList.of(
            "array[K]"."array[V]"
        );
    }
    
    @Override
    public String getOutputParameterSignature(a){
        return "map(K,V)"; }}@Override
public void init(StdFactory stdFactory){
    super.init(stdFactory);
}
@Override
public StdMap eval(StdArray a1, StdArray a2){
    if(a1.size() ! = a2.size()) {return null;
    }
    StdMap map = getStdFactory().createMap(_mapType);
    for(int i = 0; i < a1.size; i++) {
        map.put(a1.get(i), a2.get(i));
    }
    return map;
}
Copy the code

To deal with SQL migration problems of users, users used Spark SQL to develop jobs, and then wanted to use stream batch, change to Flink SQL. There are a lot of frameworks out there. LinkedIn has developed a coral solution, opened it up on Github, and has done some Facebook talk, These include the Transport UDF to provide an isolation layer for users to better migrate across engines and reuse their own business logic.

The script will define the familiar ASCII SQL and table properties, then generate a CORAL IR tree structure, and finally translate the physical plan for each engine.

In case B, the flow batch is unified. In the case of large cluster traffic, users attach great importance to the performance, stability and success rate of batch processing. Shuffle Service greatly affects the batch processing performance.

4. Comparison between Spark and Flink for Shuffle Service

In-memory Shuffle, Spark, and Flink are supported, which is fast, but not extensible.

Hash-based Shuffle, Spark, and Flink are all supported. Compared with In-memory Shuffle, they are more fault tolerant, but also do not support scalability.

Sort-based Shuffle, scalable for large Shuffle, reads from disk bit by bit Sort match better read back, on flip-148: Introduce Sort-Based Blocking Shuffle to Flink has also been supported.

External Shuffle Service: The External Shuffle Service is very important when the cluster is very busy (for example, when dynamic resource scheduling is performed). The External Shuffle Service isolates Shuffle performance from resource dependence. After the isolation, resources can be better scheduled. Flink-11805 A Common External Shuffle Service Framework is now in the reopen state.

Cloud Native is advocated in the field of big data. The separation of computing and storage should also be considered in the design of Shuffle Service. Flink-10653 Introduce Pluggable Shuffle Service Architecture Introduces the Shuffle Service Architecture.

Spark has given a relatively big boost to Shuffle Service, which is also a magnet project led by LinkedIn, resulting in a paper called commission-Magnet (Magnet: A Scalable and Performant Shuffle Architecture for Apache Spark), in LinkedIn blog 2020. Magnet obviously improves the efficiency of disk read and write. From relatively small random range to relatively large sequential reads, magnet also performs some merging rather than random reads of shuffle data, so as to avoid some problems of random IO.

Magent Shuffle Service is used to alleviate problems in Shuffle stability and scalability. Before this, we found a lot of Shuffle problems, such as Job failure and so on, very high. If you want to use Flink for batch processing, it will take more work on Shuffle to help users who used Spark for batch processing.

  • In terms of Shuffle availability, best-effort is adopted to push Shuffle blocks, ignoring some large blocks to ensure final consistency and accuracy.
  • Generate a copy of shuffle temporary data to ensure accuracy.

If the push process is particularly slow, there is an early termination technique.

Compared with Vanilla Shuffle, Magent Shuffle reduces the wait time for reading Shuffle data by almost 100%, reduces the task execution time by almost 50%, and reduces the end-to-end task duration by almost 30%.

5. To summarize

  • LinkedIn is very pleased to see that Flink has a clear advantage in streaming and batch processing. It is more uniform and is still improving.

  • Flink batch processing capabilities need to be improved, such as History Server, metrics, debugging. When users are developing, they need to see some solutions from the user community, and the whole ecosystem should be built so that users can conveniently use it.

  • Flink needs to focus more on shuffle service and large cluster offline workflows, ensuring workflow’s success rate, providing better user support and monitoring cluster health as it scales up.

  • With the emergence of more and more framework engines, it is better to provide users with a more unified interface, which is a relatively big challenge, including development and operation. According to the experience of LinkedIn, there are still many problems, not a single solution can cover all user scenarios. Even some functions or expressions are hard to cover completely. Like Coral, Transport UDF.


For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~