Introduction:

The rapid development of the Internet has brought us into the era of big data. Big data has become an indispensable force to support development. The challenges and opportunities of big data coexist. Sun Jianbo, r & D engineer of seven Cows Big data team, brought you the technology sharing of Go’s actual combat experience in big data development. The following is a summary of the speech.

About the author:

Jianbo Sun, InfoQ columnist, graduated from The School of Computer Science, Zhejiang University with a master’s degree. He joined Qiniu at the end of 2015 and participated in and was responsible for the architecture design and development of the whole link and multi-module of Qiniu big data platform, including big data computing engine, timing database service and log retrieval service. Prior to this, he mainly studied and participated in Cloudfoundry, Kubernetes, Docker and other container and container cloud related work. He is one of the main authors of the book Docker Container and Container Cloud.

Big data

Figure 1

As can be seen in Figure 1, the ecology of big data is relatively mature now, with many, many components related to big data. For example, Kafka and HDFS can be stored, Mesos can be used for cluster scheduling, Spark can be used for retrieval, etc. Data visualization tools like Zeppelin, monitoring tools like Grafan. But with so many complex components put together, it can be fun to play with technology, there are so many components to play with, everything we can put together, we can do our own thing. But for a person who just wants to do the business, who just cares about the business, who needs to discover their value, it’s actually very painful, because they need to know how to put the components together, to fill in the holes of the components.

Pandora big data platform

Figure 2

Seven Cow big data platform is a platform that makes it easier for people to mine data value and use big data services.

So here’s what our Pandora Seven Cows Big Data team did:

  • Integrate various big data tools

  • Simplify complex big data management

  • Build a complete closed-loop of big data life cycle (collection – processing – analysis – management – consumption – freezing)

To put it simply (see Figure 2), it is to combine various components according to your needs, as shown in figure 2 for an interface diagram, each component is a small modular unit. You can do calculations on your own, in real time and offline. Then save to the appropriate location, if you need to store, we have seven niuyun storage, cheap, stable, reliable service. If you need to retrieve your logs and do search engine related things, you can do log retrieval service. If you need to do very efficient queries related to monitoring and so on in real time, you can export them to our cloud database service. There are also open source tools that make it easy to use the big data platform without barriers.

So what is the concept of Pandora’s big data platform? Is to integrate a variety of big data tools, so that we can simplify the operation, you can pay attention to the value of the data itself, complete the data information mining.

Pandora architecture

Figure 3

Figure 3. Product architecture of Pandora. The architecture of Pandora is very simple. With this in mind, we created a tool called LogKit that can derive data directly from the data source. If you are our seven cattle cloud storage users, do nothing to use our system. Then according to your different needs, such as some customers, they are going to the crawler, want to catch a lot of data, may be they will wash their web page, we also have their export crawler or export data, such as their original data is very, very much, thousands of hundreds of millions of data per second, so that customers don’t care about these primitive data directly, They do some filtering, they do some aggregation. Then, as needed, import it into Pandora and save it to the log retrieval service, or query it for quick retrieval.

Not only do we have open apis that users can call directly, on our platform, to give your users the value of your own data, but we also have open source tools to view your data. Of course, if you want to do offline analysis for these old data, then your cloud storage data, or log retrieval service data, can be analyzed through our Xspark tool, displayed through the display tool, and can be saved to your other services.

Figure 4.

Figure 4 is Pandora’s system architecture. The outermost layer is the data source. We take the data, enter the message queue, and then transform it for custom calculation. Then we have an export service that can export to a live database as well as to a log retrieval service. Then an API or open source implementation data visualization tool can be implemented, and finally Xspark can be used for data analysis and calculation if you want to do offline calculation.

Figure 5

What components does Pandora use? The top layer is a proxy that we provide, so that you can export your data from the data source, then into the service that receives the data, then into the message queue, this is a customized Kafka that we have, then into the data transformation (filtering, cleaning, computation, etc.) and then through the customized Spark Streaming. Finally, an export service is provided. The export service can be exported to other services you want, such as timing database. This is our self-developed distributed timing database for real-time data monitoring, aggregation and other needs.

