This article is a translation of a series of technical articles by Databricks on Delta Lake. Databricks is known for leading the open source big data community Apache Spark, Delta Lake, ML Flow and other hot technologies, and Delta Lake as the core storage engine solution for data lakes brings many advantages to enterprises. This series of technical articles will cover Delta Lake in detail.

preface

This article is a translation of a series of technical articles by Databricks on Data Lake Delta Lake. Databricks is known for leading the open source big data community Apache Spark, Delta Lake, ML Flow and other hot technologies, and Delta Lake as the core storage engine solution for data lakes brings many advantages to enterprises.

In addition, Ali Cloud partnered with Apache Spark and Delta Lake’s original Databricks engine team to launch the enterprise version of The fully managed Spark product, Databricks Data Insight, based on Ali Cloud. The product is a native integrated enterprise version of the Delta Engine that provides high-performance computing capabilities without additional configuration. Interested students can search ` Databricks data insight ` or ` ali cloud Databricks ` enter the website, or directly to www.aliyun.com/product/big… Learn more.

Peng Zhang is a technical expert of Ali Cloud Computing Platform Business Division

Delta Lake Technology Series – Features

Use Delta Lake’s stable features to reliably manage your data

directory

  • Chapter-01 Why Is Delta Lake MERGE Used?

  • Chapter-02 Simple, reliable update and delete operations on Delta Lake tables using the Python API

  • Chapter-03 Time Travel function of large Data Lake

  • Chapter-04 Easily clone your Delta Lake for testing, data sharing, and repeated machine learning

  • Chapter-05 Enable DDL and DML statements of Spark SQL in Delta Lake on Apache Spark

Content of this article

The Delta Lake ebook series, published by Databricks and translated by the Big Data Ecology Enterprise team of Ali Cloud Computing Platform Division, aims to help leaders and practitioners understand the full capabilities of Delta Lake and the scenarios in which it is located. In this article **Delta Lake Series – Features **, the Features of Delta Lake are highlighted.

subsequent

By the end of this article, you can understand not only what Delta Lake offers, but how these features can lead to substantial performance improvements.

What is Delta Lake?

Delta Lake is a unified data management system that brings data reliability and rapid analysis to on-cloud data lakes. Delta Lake runs on top of an existing data Lake and is fully compatible with The Apache Spark API.

In Databricks, we saw how Delta Lake brings reliability, performance, and lifecycle management to a data Lake. Our customers have verified that Delta Lake solves the challenges of extracting data from complex data formats, the difficulty of removing data that meets requirements, and the problems associated with modifying data for data capture purposes.

By using Delta Lake, you can speed up the delivery of high-quality data to the data Lake, and your team can quickly use that data on a secure and scalable cloud service.

Chapter-01 Why Is Delta Lake MERGE Used?

Delta Lake is a next-generation engine built on Top of Apache Spark that supports the MERGE command, which allows you to efficiently upload and delete records in the data Lake.

The MERGE command greatly simplifies the way many common data pipelines are built – all the inefficient and complex multi-hop steps of rewriting an entire partition can now be replaced by simple MERGE queries.

This fine-grained update capability simplifies how to build big data pipelines for a variety of use cases, from change data capture to GDPR. You no longer need to write complex logic to overwrite tables and overcome snapshot isolation.

As data changes, another important feature is the ability to roll back in the event of bad writes. Delta Lake also provides rollback functionality with time-travel features, so if you merge incorrectly, you can easily roll back to an earlier version.

In this chapter, we discuss common use cases where you need to update or delete existing data. We will also explore the challenges inherent in new additions and updates and show how MERGE solves these challenges.

When do YOU need upserts?

In many common scenarios, existing data in the data lake needs to be updated or deleted:

  • Compliance with the General Data Protection Regulation (GDPR) : With the introduction of the data forgetting rule (also known as data erasure) in the GDPR, organizations must delete user information upon request. Data erasure also involves deleting user information from the data lake.

  • Change data obtained in traditional databases: In service-oriented architectures, typical Web and mobile applications use microservice architectures that are typically built on traditional SQL/NoSQL databases with low latency performance. One of the biggest challenges organizations face is connecting many isolated data systems, so data engineers build pipelines that consolidate all data sources into a central data lake to speed up analysis. These pipes must periodically read changes made to traditional SQL/NoSQL tables and apply them to the corresponding tables in the data lake. Such changes can take many forms: slow-changing tables, data changes that insert/update/delete data, and so on.

  • Conversational ** : ** Grouping multiple events into a single session is a common example in many areas, from product analysis, to targeted advertising, to predictive maintenance. It is difficult to build continuous applications to track sessions and record the results of writing to a data lake because the data lake is often optimized for additional data.

  • Deduplication ** : ** A common data pipeline use case is to collect system logs to the Delta Lake table by appending data. But data sources typically generate duplicate records and require downstream deletions of duplicate data to process them.

Why has upserts traditionally been challenging for data lakes

Because data lakes are basically file-based, they are often optimized for new data rather than changes to existing data. So building the above use cases has been challenging.

