This is a very common interview question, but most people don’t know how to answer it. There are many ways to ask this question. You must have seen it and feel confused.

How have you handled the sharp increase in your business?

How to handle a 10-fold or 100-fold increase in business volume?

How does your system support high concurrency?

How to design a high concurrency system?

What are the characteristics of high concurrency systems?

. .

And so on, there are many ways to ask, but it is difficult to look at the interview type of questions, but we can have a general idea to answer, is around the support of high concurrency business scenarios how to design the system is reasonable? If you can think of this, then we can talk about how high concurrency can be supported at the hardware and software levels. In essence, this question is a comprehensive test of whether you know how to handle the details, whether you have the experience to handle it.

Faced with ultra high concurrency, the first machine to carry on the hardware level, split second micro service architecture design, code level, cache, peak clipping, decoupling problems to deal with, separation, depots table database level well, speaking, reading and writing, and stability to ensure that monitoring, fuse current-limiting downgrade it must want to have, a problem which handle in time. So from the whole system design aspect will have a preliminary concept.

Evolution of microservices architecture

In the early days of the Internet, a single architecture was sufficient to support daily business needs, and all business services were deployed on a single physical machine in a single project. All the business including your trading system, member information, inventory, goods and so on are mixed together, when the flow once up, the problem of the single architecture is exposed, the machine is down all the business can not be used.

Therefore, the cluster architecture began to emerge, the single machine cannot resist the pressure, the simplest way is horizontal expansion and horizontal expansion, in this way, through load balancing to distribute the pressure flow to different machines, is temporarily to solve the single point of service unavailability problem.

However, as the business evolves, maintaining all business scenarios in one project makes development and code maintenance more and more difficult. A simple requirement change requires the release of the entire service, code merge conflicts become more and more frequent, and online failures become more and more likely. The architectural pattern of microservices was born.

The cost of development and maintenance is reduced and the pressure on the cluster is increased by separating each individual business and deploying it independently, so that there is no longer a single change that needs to be made.

From the perspective of high concurrency, it seems that the above points can be classified as improving the overall system’s stress resistance through service splitting and the expansion of cluster physical machines. Then, the problems caused by splitting are also the problems that need to be solved by high concurrency systems.

RPC

The benefits and convenience of microservitization decoupling are obvious, but at the same time communication between microservices needs to be considered. The traditional HTTP communication method is a great waste of performance, so it is necessary to introduce RPC framework such as Dubbo to improve the efficiency of the whole cluster communication based on TCP long connection.

We assume that the ORIGINAL QPS from the client is 9000, then it is 3000 dispersed to each machine through the load balancing strategy. After the HTTP is changed to RPC, the interface time is shortened, and the QPS of the single machine and the whole machine is improved. RPC framework itself generally has load balancing and circuit breaker degradation mechanisms, which can better maintain the high availability of the whole system.

So with RPC out of the way, some of the fundamentals of Dubbo as basically the popular choice in the country are the next questions.

How Dubbo works

  1. When the service starts, the provider and consumer connect to the registry based on the configuration information to register and subscribe to the service, respectively
  2. Register returns the provider information to the consumer based on the service subscription, and the consumer caches the provider information locally. If the information changes, the consumer receives a push from the Register
  3. Consumer generates proxy objects, selects a provider based on the load balancing policy, and records the number and time of interface invocation to Monitor periodically
  4. After receiving the proxy object, the consumer makes an interface call through the proxy object
  5. The provider deserializes the data after receiving the request, and then invokes the concrete interface implementation through the proxy

Dubbo load balancing policy

  1. Weighted random: Suppose we have A set of servers = [A, B, C] whose corresponding weights are weights = [5, 3, 2] and the total weight is 10. Now, these weights are tiled on the one-dimensional coordinate values, [0, 5) interval belongs to server A, [5, 8) interval belongs to server B, and [8, 10) interval belongs to server C. Next, A random number between [0, 10) is generated through the random number generator. And then you calculate what interval that random number will fall into.
  2. Minimum active number: Each service provider has one active number. Initially, all service providers have 0 active number. For each request received, the active count is increased by one, and when the request is completed, the active count is decreased by one. After the service has been running for a while, the better performing service providers process the requests more quickly, so the active count drops more quickly, and they get priority for new service requests.
  3. Consistent hash: The provider’s invoke and random nodes generate a hash through the hash algorithm, and project the hash onto the ring of [0, 2^ 32-1]. Md5 is performed according to the key and then hash is performed during query. Get the invoker of the first node whose value is greater than or equal to the current hash.