It can also be exported to log retrieval service for data query and analysis, and then exported to qiniu’s cloud storage, and finally data visualization through Grafana, Kibana and other tools. This can be done offline or in real time.

Figure 6.

At present, the big data team of Qiniu has a relatively high data scale, with hundreds of TB and more than 200 billion real-time incremental data every day. We also provide abundant landing tools for the downstream. It basically meets the needs of the use of some big data we see at present.

So what are the problems with data export of this magnitude?

As you can see, we have a lot of services, such as real-time databases, log retrieval, cloud storage, and so on. We have to do some calculations on this huge amount of data and then export it. What about the efficiency of the flow of massive data that will go through several changes in our system? Will there be any problems? The biggest problem, everyone immediately thought, was delay. We call it real time, if there is a lot of delay, the user will not be able to accept, so there is no sense. So we did a lot of work on how to reduce that delay.

Figure 7.

Let’s take a look at what the data transfer model looks like before we address data latency. There are two kinds of traditional data transmission process, just like you receive the express, for example, you can let the express put there you go to get it, or you can let him directly to your hand. So there are two models, one is you go to the user and get the data, which is the pull model. Another option is to have the user type in the data directly. Obviously, as a service like this, you can only ask the user to call in and promise the user that this data will come to you and he can rest assured.

But what’s wrong with accepting data from users? So the efficiency of your data, in fact, depends on the posture of the user. If corresponding to different downstream services, users may use different posture, if writing a program, then you just connect, and then guide, you will encounter a lot of problems. So posture is very important.

So if we look at the data transfer, there are a couple of things that are generally conceived, guaranteed, common, you would think that the export of data, the flow of data is not going to change very much, for example, if a user is 10 MB/s today, is he going to be 100 MB/s tomorrow? You end up thinking 20 MB/s and so on, you don’t think 100 MB/s. Is this change certain? I don’t think so. Especially, as a PaaS manufacturer, we cannot say, we must think that the user is 10 MB/s today, and may be 15,20 MB/s tomorrow, but we should always be prepared for him to call us at 100 MB/s.

Then we might also think that the downstream services of the data are stable and reliable if we provide the equivalent of a data change, big data analysis. We provide a lot of downstream services, so many people feel that downstream is very stable. But downstream this reliability is actually less certain. As many manufacturers have been exposed before, well-known foreign manufacturers will also have this problem. So you can hardly guarantee that the downstream of the data is stable.

Common situations of data transmission:

1. The output of upstream data is stable (changes slowly)

2. The exported downstream service is always stably available (link loss is serious)

3. The export speed is only affected by one party in the upstream and downstream

  • One-way throughput = request size * number of concurrent requests

  • Overall throughput = F (pull throughput, link carrying capacity, push throughput)

  • For example, if the flow rate is 20K /s = upstream 10K /s*2+ downstream 5K /s*2? Network twitches? Slow downstream response? Card full? Memory out of limit?

And then maybe the easiest thing to think about is how fast you can export or transfer data, which is upstream, downstream, downline, and minimize. Is that really the case? In fact, it is generally thought that the number of concurrent requests multiplied by the size of each request is the actual total. So what is our overall request? It’s the amount of data you pull upstream, and then you go downstream, and then you’re missing a process, which is the capacity of this transmission link to push throughput. For example, if our traffic is 20 K/s, then our upstream requests are 10K×2, two concurrent requests, and downstream requests are 5K×4. Is that really ok? Not necessarily. Because we will encounter such problems as network instability, downstream response is slow, memory overflow and so on, so in fact this is what we must take into account.

So how to solve this problem? We can think of some common ideas.

  • Upstream and downstream decoupling: pull and push decoupling, data prefetch, queue staging, pull and send parallel

  • Task segmentation: large tasks are decomposed into small tasks, and small tasks are horizontally expanded

  • Task standardization: Each task carries a fixed amount of traffic. An increase in traffic increases the number of tasks

  • Improve resource utilization: scheduling, balancing and squeezing machine performance

  • Provide task management capability: o&M, operation, and monitoring

  • More understand the downstream