Users typically read the entire table (or a subset of partitions) and then overwrite it. Therefore, every organization tries to reinvent the wheel by writing complex query SQL, Spark, etc., to meet their needs. The characteristics of this method are:

  • Inefficient ** : ** Reading and rewriting entire partitions (or entire tables) to update very few records results in a slow and costly pipeline. Manually tuning table layouts and optimizing queries is tedious and requires deep domain knowledge.

  • Error potential ** : ** Writing code to modify data is prone to logical and human error. For example, multiple pipes modifying the same table simultaneously without any transaction support can lead to unpredictable data inconsistencies and, in the worst case, data loss. Often, even a single handwritten pipe can cause data corruption due to errors in the business logic.

  • Difficult to maintain ** : ** Fundamentally, this type of handwritten code is difficult to understand, track, and maintain. In the long run, this alone will significantly increase organizational and infrastructure costs.

This section describes the MERGE command in Delta Lake

Using Delta Lake, you can easily solve the above use cases with the following MERGE command without experiencing any of the above problems:

Let’s walk through a simple example to see how to use MERGE. Suppose you have a slowly changing user data table that maintains user information such as addresses. In addition, you have a new address table for existing and new users. To merge all the new addresses into the primary user table, run the following command:

MERGE INTO users USING updates ON users.userId = updates.userId WHEN MATCHED THEN UPDATE SET address = updates.addresses  WHEN NOT MATCHED THEN INSERT (userId, address) VALUES (updates.userId, updates.address)Copy the code

This is perfectly syntactic – for existing users (that is, the MATCHED clause) it will update the ADDRESS column, and for new users (that is, the NOT MATCHED clause) it will insert all columns. For large data tables with terabytes in size, Delta Lake MERGE is N orders of magnitude faster than overwriting an entire partition or table because Delta Lake only reads the relevant files and updates them. Specifically, Delta Lake’s MERGE command has the following advantages:

  • Fine-grained: This operation overwrites data at file rather than partition granularity, which solves all the complexities of overwriting partitions, updating Hive metadata using MSCK, and so on.

  • Efficient: Delta Lake’s data skip feature makes MERGE more efficient at finding files to rewrite, eliminating the need to manually tune pipes. In addition, Delta Lake has optimized all I/O and processing so that MERGE reads and writes all data significantly faster than similar operations in Apache Spark.

  • Transactional: Delta Lake uses optimistic concurrency control to ensure that concurrent writers update data correctly using ACID transactions, while concurrent readers always see a consistent snapshot of the data.

Below is a visual comparison of MERGE versus handwritten pipes.

Use MERGE to simplify use cases

Data is deleted in compliance with the GDPR

Complying with the GDPR’s “right to be forgotten” provision does not make any processing of data in the data Lake easy. You can use the sample code to set up a simple scheduled job, as shown below, to remove all users who opt out of the service.

MERGE INTO users
USING opted_out_users
ON opted_out_users.userId = users.userId 
WHEN MATCHED THEN DELETE
Copy the code

Data change application in database

You can easily apply all data changes (updates, deletes, inserts) from external databases to Delta Lake tables using the MERGE syntax, as shown below:

MERGE INTO users USING ( SELECT userId, latest.address AS address, latest.deleted AS deleted FROM ( SELECT userId, MAX(struct(TIME, address, deleted)) AS latest FROM changes GROUP BY userId ) ) latestChange ON latestChange.userId = users.userId WHEN MATCHED AND  latestChange.deleted = TRUE THEN DELETE WHEN MATCHED THEN UPDATE SET address = latestChange.address WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN INSERT (userId, address) VALUES (userId, address)Copy the code

Updates session information from the Streaming channel

If you have incoming data for stream events and want to sessize the stream event data while incrementally updating the session and storing it in the Delta Lake table, you can do this using structured data Flow and foreachBatch in MERGE. For example, suppose you have a structured streaming data framework that computes updated session information for each user. You can start a stream query in all session applications to update the data to the Delta Lake table, as shown below (in Scala).

streamingSessionUpdatesDF.writeStream .foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) = > microBatchOutputDF. CreateOrReplaceTempView (" updates ") microBatchOutputDF. SparkSession. SQL (" s "" MERGE INTO sessions USING updates ON sessions.sessionId = updates.sessionId WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ", "")}. The start ()Copy the code

Chapter-02 Simple, reliable update and delete operations on Delta Lake tables using the Python API

In this chapter, we will demonstrate how to use Python and the new Python API in Delta Lake in an airplane schedule scenario. We’ll show how to add, update, and delete data, how to use the Time Travle feature to query older versions of data, and how to clean older versions.

Getting started with Delta Lake

Delta Lake packages can be installed via PySpark’s — Packages option. In our example, we will also demonstrate the ability to execute the Delta Lake SQL command in a VACUUM file and Apache Spark. Since this is a short demonstration, we will also enable the following configuration:

spark.databricks.delta.retentionDurationCheck.enabled=false
Copy the code

Allows us to clean files for less than the default retention period of 7 days. Note that this is only required for the SQL command VACUUM.

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
Copy the code

