Spark is already a distributed computing platform and should not handle parallel/asynchronous tasks manually. However, I recently implemented a Spark task that required writing data from dozens of partitions at a time. Although these partitions are completely independent of each other, the write data interface provided by the basic data platform only supports synchronous writing data from one partition at a time. As a result, even though I have multiple compute nodes and the RDD is distributed across each node, I have to wait for one partition to finish writing before writing the next partition: the write partition task is blocked synchronously.

partitions
  .map(part =>  writeToDisk(data.filter(part), part))
Copy the code



The introduction of the Future


This is thanks to scala’s Future solution. It is convenient to wrap synchronous blocking operations into asynchronous operations delivered in parallel.

With await. ready to wait for all futures to complete, we can rewrite the above code as:

partitions
  .map(part => Future { writeToDisk(data.filter(data.part == part), part) })
  .map(f => Await.ready(f, Duration.Inf))
Copy the code





Avoid double-counting data


In Spark, we know that using cache/persist can avoid double-counting data flows. In this case, the Future will need to cache/persist data.

But still! No! Enough!

What we want to happen here is that the data is evaluated (only once) before the future, and then asynchronously distributed to write the corresponding partition.

However, due to the lazy computing feature of Spark, after a Future is used, multiple jobs are delivered in parallel. When each job is executed, data is not calculated. Therefore, no cache data is available. 2. To bounce about on the Jobs page on the Spark UI, several jobs appear to be executed in parallel, but the cache operation does not bring about tasks.

In this case, we need to force the data to be computed and cached before the future. In this case, you only need to call some action operators that do not affect the data, such as data.count().

The end result, after using the improvements above, was that my Spark task execution time was reduced by about 60%.


Do not copy a new friend

Reprint please indicate the source: blog.guoyb.com/2018/04/21/…

Please use wechat to scan the qr code below and follow my wechat official account TechTalking. Technology · Life · Thinking:

Back end technology little black room