Image from Dubbo official

  1. Weighted polling: For example, if the weight ratio of servers A, B, and C is 5:2:1, server A receives five requests, server B receives two, and server C receives one of the eight requests.

The cluster tolerance

  1. Failover Cluster: The default fault-tolerant solution of Dubbo. When the call fails, the call is automatically switched to another available node. The specific number of retries and interval can be set when the call is referenced.
  2. Failback Cluster Fast failure: After an invocation fails, logs and call information, returns an empty result to the consumer, and retries the failed invocation every 5 seconds through a scheduled task
  3. Failfast Cluster automatically recovers after a failure: The Failfast Cluster is invoked only once and an exception is thrown immediately after the failure
  4. Failsafe Cluster failure security: An exception occurs during invocation, but no log is generated and no result is returned
  5. The Forking Cluster invokes multiple service providers in parallel: multiple threads are created from a thread pool, multiple providers are invoked concurrently, the results are stored in a blocking queue, and the results are returned as soon as one provider successfully returns a result
  6. Broadcast Cluster: Invokes each provider one by one. If an error occurs on one provider, an exception is thrown after the invocation.

The message queue

You should be familiar with the functions of MQ, peak clipping, valley filling, decoupling. Relying on message queues, synchronous to asynchronous, reduces the coupling between microservices.

For some interfaces that do not need synchronous execution, asynchronous execution can be implemented by introducing message queues to improve the interface response time. After the transaction is completed, it is necessary to deduct the inventory, and then it may be necessary to issue points to members. In essence, the action of issuing points should belong to the performance service, and the requirements for real-time performance are not high. We only need to ensure the final consistency, that is, the successful performance of the contract. For requests of this nature, MQ can be used asynchronously, which increases the system’s ability to withstand stress.

For message queues, how to ensure the reliability of messages and not lose them?

Message reliability

Message loss can occur when a producer sends a message, MQ itself loses a message, or a consumer loses a message.

Producer loss

The producer may lose the message if the program fails to send the message, an exception is thrown, and the message is not processed again. Or the sending process succeeds, but the network is intermittently disconnected during the sending process. MQ does not receive the message, and the message is lost.

Since synchronous sending generally does not occur in this way, we will not consider synchronous sending, and we will base our case on asynchronous sending.

Asynchronous sending can be divided into two ways: asynchronous callback and asynchronous no callback, no callback, after the producer sends no matter the result may cause message loss, and through the form of asynchronous sending + callback notification + local message table we can make a solution. The following is an example of a single scenario.

  1. The local data and MQ message table are saved after the order is placed. The status of the message is in send. If the local transaction fails, the order fails and the transaction is rolled back.
  2. If the order is successfully placed, the client returns a success message and asynchronously sends an MQ message
  3. MQ callback notification message sending result, corresponding to update database MQ sending status
  4. JOB polling fails to send a message after a certain period of time (based on the service configuration)
  5. The monitoring platform configuration or JOB program processes messages, alarms, and manual intervention that fail to be sent for a certain number of times.

In general, asynchronous callbacks are fine for most scenarios, but only for those scenarios where there is a complete guarantee that messages cannot be lost.

MQ lost

If a producer guarantees that a message will be sent to MQ, and MQ receives the message while it is still in memory, the message may be lost if it goes down without being synchronized to the slave node.

Such as RocketMQ:

RocketMQ can be flushed synchronously or asynchronously. The default is asynchronous flushing, which may cause messages to be lost before they reach the hard disk. You can set it to synchronous flushing to ensure message reliability, so that messages can be recovered from disk even if MQ fails.

Kafka, for example, can also be configured to:

Acks =all The producer is returned successfully only after all the nodes that participate in the replication receive the message. In this case, messages will not be lost until all nodes fail. Replication. factor=N, the value is greater than1This requires that each partion has at least one2Min.insync.replicas =N, set to greater than1, which requires the leader to perceive that at least one follower is still connected, setting retries=N to a value that is too large to allow the producer to retry until it failsCopy the code

Although configuration can be used to achieve high availability of MQ itself, there is a performance cost, and configuration is a tradeoff based on the business.

Consumer loss

The consumer loses the message: The server is down when the consumer receives the message. MQ assumes that the consumer has consumed the message and does not send the message again. The message is lost.

RocketMQ by default requires an ACK reply from the consumer, whereas Kafka requires manual configuration to turn off automatic offset.

The consumer does not return an ACK for confirmation. The retransmission mechanism varies according to different SENDING intervals and times of MQ. If the number of retries exceeds the number of retries, a dead letter queue will be entered and manual processing is required. (Kafka doesn’t have these.)