What is the decoupling of upstream and downstream? That is, pull data is decoupled from push data, and a queue is provided in the middle, so that data can be temporarily stored, so that the speed of the data is considered to be relatively fast. You just have to make sure there’s no problem with the queue.

What else comes to mind? If a user is 10 MB/s today and 100 MB/s tomorrow, your old service will not be able to support it. You have to turn this 100 MB/s into ten 10MB/s, and this problem can be solved easily. Then there is the task standardization, the service mixing that we often talk about, a 5-core machine here, a 10-core machine there, actually has a huge impact on your service. If you can standardize your data, and all of our clusters are the same size machines, then you can simplify a lot of your thinking when you do this strategy, and you can make sure that your task segmentation, which is breaking up big tasks into smaller ones, is reliable. Then we thought about how to improve utilization, manage capacity scheduling, monitor operations and so on. Most importantly, we need to know more about downstream services, such as real-time database, log retrieval and so on. Our ultimate goal is to reduce this latency.

Build the Pandora acceleration system

Figure 8.

Just now we have seen the export service, the export service is our Xspark, Xspark already does many, many tasks, the most important thing is that it is light. What does it do? The data export you just saw, from Xspark, data filtering and conversion, refined scheduling, and so on. What is refinement? You’re not just thinking about CPU, you’re thinking about memory, but you’re also thinking about network cards, the size of the machine, all sorts of things. So it does a lot of things, but the main thing is this fine-grained scheduling. We then built a lightweight distributed Goroutine to do just that. It provides a very strong guarantee that our export service, if something goes wrong downstream, will not affect the other services at all. But the point of our service today is not to talk about exports, it’s to talk about how we can build a better set of accelerators to speed up exports.

– Selection of acceleration system

  • Logstash?

  • Beat?

  • The flume?

  • Since the research?

So when you’re building an acceleration system you want to think about selection. One of the first things we encountered was our log retrieval service, how to deal with one of the plugins, the usual community solutions to borrowing, like Logstash, Beat, Flume, etc. So what concepts did we investigate? That corresponds to the kind of thinking we’ve been talking about, upstream and downstream decoupling and so on. What we find with logstash is that it’s more of a client gathering thing, it’s the middle end, or the server side that touches the data and then calls the services, but it’s not very good. Beat is a tool that provides a lightweight collection system.

Figure 9.

Flume provides such a cache (Figure 9). We thought Flume was reliable, so we tried it out. The biggest problem is that it can be configured in different locations if you have so many users. But if you’re a PaaS vendor and you provide 100,000 users, do you configure 100,000 users for auto-generation? It was just so inelegant and out of our Gopher experience, so we did it ourselves.

– Choice of language

Figure 10.

I found that almost all the teachers who came to share in the conference had to answer one topic, that is, why do they choose Go? In fact, it is a natural choice to use Go to do this. Not only this template, but also many components in big data are written using Go. So let’s compare our requirements and look at the selection of language from the perspective of requirements.

So first of all, how do you decouple upstream and downstream? This is the concept of a buffer. It is possible to receive data from one end and then transfer it to the buffer channel, and then the other end takes data from the buffer channel. What about any subsequent horizontal extensions? You would think of horizontal scaling as distributed. What about distribution? Usually at the process level, what about at the coroutine level? Would coroutine level be more comfortable? Because there’s already a language to do that for you. And then you want to improve the utilization of that resource, improve the task management ability, can you focus that attention on the task resource allocation management and so on.

And what about the most important thing to know more about downstream? Because it is self-research, we can let the downstream partners write components to write corresponding services, we can write this process through the plug-in. For example, if you do log retrieval, write a transport plugin to speed up your log retrieval. Of course, there are many, many reasons, for example, go. Everyone says it’s simple, easy to learn and use. The community has been very active over the years, and its deployment iterations are much easier. Golang is known to compile as a binary package, so you can play it any way you want. And then it’s very efficient, it’s very stable, it’s very high performance, concurrent programming, and our technology stack, which is basically Golang, so we stuck with Golang.

– Core model

Figure 11.