Enable the Delta Lake SQL command in Apache Spark. This is not required for Python or Scala API calls.

/bin/pyspark -- Packages IO. Delta :delta-core_2.11:0.4.0 --conf "Spark. Databricks. Delta. RetentionDurationCheck. Enabled = false "-- -- the conf" spark. SQL. Extensions = IO. Delta. SQL. DeltaSparkSessionExtension"Copy the code

Load and save Delta Lake data

This time, on-time flight data or departure delay data will be used, which are generated from the RITA BTS Flight Departure Statistics Center; Some examples of these data include 2014 Flight Departure Performance via D3.js Crossfilter and on-time Flight data with graphical structure for Apache Spark. In PySpark, the data set is read first.

# the Location variables tripdelaysFilePath = ". / root/data/departuredelays CSV "pathToEventsTable = "Delta"/root/deltalake/departureDelays # Read flight delay data departureDelays = spark. Read \. Option (" header ", Option (" inferSchema ", "true") \.csv(tripdelaysFilePath)Copy the code

Next, we saved the departure delay data to the Delta Lake table. In the process of saving, we were able to take advantage of its advantages including ACID transactions, unified batching, Streaming and Time Travel.

# Save flight delay data into Delta Lake format 
departureDelays \
.write \
.format(“delta”) \
.mode(“overwrite”) \ 
.save(“departureDelays.delta”)
Copy the code

Note that this approach is similar to the common way of saving Parquet data. Now you specify the format (” delta “) instead of specifying the format (” parquet “). If you look at the underlying file system, you’ll notice that four files have been created for Delta Lake’s departure delay table.

/departureDelays.delta$ ls -l
.
..
_delta_log 
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet 
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet 
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet 
Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
Copy the code

Now, let’s reload the data, but this time our data format will be supported by Delta Lake.

# Load flight delay data in Delta Lake format delays_delta = spark \.read \ . The load (" departureDelays. Delta ") # Create temporary view delays_delta. CreateOrReplaceTempView # How many (" delays_delta ") SQL (" select count(1) from delays_delta where origin = 'SEA' and Destination = "SFO"). The show ()Copy the code

Running results:

Finally, we determined the number of flights from Seattle to San Francisco; In this data set, there are 1,698 flights.

Switch to Delta Lake immediately

If you have existing Parquet tables, you can convert them to the Delta Lake format, eliminating the need to rewrite the table. If you want to transform tables, you can run the following command.

Tables import * # Convert non partitioned parquet table at path '/path/to/table' deltaTable = DeltaTable.convertToDelta(spark, "Parquet. '/path/to/table'") # Convert partitioned parquet table at path '/path/to/table' and partitioned by INTEGER The column named 'parts' partitionedDeltaTable = DeltaTable. ConvertToDelta (spark, "parquet. ` / path/to/table `", "part int")Copy the code

Delete our flight data

To remove data from a traditional data lake table, you will need:

  1. Select all data from the table, excluding rows to be deleted

  2. Create a new table based on the above query

  3. Delete original table

  4. Instead of doing all these steps, rename the new table to the original table name to get downstream dependencies. With Delta Lake, we can simplify this process by running the DELETE statement. To demonstrate this, let’s delete all flights that arrive early or on time (that is, delay <0).

    from delta.tables import * from pyspark.sql.functions import *

    Access the Delta Lake table

    deltaTable = DeltaTable.forPath(spark, pathToEventsTable )

    Delete all on-time and early flights

    DeltaTable. Delete (” delay “0”)

    How many flights are between Seattle and San Francisco

    Spark.sql (” select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ “).show()

As you can see from the query above, we removed all on-time and early morning flights (for more information, see below) and 837 flights from Seattle to San Francisco were delayed. If you look at the file system, you’ll notice that even if you delete some data, there are still more files.

/departureDelays.delta$ ls -l
_delta_log 
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet 
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet 
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet 
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet 
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet 
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet 
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet 
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
Copy the code

In a traditional data lake, deletion is performed by rewriting the entire table, excluding the values to be deleted. With Delta Lake, you can perform the deletion by selectively writing a new version of the file that contains the data to be deleted, while marking only the previous file as deleted. This is because Delta Lake uses multi-version concurrency control (MVCC) to perform atomic operations on tables: for example, while one user is deleting data, another user may be querying a previous version. This multi-version model also allows us to go back in time (i.e., time Travel) and query previous versions, a feature we’ll see later.

Update our flight data

To update data in a traditional data lake table, you need:

  1. Select all data from the table, excluding the rows you want to modify.

  2. Modify the rows that need to be updated/changed

  3. Merge the two tables to create a new table

  4. Delete original table

  5. Rename the new table to the original table name for downstream dependencies

Instead of the above steps, with Delta Lake we can simplify this process by running the UPDATE statement. To show this, let’s update all flights from Detroit to Seattle.

# Originating from Detroit to now be originating from Seattle deltaTable. Update (" origin = 'DTW' ", {" origin ": "' SEA '"}) # How many flights are between Seattle and San Francisco spark. SQL (" select count(1) from delays_delta where Origin = 'SEA' and destination = 'SFO' ").show()Copy the code

