Elasticsearch transparently hides complex distributed mechanisms

Elasticsearch is a distributed system that can handle a large amount of data and hide the complexity of the distributed mechanism. Cluster discovery: which shard does the data go to? Cluster discovery: Which shard does the data go to? Cluster discovery: Which shard does the data go to? Shard load balancing (for example, suppose there are 3 nodes and a total of 25 shards need to be allocated to 3 nodes, ES will automatically distribute them evenly to maintain balanced read and write load requests of each node) Shard copy, request routing, cluster expansion, Shard redistribution

Elasticsearch vertical and horizontal expansion

Vertical expansion: Purchasing more powerful servers costs a lot and there will be bottlenecks. Suppose that the most powerful server in the world has a capacity of 10T, but when your total data volume reaches 5000T, how many powerful servers will you purchase for horizontal expansion: The solution often adopted by the industry is to purchase more and more ordinary servers with relatively mediocre performance, but many ordinary servers organized together can form a powerful computing and storage capacity

Cluster Health Check

The Elasticsearch cluster monitoring information contains many statistics. The most important one is cluster health, which is displayed as green, yellow, or red in the Status field

GET /_cluster/health
Copy the code

Create indexes

An index is a logical namespace that points to one or more physical shards. A shard is an underlying unit of work that holds only a fraction of the total data. A sharding is an instance of Lucene and a complete search engine in its own right

PUT /blogs
{
   "settings" : {
      "number_of_shards" : 3,
      "number_of_replicas" : 1
   }
}
Copy the code

Shard&replica mechanism sorting

  • Each shard is a minimal unit of work, carrying partial data, Lucene instances, complete indexing and request processing capabilities
  • When nodes are added or removed, the SHard automatically balances load among nodes
  • Primary shard and Replica Shard, each document must only exist in one primary shard and its corresponding replica shard, and cannot exist in multiple primary shards
  • Replica Shard is a copy of the Primary Shard. It is responsible for fault tolerance and load of read requests
  • The number of primary shards is fixed when the replica shard is created, and the number of replica shards can be changed at any time
  • The default number of primary shards is 5, and the default replica is 1. By default, there are 10 shards, 5 primary shards and 5 replica shards
  • The primary shard cannot be placed on the same node as the replica shard of one’s own replica shard (otherwise, the node breaks down and both the primary shard and the replica shard are lost, which cannot be fault-tolerant). However, the primary shard can be placed on the same node as the replica shard of another Primary shard

Illustration:

A server The cluster status value is yellow, and no copies are allocated to any node.

The cluster is up and running, but there is a risk of losing data in the event of a hardware failure, and it doesn’t make sense to keep both the original data and a copy on the same node, because once we lose that node, we lose all copies on that node as well.

Two Servers The cluster status value is green.

When the second node is added to the cluster, three replica shards will be allocated to that node. Each master shard corresponds to a replica shard. This means that when any node in the cluster fails, our data is intact. The cluster is now not only up and running, but also always available.

Three servers One shard on Node 1 and one shard on Node 2 has been migrated to the new Node 3, so there are now two shards on each Node instead of three. This means that the hardware resources (CPU, RAM, I/O) of each node will be shared by fewer shards and the performance of each shard will be improved.

Modifying the Number of copies

PUT /blogs/_settings
{
   "number_of_replicas" : 2
}
Copy the code

The number of master shards is determined when the index is created. However, read operations — searching and returning data — can be handled by both master and replica shards, so the more replica shards you have, the higher throughput you will have.

Of course, simply adding more replica shards to a cluster with the same number of nodes will not improve performance because each shard will get fewer resources from the nodes. You need to add more hardware resources to improve throughput.

With the node configuration above, we can lose 2 nodes without losing any data.

Rebalance. When the number of nodes in the cluster changes, this triggers the Rebalance of the ES cluster. The principle of Rebalance is to make the shards as evenly distributed as possible.

Fault-tolerant mechanism

Suppose we now close the first nodeThe closed node is a primary node. The cluster must have a master Node to work, so the first thing that happens is to elect a new master Node: Node 2.

When we shut down Node 1 we lost master shards 1 and 2. If we were to check the state of the cluster, we would see a red state: not all master shards are working properly.

