Moment For Technology

Distributed parallel computing using Python and Dask

Posted on Aug. 8, 2022, 7:17 p.m. by Christopher Foster
Category: The development tools Tag: python

Reference website:
Copy the code


The dask Dataframe object consists of a number of relatively small pandas Dataframes, partitioned along the index index. The Dask DataFrame object is the higher-level object built on the Delayed object that wraps the Delayed object around the PANDAS DataFrame object. The Dask DataFrame API doesn't require you to write complex functions yourself because it contains a full set of conversion methods, such as: The dask dataframe API is a subset of the PANDAS Dataframe API. Some apis (functions) are not available in dask Dataframe. For example: Pandas Dataframe provides read_csv() and read_excel() apis for reading files. Dask Dataframe provides read_csv() apis for reading files, but does not have read_excel() apis.Copy the code

Why use DataFrame data structures

First, we generally divide data into two types: structured data and unstructured data. Structured data is composed of rows and columns. From simple spreadsheets to complex relational database systems, structured data is an intuitive way to store information. As data scientists, we really like structured data because it's intuitive and easy to put relevant information into a visual space. As for structured data, I don't think you need to bother conceptually, just think of it as a table in a database. Therefore, because of the way structured data is organized and stored, it is easy to think of many different ways to manipulate data. For example, structured data related to people's information, we can easily find the earliest date of birth, filter out people who do not match certain patterns, group people by first and last name, or sort by age, etc. We created three lists, and we definitely know how to turn them into a PANDA DataFrame. It is possible to filter, transform, and aggregate in Python without using the pandas DataFrame data structure. But for structured data, no structure is as intuitive as a DataFrame, so we use DataFrame. In addition to the characteristics of a two-dimensional table, DataFrame has some additional terms: index and axis. Let's take a look at what the above lists would look like as dataframes.Copy the code

Dataframe index and axis

As shown in the figure above, it is important to note the DataFrame index and axis. An index is an index, and there are two axes: horizontal and vertical, and 0 and 1. When aggregating, separating, or concatenating dataframes, note that by default dataframes operate along axis 0, unless you explicitly specify axis 1 (axis=1). This is the case in pandas and in Dask. Index is 0 and columns are 1. If you operate along an axis, you can see that it is telescopic in the direction of the axis. Let's use numpy because the concept of axis 0 and 1 came first in Numpy. As shown below: Two arrays are concatenated together, the number of rows does not change, but the number of columns does, so axis=1Copy the code

A DataFrame also has the index index, which is similar to an index in a database, except that the DataFrame index may not be part of a field. When you print data_df.columns. Tolist (), the name of the index is not in it. For indexes, it is best to be unique (not unique or not, but preferably unique). We can set one or more fields as indexes, or we can use the default increment index. Indexes are very important, as we'll cover later, and introduce common index functions, but first look at how indexes can be used to form partitions.Copy the code

Dask and pandas

As mentioned earlier, PANDAS is very powerful for analyzing structured data, but its biggest limitation is that it was not designed with scalability in mind, meaning that it can only be computed in single-machine mode. Pandas is particularly well suited for handling small structured data and is highly optimized to perform fast and efficient operations on data stored in memory. However, with the large increase of data volume, a single machine will certainly not read, through the cluster way to deal with is the best choice. This is where the Dask DataFrame API comes in: By providing pandas with a wrapper, giant PANDAS DataFrame can be intelligently broken into smaller pieces and spread out over multiple worker nodes, allowing for faster and more reliable manipulation of large data sets. The Dask DataFrame is split into partitions, each of which is a relatively small panda DataFrame(64MB by default) that can be assigned to any worker node and maintain its full lineage if replication is required. We have already seen the operation, which is to operate on each partition individually (or in parallel for multiple machines), and then combine the results. In fact, it can be intuitively inferred that Dask must do this.Copy the code

Manage DataFrame partitions

Because partitioning can have a big impact on performance, you might think that managing partitioning is a chore and a chore. But don't worry, Dask will try to help you get as much performance as possible without manual tuning with some smart defaults and heuristics. For example, when reading data using read_CSV, the default size for each partition is 64MB(which is also the default block size, as was the case in the early days of Hadoop). 64MB seems a little small for a machine that has 16 gigabytes of memory, even for personal laptops, but 64MB isn't for memory, it's because it can be transferred quickly between networks. If the amount of data is too large, a machine may finish its task before the data transfer is complete, and then just wait. So the default for each block is 64MB, which is small but endless. In addition, you can specify the number of partitions by specifying the NPARTITIONS parameter when creating a Dask DataFrame.Copy the code

Dd_df.map_partitions (func_name).compute() # Perform the same function for each partition. If we perform a filter function for each partition, the amount of data remaining in each partition may be different after the filter, which will have an impact on subsequent operations. For example, a large amount of data in a partition causes data skew. So using repartition() to reset the number of partitions, we just need to reset the number of partitions and Dask knows what to do. Dd_df.repartition (npartitions=1) DD_df.npartitions # check how many partitions are availableCopy the code

What is a shuffle