Final consistency of messages

The ultimate consistency of distributed transactions can be achieved by transaction messages, which are the XA-like distributed transaction capabilities provided by MQ.

A semi-transactional message is one in which MQ receives a message from the producer, but does not receive a second acknowledgement that it cannot deliver.

The implementation principle is as follows:

  1. The producer first sends a semi-transactional message to MQ
  2. MQ returns an ACK acknowledgement after receiving the message
  3. The producer starts performing a local transaction
  4. If the transaction execution succeeds send commit to MQ, fail send ROLLBACK
  5. If MQ does not receive a commit or ROLLBACK from the producer for a long time, MQ checks the producer
  6. The producer queries the final state of transaction execution
  7. Submit a second acknowledgement based on the query transaction status

Finally, if MQ receives a second commit, the message can be delivered to the consumer, whereas if it is ROLLBACK, the message is saved and deleted after 3 days.

The database

For the whole system, ultimately all the flow of the query and write fall on the database, database is the core of the support system high concurrency capacity. How to reduce the pressure of database and improve the performance of database is the cornerstone of supporting high concurrency. The main way to solve this problem is through read/write separation and separate database and table.

The flow should be in the form of a funnel for the entire system. For example, our daily active users have 200,000 DAU, and the actual number of users who come to the bill of lading page every day may be only 30,000 QPS, and the number of QPS that finally turn into successful order payment is only 10,000. So for the system read is more than write, at this time can be read and write separation way to reduce the pressure of the database.

Read/write separation is equivalent to a database cluster approach that reduces the stress of a single node. But in the face of the rapid growth of data, the original single database single table storage mode has been unable to support the development of the whole business, at this time, it is necessary to divide the database into two tables. The vertical repository itself has been done for microservices, and most of the rest is a separate table solution.

The level of table

First of all, according to the business scenario, we will decide what field to use as the sharding_key field. For example, our daily orders are 10 million, and most of our scenarios come from the C terminal. We can use user_id as sharding_key, and the data query can support orders of the last 3 months. More than 3 months of archiving, then 3 months of data is 900 million, can be divided into 1024 tables, so the data of each table is about 1 million.

For example, if the user id is 100, then we hash(100) and modulo 1024 to the corresponding table.

ID uniqueness after partition table

Since primary keys are incremented by default, there will definitely be conflicts between different tables. There are several ways to consider:

  1. Set the step size, for example, 1 to 1024 tables. Let’s set the base step size from 1 to 1024 so that the primary key falls on different tables without conflict.
  2. Distributed ID, implement a set of distributed ID generation algorithm or use open source such as the Snowflake algorithm
  3. Instead of using the primary key as the query basis, each form adds a field as the unique primary key. For example, the order number of the order table is unique, and the order number will be used as the query basis regardless of which table it finally falls into, as well as the update.

Master slave synchronization principle

  1. After the master commits the transaction, it writes to the binlog
  2. Slave Connects to master to obtain the binlog
  3. Master creates a dump thread and pushes binglog to slave
  4. Slave Starts an I/O thread to read the binlog of the synchronized master and records it in the relay log
  5. Slave Starts another SQL thread to read relay log events and execute them on the slave to complete synchronization
  6. Slave Records its own binglog

Since the default replication mode of mysql is asynchronous, the master database sends logs to the slave database and does not care whether the slave database has processed the logs. This will cause a problem. If the master database has been suspended and the slave database has failed to process the logs, the logs will be lost after the slave database becomes the master database. Two concepts emerge from this.

Full synchronous replication

The master writes to the binlog and forces the log to be synchronized to the slave, which is returned to the client after all the slave libraries have finished executing, but performance is obviously affected in this way.

Semi-synchronous replication

Unlike full synchronization, the logic of semi-synchronous replication is as follows: The slave database sends an ACK to the master database after successfully writing to the log. The master database considers the write operation complete after receiving at least one acknowledgement from the slave database.

The cache

As a representative of high performance, caching may take over 90% of hotspot traffic in some special services. For some scenarios where concurrent QPS may be hundreds of thousands of seconds, the introduction of cache preheating can significantly reduce the pressure on the database. QPS of 100,000 May fail for a stand-alone database, but it is not a problem for caches such as Redis.

Take the second kill system as an example, the activity preheating commodity information can be cached in advance to provide query services, the activity inventory data can be cached in advance, the ordering process can be completely removed from the cache deduction, and then asynchronously written to the database after the second kill, the database bears too little pressure. Of course, after the introduction of caching, there are issues such as cache breakdowns, avalanches, and hot spots to consider.

