This is a very common interview question. I have been asked several times in the interview before, and I have also been asked the following question:

How have you handled the sharp increase in your business?

How to handle a 10-fold or 100-fold increase in business volume?

How does your system support high concurrency?

How to design a high concurrency system?

What are the characteristics of high concurrency systems?

. .

And so on, there are many ways to ask.

I don’t think most people know how to answer this kind of question, but we can have a general idea to answer, is how to design the system around supporting high concurrency business scenarios? If you can think of this, then we can talk about how high concurrency can be supported at the hardware and software levels. In essence, this question is a comprehensive test of whether you know how to handle the details, whether you have the experience to handle it.

Faced with ultra high concurrency, the first machine to carry on the hardware level, split second micro service architecture design, code level, cache, peak clipping, decoupling problems to deal with, separation, depots table database level well, speaking, reading and writing, and stability to ensure that monitoring, fuse current-limiting downgrade it must want to have, a problem which handle in time. So from the whole system design aspect will have a preliminary concept.

In the early days of the Internet, a single architecture was sufficient to support daily business requirements, and all business services were deployed on a single physical machine in a single project. All the business including your trading system, member information, inventory, goods and so on are mixed together, when the flow once up, the problem of the single architecture is exposed, the machine is down all the business can not be used.Therefore, the cluster architecture began to emerge, the single machine cannot resist the pressure, the simplest way is horizontal expansion and horizontal expansion, in this way, through load balancing to distribute the pressure flow to different machines, is temporarily to solve the single point of service unavailability problem.

However, as the business evolves, maintaining all business scenarios in one project makes development and code maintenance more and more difficult. A simple requirement change requires the release of the entire service, code merge conflicts become more and more frequent, and online failures become more and more likely. The architectural pattern of microservices was born.

The cost of development and maintenance is reduced and the pressure on the cluster is increased by separating each individual business and deploying it independently, so that there is no longer a single change that needs to be made.

From the perspective of high concurrency, it seems that the above points can be classified as improving the overall system’s stress resistance through service splitting and the expansion of cluster physical machines. Then, the problems caused by splitting are also the problems that need to be solved by high concurrency systems.

The benefits and convenience of decoupling RPC microservitization are obvious, but at the same time communication between individual microservices needs to be considered. The traditional HTTP communication method is a great waste of performance, so it is necessary to introduce RPC framework such as Dubbo to improve the efficiency of the whole cluster communication based on TCP long connection.We assume that the ORIGINAL QPS from the client is 9000, then it is 3000 dispersed to each machine through the load balancing strategy. After the HTTP is changed to RPC, the interface time is shortened, and the QPS of the single machine and the whole machine is improved. RPC framework itself generally has load balancing and circuit breaker degradation mechanisms, which can better maintain the high availability of the whole system.

So with RPC out of the way, some of the fundamentals of Dubbo as basically the popular choice in the country are the next questions.

When the service is started, the provider and consumer connect to the registry according to the configuration information, register and subscribe to the registry respectively, and return the provider information to the consumer according to the service subscription relationship. The Consumer also caches the provider information locally. If the information is changed, the consumer will receive the push from the Register and generate the proxy object. At the same time, according to the load balancing policy, the consumer chooses a provider and records The Times and time of interface invocation to the Monitor regularly. The consumer invokes the interface through the proxy object. The provider deserializes the data after receiving the request, and then invokes the specific interface implementation through the proxyDubbo load balancing strategy Weighted random: Suppose we have A set of Servers = [A, B, C] whose corresponding weights are weights = [5, 3, 2] and the total weight is 10. These weight values are now tiled over one-dimensional coordinates so that the [0, 5) interval belongs to server A, the [5, 8) interval belongs to server B, and the [8, 10) interval belongs to server C. Then generate a random number in the range of [0, 10) through the random number generator, and calculate which range this random number will fall into. Each service provider has an active number, initially 0 for all service providers, plus 1 for each request received, and minus 1 for each request completed. After the service has been running for a while, the better performing service provider processes the requests more quickly, so the active number drops more quickly, and this service provider gets priority for new service requests. The provider’s invoke and random nodes generate a hash through the hash algorithm, and project the hash onto the ring of [0, 2^ 32-1]. Md5 is performed according to the key and then hash is performed during query. Get the invoker of the first node whose value is greater than or equal to the current hash.

