Abstract:Carbondata acts as an intermediary service between Apache Spark and the storage system, providing four key features for Spark.

Make Apache Spark Better With Carbondata This article is from Make Apache Spark Better With Carbondata.

Spark is undoubtedly a powerful processing engine and a distributed clustered computing framework for faster processing. Unfortunately, Spark also falls short in several areas. If we combine Apache Spark with Apache Carbondata, it can overcome these disadvantages:

  1. ACID Transaction is not supported
  2. There is no quality enforcement
  3. Small file problem
  4. Inefficient data skipping

What is ACID?

The Spark and ACID

ATOMICITY

The A in ACID stands for atomicity. Basically, this means it’s all or nothing. Therefore, when you use the Spark Data Frame Writer API, it should write full data or none at all. Let’s take a quick look at the Spark documentation. According to Spark documentation: “It is important to realize that these save mode (overwrite) do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out of the new data.”

While the whole situation looks a bit scary, it’s actually not that bad. The Spark Dataframe API performs job level commits internally, which helps to achieve a degree of atomicity, which works with the “append” mode of FileOutputCommitter using Hadoop. However, the default implementation incurs a performance overhead, especially if cloud storage [S3/OBS] is used instead of HDFS.

We can now run the following code to prove that Spark Overwrite is not atomic and may cause data corruption or data loss. The first part of the code mimics Job 1 by creating 100 records and saving them to the ACIDpath directory. The second part of the code mimics Job 2, which attempts to override the existing data but throws an exception in the process. The result of these two tasks is data loss. Finally, we lost the data that the first job created.

Job level commit does not occur due to an exception, so the new file is not saved. Because Spark deleted the old files, we lost existing data. The Spark Data Frame Writer API is not atomic, but it behaves like an atomic operation for append operations.

CONSISTENCY

Distributed systems are usually built on machines that are less available. Consistency is a key issue in high availability systems. If all nodes see and return the same data at the same time, the system is consistent. There are several consistency models, the most commonly used in distributed systems being strong consistency, weak consistency, and final consistency. We learned that the Spark Writer API’s overwrite mode removes old files before placing new ones. Therefore, in between these two states, there will be a period of time when no data will be available. If our work fails, then we will lose data. This means that there is no smooth transaction between the two operations. This is a classic atomicity problem with Spark’s overlay operation. This problem also breaks the consistency of the data. The Spark API lacks consistency. Therefore, the Spark write pattern does not support consistency.

Isolation and Durability in Spark

Isolation means separation. Detach from any other concurrent operations. Suppose we are writing to an uncommitted dataset and another concurrent process is reading/writing to the same dataset. According to the nature of isolation, in this case, it should not affect others. Typical databases provide different isolation levels, such as read committed and serializable. Although Spark has task-level and job-level commits, it does not provide proper isolation due to the lack of atomicity of write operations.

Finally, they want to save their committed state/data for the system so that the data can be used in the correct state even in the event of a failure and a system restart. Persistence is provided by the storage layer and, in the case of Spark applications, is a function of HDFS and S3/OBS. However, when Spark does not provide proper commits due to its lack of atomicity, we cannot count on persistence without proper commits.

If we look closely, all of these ACID properties are related. Because of the lack of atomicity, we lose consistency and isolation, and because of the lack of isolation, we lose persistence.

Lack of Schema Enforcement

We know that Spark means Schema when read. Therefore, when we write any data, it does not throw an exception if there is any pattern mismatch. Let’s try to understand this with an example. Let’s have an input array that contains the following records. The following program reads the CSV and converts it to DF



The program reads from the CSV file, writes back in parquet floor format, and displays the data. The output is as follows

Let’s read another input CSV file, where the “COST” column has decimal values instead of integers (as shown below), and append the above file

In this case, our program will read the CSV and write to the PARQUET format without exception. Our program will throw an error when we want to display/display a data frame

This is because Spark never validates a schema during a write operation. The mode of the “Cost” column is inferred to be an integer during the first load, and it appends double-precision data without any problem during the second write. When we read the additional data and call the operation, it throws an error due to schema incompatibility.

