Zeus: Uber’s Highly Scalable and Distributed Shuffle as a Service

Download Slides

Zeus is an efficient, highly scalable and distributed shuffle as a service which is powering all Data processing (Spark and Hive) at Uber. Uber runs one of the largest Spark and Hive clusters on top of YARN in industry which leads to many issues such as hardware failures (Burn out Disks), reliability and scalability challenges. Zeus is built ground up to support hundreds of thousands of jobs and millions of containers which shuffles petabytes of shuffle data. Zeus has changed the paradigm of current external shuffle which resulted in far better performance for shuffle. Although the shuffle data is getting written Remote however the performance is better or the same for most of the Jobs. In this talk we’ll take a deep dive into the Zeus architecture and describe how it’s deployed at Uber. We will then describe how it’s integrated to run shuffle for Spark, and contrast it with Spark’s built-in sort-based shuffle mechanism . We will also contrast Zeus performance numbers with different storage systems backed by external shuffle e.g. NFS and HDFS. We will also talk about future roadmap and plans for Zeus.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Hello, everyone. This is my Mayank Bansal. I work in a data Infra Group at Uber and I’m co-presenting with my colleague, Bo Yang. He’s also working in Data Infra at Uber. Here we are to present Zeus, Uber’s highly scalable and distributed Shuffle as a service.

For Uber, we are a global company. We are 15 billion trips. We completed over 15 billion trips. We have 18 million trips per day. We are in six continents. We are in 69 countries and 10,000 cities. We have 103 million active monthly users and 5 million active drivers.

So data and ML actually is the backbone

for Uber to do lots of stuff. There are a lot of use cases which uses data and ML platforms and provide user experience. Many of them like Uber Eats, ETAs, self driving vehicles, forecasting, maps. They are many. I’m gonna talk about few in the next few slides.

So, ETAs, right? So ETA is a very important use case for Uber. So when you open an app, you see, my car’s three minutes away and when you click right,

then it will probably give you a route, and it will give you a time, that how much time this will take. All these things comes from the ETA functionality. ETAs are generated by route-based algorithms. There are many, many models, ML models, which crunches lot of, lot of… lots of data, and produce these ETAs.

If there is any error in the ETAs those take as a feedback and then again, be processed in the next rides, right? So this is very, very important use case and it’s been powerful in ML data platforms.

So the next use case, I’ll talk about the driver/rider match. So, once you open an app, and then you click the ride, then there is a demand and the supply, right?

So then, actually, at the runtime, machine learning models predict if you’re gonna make a ride, If you’re gonna make a ride, what are the cars which are nearby you? And then they match between rider and driver. And that’s all been driven by ML and data platforms. So this is very, very important use case for Uber.

Another line of business Uber have, which is Uber Eats, which is growing very fast, given this…

All the Eats functionality is being driven by data and ML models. The model’s used for ranking of the restaurants, delivering times, search ranking. There are hundreds and hundreds of models, which gets predicted and renders its homepage. What are the different dishes you want to like? So the Eats, apps and the models predict that, and this is being run at the runtime and then they train the model from the data and the ML platforms. This has been a very, very big use case for Uber, for (indistinct).

Another very interesting use case is self-driving cars. Uber has a self-driving car unit. where we are trying to make self driving cars and the software for that. So for that, we need a very, very big data and ML models to predict the routes, to predict the signs, to predict the pedestrians,

and then visualize the whole path, right? So this is a very big use case, I would say, for data and ML platform.

So let’s talk about a little bit on, how Uber’s data stacks look like. So there are tiered data lakes. We have in-memory databases like Pinot and AresDB And then we use HDFS for the hot and warm storage, and then we have the Archival for the disaster recovery purposes, which we do to the cloud. On the left side, we see there are mobile app events, which comes from your apps, device telemetry, device information, micro-services events, database events, third party feeds. So there are lots of events which comes, which are being emitted in the Uber stack.

