Recently, big Data and AI Technology Practice | Open Talk Hangzhou Station Salon held by Youpaiyun was successfully held in Hangzhou Xixi Science and Innovation Park. This event invited youzan, Getui, Fangde intelligence, Youpaiyun and other core technology developers of the company to share their big data technology experience and experience in their respective fields. The following content is shared by Zhang Zhao, senior development engineer of Youpaiyun:

Zhang Zhao, senior development engineer, is currently responsible for the refresh preheating, log processing and operation and maintenance platform development of Youpai CLOUD CDN. I am familiar with OpenResty and have rich experience in the field of Web development. At present, I am keen on researching related technologies of big data processing.

Hello everyone, I am Zhang Zhao from Youbai Cloud. Today, I will mainly share the considerations of Selecting Flink for multi-source log processing of Youbai Cloud, as well as the problems and solutions encountered in the process of Flink’s landing.

Why Flink for batch processing

Before choosing Flink, our overall business requirements for log batch processing were divided into three steps: data source collection, log processing, and result storage. Our log volume is 100G/h, the processing speed of single machine service is slow, and the capacity expansion is not convenient. Some similar requirements are completed in the form of coding. In addition, the data processing process is complex and needs to be transferred among multiple services, so a solution is urgently needed to solve the problem.

In the early stage, we investigated the database and found that the database did not have the function of multi-dimensional repeated summarization and mining, so we gave up the database scheme and chose Hadoop in MapReduce. In the actual production, it is often found that there are some errors in writing, resulting in some aggregation operations cannot be done. Then we chose Spark, and new problems emerged: Incomplete support for Restful apis when submitting tasks; The virtual IP address in the Web console cannot access the interior.

For these reasons, we need a better solution. By comparison, we found Flink. Flink circumvents all of these problems and comes with a full Restful API. Not only can you render the page, but you can also Submit the task directly via Submit New Job. At the same time, as we upgrade the old service, we gradually understand the characteristics of our log data and what aspects of the log data we need to mine currently. After checking the resources that can be called at hand, we hope that the whole system of deployed services can be observed and maintained. Therefore, based on the above reasons, we finally give up Spark solution and choose Flink.

Flink basics

Flink component stack

As the picture below shows, this is a distributed system and relatively simple overall. The Flink Client on the far left supports the current submission method of the Client. As we will see later, it supports submission Restful apis and submission of tasks to the Job Manager through command lines.

The Job Manager is the master node in the distributed system. After receiving the data, the Master node analyzes the package and sends other relevant information to the corresponding TaskManager node. The TaskManager executes the Job after receiving the information. The main function of the Job Manager is to parse the graph and maintain the whole cluster, such as heartbeat, resource scheduling, high availability of HA, file storage, etc. This is the process of Flink submitting the task runtime.

Moving on to Flink’s static overall design, the underlying layer is the deployment part, which will be covered later. The middle core is Runtime, which encapsulates two different apis. DataStream is stream processing, which is the most used scenario in Flink today. DataSet is the batch processing method we use. Although Flink claims to support streaming batch processing, the two interfaces are separated in its current version. The version 1.12 released in December this year has not encouraged the use of the API related to DataSet, which has been integrated into DataStream. But since we deployed version 1.1 without upgrading, we haven’t migrated these jobs to DataStream yet.

Next we explored the topmost Tabl Circle, but it didn’t work out well because there was limited support either in documentation or in code. For example, when executing circle and trying to output the final result to PG, there is a bug in circle. Its PG database finally spells out the wrong address, and its host and pot are missing a backslash. This bug is very simple, but has not been fixed yet. So I think the top layer is probably not well tested or stable. Therefore, the implementation of our final code and the compilation of business centralization are also done in the part of DataSet API called.

In addition, we also did some small work. Based on the Cloud storage system, we extended its related functions to support the direct output of Flink processing results to the cloud storage, which simplified the overall code.

JobManager and TaskManager

The role of JobManager is mainly reflected in its components. Dataflow Graph, for example, can parse the package submitted by the Flink client into an executable Graph and distribute it to the TaskManager node below. Another component of interest is the Actor System, which is implemented by the ScadAKKA asynchronous network component. We found a lot of AKKA time out issues during late deployment, which meant that the JobManager component was having trouble communicating with the TaskManager component.

