background

Message queue is a common middleware in business. In addition to the core process of message sending and receiving, it is also very important to track and query the historical message track. Without historical message queries, it is difficult to locate problems once they occur.

The message queue middleware used by the company is QMQ. Similar to other message queue middleware, the main components of QMQ include Meta Server (providing cluster management and cluster discovery), Server (providing real-time message service), producer (message producer), and consumer (message consumer). In the process of sending and receiving messages, messages are stored on the Server. To provide high-throughput message sending and receiving services, the Server stores messages in the form of sequential logs, which is not conducive to message retrieval. In order to provide the query service of historical messages, QMQ has a special backup module for the backup and query of historical messages. In order to reduce the size of the backup data of historical messages, BACKUP only synchronizes the message index from the server slave instead of the complete message content. Backup then saves the index to hbase. In this way, Backup can query the corresponding message index from HBase and read the specific message content. The details are as follows:

The status quo

The PUT API of the HBase Client is used to write several messages in batches each time (1000 message indexes by default). However, this method is inefficient. When a large amount of message index data needs to be written to HBase, it takes two to three seconds to write a batch of message indexes (the HBase server configuration is low). The delay of message backup may be several hours or even several days. As a result, historical messages cannot be queried from HBase in a timely manner, which cannot meet the requirements of existing services. In view of this, we need to try to improve the speed at which Backup can write message indexes to HBase.

Find the cause of low data writing efficiency in HBase

To understand how data is written to HBase, let’s take a look at the HBase architecture:

In an HBase cluster, one Region Server is responsible for reading and writing data. Each Region Server manages multiple regions.

A region stores data of the same HBase table. Each region contains one or more stores. Each Store corresponds to a Column Family of an HBase table.

Store consists of MemStore and StoreFile. MemStore is a write cache that stores data in memory that has not been persisted to hard disk. When MemStore is full, data is flushed to StoreFile. StoreFile corresponds to an actual file in HFile format.

Write Ahead Log (WAL) records data operation logs of each Region Server for fault recovery.

Now that you know the basic HBase architecture, you can move on to the common way to write data to HBase, which we currently use. The most direct way to write data to HBase is to invoke the HBase API and insert data using the PUT method. The process is as follows:

Data written to HBase by the Client using the API is actually RPC requests. The HMaster sends the data written by the Client to the corresponding Region Server.

After data is transferred to the Region Server, it is first written into the HLog (Write Ahead Log) and then into the MemStore of the Region. When the MemStore of the Region is full, the data is flushed into the StoreFile. Flush consumes more I/O resources.

HBase can also trigger split and compaction operations. When a large number of small Hfiles exist, a compaction operation is triggered to consolidate several small files into a larger file to reduce the number of Hfiles. When a region is too large, the split operation is triggered to split the region into two sub-regions.

When a large amount of data is written, data writing in this mode is inefficient because WAL and Flush operations are frequently performed, consuming disk I/O.

How to improve HBase writing efficiency

It is inefficient to write data using the PUT API of the HBase Client. Can we find a more efficient way to write data? As mentioned above, HBase’s underlying storage uses the HFile file format. When a large amount of data needs to be written to HBase, if we can write the data into HFile files in batches and import the data directly to HBase, can the writing speed be improved? After learning, we found that HBase provides an API for Bulk Load.

Bulk Load directly outputs data in the HBase table storage format, that is, HFile files, and loads the generated Hfiles to the corresponding nodes in the cluster. In this way, WAL and Flush are not required and do not generate a large amount of WRITE I/O. Therefore, less CPU and network resources are required. Using Bulk Load to Load data in batches greatly improves the write efficiency and reduces the write pressure on Region Server nodes.

Bulk Load Indicates the implementation of importing data in batches

The following describes how to import data to HBase using Bulk Load.

To prepare

Deploy the HBase and HDFS services in advance, and build tables in HBase. For example:

create 'bltable','cf'
Copy the code

implementation