So all these events pipe through Kafka, and then they go to the tiered data lake.

Then there is a compute fabric on top of this tiered data lake, which is YARN and Mesos, plus Peloton. So we have this Peloton internal scheduler. So we use YARN and Mesos, plus Peloton as a compute fabric for the whole Uber. And these compute fabric powers Stream processing like Flink, Flink mostly run on YARN. And then batch processing, where most of the Spark applications, Tez, map reduce, all these things runs on top of this compute fabric. On top of it, there is a Realtime, pre-aggregated query engines. There is Ad hoc query engines, and there is a complex, batch query engines. Pre-aggrigated, realtime query engines or AthenaX, Ad hoc, Interactive, Presto, Vertica, and then there is a Hive query engine on top. (clears throat) On top of it, we use many BI and analytics tools, and dashboard. So we have Piper, uWorc, which is our workflow engine. And then we have these dashboards built on top of it. Then there are Ad hoc queries, which we run on QueryBuilder, as well as there are tools like DSW and Tableau.

So, now let’s talk about Uber’s ML Stack. There are many stages of Uber machine learning, right? There is a data preparation, which we use the stream processing. For stream processing, we use Flink, which powers through Kafka, all the events come through Kafka. And for the batch processing, we use Hive and Spark,

and Hive and Tez. And these things come from data lake from HDFS. Once data preparation is done, then we start doing prototyping. Prototyping, we mostly use Jupiter Notebooks and Spark Magic. For the training, we use lot of technologies like Tensorflow, Pytorch, XGBoost, SparkML, right? So we train the model. And then at the realtime prediction for inference, we use Realtime Prediction Service for the realtime inference. and then for Batch prediction jobs also. We have Feature store, Model store and Metrics store. And this is all part of our Michelangelo platform, which is our ML platform at Uber.

So let’s talk about Apache Spark at Uber.

So Apache Spark is a primary analytics execution engine, which we use at Uber. Most of the batch applications, or most of the batch workloads, which is like 95% of the batch workloads, run on top of the Apache Spark. Apache Spark runs off YARN, and Peloton and Mesos, both.

For shuffle, we use external shuffle service, in both of the platforms. Let’s talk about how external shuffle service works in few words.

So we all know we have a mapper and the reducers. Mapper runs under the executors. Once Mapper tries to put some data onto the disk or the map output. They create some partitions and those partitions was written to local disk in some files through local shuffle service, right? This is what this diagram shows on top of it. And then once a reducer task wants to read all the mapper output, the reducer task will go to each machine where it knows where the mapper run, and then reducer task will read each and every mapper and for one partition, and then aggregate all the data at the reducers site and produce outputs based on the business logic, right? This is how (clears throat) whole shuffle paradigm or whole shuffle service works. Where shuffle service has been… For writing the data, mapper writes the data and reducers read the data through shuffle service, right? So there are some challenges while using external shuffle service. And some of them are here. So one is the SSD wearing out issue. What happens is we are using SSDs on our compute fabric and the problem is, there is a concept in SSDs, which is called EWPD, and which is mostly one. What it means is you can write and read from the disk one times a day. However, in Uber, we have so much workloads. So we actually write around 3.5 X or four X times a day, the whole disk, which actually reduces the life of the disk to one third or one foot, right? So the disk which should be bad within three to four years is actually getting bad eight to 12 months. So that is a big issue we had to… So we have to keep on changing the SSDs, or disk or replace the hardware. So that’s a big cost for us. So this is one of the major issue which we are facing. Second is reliability.

External shuffle service basically depends upon the local disk space, and many can execute, and then there is no isolation of the space or IO. So if there are many applications, which goes and runs on top of it, and one application is more chatty than other then it will fill up the disk. And most of the application, which is running on the machines will fail. So this is one of the major reliability concerns for us,