Image from Dubbo official

Weighted polling: For example, if the weight ratio of servers A, B, and C is 5:2:1, server A receives five requests, server B receives two, and server C receives one of the eight requests. Cluster fault tolerance 1, Failover Cluster failure automatic switchover: The default dubbo fault tolerance scheme, when the call fails to automatically switch to another available node, the specific number of retries and interval can be set when the reference service, the default number of retries is 1, that is, the call is invoked only once. 2. Failback Cluster Fast Failback: After an invocation fails, logs and call information are generated, and null results are returned to consumer. The failed invocation is retried every 5 seconds through a scheduled task. The Forking Cluster invokes multiple service providers in parallel. Failsafe Cluster Failsafe Cluster invokes multiple service providers in parallel. Multiple threads are created through the thread pool, and multiple providers are invoked concurrently. The result is stored in the blocking queue. As long as one provider returns a result successfully, the result will be returned immediately. Call each provider one by one, and if one of them fails, throw an exception at the end of the loop. The role of message queues for MQ is well known: peak clipping, valley filling, decoupling. Relying on message queues, synchronous to asynchronous, reduces the coupling between microservices.

For some interfaces that do not need synchronous execution, asynchronous execution can be implemented by introducing message queues to improve the interface response time. After the transaction is completed, it is necessary to deduct the inventory, and then it may be necessary to issue points to members. In essence, the action of issuing points should belong to the performance service, and the requirements for real-time performance are not high. We only need to ensure the final consistency, that is, the successful performance of the contract. For requests of this nature, MQ can be used asynchronously, which increases the system’s ability to withstand stress.For message queues, how to ensure the reliability of messages and not lose them?

Message reliability Message loss can occur in three ways: the producer sends messages, MQ itself loses messages, and consumers lose messages.

Producer loss

The producer may lose the message if the program fails to send the message, an exception is thrown, and the message is not processed again. Or the sending process succeeds, but the network is intermittently disconnected during the sending process. MQ does not receive the message, and the message is lost.

Since synchronous sending generally does not occur in this way, we will not consider synchronous sending, and we will base our case on asynchronous sending.

Asynchronous sending can be divided into two ways: asynchronous callback and asynchronous no callback, no callback, after the producer sends no matter the result may cause message loss, and through the form of asynchronous sending + callback notification + local message table we can make a solution. The following is an example of a single scenario.

1. After placing an order, save the local data and MQ message table first. At this time, the state of the message is sending. 3. Sending result of MQ callback notification message. Update the database MQ sending status. 4. If the JOB polling fails to send messages after a certain period of time (based on the service configuration), retry. 5.In general, asynchronous callbacks are fine for most scenarios, but only for those scenarios where there is a complete guarantee that messages cannot be lost.

MQ lost

If a producer guarantees that a message will be sent to MQ, and MQ receives the message while it is still in memory, the message may be lost if it goes down without being synchronized to the slave node.

Such as RocketMQ:

RocketMQ can be flushed synchronously or asynchronously. The default is asynchronous flushing, which may cause messages to be lost before they reach the hard disk. You can set it to synchronous flushing to ensure message reliability, so that messages can be recovered from disk even if MQ fails.

Kafka, for example, can also be configured to:

Acks =all The producer is returned successfully only after all the nodes that participate in the replication receive the message. In this case, messages will not be lost until all nodes fail. Replicas =N; replicas=N; replicas=N; replicas=N; replicas=N; replicas=N This requires the leader to perceive that at least one follower is still connected and to set a value that is too large to allow the producer to retry after sending a failed message. Configuration can be used to achieve high availability of MQ, but it will cause performance loss.

Consumer loss

The consumer loses the message: The server is down when the consumer receives the message. MQ assumes that the consumer has consumed the message and does not send the message again. The message is lost.