However, full copies of these two master shards exist on other nodes, so the new master Node immediately promotes the corresponding replica shards on Node 2 and Node 3 to master, and the cluster state will be yellow. The process of promoting the master shard is instantaneous, like flipping a switch.

Why is the cluster status yellow instead of green? Although we have all three master shards, we also set up 2 replica shards for each master shard, and there is only one replica shard. Therefore, the cluster cannot be in the green state.

If we also shut down Node 2, the cluster can still run without losing any data, because Node 3 keeps a copy of each shard.

If we restart Node 1, the cluster will be able to redistribute the missing replicas and the cluster will return to green.

Master the election

Election time

  • The cluster starts initialization
  • The Master of the cluster crashes
  • An election is triggered when any node detects that the Master node in the current cluster is not recognized by the N /2 + 1 node

Cluster Node Role

  • Master node

The main responsibilities are cluster level operations, managing cluster changes, such as creating or deleting indexes, keeping track of which nodes are part of the cluster, and deciding which shards to assign to related nodes. The primary node can also serve as a data node, but a stable primary node is very important for the health of a cluster. By default, any node in a cluster can be selected as the primary node. Operations such as index data and search and query occupy a large amount of CPU, memory, and I/O resources. Separating the master node from the data node is a good choice. Make nodes eligible to be elected master by setting Node. master:true(default). The Master node is made globally unique and will be elected from nodes that qualify as Master. To prevent data loss, each primary node should know the number of eligible primary nodes. The default value is 1. To avoid multi-primary network partitions, configure discovery.zen.minimun_master_nodes. (master_eligible_nodes/2)+1

node.master:true
node.data:false
Copy the code
  • Data Node

A data node is a node that stores index data and performs data-related operations, such as CRUD, search, and aggregation. Data nodes have high REQUIREMENTS on CPU, memory, and I/O. Therefore, you need to monitor the status of data nodes during optimization. If resources are insufficient, you need to add new nodes to the cluster.

node.master:false
node.data:true
Copy the code
  • Coordinating Node

Client requests can be sent to any node in the cluster, and each node knows the location of any document, then forwards these requests, collects data and returns it to the client, and the node that handles the client requests is called the coordination node. The coordinating node forwards the request to the data node that holds the data. Each data node performs the request locally and returns the result to the coordinating node. The coordination node collects data and merges the results of each data node into a single global result. The process of collecting and sorting results can require a lot of CPU and memory resources.

node.master:false 
node.data:false
node.ingest:false
Copy the code
  • Voting Node

By setting node.voting_only=true in the configuration (only voting nodes, even if node.master =true, do not vote, but can still act as data nodes).

  • Master-eligible Nodes

When node.master is true, it indicates that the node is a master-eligible node eligible to participate in the election. ES can only have one master(that is, leader) when it runs normally. If there is more than one leader, brain split occurs.

  • Ingest node (preprocessing node)
  • Machine Learning Node

The election process

If a node finds that the majority of master-eligible nodes, including itself, believe that the cluster does not have a master, it can initiate a Master election.

7. Select the main process before X

Source process

    1. Filter the activeMasters list

The master is elected from the activeMasters list or masterCandidates list, so es needs to get both lists before voting. Ping_timeout Elasticsearch node members send Ping requests to all members of the cluster. By default, Elasticsearch waits for discovery.zen.ping_timeout. The activeMasters list is screened out, and the activeMaster list is the Master node of the current cluster that other nodes consider

List<DiscoveryNode> activeMasters = new ArrayList<>(); For (zenping. PingResponse PingResponse: pingResponses) {// Do not allow yourself to be placed in activeMasters' list if (pingresponse.master ()! = null && ! localNode.equals(pingResponse.master())) { activeMasters.add(pingResponse.master()); }}Copy the code
    1. Filter the masterCandidates list

The masterCandidates are the nodes that are eligible to be Master. If you specify the following parameters in elasticSearch. yml, this node is not eligible to be Master and will not be selected to the masterCandidates list

# did not become a master qualifications to configure a node node. The master: false List < ElectMasterService. MasterCandidate > masterCandidates = new ArrayList < > (); for (ZenPing.PingResponse pingResponse : pingResponses) { if (pingResponse.node().isMasterNode()) { masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); }}Copy the code
    1. Elect the Master node from the activeMasters list

If activeMasters are empty, there are no living master nodes. If the activeMasters list is not empty, ElasticSearch preferentially elects from the activeMasters list using the Bully algorithm. In the activeMasters list priority comparison, if the node is qualified as master, the priority is higher. If multiple nodes in the activeMaster list are qualified as Master, the node with the smallest ID is selected

private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) { if (o1.isMasterNode() && ! o2.isMasterNode()) { return -1; } if (! o1.isMasterNode() && o2.isMasterNode()) { return 1; } return o1.getId().compareTo(o2.getId()); }Copy the code
    1. The Master node is elected from the masterCandidates list

