Write before:

This article was written earlier, so the Spark SQL discussed in this article is specific to Spark1.6, with possible partial fixes and updates in later releases.

1. Spark memory leaks

1. Memory leaks in the case of high concurrency

I am sorry to tell you that Spark was not designed for high concurrent requests. We tried to run 100 concurrent queries in a cluster with poor network conditions, and found a memory leak after 3 days of the stress test.

In the process of conducting a large number of small SQL tasks, it is found that a large number of active jobs are always in pending state on Spark UI and never end, as shown in the figure below:

The driver memory is full:

We used the memory analysis tool to analyze the following:

2. Memory leak of the WEB UI caused by high AsynchronousListenerBus concurrency

In a short period of time, Spark submits a large number of SQL, and there are a large number of union and join cases in SQL, which will create a large number of event objects, making the number of events here more than 10000.

Once there are more than 10000 events, the event is discarded, and this event is used to recycle resources, discarded resources can not be recycled. For the UI page problem, we removed the queue length limit.

3. AsynchronousListenerBus itself cause memory leaks

Through the bag capture, we found:

These events are passed through the POST method and are written to the queue.

But postToAll is also done by a single thread:

However, in the case of high concurrency, single-threaded postToAll is not as fast as POST, which can lead to more and more events piling up in the queue. If there is a persistent high concurrency SQL query, this can lead to memory leaks.

Next, we will analyze the postToAll method, which path is the slowest, which causes the slowest event processing logic?

Some of you may not believe it, but with jStack capture analysis, the program is mostly blocked on the log.

You can speed up the event by disabling the log in this place.

4 high concurrency under the Cleaner memory leak

At this point, the Cleaner design is probably spark’s worst. Spark’s ContextCleaner is used to reclaim and clean up boradcast,shuffle data that has been broadcast. However, with high concurrency, we find that more and more data will be accumulated in this place, and eventually the driver will run out of memory and die.

Let’s first look at how memory reclamation is triggered:

This logic is disabled if we disable system.gc () in the JVM (and many JVM optimizer parameters recommend disabling system.gc).

This is a single thread logic, and each time to clean up with a lot of machines to clean up, cleaning speed is relatively slow, but the SQL concurrency is very large, the speed exceeds the cleaning speed, the whole driver will occur memory leak. Brocadcast also uses a lot of local disk small files if it takes up a lot of memory. In our tests, we found that the local disk directory used to store the BlockManager took up 60% of our storage space for high persistence concurrency.

Let’s analyze which logic is slowest in clean:

The real bottleneck is removeBroadcast in blockManagerMaster, because this part of the logic needs to span multiple machines.

In order to solve this problem, we added a SQLWAITING logic in the SQL layer to determine the heap length. If the heap length exceeds our set value, we will block the execution of new SQL. Pile length can be changed by the conf directory ya100_env_default. Sh ydb. In SQL. Waiting. Queue. The size of the value to set.

It is recommended that the bandwidth of the cluster be larger. The cleaning speed of a 10-gigabit network is much faster than that of a gigabit network. Give the cluster a break. Don’t keep the concurrency high all the time. Give the cluster a break. To increase the thread pool of Spark, you can adjust the following values of spark-defaults.conf in conf.

5. Memory leaks caused by thread pools and ThreadLocal

Spark, Hive, and Lucene all use ThreadLocal to manage temporary session objects. They are expected to be automatically released after the SQL execution is complete. Spark also uses thread pools. Memory accumulates over time.

To solve this problem, we changed the implementation of spark’s key thread pool to forcibly change the thread pool to a new one every hour, and the old thread pool can be automatically released.

6. Document leaks

Some students may find that as the number of requested sessions increases, Spark creates massive disk directories on the HDFS and local disks. In the end, the file system and the entire file system will break down due to too many directories on the local disks and HDFS. In this case, we also do corresponding processing.

7. DeleteONExit Memory leak

Why are these objects inside? Let’s look at the source code:

8.JDO memory leak

More than 100,000 JDOPersistenceManager:

9. Listerner memory leak

The debug tool shows that the post speed of Spark listerner decreases as time goes by.

All the code is stuck on onPostEvent:

The result of jstack is as follows:

After studying the logic of the call, it is found that the call is loop listerners, and the execution of listerner is null, which results in the jStack screenshot above:

There are more than 300,000 Linterners in the memory:

Most of them are the same listener. Let’s check the source code:

Each time a JDBC connection is created, Spark adds a listener. Over time, the listener accumulates. ** To solve this problem, I simply modified a line of code and started the next round of pressure test.

2. Tune the Spark source code

The test found that even if there is only one record, an SQL query using Spark takes one second. For many AD hoc queries, a wait of one second is very unfriendly to the user experience. To address this problem, we did some local tuning of spark and Hive detail code, which reduced the response time from 1 second to 200-300 milliseconds.

Here’s what we changed

1. Creating a directory in SessionState takes time

If you are familiar with Hadoop Namenode HA, you will notice that if the first Namenode is standby, this place will be slower, more than 1 second. Always put the active node first.

2. The initialization of HiveConf takes too much time

Frequent hiveConf initialization requires reading core-default. XML, hdFS-default. XML, and yarn-default. XML

,mapreduce-default. XML,hive-default. XML and other XML files, which are embedded in the JAR package.

First, it takes a lot of time to unzip these JARS, and second, it takes time to parse the XML files each time.

3. The Serialization of the Hadoop configuration used for broadcast transmission is time-consuming

Serialization of L-Configuration, using compression mode serialization, has the problem of global lock.

L-configuration passes too many items for each serialization, more than 1000 items, occupying more than 60 Kb. After eliminating the configuration items that are not necessary to transfer, we reduced it to 44 configuration items and 2KB size.

4. Improvement of the Cleaner of Spark broadcast data

Due to the BUG of Spark-3015, the SPARK cleaner is currently in a single-threaded recycling mode.

Note the spark source code:

The single-threading bottleneck is the cleaner of broadcast data, which spans many machines and requires network interaction through Akka.

If the number of concurrent collection is very large, the spark-3015 bug report will cause network congestion, resulting in a large number of timeouts.

Why is the amount of recycling so large? In fact, because the essence of cleaner is through system.gc (), regular execution, the default accumulation of 30 minutes or after gc to trigger the cleaner, which will lead to a moment, a large number of concurrent execution of AKKA, centralized release, which has caused the instantaneous paralysis of the network.

But single-thread recycling means recycling speed constant, if the query concurrency is very large, recycling speed can not keep up with the speed of the cleaner, will lead to the cleaner accumulation of a lot of process OOM (YDB has been modified, will limit the concurrency of the foreground query).

Neither OOM nor limited concurrency is something we want to see, so the single-threaded recycle rate is not sufficient for high concurrency.

As for the official practice, we say that it is not a perfect cleaner scheme. Concurrent collection must be supported, as long as the timeout problem of akka can be solved.

So this problem should be carefully analyzed, akka why will timeout, because cleaner occupy too many resources, then we can control the cleaner concurrency? For example, how about using four concurrent threads instead of using all concurrent threads by default? Isn’t it better to solve the recycling speed of cleaner and the problem of Akka?

In view of this problem, we finally chose to modify the Spark ContextCleaner object, and changed the broadcast data recycling to multi-threaded mode, which limited the number of concurrent threads, thus solving the problem.