• How to Run Parallel Data Analysis in Python using Dask Dataframes
  • Luciano Strika
  • The Nuggets translation Project
  • Permanent link to this article: github.com/xitu/gold-m…
  • Translator: Starriers
  • Proofread by SNPMYn

Multi-dimensional thinking. Source: Pixabay

Sometimes you open a large data set with Python’s Pandas and try to get some metrics, but the whole process can come to a sudden halt. If you use Pandas to handle big data, a simple sequence average may take a minute to complete, and we won’t even call Apply. And that’s just the millions of rows! When your data is in the hundreds of millions, you’re better off using Spark or something else.

I discovered this tool a while ago: a way to speed up data analysis in Python without needing a better infrastructure or transformation language. If the dataset is too large, its final optimization may be somewhat limited, but it still scales better than regular Pandas, and may fit your problem scenario better — especially if you don’t do a lot of index rewriting.

What is Dask?

Dask is an open source project that provides abstractions for NumPy arrays, Pandas Dataframes, and regular lists, allowing you to run their operations in parallel using multi-core processors.

Here’s an excerpt from the tutorial:

Dask provides higher-level collections of Array, Bag, and DataFrame that mimic NumPy, List, and Pandas, but allow parallel operations on data sets that do not fit into main memory. For large data sets, Dask’s high-level collections can replace NumPy and Pandas.

That sounds good! For this article, I tried out Dask Dataframs and ran a few benchmarks on them.

Read the documentation

I first read the official documentation to see the precise recommendations in Dask’s documentation, rather than the regular Dataframse. Here’s part of the official document:

  • Manipulate large data sets, even if the data is not available in memory
  • Use as many cores as possible to speed up long-run calculations
  • Pandas performs distributed processing of computations on large data sets through standard operations such as clustering, joins, and time series computation.

Next, it lists some quick scenarios, but only if you’re using Dask data:

  • Arithmetic operations (multiplying or adding sequences)
  • Common aggregates (mean, minimum, maximum, sum, etc.)
  • Call apply (as long as it is an index and not groupby(‘ y ‘), where y is not an index)
  • Call Value_counts (), Drop_Duplicates (), or corr().
  • Filter with Loc, ISIN, and line-by-line selection

If you find it useful, just do a quick scan of the data filtering.

# by reference only, return lines x >5 (write changes according to the original df)
df2 = df.loc[df['x'5]] >By reference, only rows whose x is 0, 1, 2, 3, or 4 are returned
df3 = df.x.isin(range(4))
# return only lines x > 5 by read-only reference (cannot be written)
df4 = df[df['x'5]] >Copy the code

How to use Dask Dataframes

Dask Dataframes has a similar API to Pandas Dataframes, except that aggregation and apply are lazy calculations that you need to compute by calling the compute method. To generate a Dask Dataframe, you can simply call the read_CSV method as in Pandas, or just call the given one PANDA Dataframe df.

dd = ddf.from_pandas(df, npartitions=N)
Copy the code

DDF is the name you import with DASK Dataframes, and nparitions is a parameter that tells the Dataframe how you want it partitioned.

StackOverflow recommends dividing dataframes into partitions with the same number of cores on your computer, or several times that number, because each partition will run on different threads, and if there are too many threads, they will become too expensive to interconnect.

Start: Benchmark!

I developed a Jupyter note to try out the framework and posted it on Github so you can see the details and even run it yourself.

The benchmarks I ran are available on GitHub, and here are the main ones:

def get_big_mean():
    return dfn.salary.mean().compute()
def get_big_mean_old():
    return df3.salary.mean()

def get_big_max():
    return dfn.salary.max().compute()
def get_big_max_old():
    return df3.salary.max()

def get_big_sum():
    return dfn.salary.sum().compute()
def get_big_sum_old():
    return df3.salary.sum()

def filter_df():
    df = dfn[dfn['salary']>5000]