If the activeMaster list is empty, then the masterCandidates will be elected in the masterCandidates list. First, the masterCandidates list will determine whether the minimum number of members has reached discovery.zen.minimum_master_nodes. If the priority is achieved, the version number of the cluster state owned by the node is first compared, and then the ID is compared. The purpose of this process is to make the node with the latest cluster state become the master

public static int compare(MasterCandidate c1, MasterCandidate c2) {
    int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
    if (ret == 0) {
        ret = compareNodes(c1.getNode(), c2.getNode());
    }
    return ret;
}
Copy the code
    1. The local node is master

After the election, a quasi-master node is elected. The quasi-master node waits for votes from other nodes. If discovery.zen.minimum_master_nodes-1 node votes that the current node is master, the election succeeds. Must master waits for discovery. Zen. Master_election. Wait_for_joins_timeout time, if the timeout, then failed Local node is the Master of time, The Master node will enable NodeFaultDetection mechanism. It will scan all the members of the cluster periodically, remove the deactivated members from the cluster, and publish the latest cluster status to the cluster. After receiving the latest cluster status, the cluster members will make corresponding adjustments, such as re-selecting the Master shard. Perform operations such as data replication

    1. The local node is not a master

If the current node decides that it cannot be a master node in the current state of the cluster, it first forbids other nodes to join it and then votes for a quasi-master node. At the same time, monitor the cluster status published by the master (MasterFaultDetection mechanism). If the cluster status displayed by the master node is not the same as the master node that the current node considers to be the master node, then the current node initiates the election again.

Message type

1) Election Message: Sent to announce Election. 2) Answer (Alive) Message: Responds to the Election message.) 3) Coordinator (Victory) message: Sent by winner of the election to announce victory.)

When a process P receives a primary node failure message, process P does the following:

1) If process P has the maximum process ID, it broadcasts a Coordinator (Victory) Message to other nodes. Otherwise, process P sends the Election Message to the process whose id is larger than it. 2) If process P does not receive an Election Message after sending the Election Message, it broadcasts the Coordinator (Victory) Message to other nodes and becomes the master. 3) If process P receives an Answer (Alive) Message from a process whose process ID is higher than its own, it will lose the election and wait to receive a Coordinator (Victory) Message from another node. 4) If process P receives an Election Message from a process whose number is lower than its own, it returns an Answer (Alive) message to that node, starts the Election process, and sends an Election message to the process whose number is higher than its own. 5) If a Coordinator (Victory) Message is received by process P, the node that sent the Message is considered as the master process. zhuanlan.zhihu.com/p/110015509

Bully algorithm defects

Master feign death

If the Master node is overloaded with responsibilities, it may not be able to respond immediately to the members of the group, which is suspended animation. If the master node feigns death and elects a new master, that’s a split brain problem.

Split brain problems

Split brain problem refers to the occurrence of two or more Master nodes in a cluster.

The cluster is divided into two parts due to network reasons. One part is called Partition1, which contains P3,P5 and P6 nodes, and the other part is called Partition2, which contains P2,P1 and P4 nodes. These two partitions cannot communicate with each other due to network reasons, such as the short-term failure of the router.

Discovery.zen. minimum_master_nodes = (n / 2) + 1Copy the code

The split brain problem is a problem that all the clustering algorithms have to face, and the Elasticsearch clustering mechanism adopts the scheme of the smallest participating node to solve it. Assuming that the number of instances eligible to vote in the ElasticSearch cluster is N, the node must get N /2 +1 votes (in this case, 4) to become master. P6, P4, P4, P6, P4, P4, P6, P4, P4, P4, P6, P4, P4, P4, P6, P4, P4, P4, P6, P4, P4, P4, P6, P4, P4, P6, P4, P4