When TaskManager interacts with external systems, it does not use the Actor model. The Actor model is primarily asynchronous communication, with an emphasis on speed. When it communicates with the outside world, TaskManager uses Netty for more stable input data.

Focusing on the concept of Task Slot, some shared best practices suggest that slots in TaskManager should be 1:1 with the number of CPU cores on the current machine. We initially ran small jobs in a 1:1 design, which was fine, but we often had some time-out issues as the data volume increased. The reason for this is that the CPU provided by Kubernetes is only a practical slice of a CPU, not equivalent to the CPU on a physical machine. When multiple cpus are deployed under TaskManager, their memory is allocated, but the CPU is shared. In this case, the overall TaskManager is not particularly stable. So we ended up with something like 1:4 or 1:8. Specific data should be determined from network conditions and experience values in the current environment.

Flink deployment

When we first deployed Flink, we were confused because the Flink deployment documentation covered many of the patterns such as standalone, Kubernetes, YARN, or Mesos, as well as others that were less practical. Although we conduct a Kubernetes operation on the cloud platform, we cannot directly use the Kubernetes hosted service, so we finally adopt the Standalone on Docker mode, as shown in the picture below:

  • In Standalone mode, the Master and TaskManager can run on the same machine or on different machines.

  • In the Master process, the Standalone ResourceManager manages resources. When the user submits JobGraph to the Master through Flink Cluster Client, JobGraph goes through the Dispatcher first.

  • When the Dispatcher receives the request, the JobManager is generated. The JobManager process then applies for the resource from the Standalone ResourceManager, and finally starts the TaskManager.

  • After the TaskManager is started and registered, the JobManager assigns specific tasks to the TaskManager for execution.

Flink submits the task

Flink provides a wealth of client-side operations to submit and interact with tasks, including the Flink command line, Scala Shell, SQL Client, Restful apis, and the Web.

The most important is the command line, followed by the OPERATION of SQL Client to submit SQL tasks, and the Scala Shell to submit tasks to the Table API. It also provides Restful services that can be called through HTTP. There are also Web ways to submit tasks. What’s really useful for us is Restful API functionality. In the current service, except for the code of pulling the original log, the subsequent operations such as statistics and sorting of some self-developed components of GO are not needed now, and flink-related interfaces are directly called.

Flink is a process that executes asynchronously. After calling the interface to transfer the task, taster’s ID will be returned to you. In subsequent operations, you can continue to cycle through this interface to find the execution of the current task and then make the next decision. Taken together, Flink’s Restful API is very convenient for our heterogeneous, non-Java team.

Problems encountered with batch processing

Network problems

When we gradually migrated the logging service, the log volume was small at first, and Flink worked very well. When it is found that it can not load, GVM heap errors and other problems just need to adjust the relevant parameters can be, after all, the cloud platform resources are relatively rich, the operation is also very convenient. However, when we trust it more and more, and a job has hundreds of GIGABytes of traffic, the whole TAP graph becomes a line, and network problems occur. Before, there were problems such as heartbeat timeout or task retry, which we did not particularly care about, because Flink supports retry after failure, and we can sense it through restful interface, so we try again after failure. However, as the volume of subsequent tasks increases, the cost of each run increases, resulting in more commits and more deterioration of the current cluster.

The akkaTimeout problem is caused by the communication between JobManager and TaskManager. The akkaTimeout problem is caused by the communication between JobManager and TaskManager. Problems like heartbeat timeouts or links being reset are also numerous.

Why haven’t we dealt with this problem completely? Because we saw some experience summary of Ali’s Flink on K8S. You can take a look if you’re interested.

Faced with the same problem in this article, Ali team proposed that virtualizing the network on the K8S network would achieve certain performance, and we referred to this solution. Specifically, there are some adjustments to the Flink configuration and some other operations involving connection reset by peer:

