The introduction

Some time ago, I participated in the offline platform project of Elasticsearch, mainly to build a set of Elasticsearch buildService. On the one hand, bahamut’s ability to define data flow directly connects users’ original data to achieve full incremental integration and solve users’ pain points in data preparation. On the other hand, the community elasticSearch does not have the concept of full increment. All data is sent to the ES online service one by one through the SDK to build indexes, which is difficult to process massive data and inevitably affects the online performance. In particular, the index Merge will seriously affect the online service stability. Therefore, we need to build a BuildService for Es to build indexes directly on the Blink cluster.

In this article, Zhicen introduces the overall architecture of Elasticsearch offline platform construction. In this article, he also introduces how to build index on Hdfs for ElasticBuild and some related optimization.

Loading the Hdfs Index

Elasticsearch at startup, will be for each Index of each Shard initialize a InternalEngine examples, the main job is to restore the lucene indexWriter and translog es, native es only support from the local load Index file, The modified ElasticBuild directly writes indexes to HDFS without dropping them from the local disk. Therefore, you need to implement a solution to directly load indexes from HDFS without dropping indexes from the local disk.


The obvious solution is to drag and load the index locally, which is obviously too time consuming, and the instance disk on Blink may not support such a large index. Fortunately, lucene’s index read-write interface Directory supports a variety of read-write extensions, and HdfsDirectory can solve this problem by referring to open source components.

HdfsDirectory implementation, it implements the Directory Input, Writer of various read and write interface:

Add layer Cache to HdfsDirectory

Performance bottleneck

At this point, I felt confident that the solution to the most important difficulties had been found, but the subsequent performance testing brought the project back to its original uncertainty…

An HdfsDirectory version of the ES service was launched on the developer and a 70GB index was built using esRally. It was a fun run at first, but the Build task was never finished and the CPU was getting stuck, waiting for another 4 hours and still not coming out.

After analyzing the merge, it is found that the index merge cannot be completed, and the main performance consumption is the READ and write of HDFS. In other words, reading and writing HDFS using HdfsDirecoty is not feasible.


If you think about it, besides the network overhead, another important reason for reading and writing indexes on HDFS and on local disk is that local disk has PageCache, while HDFS does not (HDFS itself has a cache for physical blocks, but no corresponding cache on the ES machine). HdfsDirctory can be added to the Cache layer to achieve the same effect.

Through investigation, it is found that solr realizes a BlockDirectory, which calls open source CaffeineCache and can add the Cache function to the ordinary Directory. We made a few customization to BlockDirectory with the ElasticBuild scenario:

  • ElasticBuild runs on blink. There are multiple shards in the same blink instance. Instead of allocating a cache for each shard, we use a static cache globally to avoid cache size allocation caused by hot data.
  • Only data read from the Hdfs is stored in the Cache. Data written to the Hdfs from the memory is not directly stored in the Cache. There are two reasons. First, it is found in the logs that IndexWriter repeatedly reads overlapping data blocks at a high frequency during merge, but this phenomenon does not occur during write. Therefore, more memory can be reserved for reading data. On the other hand, written files are usually large and may flush the contents of the cache all at once, resulting in a decrease in cache hit ratio.

BlockDirectory for ElasticBuild

The test results show that the cache hit ratio of BlockDirectory reaches 90+%, which greatly improves the index read and write performance of Hdfs, mainly the performance of Merge.


In retrospect, with BlockDirectory, a large part of the overhead of reading Hdfs is saved, but there is still some consumption of writing Hdfs. After reading the ES code, it is found that InternalEngine specifies that the index will end up as a CompoundFile when generating the config for IndexWriter:

CompoundFile exists to reduce the number of index files and avoid excessive file handles when opening the index.

In other words, the resulting pile of temporary files has no effect on the final result, and only the final four formats are persistent. Instead of going through HDFS, these files can be digested in memory.

Lucene-core provides an NRTCachingDirectory that encapsulates a RAMDirectory on top of other directories to digest certain small files directly in memory. We can take it in and modify which files are processed in memory. Logic for which files to dump to other directories so that only the final file is processed:

After 70G data test, more than 30 files are directly processed in the memory of a single SHard in NRTCaching, without the need to go around in Hdfs. This is useful for performance, because without NRTCaching, all files need to be written to HDFS, read back those files at flush, and then merged into CFS, CFE, AND SI files.

Adaptive memory allocation

According to the above analysis, if the HDFS read and write performance is improved by cache, the instance memory is mainly consumed by the following parts:

  • Lucene’s indexWriter requires RamBuffer
  • BlockDirectory requires a BlockCache
  • RAMDirectory in NRTCachingDirectory requires a section of RAM

Since the instance specifications on blink are not fixed, we cannot directly write down the memory requirements of each module. It is better to achieve automatic adaptation to reduce operation and maintenance costs. After a period of testing, we draw some experimental conclusions:

  • Set es’s indices.memory.index_BUFFer_size to 40% so that ES will flush when indexWriter memory reaches 40% of system memory.
  • Set IndexWriter’s indexingBufferSize to 40% of the number of freeMem on the heap, and IndexWriter will flush when memory limits are reached.
  • When the IndexWriter memory is adjusted to 40%, the flush index size is expected to be around 20%, and these files will be transferred to RAMDirectory. Since the file size in RAMDirectory is estimated, sometimes the difference is large. Give RAMDirectory some space, so NRTCachingDirectory is allocated 40% memory, which is at most the same as in IndexWriter.
  • The remaining 20% goes to BlockDirectory, and if you look at the experiment, BlockDirectory allocates this much, which is enough.

This way, we do not need to configure the memory ratio of each module individually for each specification instance.

Meta Information Synchronization

All of the above mentioned indexWriter indexes are synchronized to HDFS. In addition, there is some additional data that needs to be synchronized to HDFS along with es.


The _state file of the Shard contains the primary, indexUUID, and other information about the Shard. You need to synchronize the copy to HDFS when es updates the _state of the local SHard.


The _state file of Index records some key information such as Index Setting and shard number. You need to synchronize the copy to HDFS when es updates the _state of local Index.


Translog is special because it has a large number of files and many synchronization points. The whole directory cannot be synchronized to HDFS at a time, which makes it difficult to support performance. Because translog doesn’t work in elasticBuild, the recovery points are implemented via blink checkpoint. For elasticBuild failover, create an empty translog directory and ensure that the process can restart.

Looking forward to

We have planned some optimization points that were not completed in the first phase due to time constraints, so we can consider investigating them later.

Shard level concurrency

A lot of optimization has been done above, but there is a limitation that the same shard can only be processed by a single sink, and shard level concurrent build cannot be achieved.

To split a shard into multiple tasks, you need to solve the following two problems:

Multiple shard segment merge and snapshot function peripheral implementation

After each sub-shard builds the index separately, it needs to be merged into the same index, which can be loaded. In fact, it only needs to change the CFE, CFS, SI generation number and merge segmentInfo into segments_0 file.

The merged information includes:

  • SequenceNumbers.LOCAL_CHECKPOINT_KEY
  • SequenceNumbers.MAX_SEQ_NO

All of this information can be combined.

In addition, after we merge the indexes in HDFS, we also need to merge them into a snapshot in the outer layer and upload them to OSS for online ES to load. This is also possible. The only problem is that uploading the indexes in the outer layer takes a long time and needs to be directly synchronized to oss later.

Global seqNo allocation

Elasticsearch 6.x also introduced _seqNo as a default field written to doc. Since this field is strictly shard size increment, you need to allocate _seqNo to each child shard.

Direct synchronization to OSS

In the current version, HdfsDirectory is used to write the index on HDFS, and then finally generate snapshot on HDFS and upload it to OSS. This upload is still some cost, so it is best to implement an OssDirectory, in theory can directly replace HdfsDirectory, save the cost of a upload, but also can do efficient segments in the outer layer.

Thank you

From zero knowledge of Elasticsearch and Lucene to the completion of the project’s feature points, there was a lot of help. Thanks to @Hongzhen boss and @Kunlun boss for their guidance on Lucene and Blink. Thanks to the brothers in @Wanxi team for discussing solutions and solving all kinds of problems. We need to cooperate in the next phase