from a shuffle service perspective. And from a computing perspective, we are migrating towards Kubernetes. So for Kubernetes dynamic allocation, we need an external shuffle service. One of the main reason also is the co-location. So we wanted to co-locate the state, and state plus, and batch workloads together on the same machine. For doing so, we need to remove IOs from the machine… From removing IO, we need to remove shuffle data from the local machine by their schedulers and dash can work together. So this is one of the other reasons we wanted to also is spend our time on something, which can be solving off with issues.

So we actually tried different approaches, before we come up with a larger architecture. So one of the things which we did is we actually abstracted out the Shuffle Manager and we actually plugged in lot of new… lot of external storage, like NFS, HDFS. And we write through synchronous writes. The interface was 2X slow. HDFS was actually 5X slow. Then we actually tried different approach. We tried to do some semi-asynchronous writes on the Shuffle Manager. What it does is it tries at booze and try to write data link into different files. However, that also is not performing as we thought. So that was data experiment also (indistinct).

So what we tried is now we said, “Okay, these things are not working. So let’s do streaming writes, let’s try streaming writes.” So we wrote a shuffle service, a very based version of shuffle service and see if the streaming writes will perform. And the back-end we used HDFS, the first. So streaming writes from as HDFS, it was like 1.5 times slower, which was better, but not at the same what we wanted. And then we tried streaming writes on the local disk, which had same performance, similar performance of external shuffle service. We tried to change something, which I’ll talk about in the next slides. But streaming writes on the local actually helped us. So in the remote shuffle service, what we actually zero down is streaming writes on the local storage. However, we had to change many, many things to get to the similar performance. We actually changed the whole MapReduce paradigm, how MapReduce today work. So Mapper produce output in each local machine and reducers gets, from all the machines, they get that data. So that has to be changed. And then we actually streamline the whole process. So we record the stream, we’ll go to the shuffle service, and directly go to the disk. So we’re streaming to the disk directly, which actually gave us lot of performance improvements. And there is one more important thing that there is no spill now from the executor side, which was actually one of the main reasons for slowness. So now, because of there is no spill, the performance is really better.

So let’s look at the large architecture, what whole remote shuffle service looks like. On the upper side of the diagram, there is a host, which is running executor, which has map tasks. And there is a shuffle manager, which is plugged into our remote shuffle service, and which has been obstructed off from the shuffle manager, from the Spark. So all what happens is this, the host will… these map tasks will know which partition they are writing to. So the MapReduce paradigm, what we change it to is all the map task will write for the same partition, to the same remote shuffle server.

There is server discovery, we’ll talk about it later. But the server discovery will know that for this partition, for all the mapper, for this application will go to the same server. So all the mappers will write to the same shuffle server for the same partition. And those data will be written sequentially. And then reducers side, we will know that all… for this partition, the data is in this particular remote shuffle server. And then the reducer side, those data will be copied from… So the reducer will go to one shuffle server and read the data sequentially, and it doesn’t need to go to all the machines. So this is the change in the paradigm where mappers writing for same partition to a machine, and reducer only going to one machine. (clears throat) That actually improved us a lot of performance. In the shuffle server side, we have partitioners which actually go and partition the data into different files, and we’ll talk about this into the next few slides.

So let’s do Deep Dive. Our colleague, Bo, will now take it over and he will go from there. Thank you. – Thank you, Mayank. Hey, everyone. My name is Bo. I work in the Data Infra at Uber. So I will go through some details about how we design and implement the remote shuffle service side.

Yeah, we did a lot of experiments and iterations. So we summarize some design principle as followings. The first one is the scale out horizontally. Yeah, (indistinct) power has many executors. We may have thousands of machines in (indistinct). So we want to… the whole system could scale out very easily.

And because we store data in a remote server and for the network, the latency matters, it’s not trivial. So we need to work around network latency using a lot of techniques. And at the same time, the network bandwidth is very high, so we don’t focus on the bandwidth too much. Instead, we focus on the latency a lot. And also we do performance very seriously. We did a lot of performance optimization and we find the major Spark application succeed in (indistinct) environment. So we focused on this application to optimize for our most cases. And for failure cases, we rely on Spark and YARN retry to support the recovery.

