This article is edited from the topic “Pravega Flink Connector Past, Present and Future” shared by Yumin Chou, Software Engineer, Dell Technology Group, at Flink Forward Asia 2020. The article is as follows:

  1. Introduction to Pravega and Pravega Connector
  2. Pravega Connector’s past
  3. A review of Flink 1.11’s advanced features
  4. future

I. Introduction to Pravega and the Pravega Connector

The name of the Pravega project comes from the Sanskrit word for good speed. The project originated in 2016, opened source on GitHub based on the Apache V2 protocol, and joined the big family of CNCF in November 2020 and became the Sandbox project of CNCF.

The Pravega project is a new enterprise-class storage system designed for large-scale data flow scenarios to complement the shortcomings of traditional message queue storage. It also adds enterprise-level features, such as elastic scaling and tiered storage, that help reduce usage and maintenance costs for enterprise users while maintaining a border-free, high-performance read and write to streams. At the same time, we also have many years of technology accumulation in the storage field, can rely on the company’s commercial storage products to provide customers with persistent storage.

The architecture diagram above describes a typical Pravega read/write scenario, and provides an introduction to Pravega terminology to help you further understand the architecture of the system.

  • In the middle is a cluster of Pravega, which as a whole is a stream abstraction system. A stream can be thought of as a topic analogous to a Kafka. Similarly, Pravega’s Segment can be analogous to Kafka’s Partition as a concept for data partitioning, while providing dynamic scaling capabilities.

    Segment stores binary data streams and, depending on the size of the data flow, performs a merge or split operation to free or concentrate resources. In this case, the Segment performs a SEAL operation to prevent any new data from being written to the Segment, and the new Segment receives the new data.

  • On the left side of the picture is the scene of data writing, which supports the writing of APPEND ONLY. Users can specify Routing keys for each event to determine Segment ownership. Think of this as the Kafka Partitioner. The data on a single Routing key is order-preserving, ensuring that the read is in the same order as the write.
  • On the right side of the image is a scenario where the data is read, and multiple readers are controlled by a Reader Group. The Reader Group controls load balancing among readers to ensure that all segments are evenly distributed among readers. Checkpoint mechanism is also provided to form a consistent stream shard to ensure data recovery. For “read” we support both batch and stream semantics. For streaming scenarios, we support tail reading; For batch scenarios, we will consider high concurrency more to achieve high throughput.

II. Past of Pravega Flink Connector

Pravega Flink Connector is the first Connector supported by Pravega. This is also due to the fact that Pravega and Flink are designed in the same way. Both are stream-based batch-based systems that can be integrated into a complete storage plus computing solution.

1. Development history of Pravega

  • Connector has been a standalone GitHub project since 2017. In 2017, we developed based on Flink version 1.3. At that time, Stephan Ewen and other members of Flink PMC joined us to build the most basic Source/Sink function, supporting the most basic read and write functions. It also includes integration with Pravega Checkpoint, which will be described later.
  • One of the most important highlights of 2018 is end-to-end precise one-time semantic support. There was a lot of discussion between the team and the Flink community at the time. Pravega first supported a transactional write client feature, and the community worked together to implement a Checkpoint-based distributed transaction function based on the Sink function with a set of two-phase commit semantics. Later, Flink also further abstracts the two-phase commit API, which is known as TwoPhaseCommitSinkFunction interface, and is also Kafka connector. The community has blogs devoted to this interface, as well as end-to-end one-off semantics.
  • 2019 will see more Connector completions to other APIs, including batch read and Table API support.
  • The main focus for 2020 will be the integration of Flink 1.11, with the focus on FLIP-27 and FLIP-95’s new feature integration.

2. Checkpoint integration implementation

Taking Kafka as an example, let’s first take a look at how Kafka achieves Flink Checkpoint integration.

The diagram above shows a typical Kafka “read” architecture. The Flink Checkpoint implementation, based on the Chandy-Lamport algorithm, sends an RPC request to the Task Executor when the Job Master triggers a checkpoint. After receiving it, it will merge the Kafka commit offset in its state store back to the Job Manager to form a Checkpoint Metadata.

When you think about it, there are a few small problems:

  • Scaling capacity and dynamic balance support. How to ensure the consistency of the Merge when the Partition is being adjusted, or in Pravega’s case, when the Partition is dynamically expanding and shrinking.
  • Another point is that the Task needs to maintain an offset, and the whole design is coupled to Kafka’s internal abstract offset.