How to overcome the above drawbacks of Spark

If we use Apache Spark to plug in CarbonData as an additional layer to our storage solution, we can manage the above issues.

What is CarbonData

Because the Hadoop distributed file system (HDFS) and object storage are similar to file systems, they are not designed to provide transactional support. Implementing transactions in a distributed processing environment is a challenging problem. For example, implementations often must consider locking access to storage systems at the expense of overall throughput performance. Storage solutions such as Apache CarbonData effectively address these ACID requirements of the data lake by pushing these transaction semantics and rules into the file format itself or into a combination of metadata and file formats. Carbondata acts as an intermediary service between Apache Spark and the storage system. Right now, CarbonData is responsible for compliance with ACID. The underlying Storage system can be anything from HDFS to Huawei OBS to Amazon S3 to Azure Blob Storage. Several key features Carbondata provides for Spark are:

  1. ACID transactions.
  2. Schema enforcement/Schema validation.
  3. Enables Updates, Deletes and Merge.
  4. Automatic data indexing.

CarbonData in Apache Spark: ACID



In the code snippet above, the first part of the code mimics Job-1, creating 100 records and saving them to the ACIDpath directory. The second part of the code mimics Job-2, which tries to override existing data but throws an exception during the operation.

The result of these two tasks is data loss. Finally, we lost the data that the first job created. Now let’s change the code shown below to use CarbonData.

Perform the first job and count the rows. As expected, you get 100 rows. If you check the data directory, you will see a Snappy Compressed Carbondata file. The data file holds 100 rows in column encoded format. You will also see a metadata directory that contains the TableStatus file. Now execute the second job. What are your expectations from your second job? As mentioned earlier, the job should attempt to do the following.

  1. Delete the previous file.
  2. Create a new file and start writing records.
  3. Throw a runtime exception in the middle of a job.

Due to an exception, the job level submission did not occur and we lost the existing data observed above without CarbonData.

But now if you do the second job, you’ll still get an exception. Then, count the rows. You get an output of 100, and you don’t lose old records. It looks like CarbonData has atomized the Overwrite. Let’s take a look at the data directory and you’ll find two Carbondata files.

One file is created by the first job and the other file is created by job 2. Instead of deleting the old file, job 2 simply creates a new file and starts writing data to the new file. This method leaves the old data state unchanged. That’s why we didn’t lose the old data, because the old files stayed the same. The new incomplete file is also there, but the data in the new incomplete file is not read. This logic is hidden in the metadata directory and managed using the TableStatus file. The second job could not create a successful entry in the tableStatus file because it failed in the middle. The read API does not read files in which entries in the TableStatus file are marked for deletion.

This time, let’s write code logic without exception to override the old 100 records with 50 records.

Now the record count shows 50. As expected. So, you have overwritten the older data set of 100 rows with a new data set of 50 rows.

Carbondata solves the data consistency problem by bringing metadata management to Apache Spark and making the Spark data writer API atomic. Once the consistency issues are resolved, CarbonData will be able to provide updates and deletions.

Spark With CarbonData: Schema Enforcement

Let’s consider a simple user scenario where data arrives in multiple batches for conversion. For the sake of simplicity, let’s assume that there are only two batches of data, with the second batch carrying some column data of a different type than the first batch.

To begin the experiment, let’s read the data from Table 1 and write the data with and without CarbonData. We can use the “Overwrite” mode to write data with and without CarbonData.

Now let’s read the second table with the dual type data of the cost column type, and then write the data frames to Parquet and Carbontables (note: _c2 is an integer type and we are trying to attach the dual type data). There is no problem with Parquet attaching data that does not match the schema, but when the program tries to attach the same data to a Carbondata table, it throws an error:

Therefore, based on the above experiment, we can see that CarbonData validates the schema before writing to the underlying storage, which means that CarbonData uses schema validation when writing. If the types are incompatible, Carbondata will cancel the transaction. This will help to track the problem at the beginning, rather than confusing it with good data, and then try to figure out the root cause.

English link: https://brijoobopanna.medium…

Author: Brijoobopanna

Click on the attention, the first time to understand Huawei cloud fresh technology ~