In fact, you'll be familiar with the concept if you know Spark, because shuffle comes from Spark. Shuffle is a time-consuming operation, and here's why. In distributed computing, shuffle is the process of broadcasting all partitions to all workers. Shuffle is required when we perform sorting, grouping, indexing, etc., because each row of the DataFrame is compared to the other rows to determine the correct relative position. So this is a relatively expensive operation in terms of time, because it requires a large amount of data transfer over the network. For example, if we want to aggregate by a field, obviously each partition cannot be aggregated separately. If we want to increment the salary field by 100, each partition can operate independently of the other. But if the count is calculated according to salary aggregation, I am sorry that it cannot be solved by processing each partition separately. For example, we have five partitions and each partition has a salary value of 8000. At this time, the value needs to be sent between partitions for statistics. So it's an expensive operation. Shuffle is a shuffle operation in which multiple partitions of data are shuffled together. Shuffle is a shuffle operation in which multiple partitions of data are shuffled together. Once the data of multiple partitions need to interact, it means the transmission of data, namely network IO, so it is time-consuming. Or sort, which is obviously a shuffle operation, because it involves comparing all the data. Because we need to perform various operations on the data, it is not realistic to completely avoid shuffle. However, we can do some things to minimize the amount of data to perform shuffle, such as ensuring that the data is stored in order, thus eliminating the need to use Dask to sort the data. If possible, we can sort in the source system (such as a relational database), which is faster and more efficient than sorting in a distributed system. Second, using a sequence as an index of the DataFrame will improve join efficiency. So the lookups after sorting the data are very fast because the partition location of a row can be easily determined using partitions defined on the DataFrame. Finally, if a shuffle operation must be triggered, the result can be persisted if resources permit, thus avoiding removement between data if recalculation is required.Copy the code

Some limitations of Dask DataFrame

Now that you have a good understanding of what the Dask DataFrame API can do, let's finish with some of the limitations of dask DataFrame. First and foremost, the Dask DataFrame does not expose all apis for pandas DataFrame, even if the Dask DataFrame consists of multiple small pandas Dataframes, Pandas has some nice features that are not appropriate for a distributed environment. Pandas dataframe some functions are not available in dask dataframe. For example, insert() does not support functions that change data formats. Pandas' DataFrame is different.Copy the code

Pandas Dataframe supports insert() directly into fields locally, but Dask Dataframe does not, and large data sets are simply not suitable for such clever transformations. But while the Dask dataframe does not support insert(), the POP () function does. More complex windowing operations, such as expanding and EWM methods, are also not supported, as are methods like Stack and unstack, which tend to cause a lot of shuffle. Often these expensive operations do not need to be run on the full original dataset; you should use Dask to do all the regular data preparation, filtering, and transformation before handing the final dataset to PANDAS. You can then perform these operations on the transformed data, which are expensive for distribution (but not expensive for pandas). Interactions between Dask DataFrame and PANDAS DataFrame are very easy. This operation is therefore very useful when analyzing data using the Dask DataFrame. The second limitation is relational operations such as Join, merge, Group BY, and Rolling. Although these operations are also supported by Dask DataFrame, they can involve a lot of shuffling and become a performance bottleneck for the program. So you can have Dask focus on other operations, give pandas Dataframe a reduced amount of data, and have pandas perform those operations. Or when you perform these operations using Dask, only apply them to the index. For example, if you merge two dataframes, one related to people and one related to transactions, and merge the dataframes according to the Person ID, then you can merge the dataframes side by side as the index. This will speed up the merge process significantly. The third limitation is that there are some index challenges. If you want to use a column in a DataFrame as an index instead of the default numeric index (which is incremented by default), the column had better be sorted, otherwise the entire DataFrame will do a lot of shuffling because of it. So the best thing to do is to make sure that the data is organized when we build it, so that we can save a lot of time in computing. Pandas calculates the new sequential index in the entire DataFrame. Pandas calculates the new sequential index in the DataFrame. In Dask DataFrame, reset_index is similar to map_partitions. This means that each partition has its own sequential index starting from 0, and we can see what the Dask DataFrame looks like with reset_index.Copy the code

After reset_index, the index becomes an incremented index starting at 0. But for a Dask DataFrame, it is reset_index on each partition, so both indexes are 0, 1, 2, 3, 4, because they both have 5 rows. Can we reset_index all partitions? Unfortunately, there is no easy way to do this, just as pandas does for all datasets. So be careful when using reset_index. Using reset_index means that you do not intend to use indexes to join, group, sort, etc on DataFrame. Finally, because the Dask DataFrame is composed of multiple PANDAS Dataframes, operations that are inefficient in PANDAS will be equally inefficient in Dask. For example, iterating through apply and iterrows is very inefficient in PANDAS, so following the optimization principles of pandas DataFrame will give you the best performance experience. If you are already familiar with PANDAS, using Dask will be a lot easier and will not only help you understand PANDAS, but also familiarize you with Dask and distribution principles.Copy the code


1Dask DataFrame consists of index, row (axis 0,axis=0), and column (axis 1,axis=1). 2DataFrame operates on axis 0 by default, and even though it operates up and down by default, it can be modified according to axis parameters if desired. 3 The Dask DataFrame divisions provide information on how the DataFrame is partitioned, whether it is balanced, or skewed. 4 Filtering Dask DataFrame results in an unbalanced amount of data between each partition, and the partition size should be consistent for best performance. It is a good practice to repartition a DataFrame after filtering a large amount of data. 5 For best performance, DataFrame should be indexed by logical columns, partitioned by their index, and presorted by index.Copy the code
About (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.