Basic concepts of Elasticsearch

Cluster
Cluster, an ES cluster is composed of multiple nodes, and each cluster has a cluster name as an identifier 'cluster.name: [elastic Search cluster name] 'An elastic search instance on the same network segment will decide which cluster to add to based on its cluster name.Copy the code
node
A node, an ES instance is a node. A machine can have multiple instances, so it is not a machine that is a node. In most cases, each node runs in a separate environment or virtual machine.Copy the code
index
Index, which is a collection of documentsCopy the code
shard
1. Sharding. ES is a distributed search engine. Each index has one or more sharding, and the data of the index is distributed to each sharding, which is equivalent to holding 2 in N cups in a bucket of water. Rebalance N shards on different nodes. (If you have 2 nodes and 4 master shards (no backups), each node will have 2 shards. Later you add 2 nodes, you will have 1 shard on each node. This process is called relocation and is automatically completed when ES senses it. Shards are independent, and each shard executes a Search Request for each Search Request. Each shard is a Lucene Index, so a shard can store only Integer.MAX_VALUE - 128 = 2,147,483,519 docs.Copy the code
replica
1. Replication, can be understood as a backup shard, corresponding to the primary shard 2. Primary and standby shards do not exist on the same node (to prevent single point of failure). By default, 5 shards and 1 backup are created for each index (5primary+5replica=10 shards) 3. If you have only one node, then 5 replicas cannot be unassigned, and the cluster status becomes Yellow. Replica's functions mainly include:Copy the code

Shard strategy

1.1 Transparent hiding properties of distributed architecture

Elasticsearch is a distributed architecture that hides complex processing mechanismsCopy the code
Fragmentation mechanism
We don't care what mechanism the data is sharded by, which shard does it end up inCopy the code
Shard copy
Replica policy is introduced in the Elasticsearch cluster to solve the problem that a single node cannot process all requests due to high access pressure. The copy policy creates redundant copies for each shard in the index. These copies are treated as primary shards for query processing. The copy policy ensures high availability and data security. To avoid data loss.Copy the code
Cluster Discovery
For example, when we start an ES process, when we start a second ES process, this process automatically discovers the cluster as a node and joins it. Shard load balancing: For example, if there are 10 shards and 3 nodes in the cluster, ES will distribute them evenly to ensure that each node has a balanced load request. Request routing: When indexing a document, the document will be stored in a master Shard. How does Elasticsearch know which shard to store a document in? When we create a document, how does it decide whether the document should be stored in shard 1 or shard 2? First of all, it certainly can't be random, otherwise we won't know where to look when we need to retrieve documents in the future. In fact, the process is determined by the following formula: 'shard = hash(routing) % number_of_primary_shards' Routing is a variable value, which defaults to the _id of the document, or can be set to a custom value. Routing uses the hash function to generate a number, which is then divided by number_of_primary_shards (number of primary shards) to get the remainder. The remainder, between 0 and number_of_primary_shreds -1, is where the document shard we are looking for is located.Copy the code

Expansion mechanism

Vertical expansion
Purchase more powerful serversCopy the code
Levels increase
More and more common servers are purchased, but the performance is relatively ordinary, but many common servers organized together, can constitute a powerful computing and storage capacityCopy the code

How to select an expansion mode
Es is generally expanded in horizontal expansion mode 2. In terms of cost, the difference between a server with small memory capacity and relatively low performance and a server with large memory capacity and good performance is not an order of magnitude. ElasticSearch, on the other hand, is a distributed system, and distributed exists to store a large amount of data. ElasticSearch expansion: Shard and Replica Shard. ElasticSearch has cluster Discovery, when we start a new node, it will find the cluster and automatically join the cluster. In addition, the ES cluster will automatically balance data among shards. In addition, the ES cluster will automatically move the data in the reduced node to other running nodes when the savings are reduced. Therefore, elasticSearch is used for horizontal expansion.Copy the code
Capacity expansion exceeds the capacity expansion limit
Now there are 6 shards, 3 primary shards and 3 replica shards. The 6 shards are stored in 6 servers. How to expand the capacity to 9 servers?Copy the code

Since the primary shard cannot be modified after the index is created, it is necessary to expand the number of servers from 6 to 9 to only increase the replica shard. You can modify the index configuration to change the number of replica shard to 2. At this point, the number of replica shards becomes 6. If 3 primary shards are added, there are 9 shardsCopy the code
Rebalance:
Adding nodes automatically balanced Create an index ` PUT 60.205.176.135:9200 / product {" Settings ": {" index" : {" number_of_shards ": 5, "number_of_replicas" : 1 } }Copy the code

} ‘when there is only one node:

The Master node
(1) Creating or deleting indexes (2) Adding or deleting nodes The primary responsibilities of the master node are related to cluster operations, such as creating or deleting indexes, keeping track of which nodes are part of the cluster, and deciding which shards are allocated to related nodes. A stable master node is very important for the health of the clusterCopy the code
The node equivalence
(1) peer nodes, each node can receive all of the request (2) automatic routing (3) response to collect Every node receives the request, after each node receives a request to put the request is routed to the relevant data of other nodes, to receive the original request node is responsible for collecting data and returned to the clientCopy the code
Sharding and copy mechanisms
An index contains multiple shards. 2. Each shard is a minimum unit of work and carries part of the data. Each shard is a Lucene instance with full indexing and request processing capabilities 3. 4. Primary shard and Relica Shard. Each document exists in only one primary shard and its corresponding Replica shard. Replica shard is a copy of the primary shard, which is responsible for fault tolerance and load balancing of read requests. 6. The primary shard is fixed when the index is created. The number of relica shard can be changed at any time. 7. The primary shard and its corresponding Replica shard should not be placed on the same machine, or they cannot be fault-tolerantCopy the code

Summary of shard&replica mechanism

1. Index contains multiple shards 2. Each shard is a minimum unit of work, carrying part of the data, Lucene instances, complete indexing and request processing capabilities 3. 4. Primary shard and Replica Shard. Each document must exist only in one primary shard and its corresponding replica shard. Replica shard is a copy of the primary shard, which is responsible for fault tolerance and load of read requests. 6. The number of primary shards is fixed when the index is created. 7. The default number of primary shards is 5, and the default replica shard is 1. By default, there are 10 shards and 5 primary shards. 8. The primary shard cannot be placed on the same node as the replica shard (otherwise, the node breaks down and both the primary shard and the replica are lost, thus it cannot be fault-tolerant). However, it can be placed on the same node as the replica shard of other primary shards. Therefore, this index is composed of 6 shardsCopy the code

How to query under multiple shards

A CRUD operation processes only a single document, whose uniqueness is determined by a combination of _index, _type, and routing values (usually the default is the _id of the document). This means that we know exactly which shard in the cluster contains this document. Search requires a more complex execution model because we do not know which documents the query will hit: those documents could be on any shard in the cluster. A search request must ask a copy of all the shards in the Index or Indices we are interested in to determine if they contain any matching documents. But finding all the matching documents is only half the job. Results in multiple shards must be combined into a single sorted list before the Search interface can return a page result. To do this, the search is performed as a two-phase process called Query then FetchCopy the code
The query phase
The query phase consists of the following three steps:Copy the code

The client sends a search request to Node 3, which creates an empty priority queue of size from + size. Node 3 forwards query requests to each master or replica shard of the index. Each shard executes the query locally and adds the result to the local ordered priority queue of size from + size. Each shard returns the IDS and sorted values of all documents in its priority queue to the coordination Node, Node 3, which merges these values into its priority queue to produce a globally sorted list of results. When a search request is sent to a node, that node becomes the coordination node. The task of this node is to broadcast query requests to all relevant shards and consolidate their responses into a globally sorted result set, which is returned to the client. The first step is to broadcast a shard copy of the request to each node in the index. As described in Document GET Requests, query requests can be processed by either a master shard or a replica shard, which is why more replicas (when combined with more hardware) can increase search throughput. The coordinator node will poll all shard copies to split the load on subsequent requests. Each shard executes query requests locally and creates a priority queue of length from + size -- that is, each shard creates a result set large enough to satisfy global search requests. Sharding returns a lightweight list of results to the coordination node containing only the collection of document ids and any values needed for sorting, such as _score. The coordination node merges these shard results into its own ordered priority queue, which represents the global ordering result set. This is the end of the query process.Copy the code
Obtaining results stage

The distributed phase consists of the following steps: the coordinating node identifies which documents need to be retrieved and submits multiple GET requests to the relevant shards for each shard to load and enrich the documents, and then returns the documents to the coordinating node if necessary. Once all documents have been retrieved, the coordination node returns the result to the client. The coordination node first decides which documents actually need to be retrieved. For example, if our query specified {"from": 90, "size": 10}, the first 90 results would be discarded, and only the 10 results starting with the 91st would need to be retrieved. These documents may come from one, more, or all of the shards associated with the original search request. The coordination node creates a multi-get Request for each shard that holds the relevant document and sends the request to the shard copy that also processes the query phase. Segmenting the document body -- the _source field -- use metadata and search snippet highlighting for rich results if necessary. Once the coordination node has received all the result documents, it assembles the results and returns them to the client as a single response.Copy the code

Create index for a single object

  1. In a single node environment, create an index. There are three primary shards and three replica shards
  2. The cluster status is yellow
  3. At this time, only 3 primary shards will be allocated to the only node, and the other 3 replica shards cannot be allocated
  4. The cluster works fine, but when a node goes down, all data is lost, and the cluster is unavailable to handle any requests

PUT /test_index { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 1 } }

Create index for 2 nodes

  1. Replica Shard Distribution: 3 primary shards, 3 replica shards, and 1 node
  2. Primary –> Replica synchronization

A copy of the policy

A copy of the principle of

Es Replicates fragments and synchronizes data to form replicas for better stability and DISASTER recovery. Adding replicas can better maintain data integrity in a cluster. The replica shard does the same thing as the master shard.

When an index is written, the replica shard does the same job as the master shard. New documents are first indexed into the master shard and then synchronized to all other replica shards. Increasing the number of copies does not increase the index capacity, but it can introduce new hardware capabilities to improve the query capacity.

Role of copy

  • Failover/cluster recovery

    If the node that holds the master shard dies, a replica shard is promoted to the master shard and the replica shard does the same job as the master shard when the index is written. New documents are first indexed into the master shard and then synchronized to all other replica shards. Increasing the number of copies does not increase the index capacity.

  • Load balancing by copy

    Search performance depends on the response time of the slowest node, so it is a good idea to try to balance the load on all nodes. If we just add one node instead of two, we end up with two nodes each holding one shard and one holding two shards doing twice as much work.

We can balance this by adjusting the number of copies. By allocating two copies instead of one, we end up with six shards, just enough to split evenly between three nodes

Differences between master copies

There is one job in ES that is unique to the Master: maintaining cluster state. Cluster status information is maintained only by the Master node and synchronized to all nodes in the cluster. Other nodes receive cluster status information from the Master node but do not maintain cluster status information. The cluster status includes the following information:

  • Cluster level configuration
  • Which nodes are in the cluster
  • Index Settings, maps, parsers, aliases, etc
  • The node position of each fragment in the index

Every node in the ES cluster can store the state of the cluster and know the node position of each fragment in the index. Therefore, any node in the whole cluster can know which node fragment to store a piece of data. And conversely, it knows which shard to read from. Therefore, Elasticsearch does not need to send read/write requests to the Master node, and any node can respond to the request as a pointcut for data read/write. In this way, the network pressure on the Master node is further reduced and the overall routing performance of the cluster is improved.

Selection of the number of copies

Distributed mode

model On behalf of the component advantages disadvantages
A master-slave mode ES/HDFS/HBase Simplified system design, Master as the authoritative node, responsible for maintaining the original cluster information. The Master node has a single point of failure, and the cluster size is limited by the management capability of the Master node.
No main mode Cassandra Distributed hash table (DHT), supporting thousands of nodes per hour to leave and join. Cluster does not have the concept of master, all nodes are the same role, completely avoid the entire system caused by a single point of problem instability. Multiple nodes may operate the same data, and data consistency may be difficult to ensure.

Why Master

Another simplification in a typical Elasticsearch scenario is that there are not that many nodes in the cluster. In general, the number of nodes is much smaller than the number of connections a single node can maintain, and the grid environment does not have to deal with nodes joining and leaving often. This is why the leader’s approach works better for Elasticsearch.

Description of the process

  1. Each node computes the lowest known node ID and sends a leadership vote to that node.
  2. If a node receives enough votes and the node votes for itself, it takes on the role of leader and starts publishing the cluster status.
  3. All nodes parametric vote and participate in the vote, but only the vote of the node that is qualified to be master is valid.

The overall process can be summarized as: election of temporary Master, if the local node is elected, wait to establish the Master, if other nodes are elected, wait to join the Master, and then start the node failure detector.

Choose the main details

  1. Trigger Master: Before entering the temporary Master of the election, the number of nodes must reach a quorum.
  2. Master: After the temporary Master is elected, a quorum of votes is required to confirm the election.
  3. Gateway Metadata: Sends a request to the Master node to obtain metadata. The number of responses obtained must reach the quorum, that is, the number of nodes participating in the metadata election.
  4. Master Advertises cluster status: The number of users who successfully advertises cluster status information to a node must reach the quorum.
  5. Whether rejoin is triggered during NodesFaultDetection: removeNode is executed when a node cannot be connected. Then check whether the quorum is up to the threshold (discovery.zen.minimum_master_nodes). If it is not up to the threshold, give up the Master status and rejoin to avoid brain split. Master expansion scenario: Currently, there are three master_ELIGible_nodes, and quorum can be set to 2. If you expand master_ELIGible_nodes to four, quorum increases to three. In this case, set discovery.zen.minimum_master_nodes to 3 before expanding the Master node. This configuration can be set dynamically: PUT /_cluster/ Settings {” persistent “: {” discovery.ze.minimum_master_nodes” : 3}} Master capacity reduction scenario: The process of capacity reduction and capacity expansion is completely opposite. The Master node needs to be reduced first, and then the quorum number needs to be reduced. Be very careful when modifying Master and cluster-related configurations! Incorrect configurations may result in split brain, bad data writing, or data loss. Note: in ES 7, the minimum_master_nodes configuration has been removed to allow Elasticsearch to select nodes for quorum.
  • Minimum_master_nodes =(master_ELIGIBle_nodes)/2+1

Split brain is the phenomenon of a cluster splitting into two clusters, with two masters. Minimum_master_nodes =N/2+1 This configuration means how many nodes need to communicate during the process of electing the Master. White points are the votes. If it doesn’t reach N/2+1, it’s a majority, and it loses the election and it runs again. The diagram below:

Node failure detection

The Master node has been identified. Non-master nodes have been added to the cluster. Procedure Node failure detection monitors whether a node is offline and then handles exceptions. Failure detection is an essential step after selecting the main process. Failure detection may result in split brain (dual master or multiple master).

We need to activate two types of failure detectors:

  • On the Master node, start NodesFaultDetection, or NodesFD for short. Periodically check whether nodes that join the cluster are active.
  • Start MasterFaultDetection (MasterFD) on a non-master node. Periodically check whether the Master node is active.

Both NodesFaultDetection and MasterFaultDetection detect the health of a node by sending ping requests periodically (default: 1 second). When a certain number of failures (default: 3) or a notification is received from the underlying connection module that the node is offline, The node exit event is processed.

NodesFaultDetection

Check whether the total number of nodes in the current cluster reaches the legal limit (more than half). If not, the system discards the Master identity and joins the cluster again. Why would you do that? Imagine the following scenario, as shown below.

Assume that a cluster of five machines generates a network partition. Two machines form a group and the other three form a group. The original Master is Node1 before the partition is generated. At this time, the nodes in a group of 3 will re-elect and successfully select Noded3 as the Master. Will there be a double Master? NodesFaultDetection is used to avoid the occurrence of two masters in the preceding scenario.

If the number of nodes in the current cluster is less than the quorum during the event that the node is offline, the primary node abandons the Master identity to avoid the occurrence of two active nodes

MasterFaultDetection

If the network connection of the master node is lost, try again.

How to trigger primary selection

  • The cluster start
  • Master failure The MasterFaultDetection running on the non-master node detects the Master failure. Then, run handleMasterGone in the registered listener and rejoin to select the Master again. Note that the main selection process is entered even if a node considers the Master invalid

Description of the synchronization process of master shard data to replicas

New, call | and delete requests are write operations. Write operations must be successfully performed on the primary shard before they can be copied to the related sub-shard.

The process of writing a single document (image from the official website) is shown below.

Here are the steps required to write a single document: (1) The client sends a write request to NODE I.

(2) NODEI uses the document ID to determine that the document belongs to fragment 0, which is known by the content routing table information in the cluster status

The primary shard for Shard 0 is on NODE3, so the request is forwarded to NODE3. (3) Write the primary shard on NODE3. If the write succeeds, it forwards the request in parallel to the sub-shards of NODE I and NODE2 and waits for the result to return. When all sub-shards report success, NODE3 reports success to the coordinating node, which in turn reports success to the client.

When the client receives a successful response, it means that the write operation has been performed on both the primary shard and all sub-shards.

The default policy for write consistency is quorum, where the majority of shards (where shard copies can be primary or secondary) are available at write time.

1. Write consistency principle

We send any add/delete operation, such as PUT /index/type/ ID, with a consistency parameter indicating what consistency we want.

PUT /index/type/id? consistency=quorum

If the primary shard is active, the write operation can be performed

All: Requires that all shards be active in order to perform the write operation

Quorum: also the consistency default, requiring that most of all shards be active before a write operation can be performed

2. For the quorum mechanism, ensure that most shards are available before writing. Int ((primary + number_of_replica)/2) + 1 takes effect only when number_of_replica>1

For example:

There are 3 primary shards, number_of_replica=1, there are 3+3*1 = 6 shards

Quorum = (int(3+1)/2) +1 = 3, so at least three of the six shards must be in the active state before the write operation can be performed.

3. If the number of nodes is smaller than the number of quorum nodes, the quorum may not be complete and therefore no write operations can be performed

For example: Three primary shards, replica=1, at least three shards are required to be in active state. According to the previous study, the SHARd&Replica mechanism must be on different nodes. If there are only two machines, is it possible? If all three shards are not allocated, write operations may fail.

Es provides a special scenario, which is valid only when number_of_replicas>1, because if you have one primary shard and replica=1, then you have two shards

(1+1)/2+1 = 2, requires 2 shards to be active, but there may be only one node, in which case only one shard is active, and if you do not treat it specially, our single-node cluster will not work

Supplementary notes:

1 primary shard, replica=3, quorum=(1+3)/2)+1 =3, require 1 primary shard +3 replica shard = 4 shard, Three of these shards must be active. What happens if there are only two machines at this point?

When P0, R0-0, R0-1, r0-2 and there are only two nodes, the primary shard and replica shard cannot be on the same node, so only two shards can be allocated to the two nodes at most.

At this time, the number of shards is less than quorum’s requirement, so the condition is not met. Es waits by default for new shards to be added until timeout

4. If quorum is not complete, wait, default 1 minute, timeout, 100 seconds,30 seconds

If quorum is not complete, es waits by default, expecting the number of active shards to increase, and eventually, timeout

We can actually add a timeout parameter to the write operation, like put /index/type/ ID? Timeout =30, this parameter means that when quorum is not complete, the timeout duration of ES can be shortened or increased. The default unit of timeout is ms. If you want it to be s, write? timeout=30s

Index return

The core idea of sub-shard recovery is to pull Lucene segments and Translog from the master shard for recovery. According to the direction of data transmission, the primary shard node is called Source and the sub-shard node is called Target.

Why do I need to pull the translog for the master shard? Because new writes are allowed during sub-sharding recovery, Lucene is copied from Lucene

From the moment of segmentation, the recovered sub-shard data does not contain the new content, which is in the translog of the master shard. Therefore, the sub-shard needs to pull the translog from the master shard node for replay to obtain the new content. This requires that the translog of the primary shard node not be cleaned.

Prior to version 2.0, sub-shard recovery went through three phases. Phase 1: Snapshot Lucene of the master shard and send it to target. New data is written to the translog of the master shard without blocking index operations.

Phase2: Takes a snapshot of the master fragment translog and sends it to the target for replay without blocking the index operation.

Phase3: Write locks are added to the master shard to send the remaining translog to target. At this point, the amount of data is small and the write process is blocked very short. In theory, it is relatively easy to achieve master/slave consistency as long as the process allows write operations to be blocked for a period of time.

But later (starting with version 2.0), when the translog.view concept was introduced, Phase3 was removed. Phase3 is removed, which is the operations phase, while preventing new writes to the Engine. This is not necessary because the standard index operation sends all operations to the shard being restored from the start of recovery. Replaying all the operations in the View taken at the beginning of the recovery is sufficient to ensure that no operations are lost.

Phase3, which blocks write operations, is removed and there is no write blocking during recovery. The next thing you need to do is solve phase! Timing and conflicts between write operations and Phase2 and phase2 replay operations. At the sub-shard node, phase! After completion, if the new index operation and translog replay operation are executed concurrently, there will be old and new data interchange due to timing. How to implement master and slave shard consistency?

Suppose that during the first phase of execution, a client index operation asks to write the contents of the docA to L, the master shard performs this operation, and the sub-shard does not perform it because it is not ready. During the second stage, the client index operation requires the content of docA to be 2. At this time, the sub-shard is ready. First, the new request to write docA to 2 is executed, and then the translog sent from the node where the master shard is sent repeatedly writes docA to 1. The specific process is shown in the figure below.

In this way, operations with timing errors are ignored, and for a particular doc, only the latest operation takes effect, ensuring that the master and slave shards are consistent.

Let’s look at each of the three types of write processing mechanisms.

  1. Index new documents

There are no conflict issues that need to be addressed.

  1. Update Check whether the version of the operation is smaller than that of the doc in Lucene. If the version is smaller than that of the doc in Lucene, abort the operation.

  2. Delete Check whether the version in this operation is smaller than the doc version in Lucene. If the version is smaller than the doc version in Lucene, abort this operation.