The transaction

In the last section we learned how to support insert values(…) in Hive. Delete and Update, and Hive transaction configuration and implementation principles, let’s take a look at its implementation

1 Installation Tools

Before we go to install an ORC file tools, admiral to git clone project down https://github.com/apache/orc, then under the project of Java directory, MVN clean package-dskiptests =true -dmaven.javadoc. Skip =true,

The final package orc-tools-1.7.0-snapshot-uber. jar is our tool, which is in the Tools directory

Java-jar orc-tools-x.y.z-uber. jar


We can now use it to query the contents of the ORC file and wrap it as a script for future use of the aspect

#/bin/sh command=$1 args=$2 Java -jar orc-tools-1.7.0 -snapshot -uber.jar $command $argsCopy the code

Then create an alias for it to use, and you can write it to your profile to make it permanent

alias orc=/Users/liuwenqiang/workspace/code/script/orc_tools.sh
Copy the code

So let’s use it

2. Transaction information interpretation

Earlier we saw that all INSERT statements create the delta directory. The UPDATE statement also creates the delta directory, but first creates a DELETE directory, that is, delete and then insert. The delete directory prefix is delete_delta; How does Hive return the correct data when it reads data

The previous valve stated that every line of data in an ORC file will be identified by rowId. Reading data from the ACID transaction table merges these files to get the results of the latest transaction. This process is implemented in the OrcInputFormat and OrcRawRecordMerger classes and is essentially an algorithm for merging sorts. The first output is the first data that we insert with rowId 1, and then we insert 3 more of the same data. The second output is an Update to the Hive table

You can also query this information in SQL

So let’s try to interpret some of the information here, so that we can understand it later

Hive generates globally unique ids for all transactions, including read and write operations. For Write transactions (INSERT and DELETE), Hive also creates a Write transaction ID (Write ID) that is unique in the table scope. Write transaction ids are encoded in the names of delta and DELETE directories; The Statement ID is used when there are multiple write statements in a transaction and is used as a unique identifier. The naming format for the entire folder is delta_minWID_maxWID_stmtID, which is the delta prefix, the ID range for write transactions, and the statement ID. The file that you delete is prefixed with delete_delta and contains the file that you want to delete.

The value of _ORC_ACid_version is 2, that is, the current ACID version is 2. The main difference from version 1 is that the UPDATE statements use the split-update feature, which is delete first and insert later, as mentioned above. This feature enables ACID tables to support conditional push. For details, see Hive-14035. This file is not an ORC file, so you can download it and view it directly

The bucket_00000 file is the data that is written. Since there are no partitions or buckets in this table, there is only one file. Transaction tables are stored in ORC format,

The data in the file is sorted by originalTransaction, bucket, rowId, which is critical for subsequent reads

operation

0 means insert, 1 means update, and 2 means delete. Since split-update is used, the update does not occur, so operation in delat is 0 and operation in delete_delta is 2

rowId

RowId is an incremented unique ID that is unique in the combination of write transactions and buckets

originalTransaction

Is the original write transaction ID of the record, which is the same as currentTransaction for INSERT operations. For DELETE, is the write transaction ID of the record when it was first inserted; The same is true with currentTransaction for Update because Update is implemented by DELETE + INSERT

update

Delete

currentTransaction

ID of the current write transaction

row

Specific data. Null for a DELETE statement, inserted data for Insert, and updated data for Update

3. Data reading process

Each row of data in the ORC file is identified and sorted by row__ID. Reading data from the ACID transaction table merges these files to get the results of the latest transaction. This process is implemented in the OrcInputFormat and OrcRawRecordMerger classes and is essentially an algorithm for merging sorts.

The merge process is as follows: All data rows are sorted in originalTransaction, bucketId, rowId order, and currentTransaction order

This is an example of a file that you create by inserting three records, running a Major Compaction, and updating two records. 1-0-0-1 is an abbreviation for originalTransaction – buckeTID-rowid-CurrentTransaction.