Adjust Flink configuration parameters

  • Increase network fault tolerance, that is, the part related to timeout in the configuration parameter. For example, if the heartbeat exceeds 5 seconds once, adjust it to 20 seconds or 30 seconds. Note that it can not be completely banned or adjusted to a large amount.

  • Enable compression. If the file is uploaded in the form of plain text or not in the form of compressed package, Flink will read the file in parallel to speed up the processing speed, so it tends to upload the decompressed text in the early stage. When the network overhead becomes large, we choose to enable file compression, hoping to reduce the network overhead as much as possible by increasing the CPU pressure. In addition, TaskManager or communication between JobManager and TaskManager can also enable compression;

  • Using caching, such as taskmanager.memory.net work. Fraction etc, parameters configuration is flexible;

  • Reduce the number of Task slots under a single Task Manager.

Connection reset by peer

  • Do not have heterogeneous network environment (try not to access across the machine room)

  • The machine of the cloud service provider is configured with multiple queues of network cards (the network interruption of the instance is distributed to different cpus to improve performance)

  • Select high-performance network plug-ins provided by cloud service providers: for example, Terway of Ali Cloud

  • Host network, bypass K8s virtualization network (requires a certain amount of development)

Since the Connection Reset by peer scheme involves cross-department coordination, it is difficult to implement. Therefore, the current solution to alleviate network problems is to adjust Flink configuration. By this means, the network problems of the current cluster have been alleviated to a large extent.

Waste of resources

In Standlone mode, the total amount of resources configured in the cluster depends on the capacity required by the largest job in the current cluster. As shown in the figure below, the data copied between the different task steps at the bottom has reached 150 GB +. The solution to this problem is to continuously configure larger parameters.

However, because Flink has a JVM behind it, the JVM often claims resources and does not release them in time, so once a container has run through a task, memory will run up. When the configuration is constantly increased and the number of configurations is still so large, if our task is to do log processing at the level of one hour, the amount of resources really used is very small, and the final effect is not very good, resulting in a waste of resources.

The job card to death

When the capacity is relatively large, we find that the job will be stuck. Often, a large number of jobs will be stuck halfway through loading. As shown in the picture below (light blue is done, bright green is in progress), if we try to leave it alone, the task will run for three, five, or even eight hours until it is completely cross over due to a heartbeat timeout or something.

The problem is not completely located, so the only measure you can take is to set a maximum threshold when checking tasks through restful interfaces. When this threshold is exceeded, the task is considered completely broken and cancelled through the interface.

Revenue from Flink

The following figure shows a certain part of log processing. Each small square represents a service, and the link of the whole service is relatively long. When a data is loaded by multiple data sources, it will be transferred to the cloud storage of the same cloud storage, converted by the log-Merge service, and finally saved to the cloud storage or Redis according to the specific business requirements of the current service.

There are two ways to connect tasks. One is to make an artificial agreement. For example, I am your downstream component and we agree on a delay of 3 hours. The second way is to use ASQ. After I finish processing, I push the message. As for whether you consume or not and whether the consumption is successful, the upstream does not need to care. Although normally the service runs smoothly, it can be painful to locate and manipulate the entire system, chase down some logs or rerun some data once a problem occurs. This has been greatly improved overall since we introduced Flink.

At present, only the task management part is the reuse of the previous code, equivalent to the acquisition plate. The collected data is directly submitted to Flink for the current job, which is directly stored in the cloud storage after being processed by Flink. Our task management is mainly divided into two types of functions, one is acquisition, the other is dynamic monitoring of the current task results. In general, a closed loop is formed after reconstruction. Whether there is a problem in Flink processing or a problem in storage, the task management system will run again, which is equivalent to reducing some later operation and maintenance work.

conclusion

Choosing a standalone system to deploy a Flink system that can handle batch processing, which it is not very good at, is very challenging. The challenge is that this is not a typical Flink scenario, and many of the configurations don’t work out of the box, even though they claim to support batch processing, they are turned off by default. This requires tuning, but most of the documentation says that if you run into a problem of a certain kind, you should increase the value of a certain class. How much is a matter of experience.

However, we are confident in the future development of Flink because the current Flink is mainly promoting the integrated development of streaming batch. The datesat batching API and stream API are separate from each other in Flink1.1, but in 1.12 they are starting to merge, and the datesat section is no longer recommended. We believe that developing in this direction, keeping up with the rhythm of the community, the future is promising.

Recommended reading

Praised unified access layer architecture evolution

How to implement CI/CD in microservices architecture