Before continuing, we’ll talk about how search is performed in a distributed environment.

It’s a little more complex than the basic create-read-update-delete (CRUD) request we talked about earlier.

A CRUD operation only processes a single document. The uniqueness of a document is determined by a combination of _index, _type, and routing-value (usually the default is the _id of the document). This means that we can know exactly which shard in the cluster holds the document.

Because you don’t know which documents will match the query (documents can reside on any shard in the cluster), search requires a more complex model. A search would have to query a sharded copy of each index we were interested in to see if there were any matching documents.

However, finding all the matching documents is only half the job. Before the search API returns a page of results, results from multiple shards must be combined into an ordered list. Therefore, a search is executed in two stages, called Query then Fetch.

The query phase

In the Query phase, the query is broadcast to each shard copy (original or copy) in the index. Each shard performs a local search and establishes a priority queue that matches the document.

A priority queue is simply an ordered list of the first n (top-N) matching documents. The size of the priority queue is determined by the paging parameters FROM and size. For example, the following search request requires a priority queue that can hold 100 document GET /_search {"from": 90, "size": 10}Copy the code

The process of this query is described in the graph distributed search query phase.

Figure 1 Distributed search query phase

The query phase consists of the following three steps:

1. The client sends a search request to Node 3, which creates an empty priority queue of length from+size.

2.Node 3 forwards the search request to the original or copy of each shard in the index. Each shard executes the query locally and the result is placed in an ordered local priority queue of size from+size.

3. Each shard returns the DOCUMENT ID and the sorting value of all documents in its priority queue to the coordination Node 3. Node3 merges these values into its own priority queue to produce global sorting results.

When a search request is sent to a Node Node, that Node becomes the coordination Node. The job of this node is to broadcast search requests to all relevant shards and consolidate their responses into a globally ordered result set. The result set is returned to the client.

The first step is to broadcast the request to a shard copy of each node in the index. Just like the Document GET request, the search request can be handled by the original or any copy of each shard. This is how more copies (when combined with more hardware) can improve the throughput of searches. For subsequent requests, the coordinating node polls all shard copies to split the load.

Each shard performs the query locally and establishes an ordered priority queue of length from+size — a length that means it has enough results on its own to satisfy the global request. Sharding returns a lightweight list of results to the coordinator node. It just contains the documentID values and the values that you need to sort, like _score.

The coordinating node merges these shard – level results into its own ordered priority queue. This represents the final globally ordered result set. At this point, the query phase ends.

The whole process is similar to merge sort, grouping sort and then merging together, which is ideal for distributed scenarios like this.

Note that an index can consist of one or more original shards, so a search request for a single index also needs to be able to combine results from multiple shards. A multiple or all index search works exactly the same way -- just with more sharding.Copy the code

Retrieve the phase

The query phase identifies documents that satisfy the search request, but we still need to retrieve those documents themselves. This is what the fetch phase does, as shown in the fetch phase of a distributed search.

Figure 2 distributed search fetch phase

The distribution phase consists of the following steps:

1. Coordinating nodes identify which document needs to be retrieved and issue GET requests to relevant shards.

2. Each shard loads the document and enriches it as needed, then returns the document to the coordination node.

3. Once all documents have been retrieved, the coordination node returns the result to the client.

The coordination node first decides which documents actually need to be retrieved. For example, if we specify the query {“from”: 90, “size”: 10}, the first 90 entries will be discarded and only the next 10 entries will be retrieved. These documents may come from one, some, or all of the shards associated with the original query request.

The coordination node establishes multi-point GET request for each shard holding relevant document and then sends the request to the shard copy in the processing query stage.

Fragment loading document body — _source field. If necessary, results are enriched and search fragments are highlighted based on metadata. Once the coordination node receives all the results, it aggregates them into a single reply response, which is returned to the client.

Deep paging

The query-and-fetch process supports paging by using the FROM and size parameters, but within a limited range.

Remember that each shard must construct a priority queue of length from+size, all of which is passed back to the coordination node. This means that the coordination node will find the correct size document by sorting the number of shards * (from + size) documents.

Depending on the number of documents, the number of shards, and the hardware used, deep paging of 10,000 to 50,000 results (1,000 to 5,000 pages) is possible. But for sufficiently large FROM values, the sorting process becomes onerous, using huge amounts of CPU, memory, and bandwidth. Therefore, deep paging is strongly discouraged.

In practice, “deep pagers” are also a very small minority. The average person stops after two or three pages and changes their search criteria. Those anomalies are usually the behavior of robots or web crawlers. They will continue to fetch page after page until the server is on the verge of crashing.

If you really need to get a lot of documents from the cluster, you can do this efficiently by setting the search type scan to disable sorting.

Search options

Some query-string optional parameters can affect the search process.

Preference (= preference)

The preference parameter allows you to control which shard or node is used to process the search request.

She accepts the following parameters _primary, _primary_first, _local, _only_node:xyz, _prefer_node:xyz, and _shards:2,3.

However, usually the most useful values are random strings that avoid the bouncing Results problem.

Imagine Bouncing Results while sorting your Results by the TIMESTAMP field and having two documents with the same TIMESTAMP. Since the search request is polled across all valid shard copies, the two documents may be in one order in the original shard and another order in the replica shard. This is known as bouncing Results: the order of results changes each time the user refreshes the page. To avoid this problem, always use the same shard for the same user. The method is to use a random string such as the user's session ID to set the preference parameter.Copy the code

