This article focuses on actual combat or implementation. CAP is not involved and ACID is omitted.

This article is for basic distributed programmers:

  1. This article covers failover and recover of nodes in the cluster.

  2. This article covers the issue of transactions and opaque transactions.

  3. This article mentions twitter and Tweeter and raises a big data issue.

Because distribution is too big a topic and transactions are too big a topic, let’s start with a small node in a cluster.

Active nodes in the cluster and synchronization

How to determine whether a node is alive in a distributed system? Kafka puts it this way:

  1. This object can Keep sessions with Zookeeper through heartbeats.

  2. If this node is a slave node, it must reflect data changes to the master node as faithfully as possible. In other words, the master node must be able to write new data, the change of data replication, the so-called timely, do not drop too much oh.

Then, a node that meets the above two conditions can be considered alive or in-sync.

With regard to point 1, we are all familiar with heartbeat, so we can assume that a node cannot talk to ZooKeeper:

zookeeper-node:
var timer = 
new timer()
.setInterval(10sec)
.onTime(slave-nodes,function(slave-nodes){
    slave-nodes.forEach( node -> {
        boolean isAlive = node.heartbeatACK(15sec);
        if(! isAlive) { node.numNotAlive += 1;if(node.numNotAlive >= 3) { node.declareDeadOrFailed(); slave-nodes.remove(node); / / callback can be leader - node - app. NotifyNodeDeadOrFailed (node)}}elsenode.numNotAlive = 0; }); }); timer.run(); // You can call the leader-node-app or use a simple timer like the following: var timer = new timer().setinterval (10sec).ontime (slave-nodes,function(slave-nodes){
    slave-nodes.forEach(node -> {
        if(node.isdeadorFailed) {// Node cannot communicate with ZooKeeper}}); }); timer.run();Copy the code


On the second point, it’s a little more complicated. How do you do it? Look at it this way:

  • Data messages.

  • Op – log operation.

  • Shift the position/offset.

// 1. Consider messages // 2log(database or storage-device.) var timer = new timer() .setInterval(10sec) .onTime(slave-nodes,function(nodes){ var core-of-cpu = 8; // If it is too slow, it can be addedhash go!
    nodes.groupParallel(core-of-cpu)
    .forEach(node -> {
        boolean nodeSucked = false;

        if(node.acktimediff > 30sec) {// No reply within 30 seconds, node is stuck nodeSucked =true;
        }
        if(node.logoffsetdiff > 100) {// Node replication can't keep up, the gap is more than 100 data nodeSucked =true;
        }

        if(nodeSucked) {// The node is dead. Network-error is a common occurrence in distributed systems or when nodes fail. // Nodes. Remove (node); Fire-event-nodedeadorfailed (node); }}); }); timer.run();Copy the code

The state management of the above nodes is generally done by ZooKeeper, and the leader or master nodes also maintain some state.

So the leader or master node in the application only needs to pull the state from ZooKeeper. At the same time, is the above implementation necessarily the best? No, and most operations can be combined, but for the sake of describing whether a node is alive or not, there’s no problem writing this.

The node died, failed, and was out of sync.

Well, finally, failover and recover are mentioned. Failover is relatively easy, because there are other slave nodes, so data reading is not affected.

  1. Multiple slave nodes fail at the same time? There is no 100% availability. Data centers and computer LABS go down, network cables go down, hackers delete your roots, and you’re off the charts.

  2. If the master node fails, what about master-master? Keep -alived or LVS or write failover yourself. High availability architecture (HA) is a big deal again, so I won’t cover it in this article.

Let’s take a look at recover. Not only does the slave node restart the log to synchronize data, let’s look at the actual application, data requests (including read, write, update) fail.

You might say, try again, replay or just forget it. Okay, that’s fine. That’s all strategy, but do you really know how to do it?

A Bigdata problem

Let’s set the context for this discussion:

The problem: The stream of messages, such as Twitter’s tweets, is constantly streaming into our app, and there’s a requirement to process them: Reach is the number of unique people exposed to a URL on Twitter. Then, count the total number of reach tweets (urls) within 3 hours.

How do you solve it?

Pull out the people who have forwarded a certain microblog (URL) in a certain period of time, pull out the fans of these people, remove the repeated people, and then calculate the total number, which is the required REACH.

For simplicity, let’s ignore dates and see if this works:

/ * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- * 1. And forwarding microblogging (url) of v. * __________________________________ * / methods: getUrlToTweetersMap (String url_id) SQL: /* SELECT url_user.user_id as tweeter_id FROM url_user WHERE url_user.url_id =${url_id}Returns: [user_1,..., user_m]Copy the code


/ * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - * 2. The fans of big V * __________________________________ * / methods: getFollowers (String tweeter_id); SQL: /* database B */ SELECT users.id as user_id FROM users WHERE users.followee_id =${tweeter_id}Return: Tweeter followersCopy the code


/ * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- * 3. And Reach * * / var url = queryArgs __________________________________ getUrl (); var tweeters = getUrlToTweetersMap(); var result = new HashMap<String,Integer>(); Tweeters.foreach (t -> {// you can batchin+ concurrent read to optimize the performance of the following method var followers = getFollowers(t.twitter_id); followers.forEach(f -> { //hashTo heavy result. The put (f.u ser_id, 1); }); }); //Reachreturn result.size();
Copy the code


Yay, yay, yay! Anyway, we got Reach!

This leads to an important question that is often overlooked in all the talk about frameworks, designs, and patterns: the relationship between performance and database modeling.

  1. How much data is there? I wonder if the reader has any thoughts on database I/O for this question, or is it a shock? Computing reach is too intense for a single machine — it can require thousands of database calls and tens of millions of tuples. In the above database design, JOIN is avoided. In order to improve the performance of large V fans, batch/bulk can be used as a batch of large V fans, and then multiple batch concurrent read database. Here, the microblog is sent to the library where the forwarding table is located, which is separated from the fan library. What if the data is larger? Library sublist… OK, assuming that you are already familiar with traditional relational databases in terms of database partitioning and data routing (aggregation of read paths, distribution of write paths), or that you are familiar with Sharding technology, or that you have a good combination of HBase’s scale-out capabilities and consistency policies to solve its secondary indexing problems. Anyway, the store and read problem assuming you’ve solved that, what about distributed computing?

  2. Micro-blog this kind of application, the relationship between people into graph (net), how do you model storage? Rather than just this question, for example:

    How well might someone’s best friend’s best friend know someone?

See how Storm solves distributed computing and provides streaming computing capabilities:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); / / V to the fans - > database 2 TridentState tweetersToFollowers = topology. NewStaticState (getTweeterToFollowersState ()); topology.newDRPCStream("reach")
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
    .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter").shuffle() /* Large V fans need distributed processing */. StateQuery (Tweeterstollowers, New Fields()"tweeter"), new MapGet(), new Fields("followers"ParallelismHint (200) /* parallelismHint(200) /* parallelismHint(200)"followers"), new ExpandList(), new Fields("follower"))
    .groupBy(new Fields("follower"))
    .aggregate(new One(), new Fields("one"*/.parallelismhint (20). Aggregate (new Count(), new Fields("reach")); /* Count reach */Copy the code

It can be processed At most once.

To get back to the above example, one is to introduce a problem about distribution (storage + computing), and two is to illustrate the point: code farmers should focus on design and implementation, such as how Jay Kreps invented the Kafka wheel:]

If you are still a coder, let’s get real. Before we said recover, how many things can we recover?

Basic:

  • Node status

  • Node data

In order to simplify the problem, let’s consider the scenario of writing data. If we use write-Ahead-log to ensure data replication and consistency, what will we do about consistency?

  1. New data is written to the primary node.

  2. Run the log from the node to copy the new data. Do two things from the node: (1). Write the id offset of the data to log; (2). Just about to process the data itself, the slave node hangs.

Then according to the node survival conditions above, if the slave node is detected and recovered manually or by itself, it needs to synchronize its status and data before joining the cluster and continuing to play with its friends. Here’s the question:

If the data is synchronized according to the offset of the data in the log, then the node writes the offset before processing the data, but the lost-datas are not processed. If the data is synchronized after the log, then the lost-datas will be lost. In this case, it is called data processing at most once, which means that the data will be lost.

At least once

Ok, data loss is unacceptable, so let’s deal with it another way:

  1. New data is written to the primary node.

  2. Run the log from the node to copy the new data. Do two things from nodes: (1). Process data first; (2). The slave node hangs while trying to write the id offset of the data to log.

Here comes the question:

If duplicated log is used to synchronize data from a node, then the data is duplicated because the batch of data is processed and the data offset is not reflected in the log. This scenario, semantically speaking, means that the data is processed at least once, meaning that the data is processed repeatedly.

Process Exactly once

Transaction

Okay, data duplication is not tolerated, right? That’s a lot to ask. What about the strong consistency guarantee (in this case final consistency) that everyone is looking for? In other words, how is transactional capability guaranteed when updating data? Suppose a batch of data is as follows:

{transactionId:4 urlId:99 reach:5}Copy the code


Now we want to update this batch of data to the database or log, so the original situation is:

{transactionId :3 urlId:99 reach:3}Copy the code


If we can guarantee the following three things:

  1. Transaction ID generation is strongly ordered (isolation, serial).

  2. The same transaction ID corresponds to the same batch of data (idempotent, multiple operations on a result).

  3. A single piece of data will only appear in a batch of data (consistency, no omission, no duplication).

So, feel free to update:

{transactionId: 4 urlId:99 //3 + 5 = 8 reach:8}Copy the code



Note that the update is made with the ID offset and the data, so what guarantees this operation: atomicity. Does your database not provide atomicity? I mentioned it a little later.

Here is the update succeeded. If the update, the node hangs, then the database or log id offset is not written, data is not processed, such as node recovery, can rest assured to synchronize, and then join the cluster to play.

So it’s hard to make sure that data is only processed once, right?

What’s wrong with the implementation of the semantics of the guarantee “processing once” above?

Performance issues.


Batch policy has been used to reduce round-trip times to libraries or disks, so what is the performance issue here?

Consider using a master-master architecture to ensure the availability of master nodes, but when one master node fails, it takes time for another master node to take over. Assuming that slave nodes are synchronizing, snap! The primary node is down! Atomicity comes into play because the semantics are handled only once, fail, roll back, and then pull failed data from the master node (you can’t update it nearby because the batch might have changed, or you didn’t cache the batch at all). What’s the result? The old master has hung up and the new master has not been started, so the transaction is stuck until the master, the source of data synchronization, can respond to the request.

It’s not a big deal to forget about performance and call it a day.

You don’t seem to want more? Come on, what is a silver bullet?

Opaque-Transaction

Now, let’s pursue an effect like this:

A piece of data may fail in one batch of data (which corresponds to a transaction), but it may succeed in another batch of data. In other words, a batch of data must have the same transaction ID.

Take a look at the example, the old data is the same, but added a field: prevReach.

// Old data {transactionId: PrevReach :2 reach:3} {transactionId:4 urlId:99 reach:5}Copy the code



If the ID of the new transaction is larger than the ID of the new transaction, the new transaction can be executed.

PrevReach :3 //3 + 5 = 8 reach:8} transactionId:4 urlId:99Copy the code


Now let’s look at something else:

// Old data {transactionId: TransactionId :3 urlId:99 prevReach:2 reach:3}Copy the code


How to deal with this situation? Did you skip it? Because the transaction ID of the new data is the same as the transaction ID in the database or log, according to the transaction requirements, the data should have been processed this time, skip?

No, you can’t guess at this kind of thing. Think about a few properties we have, and the key one is this:

Given a batch of data, they belong to the same transaction ID. Given a transaction ID, at any time, it is associated with the same batch of data.

We should do this, considering the transaction ID and the new data stored in the transaction ID, so this batch of data may be separately or asynchronous processing, however, this batch of data corresponding to the transaction ID is always the same, so, even if the part A of the first batch of data processing, because everyone is A transaction ID, then A part of the former value is reliable.

So, we’ll rely on prevReach instead of the Reach value to update:

PrevReach :2 //2 + 5 = 7 reach:7}Copy the code


What did you find? Different transaction ids result in different values:

  1. When transaction ID is 4 and greater than transaction ID3 in storage, Reach is updated to 3+5 = 8.

  2. When transaction ID is 3, equal to transaction ID3 in storage, Reach is updated to 2+5 = 7.

This is Opaque Transaction.

This kind of transaction capability is the most powerful and can guarantee the transaction to commit asynchronously. So don’t worry about getting stuck, if the cluster:

Transaction:

  • Data is processed in batches, with each transaction ID corresponding to an identified, identical batch of data.

  • Ensure that the generation of transaction ids is strongly ordered.

  • Ensure that batch data is not duplicated or omitted.

  • If the transaction fails and the data source is lost, subsequent transactions are stuck until the data source is recovered.

Opaque ws-transaction:

  • Data is processed in batches, and each batch has a determined and unique transaction ID.

  • Ensure that the generation of transaction ids is strongly ordered.

  • Ensure that batch data is not duplicated or omitted.

  • If a transaction fails and the data source is lost, subsequent transactions are not affected unless the data source for subsequent transactions is also lost.

The design of the global ID is an art:

  • Redundant associated table ID to reduce join, until O(1) get ID.

  • Redundant date (long) fields to avoid order by.

  • Redundant filtering fields are used to avoid the embarrassment of no HBase secondary index.

  • Stores mod-hash values to facilitate data route writing at the application layer after database and table partitioning.

This content is too much, the topic is too big, not here to expand.

You now know the importance of Twitter’s Snowflake generating globally unique and ordered ids.

Two-phase commit

Two-phase commit with ZooKeeper is now an entry-level technique, so it won’t be expanded.

If your database does not support atomic operations, consider two-phase commit.

If you want to learn Java engineering, high performance and distribution for free, simple. Micro services, Spring, MyBatis, Netty source analysis of friends can add my Java advanced group: 478030634, group ali Daniel live explain technology, and Java large Internet technology video free to share to you.

conclusion

To be continued.