Brief introduction:Best practices, using DLA as an example. DLA is committed to helping customers build low-cost, easy to use, and resilient data platforms that cost at least 50% less than traditional Hadoop. Among them, DLA Meta supports the unified view of 15+ data sources (OSS, HDFS, DB, DW) on the cloud, introduces multi-tenancy and metadata discovery, pursues the marginal cost of 0, and provides the use for free. DLA Lakehouse is based on the Apache Hudi implementation. The primary goal is to provide an efficient lake warehouse that supports CDC and incremental writing of messages, which is currently in intensive production. DLA Serverless Presto is developed based on Apache Prestodb, mainly for federated interactive queries and lightweight ETL.

background

Data lakes are currently a hot project at home and abroad. MarketsandMarkets _ _ (https://www.marketsandmarkets.com/Market-Reports/data-lakes-market-213787749.html) market research shows that the data is expected to lake the size of the market in 2024 from $7.9 billion in 2019 To $20.1 billion. Some enterprises have built their own cloud native data lake solutions to effectively solve the business pain points; Many more companies are building or planning to build their own data lakes. Gartner Report released by the 2020 _ _ (https://www.gartner.com/smarterwithgartner/the-best-ways-to-organize-your-data-structures/) has 39% of the users in the use of so far Data Lakes, 34% of users would consider using a data lake within a year. With the maturity of cloud native storage technologies such as object storage, people will first store structured, semi-structured, pictures, videos and other data in object storage at the beginning. When these data need to be analyzed, cloud native data lake analysis service DLA such as Hadoop or Aliyun will be selected for data processing. Object storage compared to the deployment of HDFS in the analysis of performance on the top of a certain disadvantage, the industry has done extensive exploration and implementation.

I. Challenges of Object-based Storage Analysis

1. What is a data lake

According to Wikipedia, a data lake is a type of system or store that stores data in its natural/original format, usually blocks of objects or files, including copies of the original data produced by the original system and converted data produced for various tasks. This includes structured data (rows and columns) from relational databases, semi-structured data (such as CSV, logs, XML, JSON), and unstructured data (such as email, documents, PDF, images, audio, video).

It can be concluded from the above that the data lake has the following characteristics:

  • Data source: original data, converted data
  • Data types: structured data, semi-structured data, unstructured data, binary data
  • Data Lake Storage: Scalable mass data storage service

2. Data lake analysis scheme architecture

It mainly includes five modules:

  • Data source: raw data storage module, including structured data (Database, etc.), semi-structured (File, log, etc.), unstructured (audio and video, etc.);
  • Data integration: in order to unify the data into the data lake for storage and management, the data integration is mainly divided into three types: morphology and appearance association, ETL and asynchronous metadata construction.
  • Data lake storage: The current industry data lake storage includes object storage and self-built HDFS. With the evolution of cloud native, object storage has a lot of optimization in scalability, cost, free operation and maintenance. At present, customers more choose cloud native object storage as the data lake storage base, rather than self-built HDFS.
  • Metadata management: Metadata management as a bus connecting data integration, storage, and analysis engines;
  • Data analytics engines: There are plenty of analytics engines out there, such as Spark, Hadoop, Presto, etc.

3. Challenges facing object-oriented storage analysis

Object storage compared with HDFS in order to ensure high scalability, metadata management in the choice of flat way; The metadata management does not maintain a directory structure, so it is possible to scale the metadata service horizontally, unlike HDFS’s NameNode, which has a single point of bottleneck. At the same time, compared with HDFS, object storage can be free of operation and maintenance, storage and reading on demand, and a complete storage and computing separation architecture can be built. But there are some problems with being analytically and computationally oriented:

  • List is slow: the object is stored by directory /. Why is List so slow compared to HDFS?
  • Too many requests: why is the number of requests for the object store more expensive than the calculation cost?
  • Rename is slow: Spark, Hadoop analysis write data keep stuck in the Commit phase?
  • Slow read: 1TB data analysis, compared to the self-built HDFS cluster is so much slower!
  • .

4. Current situation of object-oriented storage analysis and optimization in the industry

These are some of the typical problems you encounter when building a data lake analysis solution based on object storage. Solving these problems requires an understanding of the architectural differences between object storage and traditional HDFS and targeted optimization. At present, the industry has done a lot of exploration and practice:

  • JuicEFS: Maintains a standalone metadata service, using object storage as the storage medium. Provide efficient file management semantics through independent metadata services, such as List, Rename, and so on. However, additional services need to be deployed, and all analysis read object stores depend on this service;
  • Hadoop: Since Hadoop and Spark write data using OutputCommitter two-phase commit protocol, there are two renames on OutputCommitter V1 on commitTask and commitJob. Rename on top of the object store does a copying of the object and is costly. OutputCommitter V2 is proposed. This algorithm can only do rename once, but dirty data will be generated when commitJob process is interrupted.
  • Aluxio: By deploying an independent Cache service, the remote object storage file is cached to the local area, and the local read data is analyzed and computed to speed up.
  • Hudi: Currently, Hudi, Delta Lake and Iceberg store metadata information of DataSet files independently by means of metadata to avoid list operation. Meanwhile, they provide ACID and read-write isolation similar to traditional databases.
  • Aliyun Cloud native Data lake analysis service DLA: DLA service has done a lot of optimizations on the read-write object storage OSS, including Rename optimization, InputStream optimization, Data Cache and so on.

II. Structure optimization of DLA object-oriented storage OSS

Due to the above problems of object storage analysis oriented scenario, DLA builds a unified DLA FS layer to solve the problems of object storage element information access, Rename, slow reading and so on. DLA FS also supports ETL reading and writing of DLA Serverless Spark, interactive query of DLA Serverless Presto data, and efficient reading of Lakehouse into the lake and building warehouse data. The architecture optimization of object-oriented storage OSS is divided into four layers:

  • Data Lake Storage OSS: Storage structured, semi-structured, unstructured, and HUDI format through DLA Lakehouse into the lake to build a warehouse;
  • DLA FS: Uniform solution of object-oriented storage OSS analysis optimization problems, including Rename optimization, Read Buffer, Data Cache, File List optimization, etc.
  • Analysis load: DLA Serverless Spark mainly reads ETL data in OSS and then writes it back to OSS, while Serverless Presto mainly conducts interactive query on data built up on OSS.
  • Business scenarios: DLA-based twin-engine Spark and Presto can support business scenarios with multiple patterns.

3. DLA FS object-oriented storage OSS optimization technology analysis

The following mainly introduces the optimization technology of DLA FS object-oriented storage OSS:

1, Rename optimization

The OutputCommitter interface is used in the Hadoop ecosystem to ensure data consistency during the write process, which is similar to the two-phase commit protocol.

Open source Hadoop provides an implementation of Hadoop Filesystem to read and write OSS files, and its default OutputCommitter implementation is a FileOutputCommitter. For data consistency, do not let the user see the intermediate results, in the execution of task output results first to a temporary working directory, all tasks confirm the completion of output, and then the driver unified temporary working directory Rename to the production data path. The diagram below:

Since the Rename operation of OSS is more expensive than the Rename operation of HDFS, it is a copy&delete operation, while HDFS is a metadata operation on NameNode. The analysis engine in DLA continues to use the open source Hadoop FileOutputCommitter for poor performance. To solve this problem, we decided to introduce the OSS Multipart Upload feature in DLA FS to optimize the write performance.

3.1 DLA FS supports Multipart Upload mode to write OSS objects

AliCloud OSS supports Multipart Upload function. The principle is to divide a file into multiple data slices and Upload them simultaneously. After uploading, users can choose an opportunity to call the completion interface of Multipart Upload and merge these data slices into original files, so as to improve the throughput of writing files into OSS. Because Multipart Upload controls when the file is visible to the user, we can use it instead of the Rename operation to optimize the performance of DLA FS when writing OSS in OutputCommitter scenarios.

OutputCommitter realized based on Multipart Upload, the whole algorithm flow is as follows:

Using OSS Multipart Upload has the following advantages:

  • Multiple copies are not required to write a file. As you can see, the expensive rename operation is no longer needed, and no copy&delete is needed to write to the file. In addition, the CompletmultiPartupload interface of OSS is a very lightweight operation compared to Rename.
  • There is less chance of data inconsistencies. Although if multiple files are to be written at one time, the completemultiPartupload operation is still not atomic, compared with the original rename which copies the data, its time window will be much shorter and the probability of data inconsistencies will be much smaller, which can meet the needs of most scenes.
  • File meta information related operations in rename are no longer required. According to our statistics, the metadata operation of a file in algorithm 1 can be reduced from 13 times to 6 times, while algorithm 2 can be reduced from 8 times to 4 times.

The interfaces that control the visibility of the user in OSS MultipartUpload are completemultiPartupload and AbortMultipartupload, which have semantics similar to commit/abort. The Hadoop Filesystem standard interface does not provide semantics such as COMMIT/ABORT.

To solve this problem, we introduced the Semi-Transaction layer in DLA FS.

3.2 DLA FS introduces the Semi-Transaction layer

As mentioned earlier, OutputCommitter is similar to a two-phase commit protocol, so we can abstract this process into a distributed transaction. The Driver starts a global transaction, and each Executor starts its own local transaction. The Driver commits the global transaction when it receives information that all local transactions have completed.

Based on this abstraction, we introduce a Semi-Transaction layer (we do not implement all Transaction semantics) where interfaces such as Transaction are defined. Under this abstraction, we encapsulate the consistency assurance mechanism that ADAPTS to the OSS Multipart Upload feature. We also realized OSSTransactionalOutputCommitter, it implements the OutputCommitter interface, the calculation of the upper engine such as Spark and through it we DLA FS of Semi – the Transaction layer interactions, structure is as follows:

Below to DLA Serverless Spark to illustrate the use of DLA FS OSSTransactionalOutputCommitter general process:

  1. SetupJob. Driver opens a GlobalTransaction GlobalTransaction when initialization will create a hidden in the OSS belongs to the working directory of the GlobalTransaction, used to store the file metadata of this job.
  2. SetupTask. Executor uses a GlobalTransaction serialized from the Driver to generate a LocalTransaction. And listen for the file’s write completion status.
  3. Executor writes files. The metadata information of the file will be listened to by LocalTransaction and stored in the local RocksDB. Remote calls to the OSS are time-consuming, so we can save the remote call time by storing the metadata in the local RocksDB until a subsequent commit.
  4. CommitTask. When Executor calls LocalTransaction Commit, LocalTransaction passes the Task’s metadata to the OSS working directory and no longer listens for file completion status.
  5. CommitJob. The Driver invokes the COMMIT operation on GlobalTransaction, the GlobalTransaction reads the list of files to commit in all the metadata in the working directory, and calls the OSS CompletmultiPartupload interface to make all the files visible to the user.

The introduction of Semi-Transaction with DLA FS has two benefits:

  • It is not dependent on the interface of any computing engine, so it can be easily ported to another computing engine later, and its implementation can be adapted for use by Presto or other computing engines.
  • Additional implementations can be added under the semantics of Transaction. For example, in the case of partition merge, MVCC features can be added to merge data without affecting the online use of the data.

2. InputStream optimization

Users report that the cost of OSS request is high, even exceeding the cost of DLA (OSS request cost = number of requests × unit price per 10,000 requests ÷10000). This is because the open source OSSFilesystem reads data in 512KB per unit of read-ahead, the survey found. For example, if a user reads a 1MB file sequentially, there are two calls to OSS: the first request reads 512KB in the front, and the second request reads 512KB in the back. This implementation will cause more requests when reading large files. In addition, because the pre-read data is cached in the memory, if more files are read at the same time, it will also cause some pressure to the memory.

Therefore, in the implementation of DLA FS, we remove the pre-read operation. When the user calls Hadoop’s read, the underlying layer will request OSS to read the entire range of data from the current location to the end of the file, and then read the data required by the user from the stream returned by OSS and return it. This way, if the user reads sequentially, the next read call reads naturally from the same stream without the need to initiate a new call. Even sequential reading of a large file can be accomplished with only one call to OSS.

In addition, for small seek operations, the implementation of DLA FS reads the skipped data from the stream and throws it away. There is no need to make a new call. Only large jumps close the current stream and make a new call (this is because large jump read-drops cause a longer seek delay). This implementation ensures that the optimization of DLA FS will also reduce the number of calls on file formats such as ORC/ PARQUET.

3. Data Cache acceleration

Object based storage OSS storage computing separation architecture, reading data from remote storage over the network is still a costly operation, often with a performance cost. Local caching mechanism is introduced in DLA FS for cloud native data lake analysis, which can cache hot data in local disk, shorten the distance between data and computation, reduce the delay and IO limitation caused by reading data from the remote end, and realize smaller query delay and higher throughput.

3.1 Local Cache Architecture

We encapsulate the processing logic of the cache in DLA FS. If the data to be read exists in the cache, it will be returned directly from the local cache without the need to pull the data from OSS. If the data is not in the cache, it is read directly from OSS and cached asynchronously to the local disk.

3.2 Data Cache Hit Rate Increase Strategy

Here, DLA Serverless Presto is used to show how to improve the local Cache hit ratio of DLA FS. Presto’s default split submission policy is NO\_PREFERENCE. Under this policy, the main consideration is the load of the worker, so it is largely random to which worker a split is assigned. In DLA Presto, we use the SOFT\_AFFINITY commit policy. When a split of Hive is submitted, the same split is submitted to the same worker as far as possible by calculating the hash value of the split, so as to improve the Cache hit rate.

With the \_SOFT\_AFFINITY\_ policy, the split commit policy looks like this:

  1. Determine the preferred worker and alternative worker of split through the hash value of split.
  2. If the preferred worker is idle, commit to the preferred worker.
  3. If the preferred worker is busy, it is submitted to the alternative worker.
  4. If the candidate worker is also busy, it is committed to the least busy worker.

IV. Value brought by DLA FS

Rename optimizes the effect of ETL write scenarios

When using DLA, customers usually use DLA Serverless Spark for ETL of large scale data. Create a new table Orders \ _Test partitioned with the O \ _OrderMonth field for the write test using the Orders table in the TPC-H 100G data set. Execution of SQL in Spark :”insert overwrite table \ ‘TPC \_h\_test\’.\ ‘orders\_test\’ select * from \ ‘TPC \_h\_test\’.\ ‘orders\’ “. The results were compared using the same resource configuration, using one version of Spark as open source and the other as DLA Serverless Spark.

It can be concluded from the figure that:

  • This optimization has a great improvement on algorithm 1 and algorithm 2.
  • Algorithms 1 and 2 are both optimized when this feature is turned on, but algorithm 1 is more obvious. This is because algorithm 1 needs to perform rename twice, and once rename is performed on a single point on the driver; Algorithm 2 is where each Executor performs distributed rename operations only once.
  • With the current data volume, the difference between algorithm 1 and algorithm 2 is not so obvious after this feature is turned on. Neither method requires a rename operation, only if the completemultiPart is executed at a single point on the driver (for algorithm 2, we have the completemultiPartupload executed at the commitTask), Big data may still have a big impact.

2. InputStream optimizes the effect in interactive scenarios

DLA clients use DLA’s Serverless Presto to analyze various formats, such as Text, ORC, Parquet, etc. The following compares the number of access requests in 1GB Text and ORC formats based on DLA FS and community OSSFS.

Compare the number of requests for 1GB Text file analysis

  • The number of calls to the Text class is reduced to about 1/10 of the open source implementation;
  • ORC format calls reduced to about 1/3 of the open source implementation;
  • On average, the OSS call cost can be saved by 60% to 90%;

3. The effect of Data Cache in an interactive scene

We did performance tests against the community version Prestodb and DLA. We chose Prestodb 0.228 and added support for OSS data sources by copying the JAR package and modifying the configuration. We compared the 512 core 2048Gb community version cluster for the DLA Presto Cu edition.

We selected TPC-H 1TB data test set for the query of the test. Since most of TPC-H’s queries are not IO intensive, we only select the queries that meet the following two criteria for comparison:

  1. The query contains a scan of the largest table, LINEITEM, so the volume of data scanned is large enough that IO may become a bottleneck.
  2. The JOIN operation of multiple tables is not involved in the query, so there will not be a large amount of data involved in the calculation, so the calculation will not become a bottleneck before IO.

Based on these two criteria, we selected Q1 and Q6, where a single LineItem table is queried, and Q4, Q12, Q14, Q15, Q17, Q19, and Q20, where a LineItem table is joined with another table.

You can see that Cache acceleration can be managed in all of these queries.

5. Cloud native data lake best practices

Best practices, using DLA as an example. DLA is committed to helping customers build low-cost, easy to use, and resilient data platforms that cost at least 50% less than traditional Hadoop. Among them, DLA Meta supports the unified view of 15+ data sources (OSS, HDFS, DB, DW) on the cloud, introduces multi-tenancy and metadata discovery, pursues the marginal cost of 0, and provides the use for free. DLA Lakehouse is based on the Apache Hudi implementation. The primary goal is to provide an efficient lake warehouse that supports CDC and incremental writing of messages, which is currently in intensive production. DLA Serverless Presto is developed based on Apache Prestodb, mainly for federated interactive queries and lightweight ETL. DLA supports Spark mainly for large-scale ETL on the lake, and supports flow computing, machine learning; It offers a 300% cost-performance improvement over traditional home-built Spark, and a 50% cost savings from moving from ECS home-built Spark or Hive batch to DLA Spark. DLA-based integrated data processing scheme can support BI report, data screen, data mining, machine learning, IoT analysis, data science and other business scenarios.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.