Fifth, distributed characteristics and distributed search mechanism

1. Cluster distributed model and the problem of master selection and split brain

We know that ES is a distributed architecture by nature

1.1 Distributed Features

  • ElasticSearchThe benefits of distributed architecture
    • Horizontal storage capacity expansion supportedPBLevel data
    • Improves system availability. Some nodes stop services, but the services of the whole cluster are not affected
  • ElasticSearchDistributed architecture of
    • Different clusters are distinguished by different names, the default nameelasticsearch
    • Through configuration files, or on the command line-E cluster.name=geektimeTo set

1.2 ElasticSearchNodes of the cluster

  • A node is aElasticSearchAn instance of the
    • It’s essentially oneJAVAprocess
    • You can run more than one machineElasticSearchProcesses, but production environments generally recommend running one on a single machineElasticSearchThe instance
  • Each node has a name, configured through a configuration file, or at startup time-E node.name=geektimeThe specified
  • Each node is assigned one after it is startedUIDAnd stored in thedatadirectory

1.3 Coordinating Nodenode

  • We call the request to the node, and the node that handles the request is calledCoordinating Node
    • Routing requests to the correct node, such as a request to create an index, requires routing toMasternode
  • The default is for all nodesCoordinating Node
  • By setting other types toFalseMake it becomeDedicated Coordinating Node

1.4 Data Nodenode

  • A node that can hold data is calledData Node
    • When a node is started, it is a data node by default. You can set thenode.data.falseban
  • Data NodeThe responsibility of the
    • Save fragmented data. Plays a crucial role in data expansion (byMaster NodeDecide how to distribute shards to data nodes.)
  • By adding data nodes
    • Can solve data horizontal scaling and solve data single point problems

1.5 Master Nodenode

  • Master NodeThe responsibility of the
    • Handles index creation/deletion requests/decides which node to allocate shards to/is responsible for index creation/deletion
    • Maintain and updateCluster State
  • Master NodeBest practices of
    • MasterNodes are very important and need to be considered in deployment to solve the problem of single points
    • Set multiple for a clusterMasterNodes/each node is only assumedMasterThe single role of

1.6 Master Eligible Nodes& Select the main process

  • Multiple clusters can be configured for one clusterMaster EligibleNode. These nodes can be used when necessary (e.gMasterNode failure, network failure) participate in the main selection process, becomeMasternode
  • Each node starts with one by defaultMaster eligiblenode
    • You can set thenode.master: falseban
  • When the first one in the clusterMaster eligibleWhen a node starts, it elects itselfMasternode

1.6.1 Master Eligible NodesThe process of node selection

  • Each otherpingEach other,Node IdThe low ones will become the nodes to be elected
  • Other nodes join the cluster but are not responsibleMasterRole of a node. Once it is found that the selected primary node is missing, a new one is electedMasternode

1.7 Cluster Information

  • Cluster status information (Cluster State), maintains the necessary information in a cluster
    • All node information
    • All indexes and their associatedMappingSettinginformation
    • Fragmented routing information
  • Cluster status information is stored on each node
  • However, onlyMasterA node can modify cluster status information and synchronize it to other nodes
    • Because any node can modify the information will causeCluster StateInconsistent information

1.8 Split brain problem

  • Split-BrainWhen a network problem occurs, one node cannot connect to another node
    • Node2Node3There will be a new electionMaster
    • Node1Oneself still asMasterTo form a cluster and update at the same timeCluster State
    • There are two nodes in the clusterMaster, maintain differentCluster State. When the network is recovered, the correct recovery cannot be selected

1.8.1 How to avoid split-brain problem

  • Qualifying an election condition, setQuorum (arbitration)Only if theMaster eligibleNumber of nodes is greater thanQuorum (arbitration)Before an election can be held
    • Quorum = (number of Master nodes / 2) + 1
    • When the threeMaster eligibleWhen settingdiscovery.zen.minimum_master_nodesA value of 2 can avoid brain split
  • Starting with 7.0, this configuration is not required
    • removeminimum_master_nodesParameters,ElasticsearchChoose the node that can form the arbitration
    • Typical primary elections now take a very short time to complete. Cluster scaling becomes safer, easier, and there are fewer system configuration options that can cause data loss
    • Nodes more clearly record their status to help diagnose why they cannot join the cluster or why they cannot elect a master node