Okay. We will talk about this in three groups. The first one is scale out.

We achieved horizontals scalable by make each shuffle server work individually to not depend on each other. And the whole cluster has hundreds of shuffle servers. So different application can share different… can share some shuffle servers, and they can also use different shuffle servers. It’s pretty flexible. And because we don’t have shared state among different shuffle servers, we can just add a new server without any bottleneck.

And here is one example how we share… how we put different shuffle server for our Spark application. And we know that Spark application has different mappers and reducers. Also, its data has different partitions. And there are all kinds of factors that we need to consider when we assign a shuffle server to different tasks. So in this example, we have three shuffle servers, while we have four mappers And each mapper will connect to each shuffle server because when a mapper send data to a shuffle server, the mapper have data for different partition, and the different partition reside on different shuffle servers. So basically, each mapper will decide based on the key, which partition that data belongs to. Then the mapper will send data to that shuffle server, which holds the partition. That’s why each mapper will connect to every shuffle server. Now, for the reducer side, it’s pretty simple because all the partition data reside on a single shuffle server. So the reducer only connects to that shuffle server, and download the whole file directory. So we assume that the reducer design a lot. So in next slide, we will give some generic calculation

of how many network connections we have. And we’ll use some number. (indistinct) number we have m-mapper and r-reducer, and then we have s-servers. So the total connections in mapper side is M multiple S connections. And for reducer side, it’s r connections. So it’s different from the Spark external shuffle service. The external shuffle service, the mapper side is simple, it’s writes to the local. And for reducer, it will connect to different executor to read data. So there will be a lot of connections in the reducer side in the Spark external shuffle service. And here, we kind of… In the mapper side, we have more connection, in the executor side, we have less connections. But the overall connection is similar. Yeah. Now, we will talk about network latency.

We use Netty in the server side. As we know, Netty is a very high performance, asynchronous server framework. So it can achieve very high speed when transferring data. And we use two thread groups in the server side. The first thread group is to accept a socket connection. The second thread group is to read socket data, and the write to file. So they won’t impact each other and they can work in parallel. So this better design is… Because when we do a load test, we find sometimes if people… I can write a lot of data, it’ll use the second thread group a lot.

And in our next design, we still have another thread group, which can accept a new connection, so it can still running. And for network protocol, we use a binary network protocol designed by ourselves. We can achieve very high efficiency in coding, and also, we can do compression in our side. And, yeah, I think we will talk about that later with a little more details.

We will write direct to disk file. We write to OS file directly. We do not do application level buffering. And (indistinct) this is very fast because OS has its own buffer, so we do not need another buffer. I would do zero copy. This is a common technique where you buy many servers. We do that as well. So we transfer data from the shuffle server to the user side, without any user level memory copy. Here, we write and read data sequentially, and there’s no random disk IO. So it is very fast.

We mentioned we use binary network protocols. So we do compression in the client side. So the shuffle client will compress and decompress data. It’s a reduced network size. And also, it reduce the CPU usage on the server side, so the server does not need to evolve in the compression/decompression this kind of thing. And also, it’s paternal. It will support client side encryption. So we do encryption from the client side, and do a decompression from client side as well. So the server side, again, does not need to evolve, so it makes the whole system pretty secure.

Another technique we use is we parallel serialization and the network IO. In Spark shuffle, it will serialize data. Each shuffle record basically is a Java object. So it needs to be serialized to bytes. So it do serialization a lot. And serialization takes time. So we use a… Way to (indistinct) there’s two steps. We use the background’s thread to read the serialized data and send it to the server side, so they can run side by side, don’t block each other. It improve the performance in our production. We also use connection pool. It’s pretty simple, just to used connection when possible. So this is common. And we don’t talk about that very much. Okay. So we do a very… the ECG optimizing our size. So I want to share that as well.