Elasticsearch 7.x master election Raft algorithm

Raft is a consensus algorithm. Consensus is when multiple nodes agree on something, even when some nodes fail, the network delays, the network splits.

  • leader election

Raft protocol a node is in one of three states at any one time:

leader

follower

candidate

Normally, a cluster has only one Leader and all other nodes are followers. Followers are passive receivers of requests and never send any active requests. Candidate is an intermediate state from Follower to Leader.

  • term

As can be seen from the above, which node serves as the leader is elected by voting. Each leader works for a period of time, and then a new leader is elected to continue to take charge. Much like elections in a democratic society, each new term is called a term, and this is also true in raft protocol, with the corresponding term.The term begins with an election, followed by a long or short period of normal Operation. As you can see from the figure above, tenure increases, which acts as a logical clock. In addition, term 3 shows a situation where a leader is not elected and a new election is called

  • The election process

If the follower does not receive a heartbeat from the leader after election timeout, the follower may not have elected the leader yet and everyone is waiting. Maybe the leader dies; Perhaps there is a network failure between the leader and the follower. The steps are as follows:

— 1. Add the current term on the node and switch to the candidate state

2. Vote for yourself

— 3. Send RequestVote RPCs to other nodes in parallel

— 4. Wait for the reply from other nodes

During this process, based on messages from other nodes, three possible outcomes can occur:

— 1. After receiving a majority of votes, the leader wins the election

— 2. If you are told that someone else has been elected, switch to follower

— 3. If a majority vote has not been received, the candidate remains on the ballot

Case oneAfter winning the election, the new leader will immediately send a message to all nodes to spread the message to prevent other nodes from triggering a new election. Going back to the voter’s point of view, how does a voter decide whether or not to vote on an election request has the following constraints:

— 1. A node can only cast one vote at most during a term of office

— 2. The candidate must know as much information as he or she does (more on this later in log Replication and safety)

— First come, first served

The second caseFor example, there are three nodes A, B, and C. A B initiates the election at the same time, while A’s election message reaches C first, and C votes for A. When B’s message reaches C, it cannot satisfy the first constraint mentioned above, that is, C will not vote for B, and A and B will not vote for each other obviously. After A wins, it sends A heartbeat message to B and C. Node B finds that node A’s term is no lower than its own term and converts to follower when it knows that node A has A Leader.

The third case, no node wins a majority vote, as in the following example:

There are four nodes in total. Node C and NodeD become candidates and enter term 4 at the same time, but Node A votes for NodeD and NodeB votes for Node C, which leads to A split vote. Everyone waits and waits until the election runs out. If there was a tie, the system would be unavailable for an extended period of time (no leader could handle client write requests), so Raft introduced randomized election timeouts to avoid the tie as much as possible. Meanwhile, in leader-based consensus algorithm, the number of nodes is odd, so as to ensure the emergence of majority.

log replication: In raft, the leader encapsulates client commands into log entries and copies these log entries to all follower nodes. Then you apply the commands in the log entry in the same order, and the status is guaranteed to be the same.Raft algorithm to ensure high availability, not strong consistency, but final consistency, the leader will constantly try to send log entries to the followers until all the nodes have the same log entries.

safetySafety: Raft guarantees that logs copied to most nodes will not be rolled back. Raft ultimately makes all nodes in the same state, which is a LIVENESS attribute.

  • Split brain problems

Raft’s solution to this situation is that although the old Leader cluster has three nodes left, the Leader still processes the data according to the original five nodes, so the old Leader receives the data and copies the data to the other four nodes. Unable to obtain data responses from more than N/2 Follower nodes (because they cannot be connected to the two nodes in machine room B), the client fails to write data requests to the old Leader. Since the data sent by the client to the new Leader in machine room B is A newly established cluster, it can be successfully written into the cluster. After the network communication between the two machine rooms A and B is restored, all nodes in machine room A, including the old Leader, access the cluster as followers. In addition, data on the new Leader is synchronized to complete data consistency processing.

The demo

ElasticSearch complete directory

Elasticsearch is the basic application of Elasticsearch.Elasticsearch Mapping is the basic application of Elasticsearch.Elasticsearch is the basic application of Elasticsearch Elasticsearch tF-IDF algorithm and advanced search 8.Elasticsearch ELK