1.9 Configuring the Node Type

A Node is a Master eligible Node,data Node, and ingest Node by default

The node type Configuration parameters The default value
master eligible node.master true
data node.data true
ingest node.ingest true
coordinating only There is no Set all three parameters tofalse
machine learing node.ml true(needenable-x-pack)

2. Failover of fragments and clusters

2.1 Primary Shard(Master sharding) – Increases system storage capacity

  • Fragmentation isElasticSearchThe cornerstone of distributed storage
    • Master fragment/copy fragment
  • With master sharding, data is distributed across all nodes
    • Primary Shard, the data of one index can be divided into multiple indexesData NodeTo achieve the horizontal expansion of storage
    • The shard (Primary Shard) is specified when the index is created and cannot be changed by default. To change the value, you need to rebuild the index

2.2 Replica Shard(Copy sharding) – Improves data availability

  • Data availability
    • By introducing replica sharding (Replica Shard) Improve data availability. Once the master shard is lost, the replica shard canPromoteMaster shard. The number of duplicate fragments can be dynamically adjusted. There is complete data on the entire node. If you do not configure a copy sharding, data may be lost once a node hardware failure occurs
  • Improves the read performance of the system
    • The replica shards are synchronized with the master shard. Add through supportReplicaTo a certain extent, the read throughput can be improved

2.3 Setting of fragment number

  • If you plan the number of primary shards and the number of duplicate shards for an index
    • The number of master shards is too small: for example, 1 is createdPrimary ShardtheIndex
      • If the index grows quickly, the cluster cannot scale the index by adding nodes
    • If the number of main fragments is too large, a single fragment is generatedShardThe capacity is small, resulting in too many fragments on a node, affecting performance
    • If the number of duplicate fragments is too high, the write performance of the cluster is degraded

2.4 Cluster Node Faults

2.4.1 Single-node Cluster

  • Copy cannot be sharded, cluster status is yellow (Yellow)
  • We can add a data node so that the replica shard can be allocated to the second node

2.4.2 Adding a Data Node

  • The cluster status turns green
  • The cluster has failover capability
  • Try toReplicaSet to 2 and 3 to view the status of the cluster

2.4.3 Adding another Data Node

  • The cluster has failover capability
  • MasterThe node determines which node the shard is allocated to
  • You can add nodes to improve the computing capability of the cluster

2.4.4 Failover

  • Three nodes together form a cluster. Contains an index set to 3Primary ShardAnd 1Replica Shard
  • Node 1 isMasterNode. The node is faulty unexpectedly. Cluster re-electionMasternode
  • Node3On theR0Ascension intoP0, the cluster turns yellow
  • R0andR1After the assignment, the cluster turns green

2.5 Cluster Health Status

  • Green: Health status, all master and replica shards are available
  • Yellow: Sub-healthy. All master shards are available but some replica shards are unavailable
  • Red: Unhealthy state, some master shards are not available

3. Sharding and its lifecycle

3.1 Internal principles of sharding

  • What is theESThe fragmentation of
    • ESThe smallest unit of work/is oneLuceneIndex
  • Some questions:
    • whyESThe search is in near real time (it is searched after 1 second)
    • ESHow to ensure that data is not lost in the event of a power outage
    • Why does deleting a document not immediately free up space

3.2 Invert index immutability

  • Inverted index is usedImmutable DesignOnce generated, it cannot be changed
  • Immutability brings the following benefits:
    • There is no need to worry about concurrent writes to files, avoiding the performance problems associated with locking
    • Once read into the kernel’s file system cache, it stays there. As long as there is enough space in the file system, most requests will go directly to memory and will not hit disk, which greatly improves performance
    • Caches are easy to generate and maintain/data can be compressed
  • Immutability, the challenge: If a new document needs to be searchable, the entire index needs to be rebuilt

