1. The background

After a major version upgrade (V0.9-0.11), the storage structure of HugeGraph changed and the storage of the old and new versions were incompatible. The community provided a data export component tools (through API) to export data from hbase as original data and import the data into a new map. However, it is not suitable for large data volume services, mainly considering the following two points:

  • Tools is a single machine, and the measured import speed is not enough to support the magnitude of our business (100 billion points). It is expected to take two weeks to export all business data using Tools, and there is no guarantee that it will not fail during the process. If it fails, it needs to import again.
  • Tools imports and exports data using a map client. Exporting full data increases the pressure on the hbase RegionServer and affects online service stability. In addition, continuous data writing consumes many system resources due to Operations such as Flush and compaction

2. Migration solution

Hbase data is migrated to the new hbase data table in MapReduce mode, bypasses apis and goes to the storage layer. Read region data directly, deserialize the old table graph data to the original graph data according to the old encoding method, and write it into the new map database in the new table point-edge format according to the new map data serialization method. SacnTable (old) generates Hfile, and then bulkload data to (new) table.

In data migration, the processing speed of Mr Can be greatly accelerated according to the characteristics of graph data transformation:

  • Therefore, during the migration of the old and new versions, the locations of regions before and after the migration of each data will not change. That is to say, with this feature, we can create regions with the same number and startkey as the old table. Avoid reduce sorting (reduce num is set to 0) in Mr Process, and directly generate Hfile. Because of order, more than ten TONS of data can be loaded in almost seconds, greatly speeding up the migration speed.
  • The minor encoding changes of rowkeys in the edge table do not result in out of order on the whole. However, the rowkeys before migration may not fall into one region but into adjacent regions after migration. During the load process, the rowkeys will be split again and hfiles will be moved. This process is slower than that of point tables. (Hbase data is sorted in ascending order by rowkey,column, and timestamps. This migration is qualified.)

