A large-scale end-to-end data analytics and AI pipeline usually involves data processing frameworks such as Apache Spark for massive data preprocessing, and ML/DL frameworks for distributed training on the preprocessed data. A conventional approach is to use two separate clusters and glue multiple jobs. Other solutions include running deep learning frameworks in an Apache Spark cluster, or use workflow orchestrators like Kubeflow to stitch distributed programs. All these options have their own limitations. We introduce Ray as a single substrate for distributed data processing and machine learning. We also introduce RayDP which allows you to start an Apache Spark job on Ray in your python program and utilize Ray’s in-memory object store to efficiently exchange data between Apache Spark and other libraries. We will demonstrate how this makes building an end-to-end data analytics and AI pipeline simpler and more efficient.
Carson Wang: Hi, welcome to this session. This is Carson Wang from Intel. Today, Xianyang and I will talk about how to build a large-scale data analytics and AI pipeline using RayDP.
This is today’s agenda. I will first talk about the background and how people are integrating big data with AI and what are the challenges we are facing. Next, I will introduce Ray and the RayDP projects and how we can build end to end pipeline with these projects more easily and efficiently. Next, Xianyang will talk about the RayDP API and the architectural design. And then finally, he will also show you some examples.
Let’s get started. First, as we know, big data and AI have been two different communities. On one side, ApachiSpark is one of the leading big data framework, and it have been evolving quickly in the past 13 years. On the other side, there are many machine learning and deep learning frameworks appeared and that have become very popular.
There are also more and more intersections between these two communities. One reason is because massive data is actually very critical for better AI. The model is important. However, to get a better AI, we actually need a large number of high quality data. And with more data being used in the training, the model is becoming more complex. A single node actually cannot reach the computing requirements. It’s actually also obvious that distributed training will be norm.
We have seen many projects that tried to integrate big data and AI. There are projects like HorovodOnSpark, TensorflowOnSpark, which tries to run a different framework on [inaudible] and there are also projects that try to store this back out for [inaudible] file formats that can be redefined, the depending framework.
Next, let’s take a look at [inaudible] that’s tried to integrate a spark with motion learning and deep learning frameworks. So the first approach is to use separated Spark and AI clusters. This is a traditional one. So in your end-to-end pipeline with [inaudible] and model training. You first need to write a Spark application, and [inaudible] makes that [inaudible] Spark cluster, and then you store the [inaudible] HTPS.
And in the next step, for the [inaudible] the different [inaudible] framework made [inaudible] database, or to where I need to copy the data [inaudible] to the learning cluster, and then to the model training.
So there are a few challenges here. First, there are BETA moments within clusters, and secondly there are [inaudible] managing these two clusters. If we look at the end-to-end pipeline, this will actually be [inaudible] application. So you will need a lot of good code to stitch together multiple programs, or you will need [inaudible] to do that.
The second approach [inaudible] on Spark. There are a few projects supported on Spark, and this is useful if you have an existing Spark cluster and you want to [inaudible] resource in the cluster to do model training. However, this is also some challenges here, because this is actually a very specific [inaudible] spark, and it requires the [inaudible] frameworks supported on Spark. So if you have an end-to-end pipeline, but there is a framework not supported on Spark, or you do not have Spark in your pipeline, then it will not work for you.
In this kind of solution, it’s usually a distributed file system database, also used to exchange the BETA [inaudible] Spark and the different [inaudible] framework, so it’s also where I have a latency. Today there are also more and more organizations moving to a [inaudible] managed by Kubernetes. By using a workflow orchestration framework like Kubeflow, we can also actually implement end-to-end data analytics on a [inaudible] pipeline, single Kibernetes cluster.
However, there are also a few changes here. First, the pipeline must be written in multiple programs and [inaudible] files. So you probably needed to write to the [inaudible] file using the image, and [inaudible] Spark program, and also the [inaudible] program. And then construct the end-to-end pipeline.
This is not as simple as write a single Python program. Secondly, it will also usually require a distributed file system to do the data exchange, which in this frameworks, so this will also add a latency here.
So the question is, can we have a general purpose framework that can be used as a single substrate for both data processing and model training and [inaudible] and also [inaudible]. And we want to make sure we can develop an end-to-end pipeline easily and efficiently. For example, do that in a single Python program. Also, instead of using the distributed file system to do data exchange, can we have a memory data exchange, a way to do that more efficiently?
So the answer to using Ray and RayDP. What is Ray? Ray is a general purpose framework that provides a simple, universal API for building distributive applications. In Ray [inaudible] it provides something for HIs, like task and actor, which [inaudible] to functioning at a class in our single-node programming.
These APIs are simple, but powerful enough for us to build distributed application and distributive [inaudible]. Today we unpackaged the ways a few native [inaudible], including [inaudible] for model tuning, Rilib for reinforcement learning, and raysgd for distributive [inaudible] and Ray Serve for model serving. There are also more and more [inaudible] libraries being supported on Ray, including for them [inaudible]. And Ray also supports and deploys multiple cloud service providers, and can also be deployed by many research managers like [inaudible].
So there are a few options on Ray to do better [inaudible] for them [inaudible] provides in the Slack API on Ray. There are also many [inaudible] case that is using Spark, either the major [inaudible] framework [inaudible] learning pipeline. So we created a RayDP project to provide a simple APIs for one [inaudible] on Ray, and [inaudible] back with distributed motion learning and deep learning frameworks.
When we [inaudible] back on Ray, we treated Ray as a resource manage, and we want to [inaudible] in the Ray Java actor. So this next Spark [inaudible] on Ray. To integrate Spark with the distribution, the motion learning and deep [inaudible] framework, we provided two approaches, and the simplest one is to use a [inaudible]. So you can simply [inaudible] by passing the optimizer model that’s not functioning, and a few more [inaudible]. And then you can roughly fit the [inaudible] with the Spark [inaudible] and we will take care everything else and scale your training on the Ray cluster.
However, if you prefer to use Raysgd or [inaudible] for the model training, we can. We also provide another approach to convert your Spark data frame. So on Ray [inaudible] that. So on Ray [inaudible] that is a distributive [inaudible] on Ray, store the [inaudible] store. It can be used to connect with many libraries, like a whole lot of SG [inaudible]. And this is how we can use that to connect with this motion [inaudible] frameworks.
So with Ray as a single substrate, and by using RayDP to [inaudible] back on Ray, and together with a few other frameworks and [inaudible]on Ray, we can actually easily implement an end-to-end pipeline in a single Python program. So we can also utilize Ray’s [inaudible] option store to do the efficient data exchange between its libraries.
So in a typical end-to-end pipeline now we can use [inaudible] to read the data and use the data [inaudible] API to do data [inaudible] and we can also use the Spark MRL to be able to do future engineering. After that we can just throw the [inaudible] data in with e-memory [inaudible] in [inaudible] formats.
Next we can use any deep learning or machine learning framework like PyTorch, TensorFlow, [inaudible] and also [inaudible] to do the model training. We can also integrate that with [inaudible] for hyper-priming the tuning. Once we have [inaudible] the model, we can also use Ray Serve to do the modeling. So everything went [inaudible] single pathform Ray, and we can do this in a single Python profile, which is much easier and efficient.
With Ray program we can also easily scale from your laptop to cloud or [inaudible] seamlessly and easily. We can start with a small [inaudible] and we develop the Ray program, you have a local laptop. Once it’s ready, then we can actually scale that to a Ray cluster. Ray provides a cluster [inaudible] so we can use that to launch a Ray cluster in a cloud or [inaudible] cluster. And Ray also supports auto-scaling, so we can actually start with some more [inaudible] instance, and when your application requires more resource, it can scale with more nodes. So this means that we can scale our end-to-end pipeline from our laptop to the cloud or Kubernetes seamlessly, without any [inaudible] change.
To summarize here, the benefits of using Ray and the RayDP. First, you will get increased productivity. It simplifies how to build and manage end-to-end pipeline. You can use multiple libraries and frameworks in a single Python profile, including Spark, as opposed to TensorFlow, PyTorch, [inaudible] and more, so you don’t need to separate the programs and stitch them together, or use a workflow orchestration framework.
Secondly, it will also provide a better performance. Instead of only using a distributed file system to do this exchange, we can also use the in-memory [inaudible] to do that, which will be more efficient if the data can fit in manually. We also plan to integrate some more Spark optimizations for that, and we can also use Ray [inaudible] in the shuffle.
Finally, you will also get increased resource utilization. Ray supports auto scaling at [inaudible] so it can do auto-scaling at the cluster level, and Spark can also support the dynamic resource allocation, so by combining these two we can actually do the scaling at both the cluster level and an application level. So this, you will get a better resource utilization, and save your cluster.
Next, let’s welcome Xianyang Liu to give you an introduction of RayDP API and [inaudible].
Xianyang Liu: Hello everyone, I’m Xianyang, I’m also from [inaudible] my colleague Carson has given some background about the Ray and the RayDP. Now I will give you more [inaudible] about the RayDP in imitation and also some of the examples of how using RayDP, and some other rare components to write [inaudible] solutions based on Ray.
First, let me say how [inaudible] cluster on top of the Ray. First you need to connect to the Ray cluster with Ray [inaudible] and then you could [inaudible] cluster with the RayDP [inaudible] Spark. There are some permanent [inaudible]. The first one is cache name, and also you need to specify the number for each [inaudible] one to request, and each code [inaudible] and each code memory. And also you could pass in the [inaudible] configurations for the [inaudible]. For example, Spark local [inaudible] configurations. Then you could [inaudible] instance.
After that, you could do [inaudible] data processing with the Spark session. [inaudible] . After that, you could stop the Spark cluster with RayDP [inaudible] Spark.
This page [inaudible] about how Spark is growing on top of the Ray. As you know, the Ray has [inaudible] a new process, which is [inaudible] process, which [inaudible]reactor. [inaudible] reactor we could also have [inaudible] other actors. So in RayDP we [inaudible] other resource manager, and Ray [inaudible] master and the Spark [inaudible] actor. So the first [inaudible] we needed to [inaudible] application master is the Java actors. After the application master had start up, I [inaudible] each [inaudible] application master. When the appLication master received the request, it was [inaudible] actor, which the [inaudible] results in [inaudible] examples, it’s culture, memory, and [inaudible].
And then, when we are running the Spark [inaudible] in the Java actor. So the Spark is [inaudible]is running in the Java actor. That means the Spark is [inaudible] also has the ability of [inaudible] when objects store. The [inaudible] objects store, it’s [inaudible] memory service, which [inaudible] data exchange [inaudible] the process.
We also have [inaudible] on top of Java STT, and it’s the [inaudible] directly on the [inaudible] on the line, the data exchange between the Spark and the digital [inaudible] we were introducing the next page. Here is an example how we create, how to use the [inaudible]. Unfortunately, only the [inaudible] in the number of workers that [inaudible] how much after the [inaudible] correction for the deep learning model training, and you needed to [inaudible] the model, the optimizer, you lost function and the [inaudible] is the column [inaudible] Spark [inaudible] for the model [inaudible].
And the [inaudible] column is the [inaudible] column in the name of the smart data frame. There are also have some other necessary parameters [inaudible] to pass the [inaudible]. After that you come to [inaudible] the matter instance, then you could [inaudible] directly.
The Ray ML Dataset is a distributor connections of [inaudible]. At RayDP we provide a Ray MLSataset to correct [inaudible] from the [inaudible] while you correct ML dataset, you could do some transformations with [inaudible] functions, and you also could [inaudible] data frame to the [inaudible] which we are creating a [inaudible] data set. When you’re called to touch, the distributive connections are [inaudible]. We are begin [inaudible] to distribute the connection software [inaudible]tensor.
The [TouchTensor] could be used to correct [inaudible] use for the touch model [inaudible]. Here we can [inaudible] Torch ML Dataset and the current [inaudible]. In the underlying [inaudible] transformation, it’s [inaudible] in that it’s [inaudible] coding in the [inaudible]. In the [inaudible] the data, it’s changing between the Spark and the MLDataset [inaudible] framework as a binary [inaudible].
First, we needed to store the data frame into the [inaudible]. We needed to slow the [inaudible] frame rate to the [inaudible] it was [inaudible] data frame into a connection of [inaudible]. Then the ML Dataset where we start the data, and then after we [inaudible] the ML Data [inaudible] transformation [inaudible] tensor. That way we could [inaudible] those data with the torch framework.
Of course, all the data is stored in the [inaudible] so that [inaudible] very efficient. We could [inaudible] with zero copies.
Now, let’s look some examples how to use the RayDP and some other Ray components. This example is how to integrate Spark and XGBoost on top of Ray. The left side is the data processing part, and the right side is the model training part. The process of [inaudible]to connect to the [inaudible] Spark, and also is a RayDP [inaudible] need Spark. [inaudible] data processing with Spark. Here we [inaudible] the data from the [inaudible] and then the way [inaudible] in two parts. The first part is the [inaudible] for the [inaudible] model training. And [inaudible] for the model test.
After that, we tried to [inaudible] from the [inaudible] data frame, and [inaudible] data set. On the right side, we [inaudible] the metrics from the [inaudible]data set. [inaudible] we could have [inaudible] the model with the [inaudible] created the [inaudible] metrics, and all the data is starting the [inaudible]. The data is also efficient.
This is an example showing how to integrate Spark and Horovod [inaudible] on Ray. Horovod is a popular distributive model training framework. It’s also supported [inaudible] in Ray. The left [inaudible] is almost [inaudible] before. With [inaudible] data processing, and then try to [inaudible] data set from the [inaudible] data frame, and then [inaudible] into a [inaudible] data set. And the [inaudible] it forced the [inaudible] define the [inaudible] train function. And the function where [inaudible] data was [inaudible] from the [inaudible] data set. Then we tried to [inaudible]. Then we could [inaudible] model training with the Horovod on top of the Ray.
This example is showing how to integrate Spark [inaudible] on top of the Ray. RayTune is very popular and powerful hyper [inaudible] search framework. The [inaudible] is almost [inaudible] data set, and then the [inaudible] model, and the [inaudible] function. Then we use the distributive [inaudible] creator to come up to the [inaudible] function. It’s attributable by RayTune. Then we could do our [inaudible] search with the [inaudible] and the [inaudible].
If you want to multitask on RayTune you could wait [inaudible].
Next we’ll do a summary of the Ray and RayDP. Ray is a general-purpose framework that can be used as a single substrate for end-to-end data analytics and AI pipelines.
RayDP provides simple APIs for running Spark on top of Ray and integrating Spark with distributed machine learning and deep learning frameworks.
For more information, you could visit our [inaudible] but, for examples of showing how to [inaudible] with RayDP and other [inaudible] on top of the Ray.
Okay, that’s all, thanks.
Carson Wang is a software engineering manager in Intel data analytics software group, where he focuses on optimizing popular big data and machine learning frameworks, driving the efforts of building c...
Xianyang is a software engineer at Intel. He has over 4 years of experience with big data and distributed systems. He is also an active contributor for Apache Spark, Ray, and other distributed framewo...