Now Detroit has been tagged as Seattle, and now we have 986 flying from Seattle to San Francisco. If you want to list your departure delay file system (i.e. $.. /departureDelays/ ls-l), you will notice that there are now 11 files (instead of 8 after deletion and 4 after table creation).

Merge our flight data

When using a data lake, it is common to append data continuously to a table. This usually results in data duplication (you don’t want to insert it into the table again), new rows that need to be inserted, and some rows that need to be updated. With Delta Lake, all of this can be done by using MERGE operations, similar to SQL MERGE statements.

Let’s start with a sample data set that you will update with the following query to insert or remove duplicates.

SQL (" select * from delays_delta where origin = 'SEA' and Destination = 'SFO' and date like '1010%' LIMIT 10 ").show()Copy the code

The output of this query is shown in the following table. Note that color coding has been added to clearly identify which rows are deleted duplicates (blue), updated data (yellow), and inserted data (green).

Next, let’s generate our own Merge_table that contains duplicate data that will be inserted, updated, or deleted. Take a look at the following code snippet

Items = [(1010710, 31, 590, "SEA", "SFO"), (1010521, 10, 590, the "SEA", "SFO"), (1010822, 31, 590, "SEA", 'SFO')] cols = [' date ', 'delay', 'distance', 'origin', 'destination'] merge_table = spark. CreateDataFrame (items, cols) merge_table.toPandas()Copy the code

In the above table (merge_table), there are three rows with different date values:

  1. 1010521: This row needs to update the schedule table with the new delay value (yellow).

  2. 1010710: This line is repeated (blue)

  3. 1010832: This is the new row to insert (green)

With Delta Lake, this can be done easily with merge statements, as shown in the code snippet below.

Alias (" flights ") \. Merge (merge_table. Alias (" updates "), "flights.date = Updates.date ") \. WhenMatchedUpdate (set = {" delay ": "Updates.delay"}) \. WhenNotMatchedInsertAll () \.execute() # What flights between SEA and SFO for these periods Spark. SQL (" select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' LIMIT 10 "). The show ()Copy the code

A single statement can effectively delete duplicate data, update data, and insert data.

View the data table history

As mentioned earlier, after each transaction we do (delete, update), more files are created in the file system. This is because there are different versions of the Delta Lake table for each transaction.

This can be seen by using the deltatable.history () method, as shown below.

Note that you can also perform the same task using SQL:

Spark.sql (” DESCRIBE HISTORY ‘” + pathToEventsTable + “”).show()

As you can see, for each operation (create table, delete, and update), there are three rows representing different versions of the table (the simplified version is below to help simplify reading) :

Retrace the history of the data table

With Time Travel, you can view Delta Lake tables with versions or timestamps. To view historical data, specify version or timestamp options. In the following code snippet, we specify the version option.

# Load DataFrames for each version dfv0 = spark.read-format (" delta ").option(" versionAsOf ", 0). The load (" departureDelays. Delta ") dfv1 = spark. Read the format (" delta "). The option (" versionAsOf ", 1.) the load (" departureDelays. Delta ") dfv2 = spark. Read the format (" delta "). The option (" versionAsOf ", 2). Load (" departureDelays. Delta ") # Calculate the SEA to SFO flight counts for each version of history CNT0 = Where (" origin = 'SEA' ").count() cnt1 = dfv1. Where (" origin = 'SEA' ").count() cnt1 = dfv1 = 'SFO' ").count() cnt2 = dfv2.where(" origin = 'SEA' ").count() # Print out the value Print (" SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s "% (cnt0, CNT1, cnT2)) Create Table: 1698, Delete: 837, Update: 986Copy the code

Whether used for governance, risk management, compliance (GRC), or rollback in case of an error, Delta Lake tables contain metadata (for example, recording the fact that the operator deleted) and data (for example, the rows that were actually deleted). But how do we delete data files for compliance or size reasons?

Use VACUUM to clean old tables

By default, the Delta Lake Vacuum method will delete all rows (and files) that exceed the 7-day reference period. If you look at the file system, you’ll notice that the table has 11 files.

/departureDelays.delta$ ls -l _delta_log
 
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet 
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet 
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet 
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet 
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet 
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet 
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet 
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet 
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet 
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet 
Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Copy the code

To delete all files so that only a snapshot of the current data is left, you can vacuum the method to specify a smaller value (instead of the default 7 days).

# Remove all files older than 0 hours old. deltaTable.vacuum(0) Note, you perform the same task via SQL syntax: Tobias # Remove all files older than 0 hours old spark. SQL (" VACUUM "+" pathToEventsTable "+" RETAIN 0 hours ")Copy the code

When you view the file system after the cleanup is complete, you will see fewer files because the historical data has been deleted.

/departureDelays.delta$ ls -l
_delta_log 
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet 
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet 
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet 
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Copy the code

Note that after vacuum is run, the ability to backtrack to earlier versions than the retention period will be disabled.

Chapter-03 Time Travel function of large Data Lake

