One, foreword

ElasticSearch doc writes to Lucence. As a distributed system, ElasticSearch requires many important considerations when writing data, such as reliability, atomicity, consistency, real-time performance, isolation, and so on.

How does ElasticSearch work? Write to ElasticSearch

Second, lucence wrote

2.1 increase deletion

ElasticSearch gets a doc and calls the Lucence API to write it.

 public long addDocument();
 public long updateDocuments();
 public long deleteDocuments();
Copy the code

As shown in the code above, we can add, delete and modify the document using the above interface of Lucence. There is a core class IndexWriter in Lucence that handles data writing and indexing-related work.

Indexwriter = new IndexWriter(new Directory(paths.get ("/index")), new IndexWriterConfig());  //2. Document doc = new Document(); Doc. add(new StringField("empName", "empName", field.store.yes)); Doc. add(new TextField("content", "operated on a menu ", field.store.yes)); //3. Add writer.adddocument (doc); / / 4. Submit writer.com MIT ();Copy the code

A Directory is a local file system, a database, a distributed file system, etc. The default implementation of ElasticSearch is a local file system. Document: Document is a Document in ES, and FiledType defines a number of index types. Here are a few common types:

  1. Stored: Stores the original contents of the field
  2. IndexOptions :(NONE/DOCS/DOCS_AND_FREQS/DOCS_AND_FREQS_AND_POSITIONS/DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) Store word frequency, location information, etc.
  3. DocValuesType: a column store of a forward index that creates a DOCID to the field.
  4. Some other types

IndexWriter: IndexWriter is persisted and searchable only after doc commits. IndexWriterConfig: IndexWriterConfig is responsible for some of the overall configuration parameters and provides parameters for easy functionality customization:

  1. Similarity: This is the core parameter of the search, implementing this interface allows users to customize scores. Lucence implements tF-IDF and BM25 algorithms mentioned in the previous article by default.
  2. MergePolicy: MergePolicy. We know that ElasticSearch merges to reduce the number of segments.
  3. IndexerThreadPool: Management of thread pools.
  4. FlushPolicy: FlushPolicy.
  5. Analyzer: Custom word divider.
  6. IndexDeletionPolicy: Commit management.

New system default fields for ElasticSearch to support distribution

  1. _uid is the primary key. You can specify the Doc ID at write time. If you do not specify the Doc ID, the system automatically generates a unique UUID.
  2. _version, version field, version to ensure that changes to the document are performed correctly, useful when updating the document.
  3. _source: original information. If reindex is not needed in future maintenance, you can disable this field to save space
  4. _routiong, route field.
  5. Other fields

2.2. Concurrency model

Select * from indexWriter; select * from indexWriter; So how exactly is it managed?

2.2.1. Basic operations

Key points:

  • DocumentsWriter handles write requests and assigns a specific thread, DocumentsWriterPerThread
  • DocumentsWriterPerThread has a separate memory space for processing documents. DocumentsWriter triggers some flush operations.
  • The in-memory buffers In the DocumentsWriterPerThread will be flush into separate seinterfaces files.
  • For this design, multi-threaded writes, for pure new document scenarios, all data will not conflict, very suitable for isolated data writes

2.2.2 update

A Lucene update is not the same as a database update. A Lucene update is a query that is deleted and then added.

  • Allocate a thread of action
  • Perform the delete in the thread
  • Perform the addition in the thread

Then delete

Lucence maintains a global delete table. Each thread also maintains a delete table. They synchronize data in both directions

  • An UPDATE delete records the deleted data internally and then synchronizes it to the global table.
  • The deletion of a DELETE is performed at the Global level and asynchronously synchronized to the thread.
  • In Lucene segments, data is not actually deleted. A file record is maintained inside the Segment, which docid files are deleted. In merge, doc files are deleted.

2.2.4 flush and commit

