In the last article, we briefly looked at some basic concepts of ES, such as what is index, document, sharding, segmentation and their number to some effect on clustering, but the introduction was not in-depth enough, so this article will further show some distributed features and some configurations of ES.

Link to the previous article (Basic concepts and principles of Elasticsearch) :Juejin. Cn/post / 690407…

What are the benefits of the distributed architecture of ES?

  • Horizontal storage capacity expansion to support PB level data
  • Improved system availability (some nodes fail without much impact on the whole cluster)

Distributed architecture

The default name is ElasticSearch. You can change the name in the configuration file or use -e cluster.name = Duters in the command line.

In addition, each Node is an instance of ElasticSearch, which is a Java process. In production environments, a machine runs an ES instance, and each Node is assigned a UID, which is stored in the data directory.

Coordinating Node

The Node that handles the request is called a Coordinating Node

  • When creating Index, we only mentioned that requests would be routed to a single Node, but that Node (Coordinating Node) would need to be routed to the Master Node.

  • All nodes are Coordinating nodes by default

  • We can make a Node a Dedicated Coordinating Node by setting the other types to False

We can view the corresponding cluster status and node sharding information through Cerebro:

Data Node and Master Node

The nodes that store the data, and store the fragmented data, play a key role in data expansion, that is, the Master node decides how to send the data to the data node, adding data nodes can solve the problem of horizontal data expansion and solve the data single point

  • The node is a data node by default. You can disable this by setting Node. data:false

  • The Master Node is responsible for creating and deleting the index/ deciding which Node to allocate the fragment to

  • Maintain and update cluster State

  • Practice: Master nodes are important, and to consider single points, it is best to have multiple master nodes in a cluster/each performing the single role of master

  • If one of the master nodes fails, there is an election process to select one master node. By default, each master node is a master eligible node (configure Node. master: false to disable it).

  • The first master eligible node in the cluster starts by electing itself as a Master node by default

  • Cluster status: node information, all indexes and related mapping and setting information, shard routing information. Each node stores the cluster status information, but only the master node can modify the information, and is responsible for synchronizing the information to other nodes.

The election process

1. Ping each other. The Node with the smallest Node ID is elected as the master Node

  • Search for master eligible nodes whose clusterStateVersion is higher than your own and send the vote to them

  • If clusterStatrVersion is the same, count the smallest node ID among master eligible nodes (including yourself) and send the vote to this node

  • If a node receives enough votes (that is, the minimum_master_nodes setting) and it votes to itself, the node becomes master and begins publishing the cluster status

2. After other nodes join the cluster, they do not assume the role of the master node. If the selected master node fails, you can re-elect the master node

How do you trigger an election?

  • The current Master Eligible node is not a Master
  • Network communication between nodes
  • The number of Master eligible nodes in the cluster that cannot connect to the master has reacheddiscovery.zen.minimum_master_nodesThe value set

Split brain problem: A classic network problem in distributed systems, when a network problem occurs, one node cannot communicate with other nodes,

As shown in the figure above, Node2 and Node3 re-elect the Master node, while Node1 itself forms a cluster as the Master and updates the cluster state, resulting in two Master nodes and two cluster states. Once the network is restored, there is no way to recover properly.

How to avoid brain split?

Set an election condition and Quorum arbitration. An election can be held only when the number of Master eligible nodes is greater than Quorum

  • Quorum = (number of master nodes /2) +1

  • When you eligible three master prompts, set discovery.zen.minimum_master_nodes to 2 to avoid split-brain symptoms

  • This configuration is not required for later versions

Some problems with sharding

We know from the previous article that documents are stored on a specific master and replica shard, but how do you decide which shard to store the document on?

Mapping algorithm

  • Ensure that documents are evenly distributed across the used shards, making full use of hardware resources to avoid idle parts and busy parts of the machine.

  • Potential algorithms:

    • Random /Round Robin: When you query document 1, if the number of fragments is large, you may need to query document 1 several times.
    • Maintains a document to shard mapping. Then, when the document data volume is large, the maintenance cost is high.
    • Real-time calculation: automatically calculates a value through document 1, and then obtains the document from the corresponding shard.

    How is ES routed?

    • Formula: shard = hash(_routing) % number_of_primary_shards

      1. The Hash algorithm ensures evenly distributed documents.

      2. The default _routing value is document ID.

      3. You can specify the _routing value.

      4. This is why the Primary number cannot be changed after Index Settings are set.

Immutability of inverted indexes

In addition, when we create an Index, the inverted Index is generated. The inverted Index is immutable and cannot be changed once created.

The benefits of this include:

  • There is no need to worry about concurrent writes to files, avoiding the performance problems associated with locking
  • Once read into the kernel’s file system cache, it stays there, and as long as the file system has enough space, most requests go straight to memory, avoiding hitting disk, which is a big performance boost
  • Caches are easy to generate and maintain, and data can be compressed

But this presents a challenge: if a new document needs to be searchable, the index needs to be rebuilt.

Lucene Index contains many segments, and segments are also a single inverted Index. Multiple segments are aggregated to form the corresponding fragment

The process of writing and deleting documents

So after we know the corresponding sharding routing formula, what is the specific process that we want to write a document? In the previous article, we only described the process from a macro perspective. Now let’s look at the process in relative detail:

