Apache Spark and its ecosystem provide many instrumentation points, metrics, and monitoring tools that you can use to improve the performance of your jobs and understand how your Spark workloads are utilizing the available system resources. Spark 3.0 comes with several important additions and improvements to the monitoring system. This talk will cover the new features, review some readily available solutions to use them, and will provide examples and feedback from production usage at the CERN Spark service. Topics covered will include Spark executor metrics for fine-grained memory monitoring and extensions to the Spark monitoring system using Spark 3.0 Plugins. Plugins allow us to deploy custom metrics extending the Spark monitoring system to measure, among other things, I/O metrics for cloud file systems like S3, OS metrics, and custom metrics provided by external libraries.
Speaker: Luca Canali
– Hello and welcome to this presentation. My name is Luca Canali, and my talk today is what’s new with Spark performance monitoring in Apache Spark 3.0 We will start with some introduction and motivation, and a recap of Spark monitoring. And I will talk about Spark 3.0 improvements and wrap up with some thoughts. I am a data engineer at CERN. I work on Spark, big data platforms and database services. CERN is home to the world’s largest particle accelerator the large Hadron Collider, LHC. The computing power used to process and analyze data produced by LHC experiments at the moment is of the order one million cores and one exabyte. And this is expected to grow fast in the coming years. On a smaller scale CERN and its communities, make use of big data technologies. And in particular Apache Spark, we get on Spark on Hadoop clusters and we run Spark or cloud containers using Kubernetes. There are many use cases for Spark CERN. One that I want to highlight here is the usage of Spark in our analytics platform. Our users can attach their Jupyter notebooks to Spark clusters or cloud containers or on a hadoop and also make use of a specialized high energy physics software. And most important importantly, connect to the various storage systems. Use a CERN the storage systems for the experiments where it contains experiments data and also HDFS. With monitoring data it’s very important for troubleshooting and also for root cause analysis processes. So the ideas that we want to use monitoring data from Spark workload monitoring as part of contextual knowledge together with application knowledge and information on computing environment, processes this and produce insights and also produce actions to improve the performance of the system. That’s why having good performance and monitoring data is very important for us. So before diving into a Spark 3.0 features for a new feature for monitoring I want to recap some of the standard features so that we get all on the same page. So the web UI is the first and the main point of access for Spark and monitoring data. I, I got that it. Most of you are already familiar with that. The good news for Spark newbies is that in 3.0. There is a new chapter in documentation that covers all the details. Spark metrics are also available using the REST API. This is an example, and this slide I’ll leave the details to link to the documentation. Developers can attach to Spark monitoring data using the developer API. This will most likely go into a Spark Listener. So you will write a custom class, extend the Spark Listener. Then you can write methods that react on events and collect data and process data as you want. And then attach this custom Listener to this Spark session, either on the fly or using a configuration as shown in this slide. As an example of this, I have a tool called Sparkmeasure but first let’s review what the architecture of these Spark task metrics and the executor’s will execute tasks and collect the task metrics. And these metrics are sent back to the driver changing the Sparkle listener bus. And typically I’ll send out the heartbeat every ten second. These metrics are then used by the Spark web UI, and also used by the Spark history server via the Spark event log file is configured. Then you can use this metrics with your young tools that use the Spark listener interface and Sparkmeasure is such such tool. I will show you some example of how these can be used, and then you can have a look at it and use it as an example code for your own instrumentation so that the Spark measure is on Maven central. So you can just use it minus minus packages as shown here, and then after the initialization, you will you’ll just start for example, the stage metrics which are not going to get shown on the stages. That’s part of what the measure will do for you. And then, any workload that you want can be instrumented. And with a simple API call as shown in this slide. So Sparkmeasure we’ll we’ll display the metrics aggregated per stage. So the most basic metric is the elapsed time. This is what you normally get, just measuring how much time it takes to run a certain workload. And then there are more detailed metrics account from the task metrics in particular, the executor run time. This is the elapsed time overall the executor’s data run a certain workload and then CPU time is the CPU time measured on those tasks. Here, we can see the two numbers are almost the same meaning that this workload was CPU bound, as expected. If you look at the query on the previous slide and more complex workload, we’ll have additional metrics for example, I/O or shuffle metrics that are basically zero disks sample. Another instrumentation in Spark is the Spark metric system. This is less well-known it is Spark task metric system but it’s also very powerful and very useful for monitoring Spark workloads via Spark components including the executor and the driver can sync metrics directly into one of the available sync for example, graphite. And then they, these metrics can be picked up. For example, with the Grafana dashboard type of metrics that are available for execute in a Spark metric system that uses the Dropwizard metrics library, for transport. Also the task metrics, but there are more for example, Spark internal metrics like the number of active tasks and the metrics related to interaction with the system, like the amount of CPU usage and the I/O system. To use the Spark metric system you can configure it with the metrics properties file or with the server configuration parameters, with break-fix Spark metrics conf as it is as displayed in this slide, here’s for example, syncs to a graphite sync. Then you can use influx DB for example, to run the graphite sync. Most importantly, these metrics that can be picked up into a Grafana dashboard that gives you many informations on the performance and the metrics quality, your Spark workload. This is something that we offer to our user sensor, and we found that very useful. And for example, one of my favorite metrics is the number of active tasks that gives you information on much parallel Spark you’re using at given point of time. And so, out of the course of the allocated being used or not. These are now the type of questions you can ask them then you have CPU time, HDFS rates. And most importantly if you look at the right of this figure you can see, you can see graphs that show the evolution on the metrics of the times the times series and you can correlate the metrics values with the values stages of your workload, for example we did a sequel ID, the job ID, this stage ID that’s running. Let’s now dive into a new features in Spark 3.0. There are some exciting, very powerful, new feature in here on Spark monitoring. The first one that I want to talk about is the memory usage monitoring in Spark 3.0. So the problem we want to solve here is that memory is key for performance or data engines and including Apache Spark, but sometimes it’s difficult to configure, in particular many of us have been through troubleshooting Java auto memory errors. It’s a quite difficult to troubleshoot. And another thing that can happen is that to avoid those errors we might oversize the amount of memory that we give to the executor’s, but is maybe a wastage. And sometimes we want to ensure systems in particular, we want to optimize. How the memory is used. So to get all these results. One way forward is to measure the amount of memory and see in details of the intellipaat will Spark out these memories used. The executor metrics instrumentation Spark 3.0 does just that. It measures data of memory usage per memory component and also provides values of the peak measurements. Because if you think about it, you get Out-of-Memory error or peak allocation not an average. And it’s also integrated with the Spark metric system. So these values can be seen as time series. This is how it works through the REST API using the web URL? Plus what you see in this slide here you can get the metrics per executor for each execution you can get, for example, memory metrics on the right of this slide, you have a list of the metrics that you can get. Some of them come from the API, for example S3A metrics, which are optional. And we have them in linux some of that account from the JVM and some from the internal source Spark like the unified memory details. This is quite useful if you’re now a bit confused by the complexity of this I think the next slide will help. Here you can see a diagram of the various memories and areas that you allocate and you can configure in the executor in this Spark execution when you run it. And as part of the PLCO and also gives this 2.4 and a typically you just set this Spark execution memory to a reasonable size and its Spark takes care of the defaults, but you can look at the details, for example, the unified memory pool which is a key memory area that Spark allocates that typically is the 60% of what you allocate as execution memory size. And then this is also divided in two parts. That’s the storage and execution memory it’s also can be tuned if need be normally the defaults are okay, but with the knowledge of this intellipaat’s you can make a much more sense of the instrumentation. Even the idea, the executor Metrics. What I find very useful again, is to have dashboards and graphs, so I see the metrics as time series. And also I can see the metrics values per executor. So the realization will give me how the memory usage evolution goes through time. So I can see typically peaks where indeed that can be promised without the memory arrows. And also I can see what’s the maximum amount of memory that is used, which is an indication that maybe I can save some memory in the allocation. And I can compare also these metrics with other metrics like CPU number, all active tasks et cetera One feature in Spark 3.0 that I particularly like in monitoring is the possibility of using Spark 3.0 plugins to extend monitoring. Spark 3.0 plugins are a executed at providing code user provided code at the startup of the executor’s and the driver. And the plugins also allowed to extend as Spark metric systems. As we will see that there is a hop that a metric system and that is given to the plugin, the type of problem that these plugins allow you to solve is the monitoring on cloud storage. For example, S3A something that is not done by default by Apache Spark. There is also possibility to use this for improve that shift as monitoring OS metrics are useful for Spark on Kubernetes and also all sorts of custom application monitoring. Well, this is how plugins work. So on the executor start the column the plugin is executed and the code is given in the plugin is given a hop to the monitoring. So it can actually look at the external packages that run on top of Apache Spark and also have outfitted monitoring. So it can the, the plugin can add additional instrumentation and put it inside the common metrics syncs into the together with the other metrics that we have seen an the dashboard before. So Sparking Spark plugins provide an API for integrating custom instrumentation with the rest of Spark monitoring. If you want to develop Spark plugins you have to become familiar with the API as a reported here in this slide on top of this slide and you end up writing a class extensive as Spark plugin especially have an executor, and also drive a plugin if you want, where I mean it mattered. It can be written. I hope it gets to the Spark metrics. And then one can register any metrics that we want. Spark plugins can also be used for for all the applications. Here we are. We care mostly about extended monitor. If you want to get started with Spark plugins, you can look at the code reported here in this slide, where I have written some demo+plugins and also plugins. We’ll discuss there use for monitoring Cloud I/O and OS For example, this snippet of code that shows you how to use this test package which I also uploaded to Maven central. So you can use it in management packages and then you specify the plugin that you want to run. For example, here, there two plugins run OS command and demo metric plugin. There are no I/O as the name states. But will run a piece of code and the startup will be executed. In terms of a useful example, the first the first one that I wanted to talk about is how to measure container metrics. So as metrics from C group instrumentation this is particularly useful for Spark on Kubernetes which is something that is more and more on adoption. Also had, Apache Spark the community since Sparkle Kubernete runs all Kubernetes and C groups that is extra instrumentation that counts there. And do we just pick it up with the plugins by default Apache Spark on instruments, the JVM CPU. So this is how you would run this plugin to measure a C group metrics. Again, I’m referring to this package for any way. You need to send the jars or to put the jars in the container used by Kubernetes that contains the code of the plugin. And then you will have to specify which plugins you want to run. In this case, these C group metrics plugin. The C groups metrics plugin implements the CPU nanosecond time nanosecond metric. This would give them the CPU time used in the C group. So that that would be the JVM CPU, but also other CPU. For example, if you’re running Python then you get that too which is something that normally you don’t have in this park instrumentation, then you rely on memory details and also write interestingly details about the network usage, we’ll focus on that in next example. So the first line as you can see here starts a container Y packaged influx DB and Grafana with the sample dashboards. So this simplifies the usage, or at least testing of this type of instrumentation. Then the instrumentation is actually pointed to by all this Spark metric stuff called parametric that you see at the end while the, the plug in, again. I’m using the package Spark plugin package and the plugin is configured with Spark plugins and it’s called the C group matrix. If you load this and you run some test workload you are able to see many different metrics in the Graphana dashboard and the one we want to focus here on is the network bytes sync so you can see the bytes coming into the C group for a executor. And then you can see the sum for all the C group execution and that up the workload. And this would be the number of bytes that we are reading from storage or bytes that we are reading in for our shuffle operation. Another type of instrumentation that you can do using plugins that is not available in the Apache Spark without this additional adds on, is instrumenting a cloud file system. For example, I stream Apache Spark by default implements on HTFS and local file system instrumentation. So when we use a cloud file system in inspired we have any way to use the Hadoop API. So these file systems are wrapped into a compatible files systems where we have metrics like bytes read, bytes written, read operations, write operations. So the plugin cloud FS metrics with the additional parameter cloudFS name allows us to to monitor these metrics for any cloud file system for example, S3A, GS and the other this, and then from the Grafana dashboard that we will see, for example, the number of bytes read, from S3A over time and different colors are presented different executions. So we can also different down on that. And then if we move the mouse in Grafana over the data we can see the exact values and the time when they were measured. And we can also correlate this with to and it’s not shown here, but to the SQL execution idea or job ideal or stage idea And now there’s one last example on this topics is the possibility to measure an instrument in custom libraries. So one of the words I did for benchmarking and troubleshooting is to instrument S3A with time instrumentation. So where I have implemented methods like Read Time S3A, Sleek Time S3A and Get Object Meta data Time in the past one, I used this with Spark. So I had to to do a custom Spark population to get this metric same. Now I don’t need to do that anymore because I just can use plug in and decamp those metrics with this special plugin called S3A time instrumentation in general, if you have any libraries that run on top of a, of Apache Spark and you want to get this instrumentation of this library or code into the main Apaches Spark instrumentation. Plugins are the way to go. I want also to measure to mention monitor improvements in SQL monitoring. So SQL and data frame API is now the main API for a Spark usage of mine. At least a year, it’s certain but, I guess, in the community in general. So it’s very nice to see improvements in the, in the monitoring of SQL. So one, thing that, that we have also in terms of documentation will be SQL metrics that instrument the execution and SQL and data frames up and also being documented. And, we can see them in the SQL tab. In, for example, if when we have shuffled operation we could see more details. And than we add in Spark 2.X and also we have improved this system. This is validation for the very important whole stage for generations wherever you can see the method expected described and we can find which certain stage have the maximum value of the metrics and also the coach and ID, which can be useful when there are complex plan. Something that is new very, interesting is the new structure stream UI that basically up as a streaming tab in the web UI, very useful for streaming troubleshooting. And there is an experimental support for Prometheus in the REST API. And as I mentioned before in from documentation as new web UI doc and improved monitoring dock, where you can find most of the details of what I’ve been discussing so far. Okay, I would like to wrap up with some thoughts on the Spark monitor and the community. So each version of Spark in the last few years have improved Spark monitoring with more metrics and more features, but is still work to do. So. If you’re a developer, willing to work on this there’s much to do like sink for influxDB and Prometheus. Further instrumentation for Spark core system like I/O time, Python UDF, time instrumentation et cetera, and also development and sharing of plugins like I’ve shown here. I think there is room for, for many more of the instrument libraries and also getting OS metrics like GPU methods. I also see that this type of instrumentation in particular with the dashboard and the Spark metric system is not so popular at the moment. And also the adoption of tools and method for a systematic troubleshooting and root-cause analysis still as way to go in terms adoption in the community. So if you’re running or are responsible for running platforms inside your organization is something where you could work on getting more experience on this and sharing with your users. And I also want to mention what I think is a the Holy grail of monitoring. So building automated systems that can perform a root-cause analysis and general troubleshooting monitoring data in an automatic way. I think will get there at one point but this is very hard problem and the one who solves it, is very great. In Spark 3.1, you will find already quite some improvements in this infrastructure. You will find for example a new SQL REST API and plugins also are getting improved. So there are various jars that I mentioned here that we can expect next year on Spark 3.1. And some that are still a work in progress, and there’s some work also on Hadoop. It’s quite interesting. It can then be picked up by Apache Spark in terms of I/O instrumentation. In conclusion, I would like to say that monitoring instrumentation improvements are one more reason to upgrade to Apache Spark 3.0, In Spark 3.0, you will find in memory monitoring with executor metrics which will help you troubleshooting and preventing Out-of-memory errors but also do size you’re memory better. Also Spark plugins are too new features in Spark 3.0 also and there you could use it to measure cloud file systems, I/O, OS, container metrics. Build your own custom metrics for your own applications. And there’s much more that can be done with this technologies is quite exciting and is evolving. And web UIs is also improved with streaming and Spark SQL monitoring being improved. With this, I would like to thank my colleagues at CERN, the the Apache Spark community. And I would like to thank you all for your attention.
Luca is a data engineer at CERN with the Hadoop, Spark, streaming, and database services. Luca has 20+ years of experience with designing, deploying, and supporting enterprise-level database and data services with a special interest in methods and tools for performance troubleshooting. Luca is active in developing and supporting platforms for data analytics and ML for the CERN community, including the LHC experiments, the accelerator sector, and CERN IT. He enjoys sharing experience and knowledge with data communities in science and industry at large.