RocketMQ by default requires an ACK reply from the consumer, whereas Kafka requires manual configuration to turn off automatic offset.

The consumer does not return an ACK for confirmation. The retransmission mechanism varies according to different SENDING intervals and times of MQ. If the number of retries exceeds the number of retries, a dead letter queue will be entered and manual processing is required. (Kafka doesn’t have these.)

Ultimate consistency of messages The ultimate consistency of distributed transactions can be achieved by transaction messages, which are the XA-like distributed transaction capabilities of MQ.

A semi-transactional message is one in which MQ receives a message from the producer, but does not receive a second acknowledgement that it cannot deliver.

The implementation principle is as follows:

If the transaction is successful, send a COMMIT to MQ. If the transaction fails, send rollback. If MQ has not received a commit or rollback from the producer for a long time, Finally, if MQ receives a commit, it can deliver the message to the consumer, whereas if it is rollback, the message is saved and deleted after 3 days.

Database for the whole system, ultimately all the traffic query and write fall on the database, database is the core of supporting system high concurrency capacity. How to reduce the pressure of database and improve the performance of database is the cornerstone of supporting high concurrency. The main way to solve this problem is through read/write separation and separate database and table.

The flow should be in the form of a funnel for the entire system. For example, our daily active users have 200,000 DAU, and the actual number of users who come to the bill of lading page every day may be only 30,000 QPS, and the number of QPS that finally turn into successful order payment is only 10,000. So for the system read is more than write, at this time can be read and write separation way to reduce the pressure of the database.

Read/write separation is equivalent to a database cluster approach that reduces the stress of a single node. But in the face of the rapid growth of data, the original single database single table storage mode has been unable to support the development of the whole business, at this time, it is necessary to divide the database into two tables. The vertical repository itself has been done for microservices, and most of the rest is a separate table solution.

For example, our daily orders are 10 million, and most of our scenarios come from the C end. We can use user_id as sharding_key, and the data query can support orders of the last 3 months. More than 3 months of archiving, then 3 months of data is 900 million, can be divided into 1024 tables, so the data of each table is about 1 million.

For example, if the user id is 100, then we hash(100) and modulo 1024 to the corresponding table.

Since primary keys are incremented by default, there must be conflicts between primary keys in different tables. There are several ways to consider:

Set the step size, for example, 1 to 1024 tables. Let’s set the base step size from 1 to 1024 so that the primary key falls on different tables without conflict. Distributed ids, implement a set of distributed ids generated algorithm or use open source such as snow after this kind of table do not use the primary key as a query, but each form alone as the only a new field in the primary key used, such as the order table order number is the only, no matter falls where tables are based on the order number as a query, update, too. Master creates a dump thread and pushes binglog to slave. Slave starts an I/O thread to read the binlog of the master. The slave starts an SQL thread to read relay log events and perform the synchronization on the slave. The slave records its binglogSince the default replication mode of mysql is asynchronous, the master database sends logs to the slave database and does not care whether the slave database has processed the logs. This will cause a problem. If the master database has been suspended and the slave database has failed to process the logs, the logs will be lost after the slave database becomes the master database. Two concepts emerge from this.

Full synchronous replication

The master writes to the binlog and forces the log to be synchronized to the slave, which is returned to the client after all the slave libraries have finished executing, but performance is obviously affected in this way.

Semi-synchronous replication

Unlike full synchronization, the logic of semi-synchronous replication is as follows: The slave database sends an ACK to the master database after successfully writing to the log. The master database considers the write operation complete after receiving at least one acknowledgement from the slave database.

Cache As a representative of high performance, the cache may be responsible for more than 90% of hotspot traffic in some special services. For some scenarios where concurrent QPS may be hundreds of thousands of seconds, the introduction of cache preheating can significantly reduce the pressure on the database. QPS of 100,000 May fail for a stand-alone database, but it is not a problem for caches such as Redis.