www.zhulou.net/post/8005.h…

3.3 Lucene Index

  • inLuceneIn, a single inverted index file is calledSegment.SegmentIs self-contained and cannot be changed. multipleSegmentsAll together, calledLucenetheIndexLambda corresponds to lambdaESIn theShard
  • When a new document is written, a new one is generatedSegment, all of them will be queried simultaneouslySegmentAnd summarize the results.LuceneThere is a file in theSegmentsIs calledCommit Point
  • The deleted document information is saved in.delIn the file

3.4 what isRefresh

When ES writes to a document, instead of writing the document directly to the Segment, it writes to a Buffer called Index Buffer, which then writes to the Segment

  • willIndex bufferwriteSegmentThe process is calledRefresh.RefreshDoes not performfsyncoperation
  • RefreshFrequency: occurs once by default 1sindex.refresh_intervalConfiguration.RefreshThen the data can be searched. That’s whyElasticSearchIt’s called near real time search
  • If the system has a lot of data written, it will produce a lotSegment
  • Index BufferTrigger when occupiedRefresh, the default value isJVM10% of the

3.5 what isTransaction Log

  • SegmentThe process of writing to disk is time-consuming, and with file system caching,RefreshWhen will be firstSegmentWrite to the cache to open the query
  • To ensure that data is not lost. So in theIndexDocument at the same timeTransaction Log, starting with the high version,Transaction LogDefault drop disk. One per shardTransaction Log
  • inESforRefreshWhen,Index BufferBe cleared,Transaction logDon’t empty the

3.6 what isFlush

  • ES Flush & Lucene Commit
    • callRefresh.Index BufferEmpty andRefresh
    • callfsync, will cacheSegmentsWritten to disk
    • Empty (delete)Transaction Log
    • The default value is 30 minutes
    • Transaction LogFull (default 512MB)

3.7 Mergeoperation

With Flush(3.6 above), the Segment is written to disk. As time goes by, more and more Segment files are written to disk, so we need to do regular processing of these Segment files

  • ESandLuceneIt happens automaticallyMergeoperation
    • POST my_index/_forcemerge
  • SegmentA lot, need to be merged periodically
    • To reduceSegments/ Delete the deleted document

4. Analyze distributed query and correlation score

4.1 Operation mechanism of distributed search

  • ElasticSearchThe search will be conducted in two stages
    • Stage 1:Query
    • Stage 2:Fetch
  • Query-then-Fetch

4.2 Queryphase

  • The user sends a search request toESNode. After receiving the request, the node uses theCoordinating nodeThe identity of the node, randomly select 3 shards from the 6 primary and secondary shards, and send the query request

Here said that some of the issues, not random, see this blog www.elastic.co/guide/cn/el…

  • The selected shards are queried and sorted. Each shard is then returnedFrom + SizeThe sorted document Id and sorted value are givenCoordinatingnode

4.3 Fetchphase

  • Coordinating NodewillQueryPhase, the sorted document Id list obtained from each shard is sorted again. selectFromtoFrom + SizeThe Id of the document
  • In order toMulti getRequest to the corresponding shard to obtain detailed document data

4.4 Query Then FetchPotential problems

  • Performance issues
    • Number of documents to be queried on each shard =from + size
    • The final coordination node needs to deal with:number_of_shard*(from+size)
    • The depth of the page
  • Correlation score
    • Each shard calculates relevancy based on the data in its own shard. This can lead to scoring deviations, especially if there is a small amount of data. Correlation scores are independent of each other between shards. When the total number of documents is small, if the main shards are greater than 1, the more the main shards, the more inaccurate the correlation calculation

4.5 Methods to solve inaccurate calculation

  • If the amount of data is small, you can set the number of primary shards to 1
    • When the amount of data is large enough, the results are generally not biased as long as the documents are evenly distributed across the shards
  • useDFS Query Then Fetch
    • The search ofURLSpecified parameters in_search? search_type=dfs_query_then_fetch
    • Search the word frequency and document frequency of each fragment, and then perform a complete correlation calculation, which costs moreCPUAnd memory, low performance, generally not recommended