timeout

Typically, the coordinator node waits to receive all shard responses. If one node encounters a problem, it slows down the entire search request.

The timeout parameter tells the coordinating node how long it can wait to give up and return the result. A partial result is better than nothing.

The return of the search request will indicate whether the search timed out and how many shards were answered successfully:

. "timed_out": true, (1) "_shards": { "total": 5, "successful": 4, "failed": 1 (2) }, ...Copy the code

(1) The search request times out.

(2) One out of five shards does not reply within the timeout period.

If all copies of a shard fail for other reasons — perhaps because of hardware failure — this will also be reflected in the _shards section of the reply.

Routing

When searching, you can specify one or more routing values to restrict the search to those shards instead of all the shards in the index:

GET /_search? routing=user_1,user2Copy the code

Search_type (search type)

While query_then_FETCH is the default search type, other search types can be specified for specific purposes, such as:

GET /_search? search_type=countCopy the code

1. Count

The count search type has only one query phase. You can use this query type when you don’t need search results and only need to know the number of documents that satisfy the query.

2. Query_and_fetch (query and fetch)

The query_AND_FETCH (query and fetch) search type combines the query and fetch phases into one step. This is an internal optimization option that can be used when the target of the search request is only a shard, such as when a routing value is specified. You can choose to use this search type manually, but doing so will have little effect.

3. Dfs_query_then_fetch and dfs_query_and_fetch

The DFS search type has a pre-query phase, which retrieves the item frequency from all relevant shards to calculate the global item frequency.

4.scan

The scan search type is used in conjunction with the Scroll API to efficiently retrieve a large number of results. It does this by disabling sorting.

Scan and scroll

The Scan search type is used in combination with the Scroll API to efficiently retrieve a large number of results from Elasticsearch without the cost of deep paging.

Scroll

A scroll search allows us to do an initial search and keep pulling results from Elasticsearch in bulk until there are no results left. This is a bit like cursors in a traditional database.

Traditional database cursor: A cursor is a data buffer created by the system for users to store the execution results of SQL statements. Each cursor area has a name, and the user can use SQL statements to retrieve records from the cursor one by one and assign them to the main variable for further processing by the main language. At its core, a cursor is a mechanism for extracting one record at a time from a result set containing multiple data records. A cursor is a private SQL workspace, that is, an area of memory used to temporarily store data affected by SQL statements. In plain English, the affected data is temporarily placed in a virtual table in an area of memory, which is called a cursor.Copy the code

A scroll search will create a snapshot in time. This snapshot does not contain any changes to index made after the initial search request. It protects the index from looking like it did at the beginning of the search by keeping old data files at hand.

Why Elasticsearch Scroll? When Elasticsearch responds to a request, it must determine the order of the docs and order the results of the response. Elasticsearch will not be a problem if the requested pages are small (say 20 docs per page), but if the requested pages are large (say 20 docs per page), Elasticsearch will have to extract all docs from pages 1 to 20 and remove docs from pages 1 to 19. Get the docs on page 20. And the way to do that is to use Scroll. Because Elasticsearch does something (determine the number of previous pages of the docs) for each request. So we can have Elasticsearch store this information for future query requests. The downside of this is that we can't store this information forever, because storage resources are limited. So Elasticsearch can specify how long we need to store this information.Copy the code

I can’t scan for you.

The most expensive part of deep paging is global sorting of results, but if you disable sorting, you can get all the results back at a low cost. To do this, you can use the Scan search mode. The scan mode allows Elasticsearch not to sort and returns a batch of results as long as there are still results to return from the shard.

To use scan-and-scroll, you need to perform a search request, set search_type to scan, and pass an Scroll parameter to tell Elasticsearch how long scrolling should last.

GET /old_index/_search? search_type=scan&scroll=1m (1) { "query": { "match_all": {}}, "size": 1000 }Copy the code

(1) Keep scrolling on for 1 minute.

The reply to this request does not contain any hit results, but contains a base-64 encoded _scroll_ID string. Now we can pass _scroll_id to the _search/scroll end to get the first set of results:

GET /_search/scroll? scroll=1m (1) c2Nhbjs1OzExODpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzExOTpRNV9aY1VyUVM4U0 <2> NMd2pjWlJ3YWlBOzExNjpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzExNzpRNV9aY1Vy UVM4U0NMd2pjWlJ3YWlBOzEyMDpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzE7dG90YW xfaGl0czoxOw==Copy the code

(1) Keep scrolling on for another minute.

(2) _scroll_id can be passed in body or URL, or as a query parameter.

Note that you specify again? Scroll = 1 m. The scrolling end time is refreshed each time we perform a scrolling request, so it only needs to give us enough time to process the results of the current batch rather than all the documents matching the query.

The scroll request reply contains the first batch of results. Although a size of 1000 is specified, more documents are obtained. When scanning, size is applied to each shard, so we get at most or size* number_of_primary_shards (size* number of primary shards) documents per batch.

Note: Scrolling requests also return a new _scroll_ID. Each time the next scrolling request is made, the _scroll_ID returned from the previous request must be passed.Copy the code

If no more hits are returned, all matched documents are processed.

Some Elasticsearch official clients provide a small assistant for scanning and scrolling. The little Assistant provides a simple encapsulation of this functionality.Copy the code

Reference: the definitive guide to ES