The transaction

1. The background

Hive does not support transactions because hive stores HDFS files. HDFS files can only be created, deleted, and updated. This is why Hive does not support update and delete functions

Hive started supporting transactions after Hive 0.14. If a table is to implement UPDATE and DELETE, it must support ACID. If it supports ACID, it must meet the following conditions:

The format of the table must be STORED AS ORC.

Table Property transactional = True (tblProperties (‘ Transactional ‘=’ True ‘))

4. The following configuration items must be set: you can also configure these parameters in hive-site. XML, but this is not recommended

The Client side:

set hive.support.concurrency=true
set hive.enforce.bucketing=true -- (no longer required from Hive2.0)
set hive.exec.dynamic.partition.mode=nonstrict
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
Copy the code

Server:

set hive.compactor.initiator.on=true
set hive.compactor.worker.threads=1
Copy the code

2. What is ACID and why is it used?

ACID represents four characteristics of a database transaction: Atomicity (an operation completely successful or failed, would never leave some data), consistency (once the application performs an operation, the operating results after it is visible in each operation), isolation, unfinished operations (a user will not have accident impact on other users), and persistence (once a operation is completed, It will hold, even in the face of machine failure or system failure. These features have long been considered part of the transactional capabilities of database systems.

As of 0.13, Hive only provided atomicity, consistency, and persistence at the partition level. Isolation can be provided by turning on one of the available locking mechanisms (Zookeeper or memory). With the addition of transactions in Hive 0.13, full ACID semantics can now be provided at the row level so that one application can add rows while another application reads the same partition without interfering with each other.

The feature of adding ACID semantic transactions to Hive can be used in the following scenarios:

  1. Stream data collection. Many users use tools such as Apache Flume, Apache Storm, or Apache Kafka to stream data to their Hadoop clusters. While these tools can write data at a rate of hundreds or more rows per second, Hive can only add partitions every 15 minutes to an hour. Adding partitions at shorter intervals results in a large number of partitions in the table. These tools can stream data to an existing partition, but this will result in dirty reads (that is, seeing the data that will be written after the query is started) and leaving many small files in the directory, which will stress the NameNode. With this transaction capability, this scenario is supported while ensuring that reads get a consistent view of the data and avoid excessive files.
  2. Slowly changing dimensions (tables). In a typical data warehouse with a star model, the dimension table changes slowly over time. For example, retailers open new stores that need to be added to the store table, or existing stores may change square meters or other traceable features. These changes result in either inserting a single record or updating a record, depending on the policy chosen. Starting with 0.14, Hive can support this.
  3. Data updates start with Hive0.14, and these use cases can be supported through INSERT, UPDATE, and DELETE.
  4. Use the SQL MERGE statement to implement batch updates.

3 limit

  1. All DML operations are committed automatically, and BEGIN, COMMIT, and ROLLBACK are not yet supported. Support for these features is planned for future releases.
  2. In the first release only the ORC file format was supported. The transaction feature was built to allow you to use any storage format that determined how to update or delete the base record (basically, with explicit or implicit row ids), but so far only integration for ORCs has been done.
  3. By default, transactions are configured to be OFF. For a discussion of configuration values, see the configuration section below.
  4. The ACID table is not allowed to be read/written from a non-ACID session. In other words, to operate the ACID table, must set the Hive affairs manager to org.. Apache hadoop. Hive. Ql. Lockmgr. DbTxnManager.
  5. Only the snapshot isolation level is now supported. When a given query is started, it provides a consistent snapshot of that data. Dirty reads (READ uncimadmitted), COMMITTED reads (READ COMMITTED), REPEATABLE READ or SERIALIZABLE are not supported. BEGIN was introduced to support snapshot isolation over the duration of a transaction, not just a query. Additional isolation levels can be added upon user request.
  6. Existing ZooKeeper and memory lock managers are incompatible with transactions. We have no intention of dealing with the problem. For a discussion of how to store locks for transactions, see basic design below.
  7. ACID tables do not support ALTER TABLE Schema changes. For details, see Hive-11421. This defect was fixed in Hive 1.3.0/ Hive 2.0.0.
  8. Use Oracle as Metastore DB and “datanucleus. ConnectionPoolingType = BONECP”, may be incidental to produce “No to lock…” “And” No to the transaction… The error. In this case, it is suggested that setting “datanucleus. ConnectionPoolingType = DBCP.
  9. LOAD DATA … The statement does not support transactional tables and will not execute correctly until Hive-16732.

4 Grammatical changes

  1. Starting from Hive0.14,INSERT… VALUES, UPDATE, and DELETE have been added to the SQL syntax. See the Language Manual DML for details.
  2. 2. Some new commands have been added to Hive’s DDL to support ACID and transactions, while others modify existing DDL statements.
  3. Add a new command SHOW TRANSACTIONS. For details, see SHOW TRANSACTIONS.
  4. Added a new command SHOW COMPACTIONS, see SHOW COMPACTIONS.
  5. Changed the SHOW LOCKS command to provide information about LOCKS and transactions. If you’re using Zookeeper or a memory lock manager, you’ll notice no difference in the output of this command. See Show Locks for details.
  6. ALTER TABLE adds a new option for merging tables or partitions. In general, the user does not need to request a merge because the system detects the merge requirement and starts performing the merge. However, if TABLE merge is disabled, or if the user wants to merge the TABLE without system selection, then ALTER TABLE can be used to initiate the merge. See Alter Table/Partition merge for details, which puts the task in the merge queue and returns it. You can use SHOW COMPACTIONS to see the progress of the merge.
  7. Add a new command to terminate a transaction: ABORT TRANSACTIONS, see ABORT TRANSACTIONS.

5. Implementation principle

Hive file storage is based on HDFS files. Therefore, transaction updates of HDFS files are not performed directly.

This means that the HDFS file is used as the original data and the Delta file is used as the record of the operation log, which means that the new, updated, and deleted records are stored in the incremental file. When accessing Hive data, merge the HDFS file and delta file to query the latest data. Each transaction (or each batch transaction of a stream agent (such as Flume or Storm) creates a new set of incremental files to change tables or partitions. At read time, the base and delta files are merged and updated and deleted changes are applied.

5.1 presentation

Let’s create a transaction table and insert some data

CREATE TABLE ods.employee (
    id int, name string, salary int)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t'
    STORED AS ORC TBLPROPERTIES ('transactional' = 'true');
INSERT INTO employee VALUES(1.'kingcall'.10000);
INSERT INTO employee VALUES(1.'kingcall'.10000);
INSERT INTO employee VALUES(1.'kingcall'.10000);
Copy the code

Then we’ll check it out on HDFS

Ongoing transactions are maintained in a satging folder that ends in the Delta folder, and each time a transaction is executed there is a delta delta folder

Next we perform an update operation to give the employee a salary adjustment

UPDATE employee SET salary=100000 WHERE id=1;
Copy the code

You can see that all INSERT statements create the delta directory. The UPDATE statement also creates the delta directory, but first creates a DELETE directory, that is, delete and then insert. The delete directory prefix is delete_delta;

Each transaction folder has two files, a data file and a transaction information file

You can view data directly in Hive. You can view transactions directly in Hive using the row__ID virtual column. You can also view transactions using the ORC file tool

5.2 Compactor

The aggregator is a set of background processes that run within Metastore and support ACID systems. It is composed of Initiator (Initiator), Worker, Cleaner, AcidHouseKeeperService and several other parts.

Incremental file merge

As the table is modified, more and more incremental files are created, as you can see above, which need to be merged to maintain adequate performance. There are two types of merges, minor and major:

1. Minor Compaction compacts all 'delta' files into a single file and 'delete' into a single Compaction. The compressed result file name contains the write transaction ID range 'ALTER TABLE Employee COMPACT 'minor'; 2. Major Compaction compacts all files into one file. Major mergers are costlier but more efficient.Copy the code

All merges are done behind the scenes without preventing concurrent reads and writes of data. After the merge, the system deletes the old files after all the read operations are complete.

Initiator (Initiator)

This module is responsible for discovering which tables or partitions need to be merged. Configuration parameters have to be included in Metastore hive.com pactor, initiator. On this module, the new configuration parameters of the “transaction” for *. There are several forms of threshold properties, control when creating a merge and task execution which type of merger. Each merge task processes one partition (or the entire table if the table is not partitioned). If a given partition continuous merger failure more than hive.compactor.initiator.failed.com pacts. The threshold, the partition automerge scheduling will cease. For more information, see The Configuration Parameter Table.

Worker

Each Worker handles a single merge task. A merge task is a MapReduce job. Its name is in the following format: -compactor-

Cleaner

A process that deletes deltas after a merge determines that they are no longer needed.

AcidHouseKeeperService

This process looks for transactions that have no heartbeat during hive.txn.timeout and aborts them. The system assumes that the client that started the transaction stops heartbeat and crashes, and that its locked resources should be released.

SHOW COMPACTIONS

This command displays recent history (configurable retention period) information about currently running merges and merges. Starting from hive-12353, merged historical records are displayed.

For more information about this command and output, see LanguageManual DDL#Show Compactions. The parameters that affect the output of this command are described in the Transaction New Configuration Parameters/Merge History configuration properties. The system keeps the last N entries for each type: failure, success, and attempt (where N is configurable for each type).

Transaction/lock manager

Hive adds a new logical concept called “transaction Manager”, which incorporates the previous concept of “database/table/partition lock Manager” (hive.lock.Manager, The default value is org. Apache. Hadoop. Hive. Ql. Lockmgr. Zookeeper. ZooKeeperHiveLockManager). The transaction manager is now also responsible for managing transaction locks. The default DummyTxnManager mimicked the behavior of older Hive versions: no transactions, and uses the hive.lock.Manager property to create lock managers for tables, partitions, and databases. The newly added DbTxnManager uses DbLockManager to manage all locks/transactions in Hive Metastore (transactions and locks are persistent in the event of a server failure). This means that when transactions are enabled, there is no longer any behavior that previously locked Zookeeper. To prevent the client from dying and leaving the transaction or lock pending, periodically send heartbeats to Metastore from the lock holder and transaction initiator. If the heartbeat is not received within the configured time, the lock or transaction is aborted.

Starting from the Hive 1.3.0, DbLockManger may by controlling the Hive. Lock. Numretires and Hive. Lock. Sleep. 6 set retries to try to get the lock time. When the DbLockManager cannot acquire the lock (due to competing locks), it exits and tries again after a certain period of time. In order to support ad-hoc queries that run in a short time without straining Metastore, DbLockManager doubles the wait time after each retry. Initial back time of 100 ms, and the hive. Lock. Sleep. 6 retries for ceiling. Hive.lock. numretries is the total number of times it will retry the lock request. Therefore, the total time that the call to acquire the lock will block (given 100 retries and 60 sleep times) is (100 ms 200 ms 400 ms.51200 ms 60s.. 60s)=91 min: 42 SEC: 300 ms.

For details on the locks used in the lock manager, see Show LOCKS.

Note that a lock manager using DbTxnManager will acquire locks on all tables, even those that do not have the “Transactional = True” attribute set. By default, an INSERT operation on a non-transactional table acquires an exclusive lock, preventing other inserts and reads. Although technically correct, but the Hive and the traditional way of working (for example, w/o lock manager) is different, for backward compatibility, provides a Hive. TXN. Strict. Locking. The mode (see table below) model, which will make the lock manager on the insert of non-transactional table access to a Shared lock. This restores the previous semantics while still providing the benefits of the lock manager, such as preventing tables from being deleted when they are read.

Note that inserts always acquire shared locks for transactional tables because these tables implement the MVCC architecture at the storage tier and provide strong consistency of reads (snapshot isolation) even if concurrent modification operations exist.

8 Summary of transaction parameters

Configure the properties to enable transaction support, if you are just learning to do so, but the company recommends not enabling it, only enabling it in sessions when needed

<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
</property>
<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>
Copy the code

Many new configuration parameters have been added to the system to support transactions.

Configuration is the key The default value location annotations
hive.txn.manager See notes Client/HiveServer2 Default: org. Apache. Hadoop. Hive. Ql. Lockmgr. DummyTxnManager support firms need to value: Org, apache hadoop. Hive. Ql. Lockmgr. DbTxnManager realize DummyTxnManager hive behavior before 0.13, and does not provide transaction support.
hive.txn.strict.locking.mode true Client/HiveServer2 In strict mode non-ACID resources use standard R/W lock semantics, such as INSERT will acquire exclusive locks; In non-strict mode, for non-ACID resources, INSERT will only acquire the shared lock, which allows two concurrent writes to the same partition, but still lets the lock manager prevent DROP tables, etc. (from Hive 2.2.0), from being written to the TABLE.
hive.txn.timeout 300 Client/HiveServer2 /Metastore How long (in seconds) before a transaction is declared aborted if the customer does not send a heartbeat. It is critical that this property have the same value for all components/services. (5)
hive.txn.heartbeat.threadpool.size 5 Client/HiveServer2 Number of threads used for heartbeat (fromHive 1.3.0 and 2.0.0Start).
hive.timedout.txn.reaper.start 100 seconds Metastore After Metastore is started, the delay for starting the first reaper (the process that aborts timeout transactions) (starting from Hive 0.13). Used to control the AcidHouseKeeperServcie mentioned above.
hive.timedout.txn.reaper.interval 180 seconds Metastore Describes the interval at which a harvest program (the process that aborts a timed out transaction) runs frequently. (Starting from Hive 0.13). Used to control the AcidHouseKeeperServcie mentioned above.
hive.txn.max.open.batch 1000 The customer The maximum number of transactions can be obtained in a single call to open_txns (). (note 1)
hive.max.open.txns 100000 HiveServer2 /Metastore The maximum number of open transactions. If currently opened transactions reach this limit, future opened transaction requests will be rejected until the number falls below the limit. (From Hive 1.3.0 and 2.1.0)
hive.count.open.txns.interval 1 second HiveServer2 /Metastore Check the interval (in seconds) at which the transaction count is turned on (starting with Hive 1.3.0 and 2.1.0).
hive.txn.retryable.sqlex.regex “(empty string) HiveServer2 /Metastore A comma-separated set of regular expressions used to describe SQL state, error codes, and retried SQLExceptions error messages, which are suitable for Hive Metastore databases (fromHive 1.3.0 and 2.1.0). See exampleConfiguration properties.
hive.compactor.initiator.on false Metastore The default value is false, or true if transactions are supported. Whether to run initiator and cleaner threads on this Metastore instance. Prior to Hive1.3.0, it was critical to enable it on a separate Metastore service instance (not yet enforced). Starting with Hive1.3.0, this property can be enabled on any number of independent Metastore instances.
hive.compactor.worker.threads 0 Metastore How many merge worker threads are running on this MetaStore instance. (Note 2) The default value is 0 and greater than 0 on at least one Metastore instance when transactions are supported.
hive.compactor.worker.timeout 86400 Metastore How long (in seconds) does the merge job run before it is declared a failure and requeued.
hive.compactor.cleaner.run.interval 5000 Metastore The interval (milliseconds) at which the cleanup thread runs. (Hive 0.14.0And later versions).
hive.compactor.check.interval 300 Metastore Interval in seconds to check whether a table or partition needs to be merged. (3)
hive.compactor.delta.num.threshold 10 Metastore Number of differential directories that cause minor /minor /minor merges in a table or partition.
hive.compactor.delta.pct.threshold 0.1 Metastore The percentage of incremental files that trigger a major/deep /major merge task relative to the size of the base file. 1 = 100%, default 0.1 = 10%.
hive.compactor.abortedtxn.threshold 1000 Metastore The number of aborted transactions involving a given table or partition that trigger a major/deep /major merge task.
hive.compactor.max.num.delta 500 Metastore The maximum number of incremental files that can be attempted in a single merge job (starting with Hive1.3.0). (4)
hive.compactor.job.queue “(empty string) Metastore Use to specify the name to submit the merge job to the Hadoop queue. Set to an empty string, Hadoop selects the queue (starting with Hive1.3.0).
Merge history
hive.compactor.history.retention.succeeded 3 Metastore The number of successfully merged entries (per partition) to keep in history.
hive.compactor.history.retention.failed 3 Metastore The number of merge failed entries (per partition) to keep in history.
hive.compactor.history.retention.attempted 2 Metastore The number of attempted merge entries (per partition) to keep in history.
hive.compactor.initiator.failed.compacts.threshold 2 Metastore The number of consecutive merge failures for a given partition after which the Initiator will stop automatically scheduling merge. At this point, you can still use ALTER TABLE to start the merge. As soon as the manually initiated merge is successful, the automatically initiated merge is resumed. Please note that this value must be less than hive.com pactor. History. Retention. Failed.
hive.compactor.history.reaper.interval 2M Metastore Controls how often the process that clears compactions history is run.

Note 1: Hive.txn.max.open. batch Specifies the transactions that the control flow agent (such as Flume or Storm) opens at the same time. The stream agent then writes that number of entries to a single file (per Flume agent or Storm Bolt). Therefore, increasing this value reduces the number of incremental files created by the stream proxy. But it also increases the number of open transactions that Hive must keep track of at any given time, which can have a negative impact on read performance.

Note 2: The worker thread generates a MapReduce job to perform the merge. They do not perform the merge themselves. Increasing the number of worker threads reduces the amount of time that tables or partitions need to be merged. It will also increase the background load on the Hadoop cluster as more MapReduce jobs run in the background. Each merge can process one partition (or the entire table if there are no partitions).

Note 3: Reducing this value reduces the time required to initiate a merge for the table or partition that needs to be merged. However, checking whether a merge is required requires a NameNode call on each table or partition to confirm whether each table or partition has been transacted since the last major/deep/Major merge. Therefore, lowering this value increases the load on NameNode.

Note 4: If the merge program detects a very large number of incremental files, it first runs a few small partial merges (in the current order) and then performs the actual requested merge.

Note 5: If the values are not the same, the active transaction may be determined as “timed out” and therefore aborted. This can also lead to things like “No such transaction…” “No such lock…” “And so on.

CREATE TABLE table_name (
  id                int,
  name              string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
  "compactor.mapreduce.map.memory.mb"="2048",                   -- Specifies the attributes of the merged map job
  "compactorthreshold.hive.compactor.delta.num.threshold"="4",  If there are more than four deltas, a light merge is triggered
  "compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" Deep merge is triggered if the ratio of the size of the incremental file to the size of the underlying file is greater than 50%
);
Copy the code

For example, set request-level merge options in TBLPROPERTIES

ALTER TABLE table_name COMPACT 'minor'
   WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="3072");  -- Specifies the attributes of the merged map job

ALTER TABLE table_name COMPACT 'major'
   WITH OVERWRITE TBLPROPERTIES ("tblprops.orc.compress.size"="8192");         Change any other Hive table properties
Copy the code

conclusion

  1. All INSERT statements are createddeltaDirectory. UPDATE statements are also createddeltaDirectory, but one will be created firstdeleteDirectory, that is, delete first, then insert.deleteThe directory prefix is delete_delta;
  2. Each transaction folder has two files, a data file and a transaction information file