1. Es write process

Any node of ES can accept requests as coordinating node. After receiving the request, coordinating node will conduct a series of processing, and then find the corresponding primary shard through the _routing field. After the data is written by the primary Shard, the data is written and sent to each replica. Raplica writes data and returns the data to the primary Shard. The primary Shard then returns the data to the coordination node.

1.1 coordinating node

Nodes in ES that receive and forward requests are called coordinating nodes, and all nodes in ES can accept and forward requests. When a node receives a write or update request, it performs the following operations:

  1. Ingest Pipeline checks whether the request conforms to the pattern of an Ingest Pipeline. If so, the logic in the pipeline is executed. Generally, the document is preprocessed, such as format adjustment, adding fields, and so on. If the current node does not have the ingest role, the request needs to be forwarded to the node that does.
  2. Auto create index Check whether the index exists. If auto create is enabled, the index is automatically created. Otherwise, an error is reported
  3. Set routing to get the request URL or _routing in the mapping. If not, use _id. If _id is not specified, ES will automatically generate a globally unique ID. This _routing field is used to determine which shard of the index the document is assigned to.
  4. Build BulkShardRequest. Bulk Request contains a variety of requests (Index/Update/Delete), which need to be executed on different shards. Therefore, the coordination node will separate the requests according to shards. Requests on the same shard are aggregated together to build BulkShardRequest
  5. Sending the request to the primary Shard Because the current write operation can only be done on the primary, the request needs to be routed to the node where the primary shard resides
  6. Wait for the primary shard to return
1.2 Primary shard

Primary request of the entrance is in PrimaryOperationTransportHandler messageReceived, we look at some of the related logical process.

  1. Index or Update or Delete
  • Cycle through each Single Write Request, for each Request, depending on the type of operation (CREATE/INDEX/UPDATE/DELETE) to select different processing logic.
  • Create/Index is to add a Doc directly, Delete is to Delete a Doc directly according to the _id, Update is slightly more complicated, we will use Update as an example to introduce.
  1. Translate Update To Index or Delete
  • This step is unique to the Update operation, where the Update request is transformed into an Index or Delete request. First, an existing full field and value with _id Doc (if any) will be queried through GetRequest (depending on the _source field) and then merged with the Doc in the request. At the same time, you will get the Doc version number that you read, called V1.
  1. Parse Doc
  • Each field in the Doc is parsed. The ParsedDocument object is generated along with the UID Term. In Elasticsearch, _uid = type # _id, _id is visible to the user, whereas in Elasticsearch, _uid is stored. This part of the generated ParsedDocument also has system fields for Elasticsearch, most of which will be filled according to the current content, and some unknown ones will continue to fill the ParsedDocument later.
  1. Update Mapping
  • Elasticsearch has an automatic update Mapping function that takes effect at this step. The new Field that is not included in the Mapping is first selected, and then the automatic update Mapping is run. If it is allowed, the Mapping is updated.
  1. Get Sequence Id and Version
  • Since the current Shard is Primary, a sequenceID and Version are obtained from the SequenceNumber Service. SequenceID increments by 1 at the Shard level. SequenceID is used to initialize LocalCheckpoint after the Doc is successfully written. Version increments by 1 according to the maximum Version of the current Doc.
  1. Add Doc To Lucene
  • This step starts by locking a specific _UID and determines whether the Version of the _UID is the same as the Version obtained in the previous step of Translate Update To Index. If not, this means that the Doc was changed after the Doc was read. A VersionConflict exception is thrown, which is caught at the beginning of the Primary Node and re-executed from “Translate Update To Index or Delete”.
  • If the versions are the same, proceed. If there are already docs with the same ID, Lucene’s UpdateDocument(UID, Doc) interface will be called to delete the Doc based on the UID, and then Index the new Doc. If this is the first time to write, then call Lucene’s AddDocument interface directly to complete the Doc Index, which is also implemented by UpdateDocument.
  • One of the questions in this step is, how do you ensure the atomicity of deletion-then-add, and how do you avoid being refreshed in intermediate states? The answer is that a Refresh Lock is added before the start of the Delete, which prevents it from being refreshed until the Refresh Lock is released after the Add, thus ensuring the atomicity of the deletion-then Add.
  • Invert Index, Store Field, Doc Values, Point Dimension, invert Index, Store Field, Doc Values, Point Dimension, Writing in Lucene will be covered in a future article.
  1. Write Translog
  • After the Segment of Lucene is written, the TransLog is written in the form of keyvalue, where Key is _id and Value is Doc. When querying, if the request is GetDocByID, it can be directly read from TransLog according to the _ID, which meets the real-time requirement in NoSQL scenarios.
  • Note that this is only a TransLog written to memory; the logic of whether to Sync to disk or not will come later.
  • At the end of this step, the current SequenceID is marked as successful and the LocalCheckPoint of the current Shard is updated.
  1. Renew Bulk Request
  • Here, Bulk Request will be reconstructed. The reason is that UpdateRequest has been translated into Index or Delete Request. Then, all subsequent replicas only need to execute Index or Delete Request, and Update logic is not required. First, the logic in Replica is simpler and the performance is better. Second, the execution result of the same request in Primary and Replica is the same.
  1. Flush Translog
  • Depending on TransLog’s policy, you choose to execute either Flush immediately to disk or wait until later. The higher the Flush frequency, the higher the reliability, and the greater the impact on the write performance.
  1. Send Requests To Replicas
  • In this case, the new Bulk Request that has just been constructed is sent to multiple replicas in parallel and waits for the return of replicas. The Primary Node returns to the user only after all replicas return (which may be successful or fail). If a Replica fails, the Primary sends a Remove Shard request to the Master to Remove the Replica Shard from the available nodes.
  • In this case, the SequenceID, PrimaryTerm, and GlobalCheckPoint are passed to the Replica.
  • In the request sent to Replica, ActionName equals the original ActionName + [R], where R stands for Replica. Based on the difference of [R], you can find the Handler that handles Replica requests.
  1. Receive Response From Replicas
  • After requests in the Replica are processed, the LocalCheckPoint of the Primary Node is updated.