And then let’s see if we’re going to do this thing, if we’re going to develop this thing ourselves, what’s the core model of it? The first thing you might think about is that you’re dealing with a data source. And then you’re going to receive the data in transaction form, why transaction form? We’ll talk about that later. We passed a queue, and there were a lot of people talking about how to do this queue. If you really want to speed things up, there’s only one option, and that’s memory, otherwise everything else is going to be a huge performance bottleneck. Then downstream Sink can write all kinds of plug-ins by itself, and you can export to any service you want.

About sink, use plug-in form of downstream adapter form, because no one knows downstream better than downstream. As our boss Chen Chao always says, it’s best to give your girlfriend or wife a red envelope on Valentine’s Day and let her buy it herself. This is the truth, you throw a ball to him in the past, others can bear the weight, this is not easy to say, or let him come.

So, referring to what we just said, do it in transactional form. If it doesn’t work, you can slow down your export service or export it to another service. And then if you can just put it in. At the same time, transaction is also to solve the problem of distribution, we can open multi-task in the process of scheduling, so how to ensure that the data flow to only one place, in fact, is also a transaction. Transaction maybe one of the things you might be thinking about is, what if I have a lock? If the accumulated data is passed in memory, it just puts the data into this channel, and actually the data transfer is very fast, and the lock is very small. The concurrency that’s competing for the lock at the same time, if you control it well, there’s actually only a dozen concurrency, or a few dozen concurrency that’s competing for the lock, actually the lock’s performance is very low. So this transaction we practice after the use of very comfortable.

Figure 12

Another question is, what if you need to reboot or hang up? For a reboot, you need to provide a policy, how to get this memory queue into the local disk? We use a sink to put all the data in memory into a local disk queue. And then according to when you restore the data to recover, so solve the problem of data restart.

Figure 13

What if I hang up? Because we still have the upstream export service, we can think of it as doing its own thing, which can give us the ability to replay data. What shall we do? After the data comes, we only need to record the most basic metadata, such as the Offset of this set of data, which partition. If it is successfully sent to the downstream, it will be OK, and the data will pass. What if I have some data whose offset is sitting there from start to end and never exported? When it fails, we invoke the replay capability of the export service to replay the data so that the data is not lost.

Figure 14

And then we can look at the whole thing. As a whole, in is guaranteed by transaction, out is guaranteed by transaction whether it sinks out or not. Ensure that multiple coroutines are retrieved at the same time and no conflicts are implemented (ensure that only one copy of the data comes out). Shut down the disk queue, start and stream the disk’s data back to the channel. Then use a simple state machine to complete the process.

Figure 15

After many modules are combined, the whole framework is basically put together to ensure the flow of tasks. But we’re going to have to build a REST-API so that other people’s data can come in, that exported data can come in. In addition, we need to build a task-level REST-API, because we are facing millions of users on PaaS, we must make this matter a single user level, so we can use agent to schedule this matter. The concepts are encapsulated into tasks for different users and then exported to different services of sink or downstream. So such a stand-alone version, seems to be distributed has been completed, and relatively simple and clean.

– Summary of single machine model core

  • A REPO has an independent task. Tasks are not shared among different REPOS

  • A task contains one MQ, multiple sinks, and one disk queue

  • The number of sinks is determined by mongo configuration (same as capacity)MQ contains transaction pools. Each sink controls the flow of data into and out of MQ through transactions to ensure atomicity

  • The restart process is also in the form of sink, but the sending end is changed to a disk, ensuring that the memory data is not lost during the service upgrade

To sum up, the simplest way is to assign data to one task for each user. The user unit is a REPO. Tasks are not shared, and each task has exclusive resources. A task contains one MQ, multiple sinks, a disk queue to restart, and so on. You can also get the corresponding configuration from Mongo and use Mongo. Transactions to control atomicity, the restart process is also in the form of sink, through the data downstream to the disk, so the whole thing is done.

Distributed difficulties

  • Maintenance is difficult. What if I want to add or subtract a Producer node from my cluster?

  • Data dispersion, all data may pass through any producer node, corresponding to the data sent to the downstream by each producer, resulting in a small amount of data that may be gathered for each request and a small BATchsize.

  • Resource waste. Each producer has to maintain a large number of tasks, corresponding to a large number of goroutines, which wastes CPU and memory.

  • Uneven load, if pure random, or polling, once encountered machine configuration is different or mixed services, the load can not be balanced.

  • It is difficult to manage, and the corresponding Producer task needs to be started for new services and the configuration needs to be changed.