Each WriterPerThread creates a segment file according to flush policy. The segment file is not visible and must be committed by indexWriter before being searched. Note here: ElasticSearch flush corresponds to Lucene’s flush, ElasticSearch flush corresponds to Lucene’s commit, ElasticSearch uses other methods to make segments readable while refresh.

2.2.5 merge

Merge is the act of merging a segment file to improve the efficiency of a query and actually delete a document.

2.2.6 summary

A single ElasticSearch fragment corresponds to a complete Lucene index, and a lucene index corresponds to multiple segments. When we build the same lucene index, there may be multiple threads in concurrent build the same lucene index, each thread will correspond to a DocumentsWriterPerThread at this time, Each DocumentsWriterPerThread corresponds to an index buffer. After flush, a DocumentsWriterPerThread generates a segment.

Write ElasticSearch

3.1. View ElasticSearch request at macro level

The process of writing ElasticSearch was discussed in the previous article

When writing a document, routing rules will send the document to a specific Shard to establish lucence.

  • This section describes how to send requests from the Primary Shard to multiple replicshardgits after the request is successfully executed on the Primary Shard
  • After the write request is successfully executed on multiple Replica shards and returned to the Primary Shard, the write request is successfully executed and the result is returned to the client

Note that the Write latency = master fragment latency + Max (Replicas Write), that is, if there are Replicas in the Write performance, it is the sum of the Write latency of at least two fragments.

3.2. Detailed process

3.2.1 Internal process of node coordination is shown in the figure above:

  • The coordinating node checks the request first and rejects it if there is a problem. There are length verification, mandatory parameters, type, version, ID and so on.
  • Pipeline, the user can customize the processor, such as field cutting or new field, but also support some scripting languages, you can view the official documentation.
  • If automatic index creation is allowed (which is allowed by default), the index is created first and sent to the master node. The master must wait for a successful response before moving to the next process.
  • Request preprocessing, such as whether the id will automatically generate, route, get the whole cluster information, and check the cluster status, such as cluster master does not exist, will be rejected.
  • RequestsByShard = new HashMap<>(); requestsByShard = new HashMap<>(); The default routing algorithm is document ID % Fragment number.
  • Select ElasticSearch node IP based on cluster state and request them in parallel.

3.2.2 Master Shard node process *

3.2.2.1 Write (index)

This section is the core writing process of Elasticsarch. As mentioned in the previous article, requests to this node will eventually call Lucence methods to create a Lucence index. The key points are:

  • The ElasticSearch node receives the index request, saves it to the index buffer, synchronizes it to the translog disk and returns the index result
  • Refresh The segment generated by lucence data and stored in the OS cache. If the segment is not fsync, Refresh the segment generated by Lucence data and stored in the OS cache. The default is 10% of the JVM.
  • Flush Periodically writes segments in the cache to the disk and deletes translog. Flush is also triggered if translog is full (512MB).
  • If there is a lot of data, there is a lot of segment, and it may also be a deleted document, ElasticSearch will merge them periodically.

3.2.2.2 update

  1. Read the full Doc with the same ID, recorded as version1.
  2. Merge the version1 doc and the UPDATE request doc into one doc to update the in-memory VersionMap. Get the full Doc. Go to subsequent operations.
  3. Subsequent operations will be locked.
  4. Read the doc’s maximum version number version2 from versionMap for the second time, which is generally obtained from versionMap.
  5. Check for conflicting versions and determine if they are consistent (conflicting). If they are, go back to step 1 and perform query DOC merge again. If there are no conflicts, the latest add Doc request is executed.
  6. When you add Doc, first add Version + 1 to get V3, and then add Doc to Lucene. Lucene deletes the existing Doc iD under the same ID, and then adds a new Doc. After writing Lucene successfully, update the current V3 to versionMap.
  7. Release the lock and the update process ends.

