The modern data customer wants data now. Batch workloads are not going anywhere, but at Scribd the future of our data platform requires more and more streaming data sets. As such our new data platform built around AWS, Delta Lake, and Databricks must simultaneously support hundreds of batch workloads, in addition to dozens of new data streams, stream processing, and stream/ad-hoc workloads. In this session we will share the progress of our transition into a streaming cloud-based data platform, and how some key technology decisions like adopting Delta Lake have unlocked previously unknown capabilities our internal customers enjoy. In the process, we’ll share some of the pitfalls and caveats from what we have learned along the way, which will help your organization adopt more data streams in the future.
– Welcome to my session. I’m Tyler Croy. I’m here to share some of the lessons that we’ve learned at Scribd as we’ve built out our new data platform that’s been very heavily oriented around streaming. I wanna make sure that before we get into this talk, I mean, there’s a lot of sessions that Spark and AI Summit that covered Delta Lake and Databricks and different things that you can do with Spark. I’m assuming that you understand a little bit of what Delta Lake is at maybe a high level, what Apache Spark is obviously, and a little bit about what Databricks as a platform offers. I should also mention that this talk is based on the experiences that we’ve had at Scribd in building out this platform. I don’t necessarily think that this is the perfect way to do it, I’m not saying that this is the only and the right way to do it. Most of what I’m trying to convey during the session is that this is how we’ve built things out at Scribd.
So I am the director of platform engineering here at Scribd. I lead two teams, the data engineering team, and a team called Core Platform. And a lot of what we do is build out infrastructure to support our data platform and the tools that sit on top of it. Part of the reason why I wanted to give this talk is my perspective is a little bit different than I think a lot of people that come into, I would say data platform infrastructure, data engineering teams, in that this is not where most of my professional background is. Most of my professional background is actually in developing backend web services and data services that are serving production end users. Think high throughput API surfaces. And so I bring I’d say a fairly production oriented mindset to the work that we do on the data platform here at Scribd.
As a company, Scribd is arguably one of the largest digital libraries on the planet. We’ve been around for 12 or 13 years and in that time we’ve accumulated a lot of really great books and audio books and users have uploaded a tremendous amount of content, whether that’s scientific papers, manuals for toasters and things like that, or other legal documents and all sorts of other interesting things that you might find in a sort of PDF or a presentation type form. And what that means for us as a company is that we have a really rich data set that we’re hoping to use to achieve our underlying mission in all of this, which is to change the way that the world reads. And for us, that means bringing the best contents that we can possibly find that’s gonna be the most interesting, the most relevant to our end users. And in order to power that we really need a very strong, very robust data platform underpinning that. And that’s where the platform engineering team comes in, which is the organization that I lead.
At a high level, we’re not actually delivering services that are servicing scribd.com users. What we’re doing is really delivering internal data services that other teams like the recommendations team or the applied research team, or business analytics research, building tools for them so that they can build out more infrastructure. I wanted to make sure that I also highlighted a number of the individuals in the platform engineering organization whose work I’m really presenting here. There’s very little of what’s in this presentation, are my ideas per se, definitely not my implementations in a lot of cases. So the list of people that you see here on the screen, those are the individuals that helped make the realtime data platform a reality here at Scribd.
So in order to talk about the real time data platform, I really shouldn’t start by explaining where we began with our data platform. My history at Scribd, I’ve only been here since January of 2019, which honestly it’s felt a lot longer than that, but when I joined Scribd, we had a very batch oriented data platform.
And what that meant for us is that we have, and frankly, this is still a system that’s in place in a lot of ways, very, very different tools that have been adopted organically over the years running on this one big batch data platform we have two versions of Spark. So we still have Spark One running around, which I’m really looking forward to getting rid of, we use high very heavily, we have some internal customers that are relying on Impala and probably the worst offender is we have some Ruby Scribds that are running around that are querying hive and HDFS directly. And all of this really comes together to make for a very difficult to manage and difficult to scale up a data platform.
But the tools themselves are not the biggest problem with our data platform. When we run our batches, we’re really only running them nightly. So that means for our data customers, that they are getting their data tomorrow in the best case. If there’s been any issues, say one of those sort of poorly developed Ruby Scribds fails and causes a lot of downstream jobs to fail, then we might have to run a recovery and then they might get their data 24, 48 hours, up to 72 hours in some cases where we’ve had significant problems from now. I should also mention that we’re also on top of an on premise data center for this data platform. We’ve been moving things to AWS over last six to eight months, but the legacy, I’ll call it, batch data platform is still on-premise, which means we’re also running a lot of infrastructure that we’re not very good at running. For our users this also means that there’s a lot of contention for the resources that we have in that data platform as well. So while we have the nightly running, the nightly is what we call batch jobs, while we have the nightly running, that means that ad hoc users or development users are competing with cluster resources at the same time. And that causes a lot of problems for us. And when I joined Scribd, I decided I didn’t wanna ask sort of what could we improve with this infrastructure partially because when all you’ve seen is batch what’s improvement to you might be faster batch, and I didn’t think that that was the direction we needed to go in. So I started to ask people, what can’t you do based on the performance or the tooling that we have? And what came back in a lot of cases was we can’t work with data or I can’t get answers in minutes or hours based on questions that I’m posing of our data, which is a big problem. Underneath all of this data platform what I consider the most important part of any data platform is the storage layer. And in our case, that meant HDFS. And our HDFS infrastructure we had at the time, last time I counted, almost 60 to 70% of our files in HDFS were small files. So that means the files were significantly smaller than the HDFS block size. And if you’re not familiar with the small files problem in HDFS, it’s basically, it totally kills any performance that you could possibly get out of the cluster, because for all of your reads that jobs have to do, they have to collect a tremendous amount of data. Our infrastructure that’s deployed in this on-premise environment, we have in some machines we will have 10 gigabit NICs so 10 gigabit network interfaces and in others we’ll have 25 gigabit network interfaces. During the nightly we are saturating those interfaces between some of these nodes. When we started actually survey some of the data last year that we had in HDFS, we also found that it was a collection of very, very different file types. So in some cases we were actually writing parquet and in some cases we were compressing files very well, but for the most part, we had RC file, formatted files and even plain text files floating around in unoptimized forms. And all of this came together to really, I would say, adversely affect the performance of the data platform. And when we started to talk more about streaming and streaming workloads, it was just inconceivable that we could develop a streaming platform on top of this underlying storage layer.
So last year we started to work on a prototype of what a new streaming infrastructure would look like, what that would look like in AWS. And we’ve started to look at a lot of different pieces of infrastructure that we hadn’t worked with before, but we had a feeling that they might solve their problems.
Central to, I think any streaming platform is gonna be Apache Kafka, which is, if you’re not familiar with Apache Kafka, go check it out. It is by far one of the most useful pieces of technology in any streaming infrastructure, from my perspective. But we were also uncertain on how much arc was gonna be part of our world versus Kafka streams or any other stream processing tool. We knew that we were gonna be putting this on top of S3 because we’re deploying into AWS and S3 is certainly a great storage layer underneath that, but we also wanted to adopt AWS glue catalog which is basically, in hive you’ve got H Catalog, AWS Glue Catalog is basically the same thing, except it’s run by AWS. Part of the reason we wanted to do this is by running Glue Catalog, we were confident we would be able to integrate with AWS Athena or Presto or any other tools that sort of live in the AWS Glue or EMR environment. So the team spent some time working on a prototype that pulled some of these pieces of technology together. And this is a diagram that I shared with some of our internal customers around, I would say June, July of last year for our prototype implementation. Now, if you look at the, not sure top left or top right for you, but for me it’s top left. If you look at the top left where we actually tried to use one data stream, so in our case, mobile analytics and sort of float take that and make that go all the way through a streaming data platform where we would do inline processing this batch and validation, or debatching, excuse me, and validation workloads, and then bring that back into Kafka. So we had done our initial processing on it. And then implemented a number of archival tasks to get that into S3. And our expectation at the time was that ad hoc workloads and Spark Streaming workloads that needed to get streaming data, were going to be hitting Kafka and anything that needed to hit to work in a batch mode would hit S3 directly. And so there were some complications here I would say.
It really… When you squinted at it, it looked pretty good, it showed promise. There was a lot of problems that we had with the prototype though. Probably the biggest one is we had a lot of questions and unanswered problems around how we would deal with the consistency model in S3. S3 is eventually consistent, which is very important to know if you’re building on top industry. What that means is if you have one job writing objects into a bucket, if it writes an object, it can read that object out immediately, but if another object is, or another job, excuse me, is doing a list operation or trying to query that bucket, It may not see writes as they happen. So when you’re having a series of jobs that are depending on data, being written consistently to the bucket at a certain time, and then the next job takes over, there can be problems there. S3Guard is a common tool that people will use to solve this. We didn’t end up deploying that because we basically found a better solution, which we’ll talk about in a second. But we also had a couple of problems with the difference between stream data and batch data. So this model of all streaming consumers will just have to go to Kafka to get their data, meant that we would have maybe about a seven day window of streaming data that’s available. And if you don’t read it from there, you have to go to the batch archival data store, which was S3 and that sort of like split our work, or split our implementation to where we’d have to have, any streaming application would have to be able to work with both. So sometime after we had finished our prototype, we discovered Delta Lake. And if you’re not familiar with Delta Lake it’s actually pretty simple conceptually. It’s parquet files plus a transaction log. And for us, both of those are sitting on top of the S3.
And when we looked at it for our uses, it really actually simplified a lot of things. And because storage for us is the foundation of our data platform, what that meant was that we had a very, very strong place to start from with our streaming data platform.
The number one thing that we get with Delta Lake, which I think is a fantastic feature of Delta Lake are the transactions. That transaction log really solves some of your data consistency problems. And what that allows us to do is we can have one job writing, complete a transaction, and then another job can come along and read the transaction log and see when data has been written in order to pick up after that, after that data has been written. And when we were looking at solving this for ourself, we were sort of looking up at this big, giant problem and thinking, oh, crap, we’re gonna have to spend a lot of time figuring this out. Delta like gives us that for free, which is pretty fantastic. there’s also this optimized command, which I think is one of the hidden bonus features of Delta Lake that really changes the game when you adopt it. When you think about a streaming application, a streaming application is gonna be bringing data sets in, and we’ll go from a Spark Streaming perspective, we take one batch of data, and then we write it into S3. So we’ve got one little parquet file. Say our batch site batches, every minute we’re writing every minute, writing a another little parquet file. When another job comes in and reads from that parquet or from that S3 bucket, it’s going to see hundreds or thousands of little parquet files. The optimized command allows us to basically start a transaction, take a bunch of those files, smash them together and create a new file that is optimized, joined together. And it basically removes any small files problem that you might have, which when you’re building on top of S3 and when you’re streaming data into those buckets, that’s actually very, very important.
For us, what that has meant is that the small files problem disappeared and we also didn’t have to think about it at all. So just by adopting Delta Lake, as the foundation for our platform, we had one of our biggest problems with our existing data platform, just go away entirely. The performance of Delta Lake is also something that should be mentioned.
Because Delta Lake is responsible for writing every file, they’re always compressed parquet, which for us, where we had this cacophony of different files floating around in HDFS, that’s a huge benefit for us. That also means that you’re getting probably the most bang for your buck out of the storage and the performance that you get out of that storage, because it’s optimized all the time, your optimized writes.
It’s also important to mention that every write is a new edition. Every time you do a mutation, it’s a write, it’s not an update. So you’re not updating files in place, which means that we can just stream data into Delta Lake as fast as we possibly can, and I’ll show you an example in a little while, and it all just works fine. There’s no problem writing a lot of data into S3 for us which is fantastic from a performance standpoint.
One of the features we didn’t know about, or we didn’t fully appreciate, I would say when we started with Delta Lake, was the syncs being a streaming sync and a streaming source. What this means for us is that we can use the same table to stream data into and have all of the downstream jobs that might also need to access that stream data feed directly off of that Delta Table, rather than having data come into a Spark Streaming job and then go back out to Kafka and then come back to another Spark Streaming job and then go out to Kafka. We’re basically cascading data through a series of Delta Lake tables and our N plus one streaming jobs are just feeding off of Delta tables as opposed to everybody going to Kafka, which is pretty fantastic. And what this means for us in a very real practical sense is that we have one table, one table to rule them all that sort of thing. But we have one table that all of our streaming consumers can feed off of and all of our batch workloads. So this is an example of a data stream that I’ll demonstrate a little bit more in detail later, but this is me running a query to grab data, grab today’s data. So data that’s been streaming in.
Creating logs that Fastly in this case. And when I look at the table, I can get the latest information, but I can also go back in time. So this is another query where I’m just going back to January and grabbing data that has been written back in January and for our users, that means they’re always going to one consistent place for their data. They don’t have to figure out if I’m going to one place for stream versus a batch or a historical data lookup, which for me, that’s incredible. But I should mention that Delta Lake is not perfect. It’s certainly got some caveats and a lot of the caveats that we’ve encountered come into play when you’re working with Delta Lake as a streaming source. Compaction, for example, can cause downstream consumers to receive events again.
There’s also some cases where if you optimize on a merged target table, you might have transaction conflicts going on. There’s some of these examples, I’ll make sure that we post on the Scribd tech blog to give you a little bit more detail, but suffice it to say that if you’re using streaming sources, definitely reading your streaming sources and syncs as part of your streaming architecture, definitely read the documentation thoroughly and pay attention to some of the caveats and be very aware of where you might have concurrent rights between different jobs to the same Delta Lake table. There’s some caveats that you need to be aware of there. But all that said, I’m still a huge fan of Delta Lake. It’s by far been a tremendous piece of technology for us to build our streaming data platform on top of. The other half of the equation for us, however, is Databricks, which we’re a Databricks customer, we evaluated EMR as part of our work last year, and then we discovered Databricks.
And there’s a lot of great things that I can say about Databricks. It has definitely solved a lot of the problems that we both knew we had, and that we didn’t know we had.
When we were originally thinking about what our data platform was going to look like, we were thinking about a production data platform. So something where jobs were running, that developers weren’t accessing. We weren’t really thinking about the development story and the notebooks and some of the collaboration that we can do inside of Databricks, excuse me. That actually accelerated some of our development as we began to launch streaming workloads into Databricks, which was an unexpected win, which I’m pretty happy with. I also wanna call out Delta caching because Delta caching was not a feature that we knew about until we really started to get invested in the Databricks platform and Delta Lake. When we were imagining our EMR environment, we knew that we had poorly performing jobs, we knew that we had a lot of jobs that frankly were not doing optimized reads from their storage, I mean, obviously with our HDFS storage, but they weren’t doing optimized reads. And so we were considering some approaches where we would launch EMR clusters with a local HDFS installation inside that EMR cluster to perform basically local caching for the cluster while it was doing its work. So we had load data in from S3 to HDFS, do a lot of the work inside of the cluster and then report our results back out into S3. Delta caching is effectively that, but it’s a check box that you check when you launch a cluster, which is fantastic. And it actually makes not only some poorly performing jobs run faster, but it also helps a tremendous amount with high concurrency clusters where you might have a lot of different jobs running at the same time, but also ad hoc workloads. We have some clusters that ad hoc users are basically sharing and the Delta cashing ensures that if you have, let’s say 10 data scientists that are all pulling a couple of common tables from Delta Lake, that we’re cashing that in the cluster, and it gives them faster results while they’re doing their work, which is pretty phenomenal. The challenges with Databricks though, and I don’t wanna spend too much time talking about every specific one, ’cause some of these are on roadmaps with the product team at Databricks. We’ve had a very good relationship in giving them feedback and them giving us some help on how we structure our streaming platform. But the big one that I wanna talk about is really monitoring of production workloads, which is where if you’re deploying a streaming platform and streaming infrastructure on top of Databricks, that’s probably where you’re gonna have the most work to do that you might not be anticipating. Out of the box, Databricks doesn’t really support you. There’s no turnkey integrations for metrics getting out of Databricks or logs getting out of Databricks, or sending alerts. So in our world, we think about Datadog for metrics and logs, we think about Sentry for exceptions and things like that and we think about PagerDuty for alerting. In order for us to get what we need out of the platform and get what we need for these production streaming jobs, we have to implement all of that ourselves, which was not something we were hoping to do, but that’s where we are. I know that Databricks is going to be addressing this in the future, but right now, just about the extent of what you get is this. It sends an email alert when something goes wrong, which is not nothing, (laughs) but it’s not great either. If you come from the perspective of production readiness, or if you talk with your infrastructure team about what they consider requirements for a production service, there’s just a lot of things that you have to go build yourself if you’re deploying streaming workloads on top of the Databricks platform. In all fairness, we were gonna have to build those to ourselves if we went over to EMR or any other streaming infrastructure that we might deploy, but these are things that you should be aware of if you’re gonna be building out infrastructure on top of Databricks. All of that said, however, still a great, great piece of technology that we’ve built on top of. The combination of Delta Lake and Databricks has actually unlocked a lot of workloads and a lot of features that we weren’t originally planning that have already accelerated the work that some of our internal data customers have been doing. And that is a fantastic win for us. So I wanna give you an example of one of the streaming pipelines that we have up in production right now. And this is using Delta Lake and running in Databricks literally right now. So this streaming pipeline, it’s different from the example that I showed you before when we did our prototype, which was using mobile analytics events. This example is a production pipeline where we’re pulling in logs from Fastly, which is fronting a lot of our Scribd.com traffic. And getting that into do analytics work and store logs to make those available to internal customers that need those logs and compute ratings and some other stuff that isn’t really relevant to the slide. But suffice it to say it’s a very important data source, but because it is access logs from our production web end point, the rate of data that comes through this is maybe I’m gonna guesstimate between four and 6,000 logs per second. So it’s a very high throughput pipeline. We bring that data into Kafka directly. For us, any streaming data source goes to Kafka first, and then we work with it from there. Our first hop after that is a streaming job that is basically taking data from Kafka and mapping that into a Delta Lake table. This is a fairly simple thing to do, but it does require some mapping of what one schema has to the Delta Lake or Delta tables table schema, which is important. When we write that data into this Fastly table, this is the logs Fastly table in blue, we actually decided to set up additional streaming jobs off of the Delta Lake table directly. So this view logs table is reading from the Fastly or view logs job, excuse me, is reading from the Fastly table. And then that’s doing some computation and persisting those results into a view logs table. And there’s another extreme job that is waiting for results, computed results on that view logs table and so on and so forth. All the while these two tables are accessible to ad hoc workloads that can basically query information as it’s coming in. But one thing I should mention, and I think this is an important thing to consider for any streaming workload going into Delta Lake is you need to run Optimize. At some point you have to run Optimize. And in our case, we have a batch job that is triggered by airflow, which is periodically optimizing some of these tables. This is something where you sort of have to play with it to see what the right cadence is for optimizing depending on your own query times or query needs, but you have to come in and optimize to make sure that you’re getting the best performance from your reads as you’re streaming data into those tables. And when I call back to the queers I was showing you earlier, I didn’t point this out at the time because I wanted to explain the data pipeline. If you’ll note the time difference between when the data was queried and when it was actually persisted into Delta Lake, that is a nine second difference. And so a log generated in Fastly is showing up for our customers in a Delta table in nine seconds, which is phenomenal, which means if you think about a customer that was here 12 months ago, they were waiting 24 plus hours to get results. We can start to give them results within seconds. For me, it’s still sort of mind blowing, but for the users of this data, they never anticipated this. They’re over the moon happy. I can’t tell you how happy some of these users are. And one of the things that is funny about this query to me, and I wanted to point this out as well, if you look at the bottom left of the cell in the notebook, the actual query took 23 seconds. The time it takes for us to run the query against the Delta table is actually longer than it takes us to get this data pipeline running to bring the data in. Because the writes are really, really fast. The reads aren’t as fast because this table has not yet been optimized. And so the batch workload that I… Not the batch workload, excuse me, the historical workload that I showed you earlier where we’re looking at data from January, that got data in 20 seconds instead of 23 or 30 seconds, which is about the norm for accessing this live data here. You might see a difference in how fast your data shows up in Delta Lake, depending on the trigger interval in the Spark Streaming job. So you can configure this if you have more real time requirements, you can set no true interval, which is what we have here, or if every minute is fine or whatever is appropriate for your organization, you can certainly change that yourself. But for us, there was no trigger interval. And that’s all running in production right now, which I cannot tell you how proud I am of the work that we’ve done to get from. you’ll get your data tomorrow to 10 seconds from the data being generated you have it. So with that said, it’s super awesome. I can’t tell you enough good things about Delta Lake and Databricks, but I wanted to talk a little bit about where we’re going next, because that’s just one data stream and we have so much more work to do in our streaming platform. We actually have a lot of different streams in our infrastructure, and we actually are bringing a lot into Kafka. What we’re pulling out of Kafka in this nightly batch process. And so over the next few months, we’re gonna be moving all of those into a very similar looking streaming pipeline, going into Delta tables with Spark Streaming jobs, which we’re fairly confident it’s gonna change the equation for a lot of our data customers on how they access the data and how they work with it and what they can do.
When we start to bring a lot more of these streaming workloads into Spark Streaming and into Delta Lake, we also have to do a lot more tuning. When you’re running in optimized, you’re basically, in our case, we’re spinning up a Spark cluster that is running the optimized command on the Delta table, and that is costing money. So we have to figure out what’s the right balance of running optimize too frequently, versus not frequently enough to give us the performance that we want out of this. And we also don’t have a good sense right now on how to structure our streaming jobs themselves, whether a single topic in Kafka should map to a single streaming job or whether we should co-locate a number of different data streams into one Spark Streaming job that’s running in one Spark cluster inside of Databricks.
But probably most important is there’s more production work or productionization if you will. And this is what I was alluding to earlier, the stuff that you have to do yourself. I am not pleased with the amount of production readiness that we have in our current Spark Streaming pipeline. So a lot of the work that we have lined up over the next few months is basically hardening that and making it more resistant to failures and more resistant to data problems or anything like that.
Overall, I think Delta Lake and Databricks are a phenomenal choice if you’re building out a streaming data platform, or if you’re building out batch workloads. Both of them together allow us to have the flexibility to do both. And that’s probably the most important takeaway, is the future for us is very, very streaming oriented, but our batch or periodic workloads, they don’t go away. There’s minor things that you’re gonna have to keep in mind and maybe do some manual implementations or do some things yourself that you’re not going to get for free from either Delta Lake or Spark running on top of Databricks, but when I take the whole picture together, some of the wins that you get from adopting this platform are just insane. They’re crazy big wins. I can’t even tell you how transformative Delta Lake and Databricks alone have been for the way we view data internally. All of a sudden we have people who are very aggressively getting into our data and starting to think, well, what other questions can I answer? I used to be able to get results in two days, so I didn’t ask too many questions and now they can get results in seconds or minutes. And so we have already started to see usage of our data infrastructure go up and up and up over time because people can, all of a sudden ask one question, get an answer, ask another question, get an answer and go through a really tight development cycle as they’re developing machine learning models, AB tests or anything that’s involving the data that we’re working with here at Scribd. That said, if you’ve got any questions, I’m happy to answer them now, but please, if you go to tech.scribd.com, you’ll find a little bit more that we’ve written up about our streaming data platform and about the work that we do here in platform engineering. And we’re also hiring. So if you’re curious about joining the Scribd team, you can find out more on the tech blog as well. But with that said thank you for your time.
Scribd
R. Tyler Croy leads the Platform Engineering organization at Scribd and has been an open source developer for over 14 years. His open source work has been in the FreeBSD, Python, Ruby, Puppet, Jenkins, and now Delta Lake communities. The Platform Engineering team at Scribd has invested heavily in Delta and has been building new open source projects to expand the reach of Delta Lake across the organization.