5. The sorting andDoc Values & Fielddata

By default, ElasticSearch will sort the score, so you can specify sort rules by specifying the sort parameter

5.1 the sorting

  • ElasticSearchBy default, the correlation score is used to sort the results in descending order
  • You can do this by settingsortingParameter, self-set sort
  • If not specified_score, is divided intonull

5.2 example

5.2.1 Single-field Sort

Let’s look at an example

POST /kibana_sample_data_ecommerce/_search {"size": 5, "query": {"match_all": {}}, "sort": [{"order_date": { "order": "desc" } } ] }Copy the code

We sort orders in descending order by time

5.2.2 Multi-field Sorting

POST /kibana_sample_data_ecommerce/_search {"size": 5, "query": {"match_all": {}}, "sort": [{"order_date": { "order": "desc" } }, { "_doc": { "order": "asc" } }, { "_score": { "order": "desc" } } ] }Copy the code

5.2.3 requires toTextType of fields to sort

We found an error indicating that we need to turn on the Fielddata property

# fieldData PUT kibanA_sample_data_ecommerce /_mapping {"properties": {"customer_full_name": {"type": "text", "fielddata": true, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }Copy the code

5.3 Sorting process

  • Sorting is done against the original content of the field. [Fixed] Inverted indexes don’t work
  • You need a straight index. Through the documentIdAnd field quickly get the original content of the field
  • ElasticSearchThere are two ways to do this
    • Fielddata
    • Doc Values(column storage, invalid for Text type)(That’s why it lets us turn on Fielddata when we sort fields of type Text.)

5.3.1 Doc Values VS Field Data

Doc Values Field data
When creating a Index, created with the inverted index Dynamically created at search time
Create a location Disk file JVM Heep
advantages Avoid large memory footprint Indexing is fast and does not take up extra disk space
disadvantages Reduces indexing speed and takes up extra disk space Too many documents, dynamic creation overhead, too much occupationJVM Heep
The default value ES 2.xafter ES 1.xAnd before

5.3.2 closedDoc Values

  • This function is enabled by defaultMappingSet off
    • Increase index speed/reduce disk space
  • If you open it again, you need to rebuild the index
  • When does it need to be closed
    • Make it clear that sorting and aggregation analysis are not required

6. Paging and traversal: From,Size,Search After & Scroll API

6.1 From/Size

  • By default, queries are sorted by relevance score and return the top 10 records
  • Easy to understand paging scheme
    • From: Starting position
    • Size: Total number of documents expected

6.2 Deep paging in distributed systems

  • ESDistributed by nature. Query information, but the data is stored on multiple shards, multiple machines,ESInnate need to satisfy the need for sorting (score by relevance)
  • When a query:From=990.Size=10
    • 1000 documents are first fetched on each shard. And then, throughCoordinating NodeAggregate all the results. Finally, the top 1000 documents are selected by sorting
    • The deeper the page, the more memory. To avoid the memory overhead of deep paging.ESThere is a default limit of 10000 documents
      • Index.max_result_window

6.3 Simple paging example

POST TMDB /_search {"from": 0, "size": 20, "query": {"match_all": {}}}Copy the code
  • ES reports an error when the maximum value exceeds 10000

6.4 Search AfterAvoid the problem of deep paging

We know that the deeper pages we have, the more memory we consume, so what about avoiding the problem of deep paging?

  • Search AfterIt can avoid the performance problem of deep paging and obtain the information of the next page in real time
    • Page count not supported (From)
    • You can only flip down
  • The first search needs to be specifiedsortAnd ensure that the value is unique (this can be done by adding_idGuaranteed uniqueness)
  • And then use the last document, the last documentsortValue to query

6.4.1 Search AfterAn example of