Delta Lake offers Time Travel. Delta Lake is an open source storage layer that brings reliability to the data Lake. Delta Lake provides ACID transactions, scalable metadata processing, and batch stream integrated data processing. Delta Lake runs on top of your existing data Lake and is fully compatible with the Apache Spark API.

With this feature, Delta Lake will automatically version control the big data you store in the Data Lake, and you can access any historical version of that data. This AD hoc data management simplifies your data pipeline, including simplifying auditing, rolling back data in case of mistaken writes or deletions, and reproducing experiments and reports.

Your organization can finally standardize on a clean, centralized, versioned repository of big data on the cloud for analysis.

Common challenges to changing data

  • Auditing data changes ** : ** Auditing data changes is critical both for data compliance and for simple debugging to understand how data changes over time. In this case, traditional data systems are turning to big data technology and cloud services.

  • Reproducing experiments and reporting ** : ** During model training, the data scientist performs various experiments with different parameters on a given data set. When scientists revisit the experiment after a period of time to reproduce the model, often the source data has been modified by the upstream pipeline. A lot of times they don’t know that these upstream data have changed, so it’s hard to reproduce their experiments. Some scientists and the best engineers increase the cost of storage by creating multiple copies of the data. The same is true for the analysts who produce the reports.

  • Rollback ** : ** Data pipes sometimes write dirty data to downstream consumers. This can happen because of issues such as infrastructure instability or messy data or bugs in the pipeline. Pipes that simply append directories or tables can be easily rolled back with date-based partitioning. This can become very complex with updates and deletions, and data engineers often have to design complex pipes to deal with this situation.

Use the Time Travel feature

Delta Lake’s Time Travel feature simplifies the data pipeline construction for the above use cases. Time Travel in Delta Lake has greatly increased developer productivity. It helps:

  • Data scientists can better manage experiments

  • Data engineers simplify pipelines while rolling back dirty data

  • Data analysts can easily analyze reports

Enterprises can eventually standardize on big data repositories in clean, centralized, versioned cloud storage on which to base data analysis. We are glad that you will be able to use this feature to get things done.

When you write to a Delta Lake table or directory, each action is automatically versioned. You can access different versions of data in two different ways:

Use time stamps

Scala syntax

You can provide a timestamp or date string as an option in a DataFrame reader:

Format (" delta ").option (" timestampAsOf ", "2019-01-01").load(" /path/to/my/table ") df = spark.read \. Format (" delta ") \. Option (" timestampAsOf ", "2019-01-01") \.load(" /path/to/my/table ") SQL Syntax SELECT count(*) FROM my_table TIMESTAMP AS OF "2019-01-01" SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) SELECT count(*) FROM my_table TIMESTAMP AS OF "2019-01-01 01:30:00.000"Copy the code

If you don’t have access to the reader’s code base, you can pass input parameters to the library to read the data and roll back the data by passing timestamps in the yyyyMMddHHmmssSSS format to the table:

Val inputPath = "/path/to/my/table@20190101000000000" val df = loadData(inputPath) // Function in a library that you Don't have access to def loadData(inputPath: String) : Format (" delta ").load(inputPath)} inputPath = /path/to/my/table@20190101000000000 df = LoadData (inputPath) # Function in a library that you don't have access to loadData(inputPath): Return spark. Read \. Format (" delta ") \.load(inputPath)}Copy the code

Using version numbers

In Delta Lake, each write is given a version number, which you can also use for backtracking.

Scala syntax

Format (" delta ").option(" versionAsOf ", 5238).load(/path/to/my/table) val df = spark.read. Format (" delta ").load(" /path/to/my/table@v5238 ")Copy the code

Python syntax

Df = spark.read \. Format (" delta ") \. Option (" versionAsOf ", \ "5238"). The load ("/path/to/my/table ") df = spark read \. The format (" delta ") \. The load ("/path/to/my/table @ v5238 ")Copy the code

SQL syntax

SELECT count(*) FROM my_table VERSION AS OF 5238
Copy the code

Review data changes

You can view a HISTORY of table changes using the DESCRIBE HISTORY command or through the UI.

Redo experiments and reports

Time Travel also plays an important role in machine learning and data science. Repeatability of models and experiments is a key consideration for data scientists, as they often create hundreds of models before going into production and may want to go back to earlier models during that time-consuming process. But because data management is often separated from data science tools, it can be difficult to implement.

Databricks integrates Delta Lake’s Time Travel feature with MLflow, an open source platform for machine learning lifecycles, to solve the problem of repeatable experiments. To retrain your machine learning, you simply track the data version of each training job with the time-stamped URL path as the MLflow parameter.

This allows you to go back to earlier Settings and data sets to reproduce earlier models. You don’t have to coordinate data with upstream teams or worry about cloning data for different experiments. This is the power of unified analytics, where data science and data engineering are closely intertwined.

The rollback

Time Travel makes it easy to roll back in case of dirty data. For example, if your GDPR pipeline job has a bug that accidentally deletes user information, you can easily fix the pipeline by:

INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
You can also fix incorrect updates as follows:
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source 
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
Copy the code