def filter_df_old():
    df = df3[df3['salary'] > 5000]Copy the code

This is a regular DF3 with 25 million lines of content generated using a script from the previous article (the column names randomly selected from the list are Name, surname, and Salary). I took 50 rows of data and connected them half a million times because I was only interested in the time it took to run, not in analyzing Per SE.

DFN is a Dask Dataframe based on DF3.

The results of the first batch: not so good

First, I tried to test with three partitions because I only had four kernels and didn’t want to overuse my PC. My results with Dask were not very good and I had to wait a long time to get the results, which I worried might be because I did too little partitioning:

204.313940048 seconds forGet_big_mean 39.7543280125 secondsforGet_big_mean_old 131.600986004 secondsforGet_big_max 43.7621600628 secondsforGet_big_max_old 120.027213097 secondsforGet_big_sum 7.49701309204 secondsforGet_big_sum_old 0.581165790558 secondsforFilter_df 226.700095892 secondsfor filter_df_old
Copy the code

As you can see, most operations are much slower when I’m using Dask. This gives me a hint that I may have to use more partitions. The amount of time it takes to generate delayed estimates is also negligible (less than half a second in some cases), and if I reuse them, they are not amortized over time.

I also tested it using the Apply method:

def f(x):
    return (13*x+5)%7

def apply_random_old():
    df3['random']= df3['salary'].apply(f)
    
def apply_random():
    dfn['random']= dfn['salary'].apply(f).compute()
Copy the code

The results made no difference:

369.541605949 seconds forApply_random 157.643756866 secondsfor apply_random_old
Copy the code

So, in general, most operations are still twice as fast, even though the filters are much faster. I’m worried that maybe I should call this function compute as well, so I’m going to use this as a comparison.

More partitions: Amazing speed

After this frustrating result, I thought maybe I wasn’t using enough partitions. The point of doing this is parallelism, maybe I need more parallelism? So I ran the same test on all eight partitions, and here’s what I got (I’m ignoring the non-parallel dataframes because they’re basically the same) :

3.08352184296 seconds forGet_big_mean 1.3314101696 secondsforGet_big_max 1.21639800072 secondsforGet_big_sum 0.228978157043 secondsforFilter_df 112.135010004 secondsforApply_random 50.2007009983 secondsfor value_count_test
Copy the code

That’s right, most operations run more than 10 times faster than regular Dataframe, and Apply gets faster! I also run the value_count method on the Salary sequence. For context, remember that when I ran this test on a regular Dataframe, I waited 10 minutes before I had to stop the process, this time in 50 seconds! Basically, I just used the wrong tool, and very quickly. Much faster than regular Dataframes.

conclusion

Given that I was running 250 million lines a minute on a very old quad-core PC, I thought it would be very useful in practical applications. So I suggest you consider using this framework the next time you’re dealing with data sets locally or from a single AWS instance, it’s really efficient.

I hope you found this post useful or interesting! It took longer to write than I expected because some benchmarks took too long. Remember to tell me if you knew about Dask before you read it, or if you used it at work or on a project. Also, let me know if there are other awesome features, I’m not checking to see if I’m doing something wrong! Your feedback and comments are one of the big reasons I write, because we all grow from it.

If you enjoyed this post, feel free to continue supporting me.Can continue to support my writing. You can also learn more about Python tutorials, tips and tricks here!

If you find any mistakes in your translation or other areas that need to be improved, you are welcome to the Nuggets Translation Program to revise and PR your translation, and you can also get the corresponding reward points. The permanent link to this article at the beginning of this article is the MarkDown link to this article on GitHub.


The Nuggets Translation Project is a community that translates quality Internet technical articles from English sharing articles on nuggets. The content covers Android, iOS, front-end, back-end, blockchain, products, design, artificial intelligence and other fields. If you want to see more high-quality translation, please continue to pay attention to the Translation plan of Digging Gold, the official Weibo, Zhihu column.