1. Data preparation
# Search After
DELETE users
POST users/_doc
{"name": "user1","age":10}
POST users/_doc
{"name": "user2","age":11}
POST users/_doc
{"name": "user3","age":12}
POST users/_doc
{"name": "user4","age":13}
Copy the code
2. UseSearch AfterQuery (first step search needs to be specifiedsortAnd ensure that the value is unique (this can be done by adding_idGuaranteed uniqueness)
POST users/_search
{
  "size": 1,
  "query": {
    "match_all": {}
  },
  "sort": [
    {"age": "desc"},
    {"_id": "asc"}
  ]
}
Copy the code

The search needs to specify sort, and the value is guaranteed to be unique (which can be guaranteed by adding the _id)

3. ReuseSearch AfterQuery (then use the last, last document’ssortValue for query)
POST users/_search {"size": 1, "query": {"match_all": {}}, "search_after": [ 13, "ka36BngBc4LlHCXeCOxJ" ], "sort": [ {"age": "desc"}, {"_id": "asc"} ] }Copy the code

And so on, each time filling in the last search sort value until the query result is empty

6.4.2 Search AfterHow to solve the problem of deep paging

  • Assume thatSizeIs ten
  • Query 990-1000
  • The number of documents to be processed is limited to 10 at a time by unique sorting value positioning

6.5 Scroll API(Mostly used in actual scenes)

The Scroll API is also provided by ElasticSearch to iterate over results. It specifies the Scroll survival time for the first invocation. Within this time, we can process the query results. Let’s enter the Scroll Id of the previous query to Scroll through the following data.

It also has some limitations, because the request creates a snapshot that cannot be found when new data is written

  • After a snapshot is created, new data cannot be detected
  • After each query, enter the previous oneScroll Id

6.5.1 Scroll APIA simple Demo

1. Insert three pieces of data
DELETE users
POST users/_doc
{"name": "user1","age":10}
POST users/_doc
{"name": "user2","age":20}
POST users/_doc
{"name": "user3","age":30}

POST users/_count
Copy the code

2. Based onScroll APICreate a snapshot
# # based on scroll API to create snapshot DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAFdgWOFN4NEJyd05ScnFmVjRzM3M2dzdvUQ = = POST/users / _search? scroll=5m { "size": 1, "query": { "match_all": {} } }Copy the code

3. Try writing another document
POST users/_count

POST users/_doc
{"name": "user4","age":40}
Copy the code

4. ScrollScroll API
POST /_search/scroll
{
  "scroll": "1m",
  "scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAFdgWOFN4NEJyd05ScnFmVjRzM3M2dzdvUQ=="
}

Copy the code

Write the scroll_id we saved in the previous step to this scroll_id, and we get the next data. Here our size is 1, we can only get one data at a time

And when we do that, at the end, we can’t get the data that we inserted in the middle

6.6 Different Search Types and Application Scenarios

  • Regular
    • The top part of the document needs to be accessed in real time. For example, query the latest order
  • Scroll
    • Require full documentation, such as exporting all data
  • Pagination
    • FromandSize
    • If deep paging is required, select itSearch After

7. Process concurrent read and write operations

7.1 Necessity of concurrency control

  • twoWebThe program updates a document at the same time, and the lack of effective concurrency results in the loss of changed data
  • Pessimistic concurrency control
    • Assume that there is a possibility of change conflicts. Resources are locked to prevent conflicts. For example, database row locks
  • Optimistic concurrency control
    • It is assumed that the conflict will not occur and will not block the operation being attempted. If data is modified while reading or writing, the update will fail. How does the application decide to resolve conflicts, such as retry updates, use new data, or report errors to users
    • ESOptimistic concurrency control is used

7.2 ESFor optimistic concurrency control

  • ESThe document in is immutable. If you update a document, the old document is marked as deleted and a brand new document is added. Contemporaneous documentationversionField plus 1
  • Internal version control
    • if_seq_no + if_primary_term
  • Use external versions (use other databases as primary data stores)
    • version + version_type = external

Elasticsearch Is a game about Elasticsearch. It’s a game about Elasticsearch.