Scaling your Data Pipelines with Apache Spark on Kubernetes

May 28, 2021 11:40 AM (PT)

Download Slides

There is no doubt Kubernetes has emerged as the next generation of cloud native infrastructure to support a wide variety of distributed workloads. Apache Spark has evolved to run both Machine Learning and large scale analytics workloads. There is growing interest in running Apache Spark natively on Kubernetes. By combining the flexibility of Kubernetes and scalable data processing with Apache Spark, you can run any data and machine pipelines on this infrastructure while effectively utilizing resources at disposal.

In this talk, Rajesh Thallam and Sougata Biswas will share how to effectively run your Apache Spark applications on Google Kubernetes Engine (GKE) and Google Cloud Dataproc, orchestrate the data and machine learning pipelines with managed Apache Airflow on GKE (Google Cloud Composer). Following topics will be covered: – Understanding key traits of Apache Spark on Kubernetes- Things to know when running Apache Spark on Kubernetes such as autoscaling- Demonstrate running analytics pipelines on Apache Spark orchestrated with Apache Airflow on Kubernetes cluster.

In this session watch:
Rajesh Thallam, Machine Learning Specialist, Google
Sougata Biswas, Analytics Architect at Google, Google



Rajesh Thallam: Hey everyone. Thanks for joining. This is our Data and AI Summit 2021 Conference and we are here for a session around scaling data pipelines and beyond, with Apache Spark on Kubernetes on Google Cloud. Before I begin, I would like to remind you that we are available for chat during the talk. So as questions come up, please type them in and we’ll respond. And for introductions, this is, Rajesh Thallam, I’m a machine learning specialist at Google Cloud and I have my colleague and a co-presenter, Sougata Biswas, data analytics specialist at Google Cloud. Next slide.
Today we’ll actually share with you how to effectively run a Apache Spark applications on Kubernetes, specifically Google Kubernetes engine and Google Cloud Dataproc. We’ll start with understanding key traits of Apache Spark on Kubernetes and how to run these Spark jobs on Cloud Dataproc on GKE, and things to know when running Spark jobs on Kubernetes. And finally show you a couple of implementation patterns, how Spark and Kubernetes fit in your data and mission and pipelines. Handing it over to Sougata.