Note: Producer is the accelerated service name

So this stand-alone version seems to have solved the problem, what is the problem? What if you simply took this component, our accelerated service export, and put it on many machines?

It’s not. What are the other questions? It’s difficult to maintain, if we have scattered data, how do we control it and integrate it on different machines, or balance it, etc., what about the waste of resources? As mentioned earlier, we have tasks. What if a user comes and creates them or tries them out and doesn’t use them anymore? How to clean it up? What about load imbalance? And how to manage it?

– Distributed consistency problem

  • zookeeper/etcd

  • Self-developed distributed algorithm

  • Final consistency => pull system + version stamp

Therefore, we are faced with a common problem in distributed services, which is how to transfer data to each node, which is the problem of distributed consistency. To put it simply, how do you let data inform each node at this time, so that each node knows what you do and what kind of problem you want to solve. In fact, with the development of the community for so many years, there are relatively mature solutions to the problem of consistency. Etcd/ZooKeeper may be chosen to solve this strong consistency problem, as well as some self-developed algorithms.

So do you want to research? We thought, if we do such an accelerated service, do we really need strong consistency? If a data comes in and we export it, and we say you export it here and it’s not balanced, the load on the machine is not good, you should use the accelerated service on the other machine on the other machine, does the synchronization of this message really need to be that real time? In fact, we do not weigh it down, as long as the message is finally sent three or five minutes later, the matter can reach a very coordinated state, then the matter is solved. As long as he let me finally perceive this data, to achieve the final consistency, this matter can be. So we’re going to pull the source data. The addition of the version stamp ensures final consistency of the data.

Figure 16

When it comes to this final consistency, Seven Cows have a good set of two level cache frameworks to ensure this consistency. So what does this look like? First of all, your source data must be stored in a database. When you want to use it, if you pull the source data directly, you will definitely puncture it. Almost all databases are overwhelmed by the pressure to access every single request. The obvious thing to think about is caching. What does caching look like? First of all, a data over, synchronized to the mongo database, and then do two layers of caching, the first layer is local, go to the local, found that there is no local. Then go to the level-two cache server. Then took the discovery also did not, at this time to take inside Mongo. Then store the data in a cache, a local copy of the data, and set expiration times as you want, so that your data is buffered well, so that your requests to the database itself are only a few hundred or a thousand times every two minutes, because most of the requests are already cached.

What else is there to gain from this? For example, if the database fails, we have a secondary cache, so if the secondary cache fails, we have a local cache, so that if the primary service fails, my other services can continue to work. I’m actually decoupled from the master thing, I’m not affected by matser’s failure, and if data changes, the other components are notified.

Figure 17

So let’s see what is the metric in the cache that we ultimately want to maintain?

1. Guarantee status. Can not say that after the start can not close, first of all to ensure that the start can stop

2, to have the ability to allocate, automatic allocation or manual allocation, to have a distribution process

3, to ensure batch delivery, have the ability to adjust the sending small

4. Concurrency, how many accelerated services do you have to open, and how many concurrent requests are sent downstream

5. How big is the queue cache capacity, and how much more it will backpressure

6. How many concurrent messages are received

7. If you want to specify the machine manually, you can specify it

Figure 18

And then we take into account that if this task is on this machine, on that machine, it’s actually a waste of links, first the network card is very wasteful, and then start and stop and so on, the scheduling process is very wasteful. So as we just said, we are based on standardization of tasks, and each task is actually a fixed size scale. So if each task is already of a fixed size, we can steadily assign it to some machine.

Based on the simplest, first how many tasks are known, sort them, and then we give it enough machines to allocate according to the number of tasks it needs. There may be other issues, such as uneven configuration of the machine. It is best to be balanced, but there are some unavoidable situations. So you can manually assign a task to a specific machine, and then roughly configure the scheduling algorithm. Very simple, stable, we solve this problem with a balanced task standardization mechanism.