If you just want to roll back to a previous version of the table, you can do so using either of the following commands:

RESTORE TABLE my_table VERSION AS OF [version_number] 
RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]
Copy the code

Fixed view of constantly updated Delta Lake tables across multiple downstream jobs

With the AS OF query, you can now fix snapshots OF constantly updated Delta Lake tables for multiple downstream jobs. Consider a situation where the Delta Lake table is constantly updated, say every 15 seconds, and a downstream operation periodically reads data from the Delta Lake table and updates different target tables. In this case, a consistent view of the source Delta Lake table is usually required so that all the target tables reflect the same state.

Now, you can easily handle the situation as follows:

SQL (" SELECT Max (version) FROM (DESCRIBE HISTORY my_table) ").collect() # Will use the latest version of The table for all operations below data = spark.table(" my_table@v%s "% version[0][0] data.where(" event_type = E1 ").write.jdbc(" table1 ") data.where(" event_type = e2 ").write.jdbc(" table2 ")... Data. The where (" the event_type = e10 "). Write. JDBC (" table10 ")Copy the code

Time series analysis queries become simple

Time Travel has also simplified Time series analysis. For example, if you want to know how many new customers were added last week, a query might be a very simple way, as follows:

SELECT count(distinct userId) - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)) 
FROM my_table
Copy the code

Chapter-04 Easily clone your Delta Lake for testing, data sharing, and repeated machine learning

Delta Lake has a table clone feature that makes it easy to test, share and recreate tables for multiple trainings of ML. Creating copies of tables in a data lake or warehouse has several practical uses. But given the amount of data in the data lake tables and how fast they grow, making a physical copy of the table is an expensive operation.

With table cloning, Delta Lake is now making the process simpler and more cost-effective.

What is cloning?

A clone is a copy of the source table at a given point in time. They have the same metadata as the source table: same table structure, constraints, column descriptions, statistics, and partitions. But they are a separate table with a separate architecture or history. Any changes made to the clone will only affect the clone table, not the source table. Due to snapshot isolation, changes to the source table that occur during or after cloning are also not reflected in the cloned table. At Delta Lake, we have two types of cloning: shallow or deep.

Shallow clone

Shallow clone (also known as zero copy) copies only the metadata of the table to be cloned; The data files of the table itself are not copied. This type of clone minimizes storage costs by not creating another physical copy of the data. Shallow clones are cheap and very fast to create.

These cloned tables do not act as data sources themselves, but rely on their source files as data sources. Shallow clones may become unavailable if you delete the source files on which the cloned table depends, for example with VACUUM. Therefore, shallow cloning is often used for short-term use cases, such as testing and experimentation.

A deep clone

Shallow clones are great for ephemeral use cases, but in some cases separate copies of table data are required. A deep clone copies the metadata of the source table and all data file information. In this sense, it functions like using the CTAS command (CREATE TABLE.. AS … SELECT…). Make a copy. But because it copies the original table to the specified version, it is easier to copy and you do not have to respecify partitions, constraints, and other information as you would with CTAS. It is also faster, more robust, and can work incrementally for failures.

With deep cloning, we will COPY additional metadata, such as streaming application transactions and COPY INTO transactions. So you can continue running ETL applications after deep cloning.

The applicable scenario of cloning?

Sometimes I wish I had a clone to help me with housework or magic tricks. But we’re not talking about human cloning here. In many cases, you need copies of the data set – to explore, share, or test ML models or analyze queries. Here are some examples of customer use cases.

Test and test with production sheet

When users need to test a new version of their data pipeline, they often rely on test data sets that are quite different from the data in their production environment. The data team may also want to experiment with various indexing techniques to improve query performance against sea tables. To do these experiments and tests in a production environment, you risk affecting online data and users.

Copying online tables for testing or development environments can take hours or even days. In addition, the development environment incurs additional storage costs for keeping all the duplicate data – setting up a test environment that reflects production data can be expensive. For shallow clones, this is trivial:

-- SQL CREATE TABLE delta. '/some/test/location' SHALLOW CLONE prod.events # Python DeltaTable. ForName (" spark ", Clone (" /some/test/location ", isShallow=True) // Scala DeltaTable. ForName (" spark ", "The prod. Events"). The clone (/ test/location ", "/ isShallow = true)Copy the code

After creating a shallow clone of the table in a few seconds, you can start running a copy of the pipe to test the new code, or try to tune the table in different dimensions, and you can see much, much better query performance. These changes only affect your shallow clone, not the original table.

Temporary storage of major changes to the production table

Sometimes, you may need to make some major changes to the production table. These changes can involve many steps, and you don’t want other users to see the changes you’ve made until you’re done. Shallow cloning can help you here:

-- SQL CREATE TABLE temp.staged_changes SHALLOW CLONE prod.events; DELETE FROM temp.staged_changes WHERE event_id is null; UPDATE temp.staged_changes SET change_date = current_date() WHERE change_date is null; . -- Perform your verificationsCopy the code

Once you are satisfied with your results, you have two options. If no changes have been made to the source table, you can replace the source table with a clone. If changes are made to the source table, the changes can be merged into the source table.

