Ray (ray.io) is an open-source, distributed framework from U.C. Berkeley’s RISELab that easily scales Python applications from a laptop to a cluster. It was developed to solve the general challenges of reinforcement learning, but it is flexible for any demanding workload that requires the following:
Ray has been used for reinforcement learning, hyper parameter tuning, model serving, and other applications in clusters up to thousands of nodes. I’ll discuss examples that illustrate how Ray can be used with Spark to build robust, scalable data applications for enterprises, when to use Ray versus alternative choices, and how to adopt it in your projects.
– Hi everyone, my name is Dean Wampler from Anyscale, and today I’m gonna talk to you about Ray, an enterprise-grade library for writing distributed Python applications. We’d like to get your feedback on the talk and in the conference in general. Please make sure to rate the sessions, I’d love to hear your thoughts about this.
So here’s the agenda. I’m going to discuss why Ray exists, why was it created and what problems does it solve. I’ll go through like a demo to show you what it’s like to work with Ray and how to integrate with Spark, or one way to do it, anyway. And then I’ll finish with some thoughts about when would you want to use Spark versus when you want to use Ray. And then a few thoughts about how to get started with Ray.
First, let’s talk about why Ray. This is our little Ray icon.
So it really came about in response to a couple of big trends we’ve been observing the last five, six years, whatever, in our industry. The first is that the machine learning model sizes, especially from neural networks, have been growing enormously. Basically Moore’s Law is growing at 2x every 18 months. These model sizes have been growing at a factor of 35 times every 18 months. And that’s just outstripping Moore’s Law, we have to go to distributed if going to be able to keep up and have reasonable times and performance and all that. At the same time, Python has just grown enormously as its become kind of the de facto language for data science, including machine learning. And the two things together kinda made it necessary for us to have a robust, easy to use way of doing distributed Python, especially for people who don’t wanna have to think about distributed computing, they just want something relatively intuitive that does the vast majority of what they need to do without a lot of special hand-holding or whatever.
So let me talk about in particular the challenges of doing reinforcement learning, which is really the biggest motivator for Ray. Although it’s a general-purpose framework, reinforcement learning was really kind of what drove the creation of it.
If you don’t what reinforcement learning is, here’s sort of the gist of it. So you have some sort of environment and an agent that’s trying to work its way through the environment to make intelligent decisions and maximize its reward. So it’s gonna be observing the state of the environment, observing the reward, the short-term reward it received after its last decision and the action it took. Then it will try to make a new decision, take a new action, that will over time, maximize the cumulative reward. And that’s the whole idea with reinforcement learning.
Well it became really famous a few years ago when the DeepMind team from Google successfully beat the world’s best Go player using reinforcement learning. Lee Sedol was his name. This was considered a holy grail in a lot of ways. The reason is that the game space, you know the possible moves and the state it can take you in, is enormous in Go. It’s actually much bigger than chess, so it was seen as the holy grail, something that people didn’t expect to be doable for a long time, and yet, it was done a few years ago with so-called deep reinforcement learning.
And it sort of looked like this. They had a neural network behind the scenes that learned key structures of the board, and key moves that had maximal advantage. And you’re going back to that original diagram of reinforcement learning. In this case, your observations are the board state, the actions are where you’re gonna place stones, and the rewards the pretty simple, either you win or lose.
Zooming into this, a little bit you can see it’s a fairly involved neural network with a lot of different kinds of layers. In fact, one of the challenges is finding the optimal architecture and then training it to do its job.
So, going back to the motivation for Ray.
So we have this, a number of things going on here. We have a simulator that is just sort of an arbitrary application with arbitrary CPU access patterns and memory access patterns. It could be a game simulator if you’re training a robot, or a factory floor, there could be simulators for that. These things are very general purpose kind of compute problems, but they need to be run a lot because you’re gonna be training a lot, and they need to be efficient. And the agent itself could also be somewhat complex. At the same time, you’ve got this sort of traditional neural network stuff. We have a lot of tools for that like PyTorch and TensorFlow. You’d like a system that integrates with those tools of rather than just rewrite that from scratch.
And in reinforcement learning like most machine learning systems, you’re gonna be training a lot. In the case of reinforcement learning, you’ll be playing these games over and over and over again to try to achieve the maximal cumulative reward, so you need this to be really efficient, you need to run it over a cluster to not take forever. And it just creates that. In combination with all these things, a lot of demand for very flexible, compute, from small scale to large scale, lots of flexibility’s required for this to be optimal. And that was sort of what drove the creation of Ray.
So let’s talk about the ecosystem a little bit.
So most of you may not actually never use Ray. You may use one of these libraries on top of Ray, which implements support for one of these typical scenarios that you have to do in a machine learning ecosystem, whether it’s hyperparameter tuning, or distributed training or serving models. And we just talked about the very first library which was rLlib for reinforcement learning. You may have to dip down into the Ray API for example to program your game simulators or something, but in a lot of cases you don’t need to drop to that level. So you kind of pick the level that you need, and then work with it.
And finally I’d like to talk a little bit about even using this from microservices and in the view that we have about how it can really help build applications and run them. So microservices were created for a lot of reasons, they have a lot of benefits that I won’t go into, but they have a couple of challenges. In particular, I want to mention this idea that when you’re actually deploying these things, you’re gonna be spinning up an arbitrary number of instances of the microservice. And you do that for a couple of reasons. One is to achieve scalability beyond what a node can give you, but also to give you resilience in case the node crashes. But all of this stuff has to be managed explicitly, and it’s a bit of a challenge. In fact, it’s led to some blowback against the whole idea of microservices because it can be challenging to run these things. Well one of the great things about Ray is that it actually lets you go back to the simplicity of having one logical instance reach microservice, but behind the scenes, Ray is scaling transparently your compute over a cluster. You know, you’re getting the resilience of it being a crosser cluster, as well as the scalability of not having single node boundaries. So we think this is a really nice way that Ray supports general development as well as specific cases of various machine learning problems.
Okay, let me get demo this a little bit for you. – [Instructor] This demo will see how to use Ray in PySpark, or at least one way that you can do it. This is part of my talk for Spark + AI Summit 2020. The advantages of the way we’re going to do this, we’re actually going to embed Spark into a UDF for PySpark, the advantages being that we avoid the overhead of HTTP and other restful calls. Sometimes it’s better to decouple your services like that just for the usual give ups reasons, but other times you might want the greater efficiency of having the code co-resident in your UDF when possible, even though in the case of both Spark and Ray, it might be running over a cluster and there may be some communications across nodes. Hopefully it’ll be optimized in various ways that are a little harder to do when you have discrete hard-coded services as it were. Again, the disadvantage being coupling code together more tightly than you’d have in a service. Anyway, you can find out more about Ray at this link at the, it’s ray.io. And what I’ve done actually is that I’ve started a Ray cluster with this command ray start –head, and I’m gonna connect to it and run a little Spark job here. So let’s evaluate these notebook cells. First we need to make sure that we have Java eight running. Sure enough. We’ll do some imports including PySpark and Ray, imports and types from the PySpark SQL API, although we don’t actually need all of these. And then we’re actually gonna define the UDF.
So the sample app that I’m gonna demonstrate is sort of a simple approach to data governance. Data governance is a big topic, it’s really about managing the lineage and legacy of data, whose seen it, who has access to it, what transformations have been done on it, you know all kinds of stuff. And what we’re gonna focus on is just one little use case which is, I wanna record the IDs of every record that I’ve seen in this job, basically for auditing purposes. So we have a little Python class and I’ll explain the Ray remote annotation in a moment, but it’s recalling it pretentiously data governance services could be a hook that actually talked to something like Apache Atlas or whatever. The main API method is gonna be log or I’ll call this to log these record IDs. I’ll have methods here to get the IDs to get the count of them to reset the list and to measure time that the system’s been running. The Ray remote decorator actually will turn this from a normal Python class into something we call a Ray actor, which is a stateful thing that is gonna be running somewhere in the cluster, and can make for it to make calls to it. So this is Ray’s way of handling distributed state is to have actors encapsulate that state. And if you don’t need… If you just need functions that don’t need to have any state attached to them, we can do that too with the same decorator Ray remote and those are called tasks. I also do need these get IDs and get count methods because once this thing’s an actor, I can’t just reach in and read fields in the class. I have to actually have getter methods for that. So that’s what this is for. It’s mostly just a wrapper around a dictionary, or sorry, a list. Okay, now let’s define a basic record type, and all this really is gonna be is just a thing that has an ID and some opaque dataset that we’ll use. So we’ll use this as just a tool for the demo. And now we’re gonna initialize Ray inside our application here. It’s already running as a cluster, basically we’re attaching to it. This is sort of the analog of creating the Spark contexter, or Spark session. The address equals zero, or sorry, equals auto says, “Connect to the cluster.” I got this little warning because I’ve actually run this notebook already. That’s what the ignore re-init error’s about. It’s just, don’t make a big deal about. Just keep going. And there’s this convenience tool called the Dashboard. Click into that, then I can see that I’ve got several tasks already running. If this were in a cluster, this could be hundreds of them around the cluster, and I can use this. I won’t show it anymore, but I could use this to see how things are performing to profile things and so forth. All right, back to the notebook. So now we’re gonna create something called detached actor. This is sort of like a name service, something that I can reach out and ask the system to tell me or give me a hook to it without me having to retain a reference to it in my Python code. This would be convenient for UDFs where we’re gonna kind of running across a cluster, completely decoupled from this particular notebook. We need to be able to get apps, this object that’s out there. So we’re gonna create a named detached actor. And I have this little logic here to check if it’s already created, and if so, then don’t try again. And sure enough we’ve already got this thing registered, so we can just go on. And just as a test, we’ll go ahead and try it out. We’ll use this method called ray.util.get_actor to retrieve it. And then we’ll write a few records to it. That succeeded and I have this little helper function that we can use to see what the status is, and sure enough, we have a bunch of, actually have more than three from a previous run, but we’re about to reset that. So if I reset it, so we’re back to zero. So we have a clean slate, we can start over. Now here’s the function that’ll actually be a UDF. And what this is gonna do is first, it will basically do the same initialization we just did in the local task that’s running for whatever partition Spark has assigned to this particular process. So we’ll initialize Ray if we need to. We’ll only have to do this once. And then we’ll do the same trick of asking Ray for the actor and logging the ID for it. And there’s this one method in here, you’ll see ray.get, second to the last line. It turns out when you call something.remote, which is how we’ve been logging these methods, it actually is an asynchronous process. It just returns a handle, a future that you can use to get the value later, if you just want it run asynchronously and you fire and forget, you can just move on, and you won’t be blocked waiting for it to complete. In this case, we wanna make sure that it worked. So we’ll actually do ray.get which blocks until the result is returned from that actor call, and that’s what we’re gonna do. So dgs.log, that’s the method that logs these IDs, adding the .remote is necessary because it’s an actor now, and then ray.get will retrieve, well it actually returns the current count. And this is something that we’ll return as part of our UDF. Okay, now we’ll do the usual Spark stuff. We’ll initialize Spark, we’ll create our UDF, we’re gonna return a MapType. Recall that the log record returns a string, whether or not it’s successfully initialized. Actually it’s returning a dictionary, and that will turn into a JSON object, which is gets mapped to this. MapType with a string, that’s the key and then the integer, which is the value. Okay. So let’s write 50 records to this thing. We’ll create a DataFrame from those records, and then we’ll actually run a query that returns in the fields, but also now calls this UDF and will alias that to be a new field called logged. So here’s our new schema. And now here’s the result. So you can see that we’ve successfully added things asynchronously. Notice that the counts are not in order. That’s because we’re actually running at least three of these parallel tasks. We can tell it’s three because we initialized three times here and here. But only had to do it once, and then the rest of the data was written. So it worked. And we can check to see what the status is. We now have 50 records like we would expect. We can reset, and then we can, this is optional whenever the driver process as its called, exits, like we close the notebook and shut it down, Ray’s automatically shut down or at least the local connect to the Ray cluster, but we could do that explicitly here. Okay, that’s it. – Okay, we saw a little bit about how to use Ray with a actual demo. One possible you can integrate it with Spark. Let me finish talking a little bit about the strengths of Spark versus the strengths of Ray, and when you would use one or the other.
Well, all that I’m speaking to the inquire here as it were, all of you know that Spark is really amazing for massive dataset problems, especially when you have uniform records with the schema. And even when you’re doing things like natural language processing with so-called unstructured data, you almost immediately transform it into structure of some kind, and Spark is just really great for this kind of efficient parallelized transformations and analysis of data. It has a wonderful SQL API on top for people who don’t wanna think about lower level programming. It’s great for doing this kind of analytics in a batch mode as well as in stream processing. And it gives you really intuitive, higher level abstractions for all kinds of data science and engineering tasks. It’s famous because it’s really great at what it does.
But I mentioned that Ray emerged to solve some kind of new emerging problems. And these are the things where we think Ray excels. Highly non-uniform graphs of either data or compute. So you can think about a typical object model if you’re writing an application. You have this graph of structures that are all sort of arbitrarily put together that need to, in case, be distributed over a cluster, yet it’s still accessible. And you have to be able to handle this distributed state intuitively. Ray tries to do that is to solve with Python classes, and also the compute that you might be doing. It can also be highly non-uniform and highly distributed, everything from small tasks to very, large computing intensive tasks over different kinds of resources. We think that Ray provides a really intuitive API for the 90 to 95% of cases where you don’t need really tight control over this kind of non-uniform, highly distribute computation. You just want something that’s reasonably intuitive that extends concepts you’ve already understand from Python, and then lets you work with, and as you normally would, but now running in a distributed case. As we saw and discussed earlier, it’s really good for the sort of amorphous computation that you might need to do that complements the capabilities of Spark. And so that’s where some of these libraries emerged like our libraries for reinforcement learning, hyperparameter optimizations to cast to gradient descent and so forth.
So if you wanna get started with Ray, just a few resources to check out.
First if you’re already using some of these libraries that are popular in the Python world for either multi-threading or multiprocess on a single node like asyncio, Joblib, and multiprocessing pool, Ray has some drop-in replacements that either integrate with these or replace them, so that you could now can break the one-node boundary that continue it to use software that you’re used to using. And usually this just involves a single change to an import statement, something like this for the multiprocessing pool case.
If you wanna learn more about Ray, go to ray.io. You can find links to the documentation, examples, tutorials that I’ve been developing. In fact those tutorials are starting to be rolled out at anyscale.com/academy. You can actually see the Ray source code on Github if you want. And we’ve also got a bunch of events about Ray topics and machine learning this summer if you’d like to check out those at anyscale.com/events. Thank you very much for listening. I hope you enjoyed this. Please provide feedback to let me know how you thought about this talk.
Dean Wampler (@deanwampler) is an expert in streaming systems, focusing on ML/AI. He is Head of Evangelism at Anyscale.io, which is developing Ray for distributed Python. Previously, he was an engineering VP at Lightbend, where he led the development of Lightbend CloudFlow, an integrated system for streaming data applications with popular open source tools. Dean has written books for O'Reilly and contributed to several open source projects. He is a frequent conference speaker and tutorial teacher, and a co-organizer of several conferences and user groups in Chicago. Dean has a Ph.D. in Physics from the University of Washington.