– Whitelist machine binding

  • A combination of manual and automatic

  • Coping with sudden traffic

  • Prevent jitter of large tasks

  • Make up for the differentiation brought about by machine configuration

  • The admin ability

In addition, we will add the ability of manual binding to the machine. What is the concept of this binding ability? First of all, you can combine manual and automatic. When we first started writing code, we blindly believed in the automatic process, that if I write a good algorithm, everything will be automatic, as long as the algorithm is good enough, it will be ok. But with truly online services, there are always surprises, and you definitely want to add manual capabilities. So what’s the advantage of manual capability? It’s dealing with sudden traffic. What if it’s too late to expand? Can be adjusted temporarily, very flexible, to prevent the jitter of large tasks.

For example, we have many large customers with huge data volume. We can assign some clusters and machines to them and bind them to them. In this way, the data of this big customer is relatively stable, so the task will not shake, will not affect other resources, and will not erode small customers. So the experience for both small and large customers is very good. And then there’s another thing that compensates for the differentiation that comes with machine accessories, where you can have some manual mechanics.

We’re also going to provide some apis to do what? It’s to get monitoring information — the number of tasks, the rate of success and failure, lag, and so on, and to provide some management interface, to look at the history of the problems, and then we can look at the problems and solve them.

– Create on demand and recycle resources

  • Data is created as it arrives, on demand.

  • If a task in the Producer instance does not receive data for a long time, the corresponding task is destroyed to release resources.

As mentioned above, what if there is a user to create, create for a while feel not very good? Don’t need, that don’t need this resource must waste, that resource how to recycle? In fact, there are a lot of small customers who are in the nature of trial, at this time we provide some free amount, so that they will soon run out of money, so soon they do not want to pay, and they do not have real needs. Then the data resource will occupy there, and many tasks will encounter the problem of resource recycling. What about recycling? The simplest is to make a prediction based on past statistics. For example, if he has been typing data, and suddenly he hasn’t typed data for a while, maybe a day, maybe how many hours, then you define that the data resource can be released, and then quickly start and stop.

-protobuf Indicates the serialization protocol

  • Communicates with upstream through the Protobuf protocol

  • Do not parse data repeatedly, remove THE CPU consumption of JSON parsing and so on

We’re using the serialization protocol, and we’re using protobuf here, which works really well. For comparison, if you use the JSON serialization protocol, the CPU consumption is about 10 times less than with protobuf. If you can use protobuf, try to use it, the experience is very good.

– Increased failure wait time

  • If the write fails, sleep for 1s and retry. If the write fails again, the sleep duration increases for 10s

  • If the write is successful, the sleep duration of the failure is reset to 1s

  • Effectively reduce downstream pressure

Last but not least, increase the failure wait time. For example, if you have a problem this time, the next time you try to access, the downstream is still dead. Then I will give you one second, if you still hang up, then I will wait another second, because I will give you a chance to wait. Because we often find that services like data play failed, a lot of time has been playing, there will be data accumulation, piles of data play over, so that the downstream will cause a collapse of the process. So we give him the opportunity to wait, give him a break of one second, another three seconds, wait for a pre-value of ten seconds, and if it comes back, we go back to the normal process. This effectively reduces the downstream pressure, allows the downstream to recover quickly, and then we can transmit this data quickly.

The Pandora acceleration system

  • No data is repeatedly written and no data is lost

  • Write more smoothly and remove burrs

  • Higher utilization of machine resources

  • More understand the downstream

  • There is no Lag!

So what does this accelerated service build in the end? First and foremost, there is no duplication of data and no data loss. To make data write more smooth, is to remove burrs. What is the concept of this burr? Those who have played big data, or have a certain amount of data, will feel that sometimes you have different machines, or different components, different instances to play, their time is not the same. Because depending on your request, this one might come back in ten seconds, that one might come back in a second, and this is the burr, and you would think that all the instances would come back, and you would think that this request is OK. Then we do this export acceleration server, is to solve this burr, so that the whole upstream and downstream data transmission efficiency is very smooth, has been at a very high level. At the same time we improve the utilization of machine resources. It also understands the downstream, because we write the entire downstream service in plug-in mode, and the end result is no latency.