Content Source:On August 5, 2017, Dang Hexuan, senior engineer of Big data of Seven Niuyun, gave a speech on “Go Based Big Data Platform” in the “Gopher Hangzhou Meetup”. As the exclusive video partner, IT mogul Said (wechat ID: Itdakashuo) is authorized to release the video through the review and approval of the host and the speaker.


Read the word count: 2610 | 7 minutes to read

Guest Speech video and PPT review:
suo.im/1h4oJc

Abstract

Big data is always something we talk about, no matter how big your data is, right? Party he Xuan will bring you a new big data processing ideas.

What is the Pandora

Pandora is the code name of the Seven Cows Big Data team and the whole series of products. Our goal is to provide a simple, open and efficient one-stop big data platform.

System design analysis and architecture

The challenges of building systems

At the beginning of this system, we first sort out the problems that might be encountered. A good system should not only solve the current problems, but also take into account the challenges that the business or data scale may face in the foreseeable future.

The three points on the left are where we need to think about business, and the three points on the right are where we need to think about implementation and architecture.

You can see that the core of the system is the need for high throughput, low latency capabilities.

As shown in the figure above, the most central part of this system is the red and blue block diagram. The blue part is mainly responsible for pulling data out of the message queue, and then conducting task allocation and scheduling through the red master module, and importing the pulled data into the downstream business modules.

The green module on the far right is our monitoring system, which can collect and monitor business indicators, link performance and machine health from the top level. The yellow block diagram is a visual presentation of the business and monitoring indicators.

At the top of the image is a visual interface that we provide the user with a few mouse-like drag-and-drops to create a workflow. The export part of this workflow is reflected in the export system, which is to do the work of pulling, processing and pushing data.

Multiple upstream and downstream adaptations

Business architecture

We pull data from Kafka, process it, and then push it downstream to different systems. As these systems are different, we need to think about how they are different and similar.

The export model

There are two kinds of export models, one is the most basic general export model. After receiving the task, the data should be taken from upstream, processed or filtered, and then pushed to the following.

The other is the cloud storage export model, which adds two steps to the general export model, that is, the pulled file is stored locally, then some compression, and uploaded to the cloud storage. In this way, the number of files is reduced, the cloud storage space is reduced, and users’ costs are reduced.

High throughput/low latency problem exploration

This problem can be difficult to solve, especially in the case of large data volumes, and both high throughput and low latency have their own difficulties. According to our practical experience, the biggest problem encountered in throughput is that the utilization rate of resources is not high enough, or the throughput capacity of upstream and downstream systems is inconsistent, and there is a short board effect.

In many cases, low latency is a requirement for service stability, so we should pay attention to avoid service hotspots.

In addition to this, failure to detect changes in load between upstream and downstream systems can cause a link to break. The more complex the system, the more likely it is that problems will pop up at some unintended point.

Data prefetching

When the Export Server pushes data downstream, it pulls data back from upstream in advance to maximize network utilization, reduce waiting time, and improve Export efficiency.

If no data is available during prefetch, sleep for 1s before fetching data. If there is no data, the sleep time is doubled until 32s. If data is retrieved during the process, the sleep time is reset to 1s. This mechanism effectively reduces the number of requests to the underlying storage.

Data push protocol optimization

We optimized the data push protocol between Export Service and LogDB. Json was initially used, but its serialization and deserialization performance was poor, and CPU utilization in downstream systems was very high, affecting the overall performance of the service.

After some research we changed the Json format to Protobuf. It was observed that bandwidth consumption was reduced by nearly half, throughput was increased, and CPU resource consumption was reduced by more than half.

Optimal use of resources

Kodo export to save storage space, the Converter step before the export uses Parquet compression, which can have a compression ratio of 8 to 1. The results are good, but the drawbacks are equally obvious.

The drawback is that consuming a lot of CPU affects the service, and the compression ratio is better only if the file is large. We made some optimizations to control concurrency, optimize memory usage during compression, and precisely control CPU usage.

High availability and horizontal scaling

Master/server architecture

Golang RPC is used for communication between master and server. The server reports the heartbeat to prove that it is alive and reports the status of the tasks performed. The master periodically sends tasks to the server. The server manages tasks to determine which tasks to execute and which to discard.

Master the high availability

The master is stateless and its identity information is registered with ZooKeeper. Master Failover Automatically performs an active/standby switchover. If the master loses the lock, it will commit suicide. If the standby master snatches the lock, it will become the master.

Server high availability

The server registers itself to prevent repeated running on a single machine. The server registers each task to prevent the task from being executed repeatedly. The server is highly available. When a node fails, tasks are scheduled to other normal nodes.

Horizontal scaling

If resources are insufficient, a new server is added as a new server. The new server obtains the master identity information from the ZK and reports heartbeat messages to the master. Other tasks are scheduled to the new server.

Automated operation and maintenance

The system automatically detects and adjusts hotspots

On the one hand, logs are used to audit and forecast the trend of services, so as to predict the hot spots in a period of time offline. On the other hand, the state feedback of the service itself is adjusted in real time to correct the macro prediction results.

Status quo of On-line system

It processes more than 100 billion data points and 100 terabytes of data every day. Online export delay is less than 1 minute, less manual intervention. Second expansion, real-time visual monitoring system, easy to use alarm system, automatically generate online daily newspaper.

The application of the Go

What did we do with Golang

Golang is used to develop core code for streaming computing, offline computing, log retrieval, timing database and other services.

Logkit is a simple, efficient data access tool that can access multiple databases, Kafka, machine metric information, and more in addition to Pandora. And a full set of monitoring tools.

Why Golang

Golang is easy to use and quick to get started. Reduces the mental burden on programmers and allows them to focus on the business. The simpler and more efficient concurrency model naturally supports the writing of distributed services, with rich libraries to call upon. Qiniu is one of the first companies in China to practice the GO language. The company’s internal RPC and cache system based on Golang have been polished very mature.

That’s all for today’s sharing, thank you!