Hot key problem

The so-called hot key problem is that suddenly hundreds of thousands of requests to access a particular key on Redis will cause traffic to become so concentrated that the maximum number of physical network cards will be reached, causing the Redis server to go down and trigger an avalanche.

Solutions for hot keys:

  1. Break up the hot key to different servers in advance to reduce the pressure
  2. Add level 2 cache and load hot key data into memory in advance. If Redis is down, go to memory for query

Cache breakdown

The concept of cache breakdown is that a single key has too many concurrent accesses, and when it expires, all requests are directed to DB. This is similar to the hot key problem, except that the expiration causes all requests to DB.

Solution:

  1. Lock update, such as A request to query A, find that the cache does not lock the key A, at the same time to the database query data, write to the cache, and then return to the user, so that subsequent requests can get data from the cache.
  2. The expiration time is added to the value to asynchronously refresh the expiration time.

The cache to penetrate

Cache penetration is the query for data that does not exist in the cache, and each request is called to DB as if the cache did not exist.

To solve this problem, add a layer of Bloom filter. The way bloem filters work is that when you store data, it maps it to K points in an array of bits using a hash function and sets them to 1.

In this way, when the user queries A again and the value of A is 0 in the Bloom filter, it returns directly, and no breakdown request will be made to DB.

Obviously, one of the problems with using a Bloom filter is misjudgment, because it’s an array, and multiple values can fall into the same place, so theoretically, as long as our array length is long, the probability of misjudgment will be lower, and the problem will be based on the actual situation.

Cache avalanche

When a massive cache failure occurs at some point, such as when your cache service goes down, a large number of requests come in and hit the DB directly, which can cause the entire system to crash, called an avalanche. Avalanches are different from the breakdown and hot key issues in that they mean that large caches are out of date.

Several solutions for avalanches:

  1. Set different expiration times for different keys to avoid simultaneous expiration
  2. Traffic limiting. If Redis is down, traffic limiting can be implemented to avoid DB crash caused by a large number of requests at the same time
  3. Level 2 cache, same scheme as hot key.

The stability of

fusing

For example, the abnormal situation of suspended marketing service or a large number of interface timeouts cannot affect the main link of the order, and some operations involving deduction of points can be remedied after the event.

Current limiting

If some interfaces do not perform traffic limiting, the service may be directly suspended. Therefore, it is particularly important to make appropriate traffic limiting for the evaluation of the performance of each interface.

demotion

In fact, after the fuse can be said to be a kind of downgrade, to fuse for example, the marketing interface after the fuse of the downgrade scheme is not to call the marketing service in a short period of time, until the marketing recovery to call again.

plan

Generally speaking, even if there is a unified configuration center, it is not allowed to make any changes during peak hours. However, some changes can be made in emergencies through properly configured plans.

check

In view of the distributed transaction consistency generated by various distributed systems or the data anomaly caused by attacks, it is very necessary to check the platform to do the last bottom of the data verification. For example, whether the amount of the downstream payment system and the order system is correct to check, and whether the correctness of the data dropped by the middle man attack is guaranteed.

conclusion

In fact, it can be seen that how to design a high-concurrency system itself is not difficult, but based on the knowledge you know, from the physical hardware level to the software architecture, code level optimization, what middleware to use to constantly improve the system’s ability to withstand pressure. But the question itself can bring more problems and break up of the service itself has brought the problem of distributed transactions, HTTP, the use of RPC framework bring the efficiency of communication, routing, the problem of fault tolerance, the introduction of MQ brought a message loss, backlog, transaction messages, order problem, the introduction of the cache and will bring consistency, avalanche, breakdown problem, In order to solve these problems, we must constantly add various measures to prevent and trace these problems, such as fusing, limiting traffic, demoting, off-line checking and pre-plan processing.

Python Chinese community as a decentralized global technology community, to become the world’s 200000 Python tribe as the vision, the spirit of Chinese developers currently covered each big mainstream media and collaboration platform, and ali, tencent, baidu, Microsoft, amazon and open China, CSDN industry well-known companies and established wide-ranging connection of the technical community, Have come from more than 10 countries and regions tens of thousands of registered members, members from the ministry, tsinghua university, Peking University, Beijing university of posts and telecommunications, the People’s Bank of China, the Chinese Academy of Sciences, cicc, huawei, BAT, such as Google, Microsoft, government departments, scientific research institutions, financial institutions, and well-known companies at home and abroad, nearly 200000 developers to focus on the platform.

Long press scan to add “Python Assistant”

Click here to become a community member