-- If no changes have been made to the source 
REPLACE TABLE prod.events CLONE temp.staged_changes; 
-- If the source table has changed
MERGE INTO prod.events USING temp.staged_changes
ON events.event_id <=> staged_changes.event_id
WHEN MATCHED THEN UPDATE SET *;
-- Drop the staged table
DROP TABLE temp.staged_changes;
Copy the code

Repeatability of machine learning results

Training effective ML models is an iterative process. In adjusting different parts of the model, the data scientist needs to evaluate the accuracy of the model against fixed data sets.

This is difficult to do, especially in systems where data is constantly being loaded or updated. A snapshot of the data is needed to train and test the model. This snapshot supports repetitive training and model governance of ML models.

We recommend using Time Travel to run multiple experiments on a snapshot; An actual example can be seen in Machine Learning Data Lineage With MLflow and Delta Lake.

When you are happy with the results and want to archive the data for later retrieval (for example, the next Black Friday), you can use deep cloning to simplify the archiving process. MLflow is very well integrated with Delta Lake, and the automatic logging feature (mlflow.spark.autolog() method) will tell you which data table version was used to run a set of experiments.

# Run your ML workloads using Python and then DeltaTable. ForName (Spark, "feature_store"). CloneAtVersion (128, "Feature_ store_bf2020")Copy the code

Data migration

A large number of tables may need to be moved to a new dedicated storage system for performance or management reasons. The original table will not receive new updates and will be deactivated and deleted at a later point in time. Deep cloning makes the replication of sea scale more robust and scalable.

-- SQL
CREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events; 
ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;
Copy the code

Because with deep cloning, we replicate the flow application transaction and COPY INTO transaction, you can continue the ETL application from exactly where you stopped after migration!

Information sharing

Within an organization, users from different departments are often looking for data sets that can be used to enrich their analysis or models. You may want to share data with other users in your organization. But rather than building complex pipes to move data into another, it’s often easier and cheaper to create copies of related data sets. These copies are intended for users to browse and test the data to determine if it is appropriate for their needs without affecting the data on your own production systems. Here again, deep cloning plays a key role.

 -- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;
Copy the code

Data archiving

All data in a table needs to be retained for a certain number of years for regulatory or archiving purposes, whereas active tables retain data for several months. If you want to update your data as soon as possible, but want to keep it for a few years, storing it in a table and doing Time travel can become very expensive.

In this case, archiving data on a daily, weekly, or monthly basis is a better solution. The incremental cloning capabilities of deep cloning will really help you here.

 -- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;
Copy the code

Note that this table will have a separate history compared to the source table, so time Travel queries on the source and clone tables may return different results depending on how often you archive.

It looks great! Any questions?

Just to reiterate some of the pitfalls mentioned above, note the following:

  • Cloning is done on your snapshot. Changes to the source table after the start of a clone are not reflected in the clone.
  • Shallow clones are not self-contained tables like deep clones. If data is removed from the source table (for example via VACUUM), your shallow clone may not be available.
  • Clones and source tables have separate histories. Time Travel queries on source and clone tables may not return the same results.
  • Shallow clones do not copy stream transactions or copy copies to metadata. Using deep clones to migrate tables, ETL processing can pick up where it left off.

How do I use it?

Shallow clones and deep clones support data teams in testing and managing how their new cloud data lakes and warehouses implement new features. Table cloning can help your team perform production-level testing of their pipelines, fine-tune indexes for optimal query performance, and create table copies for sharing – all with minimal overhead and expense. If your organization needs to do this, we’d love you to try cloning tables and provide feedback – we look forward to hearing about your new use cases and extensions in the future.

Chapter-05 Enable the Spark SQL DDL and DML functions in Delta Lake on Apache Spark 3.0

The release of Delta Lake 0.7.0 coincided with the release of Apache Spark 3.0, enabling a new set of features that were simplified using Delta Lake’s SQL capabilities. Here are some key features.

Define tables in Hive Metastore to support SQL DDL commands

You can now define Delta tables in Hive Metastore and use table names in all SQL operations when creating (or replacing) tables.

Create or replace tables

-- Create table in the metastore CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, Data STRING) USING DELTA PARTITIONED BY (date) LOCATION '/ DELTA /events' -- If a table with the same name already exists,  the table is replaced with the new configuration, else it is created CREATE OR REPLACE TABLE events ( date DATE, EventId STRING, eventType STRING, data STRING) USING DELTA PARTITIONED BY (date) LOCATION '/ DELTA /events'Copy the code

Explicitly change the table schema

-- Alter table and schema
ALTER TABLE table_name ADD COLUMNS (
    col_name data_type
        [COMMENT col_comment] 
    [FIRST|AFTER colA_name],
...)
Copy the code

You can also use the Scala/Java/Python API:

  • SaveAsTable (tableName) and DataFrameWriterV2 APIs.

  • DeltaTable. Class.forname (tableName). This API is used to create the IO delta. Name DeltaTable instance, This is useful for performing Update/Delete/Merge operations in Scala/Java/Python.