We do asynchronous shuffle data commit. So this is coming from the observation we find. So for Map task, when we stream data to the server side,

and the servers I need to flush data to make it persistent. So the flushing takes time, and we don’t want map task to wait for the flushing operation. Again, we decouple these two steps. So the map task will tell the server side, it finished sending data and the server side will do the flush. We call it commit as well, in the background.

So the next map task do not need to wait for the previous map task to flush data. So this brings some issue in the reducer side, so reducer need to know, “Okay, all the data is flushed.” Before it would read data. So in the reducer side, we do query data availability before it fetch data. So it asks the server, “Hey, whether the data is available.” If it’s available, it will get data to the server side, if not, it will wait a little bit and then retry, and fetch data again.

We also support fault tolerance in our design. The first thing we use, ZooKeeper, as server recovery, and to health check. So it’s pretty common.

We do data replica. For the mapper task, when it write data, it write to multiple server, at the same time. So when server is down, it can still write data. For the reducer, because we have different replicas. The reducer just pick up one replica and read from that. Again, when server is down, it will switch to another replica.

And the server side has some local states. Yeah, we mentioned, we do not have to centralize the shared state, but each shuffle server do have the local state. (indistinct) which map task is finished or is committed. So this kind of local state, we do flush in batches. We try to avoid small flower flush again. So we flush all these data in batches. So normally, when shuffle stage finish, we flash the state. So we keep our flush operation as minimum as possible.

We will talk about some production status in our current environment. The first thing is our op-… remote shuffle service is a compatible with open-source Apache Spark. So you do not need to change anything when you use it, or you do not need to change your internal spark code to use it. You can assess the shuffle manager plugin to our remote shuffle manager (indistinct) names, so it will just launch it through the whole (indistinct) system. And the other side we use just MapStatus, and MapOutputTracker

to maintain some states. For example, the shuffle server connection information, so we embed that in the MapStatus. So the reducer can retrieve that when it try to connect to the servers.

We do a lot of metrics. And Uber had M3 open-source metrics library, so we use that. We find that some very important metrics in our site is network connections, file descriptor, and disk utilization. So we monitor these metrics very closely.

We do a lot of tests to make it production quality. So we do unit tests and also we do stress test. So we have a special tour. It’ll randomly generate data and a random map task. It will also random cure servers during run time, so we can test a lot of executors. So this help us a lot to find a lot of issues. Also, we sample production Hive queries, and convert this queries to Spark application. So we can get our real production load, and then test our system.

This is currently running our production. How much data we have? I think the graph shows, we may have around 500 terabytes shuffling every day. So this is not whole amount of data. Now we have even more data than this. Is running for any amounts so far, and so far, everything is good. And we have hundreds of thousand applications running every day. And most importantly, we find a job latency is very similar with the external shuffle. So we do not introduce any degregation for the server.

And also, because we remove the spill files, so actually reduced file IO dramatically by half.

Hopefully, we will open-source it very soon.

We still have a lot of work to do. Here’s our roadmap. The first one is we want to support HiveOnSpark as well. And we want to add a quota, so it can run well in multi-tenancy environment. Also, of course, load balancing is important. When you have more time, we can fine tune the load dependency.

Also, in Spark community, there’s a discussion about new shuffle metadata API, that is, to support all kinds of different remote shuffle service. So we’ve come out. So we want to integrate with that as well.

Yeah. That’s all. So thank you everyone.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Mayank Bansal

Uber Inc

Mayank Bansal is currently working as a Staff engineer at Uber in data infrastructure team. He is co-author of Peloton. He is Apache Hadoop Committer and Oozie PMC and Committer. Previously he was working at ebay in hadoop platform team leading YARN and MapReduce effort. Prior to that he was working at Yahoo and worked on Oozie.

About Bo Yang

Uber

Bo is Sr. Software Engineer II in Uber and working on Spark team. In the past he worked on many streaming technologies