Sougata Biswas: Thank you, Rajesh, for the introduction. Hi there, I am Sougata from GCP. Before getting into the core topic of running Spark on Kubernetes on GCP, let me spend some time explaining why Spark on Kubernetes? Why should you care? Well, Spark job can be orchestrated on various cluster managers, like YARN, Mesos, Kubernetes. Kubernetes comes with some unique benefits. First, cluster scaling. Scaling containers are much more faster than VMs. Second, isolation. Packaging all dependencies and libraries of Spark application in the container, provides a great way to isolate your workload and teams can run their workload independently without blocking each other. Third, portability. Containerization of Spark application will give you the ability to run Spark application on cloud, as well as on-prem. Last, but not the least, cost optimization. You can utilize your existing Kubernetes infrastructure to run Spark alongside other application, without maintaining separate big data infrastructure like Hadoop clusters.
Out of all cluster managers, YARN is most popular cluster manager for Spark. So let’s do a quick comparison between YARN and Kubernetes. Kubernetes is really new. YARN was supported from the inception of Spark, while Kubernetes started as an experimental version from Spark 2.23 and went GA this year with Spark 3.1. In terms of feature, Kubernetes is lagging a little bit, but catching up fast. YARN is a cluster manager for all big data applications, while Kubernetes can handle any application as long as containerization makes sense. YARN runs on VMs or the physical. So scaling is limitless compared to containers.`
Now coming to the core topic of Spark on Kubernetes on GCP. Let’s briefly discuss two Google Cloud products, which we’ll use heavily during the presentation. One, Dataproc and other, GKE. So Dataproc is managed service for running big data workload on GCP. You can run many open source big data technologies, like Spark, [inaudible]. It has very fast spin up time, you can spin up a new cluster in almost 90 seconds. It’s very cost-effective when used doing autoscaling with preemptible VMs. It comes with enterprise security, it also give you ability to run managed jobs, like workflow templates, Spark on GKE. The one we are going to show you today.
In my mind, Kubernetes is new age operating system. It helps manage your application like scheduled workload, monitor health, scaling up and down based on resources, like an OS DOS. But all these done on a distributor set of mission, instead of a single mission. Google has invented this, an open source in 2014. However, managing a Kubernetes cluster can be a daunting task. Google Kubernetes engine or GKE is managed version of Kubernetes. GKE is the easiest way to get started with Kubernetes, with a few clicks, you can have a cluster up and running. GKE is fully managed product, meaning Google will handle all the work of keeping your cluster updated and healthy with SLAs. And lastly, GKE is integrated deeply with GCP. You can take advantage of not only underlying infrastructure, networking and storage on GCP, but a lot of high level services are willing to [inaudible], like Dataproc. The one we’re going to talk about here.
Now on the core topic of how you can run Spark on Kubernetes on GCP, there are a few options and we are going to discuss them. However, Dataproc on GKE is probably the easiest way for a GCP customer to get started. Currently it’s in beta. It provides a simple API to run Spark job on GKE, which is a same API that can be used to run job on Dataproc itself. It comes with out-of-box security control, logging, monitoring features. It also provide you flexibility to run custom container image. Let’s say you wanted to run a job with specific dependencies and libraries, you can do that. And Rajesh is going to show you how you can do that, later part of the representation. How this works? While Dataproc get registered with GKE, it install an agent on GKE node. This agent makes the API call to Kubernetes API servers. Then Kubernetes schedule Spark driver and executor pods on as many nodes as needed, as shown in the API background. The logs are pushed to GCP logging service.
You can also run Spark on GKE directly using Spark Submit, as Kubernetes is supported a Spark native cluster manager. Or you can also use Spark Operator for Kubernetes, which was open source by Google and [inaudible]. However, Dataproc and GKE makes it really simple to set up and land jobs, especially for existing Dataproc customers. As it uses standard Dataproc API, which comes with all built in features for security logging and monitoring. Let’s see a demo for Spark on GKE using Dataproc API. This is only four steps, I’ve already recorded it to save some time.
Step one is to set up GKE cluster. You can use your existing GKE cluster as well, but I’m setting up a new one with autoscaling enable, as you can see on the screen. Step two is to create and register the Dataproc cluster with GKE cluster, which I have prepared in step one. I have also added a separate Kubernetes namespace, so that Spark job can run under that namespace, which is isolated from the rest of the application, standing on the same GKE cluster. Step three is invoking Dataproc API and submitting a Spark job. I’m running a job which reads big query table via big query public datasets and doing a work around. You can see in the GKE console that you try and analogs are showing the outputs. Step four, how do you monitor? As I said, the logs that push to GCP logging service. So you can go to GCP logging service and see the log. You can also apply filters for driver and executors, to see specific logs as I’m showing on the screen.
So in case if a long running job, you can also do port-forwarding to see Spark Web UI, as I’m showing on the screen. As I shown, it’s really pretty easy to run Spark on GCP on GKE, using Dataproc API. But there are a few things that you should know and Rajesh is going to talk about those next. So Rajesh, hand it over to you.

Rajesh Thallam: I hope Sougata was able to convince you why you may want to run Spark on Kubernetes. But there are a few things to know to run Spark jobs effectively on Kubernetes. Starting with autoscaling of Spark jobs. So if you’re familiar with Cloud Dataproc, you may know that Dataproc has its own autoscaler to scale the Spark jobs dynamically. However, when running Spark jobs on Dataproc on GKE data, Dataproc autoscaler is not supported yet. So instead, you would actually look at GKE cluster autoscaling, which automatically resizes node pools of GKE cluster, based on the workload demand.
So unlike open source Kubernetes, where cluster autoscaler works with monolithic clusters, GKE uses node pools for its cluster automation. So cluster autoscaler lets GKE use the right size instances to avoid creating too big or too small nodes utilizing the complete space. GKE cluster autoscaler automatically resize a number of nodes in a given node pool, based on demands of your workloads. So you don’t really need to manually add or remove nodes, or even over-provision your node pools. Instead, you just specify a minimum and maximum size from your node pool and the rest is automated. So you can also combine GKE cluster autoscaler with horizontal pod autoscaling or vertical pod autoscaling. As you can notice here in this snippet, so when creating the GKE cluster, you actually specify those autoscaling options.
So moving on. Shuffle in Spark on Kubernetes. So shuffle refers to the data exchange that happens when your data is rearranged between the partitions within a Spark job. So this is required when a Spark transformation requires information from other partitions, such as aggregations or joints. So Spark gathers those required data from each of the partition and combines into a new partition, likely on a different executer. So during a shuffle, you would see that data is written to task and transferred across the network. So as you will realize, it’s actually an extensive operation and the performance of shuffle typically depends on the disk IOPS, as well as a network throughput that happens between the nodes. And specifically when you’re running Spark on Kubernetes, Spark actually supports spelling shuffle data to PVCs, stands for persistent volume claims, or you can also write your shuffle data to local volumes or scratch space on node.
So you would actually get better performance when using a local SSD, compared to the persistent disk. So the code here actually shows you how to configure or basically mount local SSD to a cluster, then mount the nodes and then properly when you actually set them in the Spark job. So also, we like to see how shuffle plays along with dynamic resource allocation, it’s a slightly related topic with shuffle. So Spark actually provides a mechanism to dynamically adjust the resources of your application occupying based on the workload through a Spark job you’re autoscaling. Which is also referred as Spark dynamic allocation. So the concept here is your application may give resources back to the cluster, if they are no longer used and request them again later when there is a demand. So it’s particularly useful if multiple applications share your Spark cluster resources.
So Spark works in tandem with external shuffle service and the shuffle service is mandatory for autoscaling to work. So with dynamic allocation on YARN, you can actually enable that external shuffle service and safely enable dynamic allocation without risk of losing shuffle data when downscaling the clusters. But however, on Kubernetes, this is not possible today, because Spark 3.0 does not support the external dynamic shuffle of this yet. So in the meantime, there’s something called soft dynamic allocation, which is available on Spark 3.0. So the way it works is, Spark driver tracks these shuffle files and it only allows to evict those executors that do not store active shuffle files. So this works actually great in practice and also on average, the scale down of a Spark application is a bit slower compared to YARN. So this is actually a major cost optimization technique, as well on Kubernetes.
So when a Spark application request a new executor, it takes a few seconds, if there is room in the cluster. If there’s no room on the cluster, the cluster actually scales up and gets a new node. So as a consequence, if you are willing to trade off cost optimization for lower latency, you can actually resolve those extra sources on your Kubernetes cluster. Either with over-provisioning or you can actually fine-tune your GKE horizontal pod autoscaling, or even add paused pods to the cluster.
Now running Spark jobs on preemptible VMs. So if you want to optimize the cost of running Spark jobs on Kubernetes, you may want to look at running Spark jobs on a node pool with preemptible VMs on GKE cluster. Preemptible VMs or PVMs are excess compute engine capacity, which lasts for a max of 24 hours and usually there is no availability guarantee. So if your jobs are fault-tolerant and can withstand a possible instance preemptions, then preemptible instance can reduce your cost significantly. So the code snippet here actually shows you how you can create a GKE cluster or a node pool within a GKE cluster with preemptible VMs. And also, the properties that you would typically set to request Spark driver and executer pods to run on PVMs.
So there are a couple of caveats to note when running Spark jobs on PVMs. So PVMs can shut down inadvertently, which will cause the pods to reschedule to a new node that can add to your latency. Also, second thing is when Spark executors with active shuffle files are shut down, those tasks will be recomputed and which can actually add to the latency. So just be aware of the trade-off between those costs and performance when running Spark jobs on PVMs.
All right. Coming to the flexibility of Dataproc on GKE. So when you’re using a managed service, like Cloud Dataproc for Spark jobs, so you may want to bring in your own Docker images or extend default images provided by Dataproc. So by default, when you create a Dataproc cluster on GKE, the default Dataproc image is used, based on the image version that you are specifying. But there’s some flexibility to bring in your own image or extend the default image as a container image to use when running your Spark jobs. What this means is you can actually include your own packages or even applications with a custom image when you need to. So that way you are not really bound to using the default Dataproc images.
Also, coming to the integration with Google services like Google Cloud Storage and BigQuery. So if your Spark jobs have Google Cloud Storage or BigQuery as sources of things, you are actually covered. You can use a built-in Cloud Storage connector to store your data in Google Cloud Storage and access it directly. You don’t need to transfer it to HDS and as well as you actually have an added benefit of storing data in Cloud Storage, which will enable that seamless interoperability between Spark and Google Services. And specifically when you are working with BigQuery as source, you would use the Spark BigQuery connector. Which is an optimized connector that utilizes BigQuery storage, read APIs, to read Google BigQuery tables into Spark data frames. And the connector also supports writing these data frames back to BigQuery.
So summarizing all the things that we saw at glance. So starting with autoscaling where you automatically resize GKE cluster node pools. Then followed by shuffle and then dynamic allocation for working with shuffle data within Spark jobs. Then preemptible VMs for cost optimization and then using custom image for flexibility and then using the built-in connectors to integrate with Google Services. Now, let’s see how this all played together. When you’re looking into the use cases or architectural patterns, working with Spark on GKE.
Now that you know why you want to run Spark on Kubernetes and how to run them effectively. The next question is basically, how to use them when you want to implement those patterns? So for that, let’s actually look at the unified architecture for data pipelines and machine learning pipelines, starting with any backend applications. So at Google, we have used a system called Borg to unify all of our processing, whether it’s a data analytics workload website or anything else. So Borg’s architecture has the basis for Kubernetes, which you can use to remove the need for big data silos as well. So by migrating Spark jobs to a single cluster manager, now you can actually focus on modern cloud management in Kubernetes. So at Google having that single cluster managers system has led to more utilization of resources, providing a unified logging and management framework. So this same capability is actually true with Dataproc on GKE.
So think of Kubernetes as not just another resource negotiator for big data processing. It’s an entirely new way of approaching big data applications that can greatly improve the reliability and management of your workloads. Now, what you see here is really key, because it enables you to implement the patterns like what you’re seeing on the slide here. So where you have data pipelines with Spark jobs running on different versions to process your data and that airflow for orchestrating their pipelines and probably a Kubeflow cluster for your machine learning tasks. So everything running on a single cluster manager Kubernetes.
So before we go and look at the architecture pattern, what are the key components that we’ll be using in those architectural patterns as Cloud Composer? So Cloud Composer is a Google Managed Apache Airflow service on Google Cloud, to schedule and manage workflows. So it runs on GKE as a backend and Composer is actually more than a wrap around of Airflow. It allows you to author end-to-end workflows on Google Cloud, with integrations with our Google Services and beyond, but also it gives you enterprise security for workflows. The key is, you don’t really have to think about managing the infrastructure once you actually deploy your composer environment.
Well, the first pattern here, orchestrating Apache Spark jobs from Cloud Composer. So this revolves around orchestrating your data pipelines running on Apache Spark through Cloud Composer. You would write a DAG to submit a Spark job to a Dataproc cluster running on GKE. So this will actually save time by not creating and tearing down the ephemeral Dataproc cluster. So the benefit here is, that you have a single cluster manager to orchestrate and process jobs. So you can efficiently use resources by having a composer environment and Dataproc running on GKE. So think of optimizing the cost and at the same time, having that better visibility and reliability into your jobs.
Now let’s actually see how you can expand that use case beyond data pipelines, to machine learning workflows. So what you see here is a typical life cycle outlining the major stages that a data science project goes through. So it’s not a linear process, it’s an iterative process that goes through multiple times. Which starts with capturing data from your business processes, and then combines this big data with massive processing to create machine learning models and then validating those models for accuracy, and then deploying them into production. Now, a single run of a workflow, they take actually a few yours to complete. So data scientists, in the meantime, they may want to actually modify the variables or models or even model parameters, to select the best combinations and then itrate through their ideas.
Now, let’s see how Kubernetes actually fits in this lifecycle. Again, before that let’s actually talk about MLflow. So MLflow is an open source platform to manage machine learning life cycle, including experimentation, reproducibility and deployment for a single model registry. So MLflow offers four components, tracking, projects, models and registry. So reproducibility and observability are extremely critical for iterating on your machine learning pipelines and MLflow is actually a widely used tool for that.
So expanding from that first pattern of orchestrating data pipeline. Here we are actually using Cloud Composer or Airflow to orchestrate data pipelines and machine learning pipelines. The composer is running on GKE backend, which is integrating with API characters through Airflow operators or sensors. Which may be reading data from Cloud Storage or BigQuery that may act as both sources and the same. So for data processing, you would actually trigger a Spark job running on Dataproc on GKE for tasks like your feature engineering or data transformations, followed by a machine learning job. You would run those machine learning jobs to train the models, either on a Kubeflow cluster running on GKE, or you can actually go serverless with cloud data platform. So as we’ve mentioned earlier, experimentation is a key aspect in our machine learning life cycle. So MLflow running on GKE would actually allow data scientists to track experiments and deploy the best performing model for serving or inference purposes. So as you can see here, the Kubernetes or GKE becomes that infrastructure backbone, unifying machine learning and data pipeline deployments.
So wrapping up, let’s actually summarize what we shared with you today. So as Sougata mentioned previously, why you may actually want to run Spark on Kubernetes or when actually you want to run Spark on Kubernetes. So if you have your apps running on Kubernetes clusters today, or if those clusters are under utilized. Or if you have actually pain managing multiple cluster managers, beyond Kubernetes or even Mesos, or when you have difficulties managing Spark job dependencies or Spark versions. Then you will actually want to look at running Spark on Kubernetes. So with this, actually you get the same benefits that you actually get as your application is running on Kubernetes. Thinking about multi-tenancy, autoscaling and fine-grained access controls.
Now, how does Dataproc on GKE fit here and why exactly that’s a solution for running Spark job on Kubernetes on Google Cloud? So there are three key reasons, I would say. One is faster scaling and reliability with enterprise security control and the inherent benefits that you actually get out of a managed infrastructure. So with this Dataproc on GKE, you can actually optimize costs with effective resource sharing as well. Finally, we actually put together some resources here, like the blog posts and some of our cloud documentation, along with some open source documentation. To help you guide through how you can actually set up and run your Spark jobs effectively on Dataproc on GKE. Before you leave, your feedback is actually important to us. Let us know how we did, so that we can improve our sessions. Thanks a lot for coming to our session.

Rajesh Thallam

Rajesh Thallam is a Machine Learning Specialist at Google Cloud enabling customers to build data science platforms, deploy machine learning pipelines and integrating them with data and analytics servi...
Read more

Sougata Biswas

Sougata Biswas is Data and Analytics Architect at Google. He helps Google Cloud customers to architect large scale data analytics platforms with best of breed technologies from GCP and open source lik...
Read more