by Xinrong Meng, Takuya Ueshin and Allan Folting
There are many factors in a PySpark program's performance. PySpark supports various profiling tools to expose tight loops of your program and allow you to make performance improvement decisions, see more. However, memory, as one of the key factors of a program's performance, had been missing in PySpark profiling. A PySpark program on the Spark driver can be profiled with Memory Profiler as a normal Python process, but there was not an easy way to profile memory on Spark executors.
PySpark UDFs, one of the most popular Python APIs, are executed by Python worker subprocesses spawned by Spark executors. They are powerful because they enable users to run custom code on top of the Apache Spark™ engine. However, it is difficult to optimize UDFs without understanding memory consumption. To help optimize PySpark UDFs and reduce the likelihood of out-of-memory errors, the PySpark memory profiler provides information about total memory usage. It pinpoints which lines of code in a UDF attribute to the most memory usage.
Implementing memory profiling on executors is challenging. Because executors are distributed on the cluster, result memory profiles have to be collected from each executor and aggregated properly to show the total memory usage. Meanwhile, a mapping between the memory consumption and each source code line has to be provided for debugging and pruning purposes. In Databricks Runtime 12.0, PySpark overcame all those technical difficulties, and memory profiling was enabled on executors. In this blog, we provide an overview of user-defined functions (UDFs) and demonstrate how to use the memory profiler with UDFs.
There are two main categories of UDFs supported in PySpark: Python UDFs and Pandas UDFs.
Series to Series, Series to Scalar, and Iterator to Iterator.Based on Pandas UDFs implementation, there are also Pandas Function APIs: Map (i.e., mapInPandas) and (Co)Grouped Map (i.e., applyInPandas), as well as an Arrow Function API - mapInArrow. The memory profiler applies to all UDF types mentioned above unless the function takes in/outputs an iterator.
To enable memory profiling on a cluster, we should install the Memory Profiler library and set the Spark config "spark.python.profile.memory" to "true" as shown below.

spark.python.profile.memory" Spark configuration.