In HBase, Bulk Load consists of exporting data from other data sources, such as simple text files or other databases. Convert data to HFile form; Import the generated HFile to the HBase region. BulkLoad can be implemented using the ImportTsv and CompleteBulkLoad tools or programmatically. Here we will focus on the implementation of BulkLoad programmatically. Program Bulk Load To import data to HBase in MapReduce mode and non-MapReduce mode.

BulkLoad is implemented in MapReduce mode

This method specifically includes three steps:

  1. Prepare data source files (such as text files) and upload them to the HDFS. You can use Hadoop’s fs command to upload data source files from the local file system to the HDFS, or use other tools.
  2. Convert HDFS data to HFile using a MapReduce job. This step requires a MapReduce job. In most cases, Map functions need to be written by ourselves. The Reduce function is provided by HBase and does not need to be considered. This job requires a rowkey(rowkey) as the output Key; KeyValue, Put, or Delete as the output Value. MapReduce jobs use HFileOutputFormat2 to generate HBase data files, that is, hfiles.
  3. To import the generated hfiles to HBase, you need to call doBulkLoad() of the LoadIncrementalHFiles class (older versions, 1. Hbase-server dependency of x.y) or use the doBulkLoad() method of BulkLoadHFilesTool (dependency of hbase-server dependency of 2. X.y).

The following is a simple example. The data source file is a TXT file with contents similar to the following. Each line consists of a MessageID and Offset:

messageid00001498 1498
messageid00001499 1499
messageid00001500 1500
messageid00001501 1501
messageid00001502 1502
Copy the code

First, you can use the following command or other tools to upload the data source file to HDFS:

Hadoop fs -put /path/on/localdisk /path/on/ HDFS / Users/bianlifeng/Documents/bulkloadtest sourcedata. TXT HDFS: / / 10.1.24.53:9000 / TMP/sourcedata. TXTCopy the code

Then write programs to convert data source files uploaded to the HDFS into HFile files and import them to HBase.

As mentioned above, we need to define our own Mapper class and override the map() method as follows:

static class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] datas = line.split(" "); // Datas form: [messageID offset] //datas[0] That is, messageID is the RowKey of each record ImmutableBytesWritable RowKey = new ImmutableBytesWritable(Bytes.toBytes(datas[0])); // Create a record corresponding to the KeyValue, where "cf" is the column cluster name, "offset" is the column name, KeyValue kv = new KeyValue(bytes.tobytes (datas[0]), "cf".getBytes(), "offset".getBytes(),datas[1].getBytes()); context.write(rowKey, kv); }Copy the code

Next, we need to define a MapReduce job and execute it. Finally, we call doBulkLoad() of the LoadIncrementalHFiles class to import the generated HFile into the specified HBase table, as follows:

Final String INPUT_PATH = "HDFS: / / 10.1.24.53:9000 / TMP/sourcedata. TXT". Final String OUTPUT_PATH = "HDFS: / / 10.1.24.53:9000 / TMP/outputhfile"; final String TABLE_NAME="bltable"; Configuration conf = HBaseConfiguration.create(); Connection conn= ConnectionFactory.createConnection(conf); Table htable= conn.getTable(TableName.valueOf(TABLE_NAME)); Admin admin= conn.getAdmin(); Job Job = job. getInstance(conf, "BulkLoad"); // Set the class name. Job.setjarbyclass (hFileGenerator.class); / / to perform the job class job. SetMapperClass (HFileGenerator. HFileMapper. Class); / / the map class, that is, the above HFileMapper job. SetMapOutputKeyClass (ImmutableBytesWritable. Class); / / key type job. SetMapOutputValueClass (KeyValue. Class); Job.setinputformatclass (textinputFormat.class); job.setinputFormat.class; job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); HFileOutputFormat2.configureIncrementalLoad(job, htable, conn.getRegionLocator(TableName.valueOf(TABLE_NAME))); If (job.waitForcompletion (true)) {// Load hFile into the hbase table using BulkLoad LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); //BulkLoadHFilesTool loader=new BulkLoadHFilesTool(conf); loader.doBulkLoad(new Path(OUTPUT_PATH),admin,htable,conn.getRegionLocator(TableName.valueOf(TABLE_NAME))); }Copy the code