Supports SQL insert, delete, update and merge

With Delta Lake Tech Talks, one of the most common questions is when can DML operations (such as delete, update and merge) be used in Spark SQL? Don’t wait any longer, these operations are now available in SQL! Here are some examples of how to write delete, update, and merge (insert, update, delete, and deduplicated operations using Spark SQL).

-- Using append mode, you can atomically add new data to an existing Delta table INSERT INTO events SELECT * FROM newEvents -- To atomically replace all of the data in a table, you can use overwrite mode INSERT OVERWRITE events SELECT * FROM newEvents -- Delete events DELETE FROM events WHERE Date < '2017-01-01' -- Update events Update events SET eventType = 'click' WHERE eventType = 'click' -- Upsert data to A  target Delta -- table using merge MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN  UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)Copy the code

It is worth noting that merge operations in Delta Lake support more advanced syntax than the standard ANSI SQL syntax. For example, merge support

  • Delete operation – Deletes the target that matches the source row. For example, “… Pair and delete…”

  • Multiple match operations with clause conditions – more flexibility when target and data row match. Such as:

    . WHEN MATCHED AND events.shouldDelete THEN DELETE WHEN MATCHED THEN UPDATE SET events.data = updates.data

  • Star syntax – Shorthand for setting the value of a target column using a source column with similar names. Such as:

    WHEN MATCHED THEN SET * WHEN NOT MATCHED THEN INSERT * — equivalent to updating/inserting with event.date = updates.date, events.eventId = updates.eventId, event.data = updates.data

Automatic and incremental Presto/Athena list generation

Lake Delta as the Query Tables From Presto and officer, Improved Operations Concurrency, andMergePerformance described in the article, Delta Lake supports other processing engines to read Delta Lake through the manifest file. The manifest file contains the latest version of the manifest when it was generated. As described in the previous chapter, you will need:

  • Generate the Delta Lake manifest file
  • Configure Presto or Athena to read the generated list
  • Manually rebuild (update) the manifest file

New to Delta Lake 0.7.0 is the ability to automatically update manifest files with the following command:

ALTER TABLE delta.`pathToDeltaTable` 
SET TBLPROPERTIES(
    delta.compatibility.symlinkFormatManifest.enabled=true 
)
Copy the code

Configure the table through the table properties file

By using ALTER TABLE SET TBLPROPERTIES, you can SET TABLE properties on a TABLE, and you can enable, disable, or configure many functions of Delta Lake, just like automatic list generation. For example, with table attributes, you can use delta.appendOnly=true to prevent the deletion and updating of data in the delta table.

You can also easily control the history retained by the Delta Lake table with the following properties:

  • Historical record of the delta. LogRetentionDuration: control table (i.e., the transaction log history) retention time. History is retained for 30 days by default, but you may need to change this value to suit your requirements (for example, GDPR history context).

  • Delta. DeletedFileRetentionDuration: control file is the candidate of VACUUM must be in how long does it take to be deleted. By default, data files older than 7 days old are deleted.

Starting with Delta Lake 0.7.0, you can configure these properties using ALTER TABLE SET TBLPROPERTIES.

The ALTER TABLE delta. ` pathToDeltaTable ` SET TBLPROPERTIES (delta. LogRetentionDuration = "interval" Delta. DeletedFileRetentionDuration = "interval")Copy the code

Submit support for adding user-defined metadata in the Delta Lake table

You can specify custom strings as metadata, submit via Delta Lake table operations, or use the DataFrameWriter option userMetadata, Or SparkSession configuration mitInfo at spark.databricks.delta.com. UserMetadata.

In the following example, we will remove one user (1xsdF1) from the data lake per user request. To ensure that we associate the user’s request with the DELETE, we also add the DELETE request ID to userMetadata.

SET spark.databricks.delta.com mitInfo. UserMetadata = {" GDPR ":" the DELETE Request 1 x891jb23 "}; DELETE FROM user_table WHERE user_id = '1xsdf1'Copy the code

When viewing historical operations for user tables (USER_Table), associated delete requests can easily be identified in the transaction log.

Other highlights

Other highlights of Delta Lake 0.7.0 include:

  • Support for Azure Data Lake Storage Gen2-Spark 3.0 already supports the Hadoop 3.2 library and is also supported by Azure Data Lake Storage Gen2.

  • Improved support for streaming single Trigger – with Spark 3.0, we ensured that trigger.once processes all incomplete data in Delta Lake tables in a single microbatch, Even with the DataStreamReader option maxFilesPerTriggers there are speed limitations.

During AMA, there were many more questions about structured flows and the use of trigger-.once.

For more information, some useful resources to explain this concept include:

  • Running a stream job once a day saves 10 times the cost

  • Beyond Lambda: Introducing the Delta architecture: especially cost versus delay

subsequent

Now that you’ve learned about Delta Lake and its features, and how to optimize performance, this series includes more:

  • Delta Lake Technology Series – Basics and Performance
  • Delta Lake Technology Series -Lakehouse
  • Delta Lake Technology Series -Streaming
  • Delta Lake Technology Series – Customer Use Cases

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.