+----------+    +----------+    +----------+
| base_1   |    | delete_2 |    | delta_2  |
+----------+    +----------+    +----------+
| 1-0-0-1  |    | 1-0-1-2  |    | 2-0-0-2  |
| 1-0-1-1  |    | 1-0-2-2  |    | 2-0-1-2  |
| 1-0-2-1  |    +----------+    +----------+
+----------+
Copy the code

The merging process is as follows:

  • All data rows are sorted by originalTransaction (bucketId, rowId) in positive order and (currentTransaction) in reverse order.
    • 1-0-0 to 1
    • ​ 1-0-1-2
    • ​ 1-0-1-1
    • ​ 2-0-1-2
  • Get the first record;
  • If the row__ID of the current record is the same as that of the previous record, it is skipped;
  • If the operation type of the current record is DELETE, the operation is skipped.
    • For 1-0-1-2 and 1-0-1-1, this record is skipped by the above two rules;
  • If not skipped, the record is output downstream;
  • Repeat the process.

The merge process is streaming, meaning Hive opens all files, prereads the first record, and stores row__ID information into the ReaderKey type. This type implements the Comparable interface, so you can customize your ordering according to the rules above:

public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
  private long writeId;
  private int bucketId;
  private long rowId;
  protected int compareToInternal(RecordIdentifier other) {
    if (other == null) { return -1; }
    if(writeId ! = other.writeId) {return writeId < other.writeId ? -1 : 1; }
    if(bucketId ! = other.bucketId) {return bucketId < other.bucketId ? - 1 : 1; }
    if(rowId ! = other.rowId) {return rowId < other.rowId ? -1 : 1; }
    return 0; }}public class ReaderKey extends RecordIdentifier {
  private long currentWriteId;
  private boolean isDeleteEvent = false;
  public int compareTo(RecordIdentifier other) {
    int sup = compareToInternal(other);
    if (sup == 0) {
      if (other.getClass() == ReaderKey.class) {
        ReaderKey oth = (ReaderKey) other;
        if(currentWriteId ! = oth.currentWriteId) {return currentWriteId < oth.currentWriteId ? +1 : -1; }
        if(isDeleteEvent ! = oth.isDeleteEvent) {return isDeleteEvent ? -1 : +1; }}else {
        return -1; }}returnsup; }}Copy the code

The ReaderKey is then stored into the TreeMap structure along with the file handle. By the nature of this structure, we get the sorted result and read the data every time we fetch the first element.

public class OrcRawRecordMerger {
  private TreeMap<ReaderKey, ReaderPair> readers = new TreeMap<>();
  public boolean next(RecordIdentifier recordIdentifier, OrcStruct prev) { Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry(); }}Copy the code

Select the file

Note That multiple transaction snapshot files exist in the transaction table directory at the same time. Therefore, Hive must first select the set of files that reflect the latest transaction results and then merge them. For example, the following files are the result of two operations: a Minor Compaction, a Major Compaction, and a deletion.

The filtration process is:

  • Obtain the list of all successfully committed write transaction ids from Hive Metastore.
  • Parse out the file type, write transaction ID range, and statement ID from the file name;
  • Select the base directory with the largest and valid write transaction ID, if any;
  • Sort delta and delete files:
    • MinWID smaller is preferred;
    • If minWID is equal, the larger maxWID takes precedence;
    • If they are all equal, sort by stmtID; Those without stmtID will come first;
  • Filter all delta files with the write transaction ID in the base file as the current ID:
    • If maxWID is greater than the current ID, keep the file and update the current ID with it;
    • If the ID range is the same, the file is kept as well;
    • Repeat the steps above.

conclusion

  1. Hive transactions use incremental files to record current operations, and then merge the incremental files with the original files to produce the latest results
  2. Detailed code analysis will be covered separately in the Hive source code section