BulkLoad is implemented in non-MapReduce mode

Because MapReduce is more dependent, slightly more complex to implement, and more time consuming to execute, is there another way to implement it? In fact, we use the non-MapReduce approach described below.

This method mainly includes the following two steps:

  1. Create an hfile. Writer, configure the path to generate HFile and other information, generate key-value pairs for each data, and call write.append() to write data to HFile. Note that this method must ensure that keys are in order when writing data. If it is not ordered, you need to sort before writing, for example, using Treemap to sort.
  2. To import the generated Hfiles to HBase, you only need to call doBulkLoad() of the LoadIncrementalHFiles class.

Create hfile. Writer and write data to HFile:

Configuration conf= HBaseConfiguration.create(); String TABLE_NAME="bltable"; byte[] FAMILY_NAME= Bytes.toBytes("cf"); Byte [] COLOMU_NAME= bytes.tobytes ("offset"); // Column cluster name byte[] COLOMU_NAME= bytes.tobytes ("offset"); // Column name Path HFILE_PARENT_PARENT_DIR=new Path("/ TMP /test"); Path HFILE_PATH=new Path("/tmp/test/"+new String(FAMILY_NAME)+"/hfile"); Configuration tempConf=new Configuration(conf); TempConf. SetFloat (HConstants. HFILE_BLOCK_CACHE_SIZE_KEY, 1.0 f); HFileContext fileContext = new HFileContext(); HFile.Writer writer=null; try { writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf)) .withPath(FileSystem.get(conf), HFILE_PATH) .withFileContext(fileContext).create(); For (int I =0; i<100; i++){ byte[] key=Bytes.toBytes("rowkey"+String.format("%08d",i)); byte[] value=Bytes.toBytes("vallue"+String.format("%08d",i)); long currentTime=System.currentTimeMillis(); KeyValue kv=new KeyValue(key,FAMILY_NAME,COLOMU_NAME,currentTime,value); writer.append(kv); } } catch (IOException e) { e.printStackTrace(); } finally{ writer.close(); }Copy the code

We then import the generated hfiles into the specified HBase table by calling the doBulkLoad() method of the LoadIncrementalHFiles class, just as we used MapReduce above.

Matters needing attention

Another caveat to this approach is the HFile path.

When writing data to HFile using hfile. Writer, you need to specify the path of the HFile file, in this case HFILE_PATH, as follows:

writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
                    .withPath(FileSystem.get(conf), HFILE_PATH)
                    .withFileContext(fileContext).create();
Copy the code

The other is to use doBulkLoad() of LoadIncrementalHFiles to specify the path to the upper and lower folders of the HFile file, in this case HFILE_PARENT_PARENT_DIR, as follows:

loader.doBulkLoad(HFILE_PARENT_PARENT_DIR,admin,htable,conn.getRegionLocator(TableName.valueOf(TABLE_NAME)));
Copy the code

The specific values of the two paths are as follows:

Path HFILE_PARENT_PARENT_DIR=new Path("/tmp/test");
Path HFILE_PATH=new Path("/tmp/test/"+new String(FAMILY_NAME)+"/hfile");

Copy the code

In this case, FAMILY_NAME is the name of the column cluster of the HBase table. When doBulkLoad() is called, it specifies the path of the upper and upper folders of the HFile file. Then it looks for the subfolders of the corresponding column cluster and reads the HFile file in the subfolders. Therefore, the name of the direct upper-layer folder in the path of the HFile file must be the corresponding column cluster name. If this is not the case, doBulkLoad() will not find the files to upload.

validation

After running the program, we can view the HFile generated in the middle with commands like the following:

Hadoop fs - ls # folder path where HFile files such as hadoop fs - ls HDFS: / / 10.1.24.53:9000 / TMPCopy the code

You can also log in to HBase and check whether data is imported to the corresponding table.

Bulk Load source analysis