So the introduction is basically an optimistic locking mechanism, where every time you update, the version number increases by 1, unlike a relational database where you’re updating something, maybe someone else is updating it, and it overwrites yours. When you want to update, check it out first and remember the version number. When you update the latest version number, it is not the same as when you query, indicating that someone else updated it first. You should read the latest data and then update it. After the write is successful, it forwards the write copy fragments, waits for the response, and finally returns the data to the coordination node. Specific process:

  • Check, check whether write fragment exists, index status is normal and so on.
  • Whether the execution needs to be delayed, if so, it will be queued to wait.
  • Check whether the number of active fragments exists, and reject write if the number is insufficient.
public boolean enoughShardsActive(final int activeShardCount) {
  if (this.value < 0) {
    throw new IllegalStateException("not enough information to resolve to shard count");
  }
  if (activeShardCount < 0) {
    throw new IllegalArgumentException("activeShardCount cannot be negative");
  }
  return this.value <= activeShardCount;
}
Copy the code

Why check the number of active shards?

  • The index layer of ElasticSearch has a waitForActiveshards parameter that specifies the number of shards required to write to ElasticSearch. The default is 1. If an index is 3 copies per shard, then there are 4 shards in total. The number of surviving shards must be at least 1, which is equivalent to the check in advance. If you have high data reliability requirements, you can increase this value, which must be reached before writing.
  • Call Lucence to write doc.
  • Write to the Translog log.
  • Writing a copy shard, looping through the copy request, passes some information. Note here that they are sent asynchronously to the replica shard and all wait until the response times out.
  • Following the previous step, if a replica shard fails, the failed shard is sent to the master, the master updates the cluster status, and the replica shard is removed from the distributable list.

Send the request to the replica

@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
  replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}
Copy the code

Wait for the result

privatevoid decPendingAndFinishIfNeeded() { assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); }}Copy the code

In the previous version, the Replica fragments were actually requested asynchronously, but the risk of data loss was considered to be high. Therefore, the Replica fragments were sent synchronously, that is, the Replica such as the Primary sends the data back to the client. If a copy fails to write, ElasticSearch does some retries, but ultimately does not force a number of nodes to write successfully. The returned result contains the number of shards in which data was successfully written and the number of shards in which data failed. If any copy fails to be uploaded, the failed copy is reported to the Master.

PS: ElasticSearch’s data copy model is very similar to Kafka’s. The master shard will synchronize data to all nodes in the in-sync copies while indexing and then return ACK to the client. For ElasticSearch, nodes in the in-sync copies list are dynamically variable. If there is only one master shard in the in-sync copies list, the SPOF problem will occur.

If wait_for_active_shards=3, the request will be rejected unless there are three active shards. Indexes with high reliability requirements can increase this value.

PS: Why write to Lucence first and then write to translog? This is because when write to Lucence, there will be data check, there may be write failure, this is in memory, if write to disk translog first, also need to roll back the log, which is troublesome

3.2.3 Copy Fragmentation Node Process

This process is basically the same as that of the main shard node, but some checksums may be slightly different and will eventually be written to the Lucence index.

Four,

ElasticSearch is a distributed system that needs many features, most of which can be found in ElasticSearch.

  • Reliability: Lucence is just a tool, ElasticSearch has its own designed copy to ensure fault tolerance of nodes, and translog to ensure recovery after downtime. These two mechanisms provide assurance of reliability.
  • Consistency: ElasticSearch implements final consistency, and the copy and master shard may read different data at the same time. For example, the refresh frequency of the replica may be different from that of the master shard.
  • High performance: ElasticSearch improves performance in a variety of ways, including:
  1. Lucence itself independent threads maintain their own Segment, multithreading needs to compete for fewer resources, better performance.
  2. Operations such as update use versionMap cache to reduce I/O.
  3. Refresh to the operating system cache.
  • Atomicity, isolation: guaranteed by version’s optimistic locking mechanism.
  • Real time: ElasticSearch is designed to be near real time, so if you flush or refresh it simultaneously, performance will be significantly reduced, so tranlog logs written in real time can be accessed in real time.