Given these shortcomings, Pravega has its own internally designed Checkpoint mechanism. Let’s take a look at how it integrates with Flink’s Checkpoint.

Read the Pravega Stream as well. Instead of sending an RPC request to the Task Executor, the Job Master sends an RPC request to the Task Executor via the Interface ExternallyInducedSource. Send a Checkpoint request to Pravega.

Meanwhile, Pravega internally uses the Statesynchronizer component to synchronize and coordinate all readers, and sends Checkpoint events between all readers. When the Task Executor reads a Checkpoint Event, the entire Pravega will mark the Checkpoint as complete, and the returned Pravega Checkpoint will be stored in the Job Master State. This completes the Checkpoint.

This implementation is actually cleaner for Flink because it doesn’t couple the implementation details of the external system, leaving the entire Checkpoint work to Pravega to implement and complete.

Reviewing Flink 1.11’s high level features

Flink 1.11 is a major release for 2020 and has a number of challenges for the Connector, focusing on the implementation of two FLIPS: FLIP-27 and FLIP-95. The team also spent a lot of time integrating these two new features, and encountered some problems and challenges along the way. Here we share with you how we step in and fill in potholes. This article will take FLIP-95 as an example.

1. The integrated FLIP – 95

FLIP-95 is a new Table API with a similar motivation to FLIP-27, which is to implement a batch-stream interface while also better supporting CDC integration. In view of the long configuration key, the corresponding FLIP-122 is also proposed to simplify the configuration key setting.

1.1 Pravega’s old Table API

You can see Pravega’s Table API before Flink 1.10 in the figure above, and you can see the DDL for the Table built in the figure:

  • The update mode and append are used to separate the batch from the stream, and the data of the batch stream is not intuitive.
  • The configuration is also very verbous and complex, and the Stream to read is configured via a very long configuration key such as connector.reader.stream-info.0.
  • At the code level, there is also a lot of coupling with the DataStream API that is hard to maintain.

In response to these problems, we have a strong incentive to implement a new set of APIs that allow users to better work with table abstractions. The whole framework is shown in the figure. With the help of the whole new framework, all configuration items are defined through the ConfigOption interface and are managed in the PravegaOptions class.

1.2 Pravega’s new Table API

The following diagram shows the latest implementation of the Table API. It is significantly simplified and features improvements, such as the configuration of enterprise-level security options, the ability to specify multiple streams and start streamcuts.

2. Flink-18641 solution process experience sharing

Next, I’d like to share a small insight from the Flink 1.11 integration with an issue resolution process. Flink-18641 was a problem we had integrating with version 1.11.0. During the upgrade, a CheckPointException is reported in the unit test. Next is our complete debug process.

  • First of all, I will step by step debug the breakpoint. By reviewing the error report log, I will analyze the relevant Pravega and Flink source code to determine that it is some problems related to the Flink CheckpointCoordinator.
  • Then we also looked at some of the community’s submissions and found that after Flink 1.10, the CheckpointCoordinator thread model has moved from a lock-controlled model to a Mailbox model. This model caused some of the logic of our original synchronous serialization execution. The error was parallelized, which resulted in the error.
  • I further read the pull request of this change and got in touch with some committers through email. Finally, confirm the problem on the Dev mailing list and open the JIRA ticket.

We’ve also summarized the following considerations for people working in the open source community:

  • Search mailing lists and JIRA to see if anyone else has asked a similar question;
  • A complete description of the problem, providing detailed version information, error log and replay steps;
  • After receiving feedback from community members, further meetings can be held to communicate and discuss solutions.
  • Use English in a non-Chinese language environment.

Being a developer in China, Dist. We also have a group of committers and video ways to contact a lot of committers. In fact, it is more of a process of communication. To do open source is to communicate with the community, which can promote the common growth between projects.

Fourth, the future outlook

  • The big work in the future will be the integration of the Pravega Schema Registry. The Pravega Schema Registry provides the management of the metadata of the Pravega Stream, including the data schema and how it is serialized and stored. This feature was released in the first open source release of the project with Pravega 0.8. We will implement Pravega’s Catalog based on this project in a later 0.10 release, making it easier to use the Flink Table API;
  • Secondly, we will keep an eye on the new developments of the Flink community and actively integrate new versions and features of the community. The current plans include Flip-143 and Flip-129.
  • The community is also completing the transformation of the new Test Framework based on the Docker container, and we are keeping an eye on integrating it.

Finally, I hope that the community members can pay more attention to the Pravega project and promote the joint development of the Pravega Connector and Flink.