In the preceding section, data is quickly written to HBase using Bulk Load. The HFile generated in the process needs to be temporarily stored in HDFS. Some students may ask, why not directly save the HFile file locally? In fact, we did plan to temporarily store the intermediate HFile on the local disk of the Backup Server at the beginning, that is, the file path of the generated HFile was written as the local path of the Backup Server. Unfortunately, the full Bulk Load process cannot be implemented using local paths in this way. After the HFile file is generated, the HFile is imported to HBase through RPC and the corresponding Region Server in HBase performs the import process. If the HFile is generated in the local path of the Backup server and the HFile is temporarily stored on the Backup Server, the Region Server cannot read the HFile. Therefore, the subsequent HFile import process cannot be performed. To better understand this, and to take a closer look at the mechanics of Bulk Load, let’s take a look at the source code for Bulk Load.

To import Hfiles into HBase using BulkLoad, doBulkLoad() is invoked. The core source code is as follows after deleting some non-key codes.

public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException {// Create a thread pool for BulkLoad = createExecutorService(); // A LoadQueueItem is subsequently generated for each HFile and added to the queue, usually called the LQI queue. Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>(); PrepareHFileQueue (hfofDir, table, Queue, validateHFile); prepareHFileQueue(hfofDir, table, Queue, validateHFile); int count = 0; If (queue.isempty ()) return; while (! Queue.isempty ()) {// Get startKeys for all regions, endKey final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); // Maximum number of loops, default is 10, In the configuration file by hbase. Bulkload. Retries. Number set int maxRetries = getConf () get int (HConstants BULKLOAD_MAX_RETRIES_NUMBER, 10); maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); if (maxRetries ! = 0 && count >= maxRetries) { throw new IOException("Retry attempted " + count + " times without completing, bailing out"); } count++; // Core method 2: divide each HFile in the LQI queue into groups according to region metedata of HBase table. Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(Table, Pool, Queue, startEndKeys); // Core method 3. Load the allocated LQI into the corresponding Region bulkLoadPhase(table, Admin.getConnection (), Pool, Queue, regionGroups). }}Copy the code

Core method 1:prepareHFileQueue

public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
    boolean validateHFile) throws IOException {
  discoverLoadQueue(queue, hfilesDir, validateHFile);
  validateFamiliesInHFiles(table, queue);
}
Copy the code

DiscoverLoadQueue () will call the visitBulkHFiles() method to check whether all files in the specified path hfilesDir conform to the format specification. Create a LoadQueueItem object for each formatted HFile and add it to the LQI queue.

Since hfiles to be loaded are placed in different subdirectories in HDFS according to column families, the validateFamiliesInHFiles() method checks whether the column families of each LQI belong to the corresponding HBase table.

LQI queues do not loop empty

In doBulkLoad(), the conditions for termination of the while loop are as follows: 1. The LQI queue is empty, that is, all Hfiles have been uploaded. 2. IOException is thrown when the number of times in a loop exceeds maxRetries.

The maxRetries default is 10, can be in the configuration file by hbase. Bulkload. Retries. Set number.

There are two important methods in this while loop, as follows:

Core approach 2:groupOrSplitPhase

This method mainly traverses the LQI queue mentioned above. For each LQI, its corresponding HFile can be obtained. Then determine whether to split each HFile based on the [firstkey, lastkey] value of HFile and the [Starkey, endkey] value of each region obtained previously. If [firstkey, lastkey] of an HFile is not in the range of [Starkey, endkey] of any region, the HFile needs to be split. The file suffixes are. Top and. Then the two newly split hfiles can obtain the corresponding two LQI and be added to the LQI queue mentioned above. The groupOrSplitPhase() method returns Multimap<ByteBuffer, LoadQueueItem> regionGroups, which represents the LQI that can be loaded to upload. This is a Multimap, Key is the startkey of the region, and value is the CORRESPONDING LQI. One region can map to multiple LQI.

Core method 3:bulkLoadPhase