Migration preparation and precautions ⚠️ :(blood and tears tread pit record)

  1. You need toThe new table regionInformation andThe region of the old table remains the sameTo facilitate the subsequent generation of maphfiledirectlyload(It’s actually movinghfileTo informregionserverHere, I used region information of the old table to rebuild the new table.
  2. According to theThe number of regionforinputsplit,Number of Maps = Number of regionsA mapTask outputs the hfiles required by a region of the new table.
  3. Note that the old version of deserialization and the new version of serialization useDifferent versions of the same package, in the introduction process is more troublesome, must be careful to call, I am here directly with the name transfer in the call.
  4. Because there is no Reduce,mapTask needs to output the data format required by HFile<ImmutableBytesWritable, KeyValue>.
  5. In the process of data migration, services are not imported and migrated at the same time. Incremental data can be imported later, but
    • First of all, business data is constantly growing, we have hourly missions and daily scheduled missions, and spreadsheets can run compaction, which can cause a lot of missions to fail
    • Second, some business data isYou cannot repeat the importSome data import policies areSUMIf I use the migration tool to migrate after scan, the service cannot know exactly which data needs to be imported. Then the incremental data may be repeatedly imported, and some original values directly change to double. This is unacceptable, and I almost made this mistake. I asked the business students to stop all scheduled tasks and carry out full migration in time. (We will run the old version and the new version in parallel for a period of time, so we need to import daily data in parallel and log off the old version diagram after it is stable)
  6. In the old version of graph database, data inconsistency exists, for example, the point/edge of a label is deleted, but the data of this label still exists in the data table, which will affect the results of gremlin execution. In this upgrade process, illegal data was removed and the full data was cleaned
  7. Some business data is imported with the update policy set to mergeunion, a large value is generated after years of consolidation. In fact, data in hbase may exceed 65535.
  8. If there is a heterogeneous policy, set the heterogeneous policy in the new DB (ONE_SSD for us), otherwise disk reading will cause performance disaster
  9. If a scheduled task is imported every day, you must confirm with the service side that the task is stopped and the process is completely cleared. I stepped on the pit where the task is stopped but the import process is not stopped, causing some data inconsistent with the original data after the migration.
  10. After the migration, you need to localize the data table, otherwise it will have a significant impact on performance

Data migration mainly involves the point table, the edge/side table migration, because understanding to the business without the label index related queries, so not indexed table migration in the migration, moreover also involves verlabel edgelabel/propertykey and count table entry.

  • Propertykeys meta information (PK table)
  • Vertexlabel point label meta information (g_v table)
  • Edgelabel edgelabel meta information (g_ie and g_oe tables)
  • Count Self-added ID of the maintenance meta information (TABLE C)

3. Detailed steps of migration:

3.1 Preparations: Initialize a new layout database

When HUGE is initialized, a set of data tables will be created to store metadata, service data and index data related to the graph. However, the number of regions is 1, which does not meet our requirements. However, region splitting of old service data tables is very stable. We need to drop the new table after disable, delete it, and create a new table according to the region number and startkey of the version.

3.2 Enter sharding and configuration details

3.2.1 inputsplit


/ / regionMap structure
public RegionMap(int size) {
        this.startKeys = new byte[size][];
        this.endKeys = new byte[size][];
        this.locationMap = new TreeMap<ImmutableBytesWritable, String>();
        this.regionKeysMap = new TreeMap<ImmutableBytesWritable, ImmutableBytesWritable>();
        }


// 1. Obtain region information
public RegionMap getRegionMap(final Configuration conf, final byte[] tableName) throws IOException {
        List<SplitInfo> regions = getSplitInfo(conf, tableName);
        System.out.println("got " + regions.size() + " regions");
        RegionMap map = new RegionMap(regions.size());
        for (SplitInfo region : regions) {
                map.add(region.startKey, region.endKey, region.regionServer);
        }
        return map;
}

// 2. Split by region
splits.add(new TableSplit(TableName.valueOf(tableName), startRow, stopRow, regionMap.getLocation(startRow))
Copy the code

3.2.2 Setting reduceTask NUMS, a key step to improve import performance

The original hbase data is in order, and our point table Rowkey has not changed, so we do not need to spend a lot of time on reduce. We expect hfile to be generated directly after map processing. Here we set reduce to 0

Think about why set reduceTask number to 0 and map can output HFile?

The code for mapreduce-client-core-2.2.0 meets our requirements when reduce is 0:

// Set the reduce number to 0, encapsulate the NewDirectOutputCollector object, and write the result to HDFS as the final result
   if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
    // If the value is not 0, the result is written to the local disk and provided to the Reduce Task
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
Copy the code

3.3 Deserialize old data

3.3.1 point

The rowkey of graph point data in the old and new versions has not changed, but the propertykeys have changed a lot: as shown in the following figure, the point ID has not changed, but column has been compressed to encode multiple pk lines into one line, which greatly saves the storage space, mainly in the following two aspects:

  • N times of rowkeys are saved. The hbase storage structure allows rowkeys to be stored several times for only a few columns. Therefore, the old version of Vertex storage structure inevitably wastes a lot of space.
  • In addition, this version also saves the space utilization of attribute values. For numeric data, the space previously opened by short/int/float/double is now subdivided into each byte, and singular bytes such as 1/3/5/7 are judged and stored as required. Emmm ···· Can be seen the author for storage space saving ~

Old Vertex storage structure:

New Vertex storage structure:

Therefore, the relative position of region does not change. We only need to deserialize and serialize the data of the old version and then directly generate hfile without sorting

3.3.2 rainfall distribution on 10-12 edge

New and old EDGE storage:

The Rowkey composition does not change, nor does the column composition transform (the same as the new column composition of Vertex), but the encoding has changed.

During the deserialization of points and edges, imports are requiredcom.baidu.hugegraph.backend.serializer.BytesBufferTry using the hbase API to read the old version of the cell, convert it to the new version, and import the new version of the database table to check whether it can be read normally. Use as many data sources as possible to verify whether the process is correct. Ensure that there is no problem in this step. Otherwise, it’s a dirty data disaster

3.4 Serialize to a new version of data

At this point, the serialized data has been restored to the real point/edge data, and then the graph original data is serialized to the new map data. This process is detailed in my previous article’s practice of Bulkload, which has the same principle and will not be described again.

3.4.1 track point

3.4.2 edge

3.5 Generating an HFile and Loading it

  1. This process differs from Bulkload in that no sorting is required
  • For point data, rowkeys are orderly in hbase, and data rowkeys remain the same before and after migration. If the region information of the old and new data tables is the same, the region to which data belongs will not change. Therefore, we can use the RecordWriter in OutputFormat. No sort operation or buffer operation is run, writing directly to output file. This saves a lot of time by avoiding shuffle,spill, merge,sort! ~ lucky.

    • After the data load is successful, all the hfiles under the original column can be moved to the hfile file of hbase under normal circumstances, and we successfully load it in seconds. This validates our belief that no sort will cause data to be out of order, and out-of-order data cannot be loaded to hbase
  • For edge data, the situation is slightly different, but absolute sorting before and after migration is still guaranteed. The rowkey composition of the edge data has not changed, but a certain byte of the encoding prefix of LabelID has changed, and the changed byte is the same value in both the old and new versions. Therefore, the absolute order of our data has not been affected, so there is no need to sort.

    • Note that although the absolute order remains the same, the location of the total data region may change due to the changed byte. Therefore, although we create the same region information as the old database, it is still not guaranteed that the data generated can be evenly distributed in each region, which is different from point data
    • The difference in the location of data relative to region causes that in the hFile column cluster generated by us, a split file will be generated, and the load process will cut it into data blocks meeting region, startkey and endkey requirements, while the original data that does not meet the requirements will be left soon. Split data blocks are moved to hbase
-rw-r--r--   3 hbase supergroup           0 2021-04-07 22:03 /home/yarn/outputei/f/ff27a8670c8e48c7ac56c299c68fc211.splited
-rw-r--r--   3 hbase supergroup 11155275472 2021-04-07 20:22 /home/yarn/outputei/f/ff66e030ecaa4723bae0fd041972ee62
-rw-r--r--   3 hbase supergroup  7496589553 2021-04-07 21:02 /home/yarn/outputei/f/ff6a0afac39949ad8558fe8eac60b640

Copy the code

4. Migration efficiency

(The more YARN resources, the faster)

90 min 80 billion points

100 min 1000 + million edge