Take the second kill system as an example, the activity preheating commodity information can be cached in advance to provide query services, the activity inventory data can be cached in advance, the ordering process can be completely removed from the cache deduction, and then asynchronously written to the database after the second kill, the database bears too little pressure. Of course, after the introduction of caching, there are issues such as cache breakdowns, avalanches, and hot spots to consider.

Hot key problem The so-called hot key problem is that hundreds of thousands of requests to access a specific key on Redis suddenly cause traffic to become too concentrated and reach the upper limit of the physical network card, causing the Redis server to break down and triggering an avalanche.Solutions for hot keys:

Ahead of the hot key break up to a different server, reduce the pressure To join the second level cache, heat load ahead of the key data into memory, if redis downtime, memory queries Breakdown breakdown is the concept of a single cache cache key high concurrent access, expires lead to direct all requests to the db, the heat and the key problem is similar, The point is that expiration causes all requests to be called to DB.

Solution:

Lock update, such as A request to query A, find that the cache does not lock the key A, at the same time to the database query data, write to the cache, and then return to the user, so that subsequent requests can get data from the cache. The expiration time is added to the value to asynchronously refresh the expiration time.Cache penetration Cache penetration is the query for data that does not exist in the cache, and each request is called to DB as if the cache did not exist.

To solve this problem, add a layer of Bloom filter. The way bloem filters work is that when you store data, it maps it to K points in an array of bits using a hash function and sets them to 1.

In this way, when the user queries A again and the value of A is 0 in the Bloom filter, it returns directly, and no breakdown request will be made to DB.

Obviously, one of the problems with using a Bloom filter is misjudgment, because it’s an array, and multiple values can fall into the same place, so theoretically, as long as our array length is long, the probability of misjudgment will be lower, and the problem will be based on the actual situation.Cache avalanche When a massive cache failure occurs at some point, such as when your cache service goes down, a large number of requests come in and hit the DB directly, which can cause the entire system to crash, called an avalanche. Avalanches are different from the breakdown and hot key issues in that they mean that large caches are out of date.

Several solutions for avalanches:

Set different expiration times for different keys to avoid traffic limiting at the same time. If Redis breaks down, traffic limiting can be implemented to avoid a large number of requests hitting DB level 2 cache at the same time. The stability offusing

For example, the abnormal situation of suspended marketing service or a large number of interface timeouts cannot affect the main link of the order, and some operations involving deduction of points can be remedied after the event.

Current limiting

If some interfaces do not perform traffic limiting, the service may be directly suspended. Therefore, it is particularly important to make appropriate traffic limiting for the evaluation of the performance of each interface.

demotion

In fact, after the fuse can be said to be a kind of downgrade, to fuse for example, the marketing interface after the fuse of the downgrade scheme is not to call the marketing service in a short period of time, until the marketing recovery to call again.

plan

Generally speaking, even if there is a unified configuration center, it is not allowed to make any changes during peak hours. However, some changes can be made in emergencies through properly configured plans.

check

In view of the distributed transaction consistency generated by various distributed systems or the data anomaly caused by attacks, it is very necessary to check the platform to do the last bottom of the data verification. For example, whether the amount of the downstream payment system and the order system is correct to check, and whether the correctness of the data dropped by the middle man attack is guaranteed.

In fact, it can be seen that how to design a high-concurrency system itself is not difficult, but based on the knowledge you know, from the physical hardware level to the software architecture, code level optimization, what middleware to use to constantly improve the system’s ability to resist pressure.

But the question itself can bring more problems and break up of the service itself has brought the problem of distributed transactions, HTTP, the use of RPC framework bring the efficiency of communication, routing, the problem of fault tolerance, the introduction of MQ brought a message loss, backlog, transaction messages, order problem, the introduction of the cache and will bring consistency, avalanche, breakdown problem, In order to solve these problems, we must constantly add various measures to prevent and trace these problems, such as fusing, limiting traffic, demoting, off-line checking and pre-plan processing.

This article combines some contents of previous articles. In fact, I wanted to write this article at the very beginning, but I found the length was too large and the content was not easy to summarize, so I split several articles and started to write. This article can be regarded as a summary and summary of the previous content, not for water.

Public id: Java story programmer