This method loads the LQI allocated in the previous step into the corresponding region. After the groupOrSplitPhase() method is executed to obtain a regionGroups, the bulkLoadPhase() method calls tryAtomicRegionLoad() to load the LQI corresponding to each region into the target region. If the loading fails, the failed LQI will be re-added to the LQI queue, which will be divided and loaded again in the next while loop. In the tryAtomicRegionLoad() method, a RegionServerCallable is created to load the LQI into the target Region.

RegionServerCallable (RegionServerCallable)

Implementations call a RegionServer and implement call(int). Passed to a RpcRetryingCaller so we retry on fail.
Copy the code

Therefore, RegionServerCallable is passed to the RpcRetryingCaller, and the RpcRetryingCaller makes a remote call to the RegionServer to perform the process of importing HFiles to the Region.

Possible problems and solutions When we tried Bulk Load, we encountered some problems, which are briefly described below.

ERROR wtracer[] [index] o.a.h.h.m.LoadIncrementalHFiles:? - Unexpected execution exception during splitting
java.util.concurrent.ExecutionException: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplitPhase(LoadIncrementalHFiles.java:649)
	......
Copy the code

The first possible problem is that doBulkLoad() is missing a snappy local library. At first it seemed strange that we specified no compression when generating hfiles, so why would we need the Snappy compression native library? If the HFile needs to be split, the original HFile is split into two sub-files using copyHFileHalf(). In this method, the compression algorithm used by the HBase table is obtained. Then use this as the compression algorithm for the newly generated HFile file. Check our HBase table and it does use SNappy compression. Therefore, you can either add the SNappy local library or change the HBase table compression algorithm to another one that does not require the local library support.

ERROR wtracer[] [index] o.a.h.h.m.LoadIncrementalHFiles:? - IOException during splitting
java.util.concurrent.ExecutionException: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file hdfs://hbase/tmp/trace/m/hfile
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplitPhase(LoadIncrementalHFiles.java:649)
Copy the code

Another possible problem is when doBulkLoad() fails to read HFile or HFile does not exist. This problem did not occur when we first started experimenting with Bulk Load, but only later. Through searching information on the Internet, it is concluded that the reason for this problem is that there are multiple threads reading and writing the same HFile at the same time. But executing Bulk Load in our program is single-threaded, so what’s the problem? Following the idea that it might be multiple threads reading and writing at the same time, we found out that we have multiple machines doing Bulk Load, and they all read and write HFile files in the same path and file name! Once the cause is found, we just need to prevent different machines from reading and writing the same HFile. We can write hfiles generated by different machines to different folders, such as folders named after the host name. Alternatively, we can use a different file name for each HFile generated, such as naming one of the keys. In this way, if BulkLoad fails to be uploaded, the failed BulkLoad files will not be overwritten by subsequent files. In this way, doBulkLoad() can be reuploaded the next time.

conclusion

In this paper, we discussed how to efficiently import large quantities of data to HBase from the production requirements of efficient QMQ historical message backup and query. We first briefly introduced the basic HBase architecture and the general process of using Client API to write data to HBase, analyzed the disadvantages of this method and the file form of HBase underlying storage, and introduced Bulk Load, which can efficiently import large quantities of data to HBase. This section describes how to import Bulk Load data to HBase through programming, and analyzes how to import Bulk Load data in MapReduce and non-MapReduce modes through simple examples. Finally, we briefly analyze the source code of doBulkLoad(), which is the core method in BulkLoad, which helps to deepen our understanding of BulkLoad mechanism. In the QMQ test, it took only a few seconds to import 10,000 message index data to HBase using Bulk Load.

Author’s brief introduction

A junior intern in the Convenience Bee Infrastructure Group. In just a few weeks of practice, the author researched and tried different schemes after getting the questions, and finally achieved good improvement effects.

If you are interested in related technologies and are committed to improving r&d efficiency, welcome to join us.

You can submit your resume to: [email protected] Subject: Basic Components department of production and Research Platform

Recruitment website

  • Bianlifeng. Gllue. Me/portal/home…
  • Learn more about the position