When writing, since any Node is a coordinating Node, requests can be processed when they reach a Node, The ES uses the _routing parameter passed in (or _routing set in the mapping, or _id by default if neither parameter nor setting is present), Shard_num =hash(\ Routing)%num_primary_shards is used to calculate the shard to which the document is to be allocated, locate the corresponding primary shard in the cluster metadata, and route the request to this shard for document write operations. In addition, concurrent writes are required to the replica shard to complete a write.

  • So the question is, can a document be queried immediately after it has been written?

    Recall from our last article that sections can only be queried if they are closed, a process called refresh

    The answer is yes, which is why ES is a near-real-time representation. First of all, we know that every shard in ES is a Lucene Index. ES provides a refresh operation. ES would first write docunment to the index buffer (in memory), and then periodically call Lucene’s reopen (the new version is openIfChanged) to generate a segmented segment for the newly written data in memory, at which point all the documents processed could be retrieved. The refresh interval is controlled by the refresh_interval parameter, which defaults to 1s. Of course, you can also include refresh in the write request to indicate refresh immediately after the write, or you can call the REFRESH API to explicitly refresh.

  • Second question, how can data be stored reliably?

    We know that although we write the data in front, but the data is stored in memory, so this time if the ES server goes down, then this part of the data will be lost, in order to solve this problem, ES introduced translog: when we do the changes to the document, will be a document written to the segmentation of Lucene, If the reliability requirement is not very high, you can also set asynchronous disk drop to improve performance. By the configuration index. The translog. Durability and index. The translog. Sync_interval control), so that it can prevent the loss of data after the server goes down. Because translog is appending, performance is better than random writes. Unlike a traditional distributed system, Lucene is written before Translog because writing to Lucene may fail. In order to reduce the complexity of rollback of write failures (ES does not support transactions), Lucene is written first.

  • Third question, translog is going down, so when is it going down? What is the frequency?

    Flush_threshold_size is controlled by index.translog.flush_threshold_size (512mb by default). ES triggers flush every 30 minutes or when translog reaches a certain size. In this case, ES will execute refresh operation to generate segment in index buffer, and then call Lucene commit method to fsync all segments in memory to disk. At this point, the data in Lucene is persisted and the translog is emptied. (Version 6.x does not delete translogs for sequenceIDs.)

In addition to the above flush procedure, we discussed in the first article whether the segment size exceeds a certain threshold will be merged.

  1. The merge operationThe default refresh interval is 1s, so a large number of small segments will be generated in the process of continuously writing to the open segment. For this reason, ES will run a task to detect the segment in the current disk, merge the segment that meets the conditions, and reduce the number of segments in Lucene. Improves query speed and reduces load. Not only that, the merge process is also when the old doc is actually deleted after a document deletion or update operation. Users can also manually invoke the _forcEmerge API (POST my_index/__forcemerge) to actively trigger merge to reduce the number of segments in the cluster and clean up deleted or updated documents.
  2. In addition, ES has the multi-copy mechanism. Primary and secondary shards of a fragment cannot be sharded on the same node, which further ensures data reliability.

The same goes for deleting a document:

The operation mechanism of distributed search

The ES search is split into two parts

  • The first stage is Query

  • The second stage is Fetch

    The specific process is as follows:

    • The user sends a search request to the ES Node. After receiving the request, the Node, as a Coordinating Node, randomly selects three fragments among the 6 primary and secondary nodes and sends the query request.

    • The selected shard is queried and sorted, and each shard returns From+Size sorted document ids and collated values to the Coordinating Node.

    • In the Fetch phase, the Coordinating Node will first take the sorted document ID list obtained From each Fetch phase and reorder it, and select the document ID From From to From+Size.

    • Use multi_GET request to the corresponding shard to get detailed document data

    Question:

    • Performance issues:

      • Number of documents to be queried for each fragment =from+size
      • The final Coordinating Node needs to process: number of shards * (from + size) so many data
      • Deep paging, which has always been a problem for search engines (ES can try the Search After API)
    • Correlation score problem

      • Each shard calculates relevancy based on the data in its own shard, which leads to deviation of score

      How to solve the correlation score problem?

      • When the amount of data is small, you can set the number of master fragments to 1

      • When the amount of data is large enough, the results won’t vary much, as long as the documents are evenly distributed across the older shards

      • You can try DFS Query Then Fetch: search URL to add

        _search? The search_type= dfS_query_then_fetch parameter is used to collect word frequency and document frequency of each shard, and then perform a correlation score. However, this parameter consumes more CPU and memory, resulting in poor performance.

ES concurrency control

Look at an example:

Why is the inventory still 99 after the second thread completes the inventory?

First, database locks contain pessimistic locks and optimistic locks

ES takes the idea of optimistic concurrency control, assuming that no collisions will occur, that the operation being attempted will not be blocked, and that updates will fail if data is modified in the process of reading or writing. ES will leave it up to us to write applications to resolve conflicts, such as failure retries, error reports, etc.

  • ES provides two fields, _seq_NO and __primary_term, which we can use to make judgments in the program for version control

    For example, in the last query we got _seq_NO and primary_term 0 and 1 respectively, then we could write our DSL statement in the next update:

PUT my_idx/_doc/1? if_seq_no=0&if_primary_term=1
{
	"title":"DaLian University of Technology"
	"department" : "Control Science and engineering"
}
Copy the code
  • If we are in a situation where ES data is synchronized from something like MySQL, we can also compare the version number in MySQL with the version_type in ES for concurrency control.
PUT my_idx/_doc/1? version=100000&version_type=external
{
	"title":"DaLian University of Technology"
	"department" : "Control Science and engineering"
}
Copy the code

Refer to the link

  • www.nosuchfield.com/2019/03/18/…
  • zhuanlan.zhihu.com/p/94915597
  • Geek Time -ElasticSearch Core Technology and Combat (Ruan Yiming)