Large Scale Lakehouse Implementation Using Structured Streaming

May 27, 2021 05:00 PM (PT)

Download Slides

Business leads, executives, analysts, and data scientists rely on up-to-date information to make business decision, adjust to the market, meet needs of their customers or run effective supply chain operations.


Come hear how Asurion used Delta, Structured Streaming, AutoLoader and SQL Analytics to improve production data latency from day-minus-one to near real time Asurion’s technical team will share battle tested tips and tricks you only get with certain scale. Asurion data lake executes 4000+ streaming jobs and hosts over 4000 tables in production Data Lake on AWS.


In this session watch:
Tomasz Magdanski, Director of Engineering, Asurion



Tomasz Magdansk…: Hello. What I’m about to tell you, you won’t find in any books or a blog. I will share with you real obstacles you will run into when trying to build large-scale data lake. I will briefly talk about Asurion. How did we get here? How to run scalable and cost-effective lakehouse? And most importantly, lessons we have learned along the way.
We are a large insurance and support company. Everyday over 10,000 of our experts engaged in support sessions to help over 300 million people around the world, and our services span from technical support to same day device repair and replacement. Just to give you an idea about the size of this project, our lakehouse ingests over 4,000 tables, from 100 plus databases. We create seven and a half thousand tables inside of datalake with only or two tables that I will talk about later. We ingest streaming data from Kafka, Kinesis, SNS and SQS, as well as files, flat files and data from APIs.
We even do data from other cloud providers. Our database sources range from SQL server, Oracle, PostGreSQL, MySQL to dynamically begin Redshift. In our data warehouse, we combine thousands of those tables to produce over 300 data models and 600 data marts. And then finally, in our consumption layer, we have over 10,000 data views and over 2000 reports. Our previous architecture was based on the Lambda architecture. As you may know, Lambda architecture contains of speed layer and the batch layer, and there are several issues with that. First of all, you have to process everything twice. You also have to validate the data twice and often you have to use different technologies to do so. Or you have to deal with late data into different ways, and you have to worry about reprocessing data in most of the immutable space. You have to worry about scheduling, about rewrites and the queries. Data updates are also really difficult, which makes the data compliance more difficult.
We didn’t really have the computer storage separation that made the scalability hard and expensive. Our data latency was mostly D-1, and we had also a very wide technology stack. From Redshift to Lambda, Kinesis, Kinesis Firehose, EMR, RDSS, Hive, Spectrum. It was difficult to manage. Then we looked at the Lake House Architecture where we only had a single pipeline. We had a near real-time data latency capabilities, scalability of Apache Spark, a very high degree of integrated ecosystem, and the technology stack was really very narrow. It was very, very promising.
Another important advancement that help development was utilization of the production data directly. We were aiming to minimize data movements as much as possible and utilize computer storage separation. Developing data platforms, having access to real production data to identify and fix many nuanced issues, as well as dealing with actual scale is something that we couldn’t reproduce in dev environment. So this outcome was very desirable for us. We use IAM roles and mount points to transparently connect pre-product compute clusters to production data in read-only mode and we were also writing data back to production packets. Therefore, data never actually resided in the long-term environment, in the true compute and storage separation portion. This picture is significant and we’ll come back to that in one of the lessons learned.
Now in our previous architecture, we had one ETL mapping job per table, which made the platform really rigid. We had effectively 4,000 mappings, and if you needed to make a change across many mappings, it’s a lot of work and the platform was kind of resisting change because of the complexities of things that has to change. In the Lake House, we wanted to create a single spark ingestion job that was capable of streaming and batching and reading from all the sources and being fully configurable, and we’ve written this job in Scala using design patterns and a lot of dependency injections allowing for very rich configurability. We picked structured streaming to take advantage of checkpoints, exact or at least one semantics and we have unified our interfaces, our landing zone, to S3 and Kafka. So the database, CDC pipelines, APIs, flat files were all uploaded to S3 and SNS and SQS and Kinesis was all uploaded to Kafka. Then we scheduled our ingestion jobs using Databricks as femoral jobs and ephemeral clusters. So this altogether allowed for highly testable, easy to maintain and very flexible code.
All the tables in our data lake are delta tables. As part of data lake, we need to keep a slowly changing dimension to type append only tables to track changes to all rows and all columns and the scores, and we call this L1 tables, but we also need to store related version of each row, like search engine [inaudible] type by merging changes to the target tables and we call those L2 tables. So we had a choice. Do you structure streaming to stream data from landing zone to L1 and then from L1 to L2? But that would effectively double the number of our jobs from 4,000 to 8,000. So instead of that, after extensive testing, we decided to utilize the power for each batch API to write both tables at the same time. And if you… Intellect box is greatly simplified, but we don’t actually use naked Spark. We have readers and writers that wrap around Spark. We also decorate our data heavily by adding mandatory meta columns inside the forge batch.
Next choice that we had to make was around our trigger choice and a kind of a native way to approach the problem was to take out the ingestion job, that was a streaming job and send it as an ephemeral job. Now, the challenge with that was, out of the box we had 4,000 tables to run and Databricks only allows 1000 jobs in the shard. And on top of that, each of those ephemeral clusters would have to have a driver and at least two nodes, which would put us in a 12,000 nodes environment, which was a little bit cost prohibitive, and we didn’t feel it’s the best use of the resources. So the next step we looked at combining many of those streaming jobs together in the notebook and running them as one ephemeral job, and we found kind of the sweet spot for a driver to handle around 40 streams.
Of course the clusters had to be larger to handle all streams, but also we’ve noticed quite high degree of computer waste because within those 40 streams, there could be streams that don’t really have data very often, but we keep them running all the time. Another problem with this approach was if we wanted to disable a single stream, we effectively had to stop the job and that will stop all 40 of those streams. So we wanted to find out a better way to be able to switch one job or move it from notebook to notebook or from a cluster to cluster without having the need to stop all the other jobs. So finally, we settled on a trigger one, and for those of you who might not know the trigger one option which is structured streaming, will collect all the data up to your max stream or number of files or bytes, process that data, and then checkpoint the progress and then terminate the job.
So you’re really responsible for scheduling the micro batches when you want them to run. That means no continuous execution, and this allowed us to put hundreds of jobs in the notebook in a single ephemeral cluster. Now, because those jobs are running on schedule, we could actually migrate them between the notebooks in between rounds. We could also refresh the conflicts for each round at each round, so the only configs changes we made were immediately taking effect without any restarts. And also we’re using machine learning to revitalize the job across different clusters, to make sure that we meet our data SLIs. We have five types of notebooks that we run as ephemeral jobs, and those notebooks have a group assigned to them. They go to the database, the configuration database, they collect all the jobs that belong to that group to be processed, and then they run those jobs in [inaudible] series, or by using power collections, we can paralyze the job execution all the way to the number of course, on the driver, which typically is between 16 and 32 two jobs running at the same time.
We have five flavors of those notebooks. We have one for things that are updated very frequently, where we could actually put around 60 tables per one notebook. Our goal is to finish all of the updates and merges within 60 minutes. We have less frequently updated tables where we can put 300 to 500 of them. And then infrequently updated tables, where we can put up to a thousand. The benefit of that approach is that we can take a table that’s growing in frequency and move it between those groups seamlessly and rebalance those jobs seamlessly without a resource or impact to any other job that’s running within that group.
We can also achieve what do we call the pseudo streaming by having a notebook that runs the job in a y-loop, and we’ve tested that and the performance is very close to just running a processing trigger and letting Spark handle the execution of micro-batches. With the exception that we can actually check config on the top of each y-loop, which means that if we have five jobs running and we want to disable one of them, we just disable one of them, and the next iteration is just not going to be run with no restarts to the other four.
Okay. So let’s move ahead to the lessons that we’ve learned while building such a large scale lake environment. Well, first of all, let’s talk about the cloud files. For those of you who don’t know, cloud files is a part of Auto Loader, offered by Databricks, and when you’re using AWS and try to read from a folder, Databricks will automatically create an S3 notification for the folder that will make it into an SNS and then it would have an SNS that subscribes to that SQS that’s subscribed to an SNS. Now, first of all, AWS only allows 100 notifications per bucket. So you’re going to have to write some sort of automation to know that you already have saturated notifications in the backend, and you might need to deploy a new type of a job in the next bucket.
Next, the SQS and SNS are not tagged by default, at least today. So if you’re like us or require all of your resources to be tagged, you’re going to have to ride some sort of Lambda or some sort of functions that notices and detects that Databricks created resources on your account and go and tag them appropriately. And then finally, AWS SNS has hard limits on APIs, ListSubscriptions and ListSubscriptionsByTopic, and Databricks uses these APIs to check if there is already… SNS already has an SQS subscription. And if you run enough jobs, like we run thousands and thousands of jobs at the same time, we have seen times where we hit those limits and our jobs fail. So the only option is today to deal with this is just to see if there are any very slow tables that don’t change a lot, where we could do either disable notifications, or just spread them out more in time to try to avoid that. But at some point, we will hit the scale where we’re just going to be running into these issues.
Okay. Another lessons that we learned about the cloud files. I’m going back to that data compute segregation slide that I talked about earlier. So when you pre-prod compute from another account, makes a request for data from production account, Databricks will set up a notification and it will set up an SNS and then it will set up an SQS. But if you notice, the SNS and SQS are in the accounts of the compute, not in the accounts of the storage.
So this works and your testing works, and great, you’re ready to deploy to production. You deploy your job to production environment. You make the same request for data, and this time Databricks creates an SQS queue only because an SNS already exists, right? So now, yes, your production data works too, but the notifications and the whole cloud files runs through a pre-prod environment, and that’s of course problematic. So what do we have to do during our deployment is to clean the notifications, run the production job first, let the production job set up SNS and SQS, and then start the pre-production jump to subscribe to that SNS topic. And that is the final step, but that’s an extra step we have to do and automate during our deployment process.
Okay, next. We bring data from thousands and thousands of database tables through AWS DMS, and we’re using the CDC change data capture flow. So when you first go and try to enable CDC on the table, you have to do two things. You have to enable a load and CDC. And what load means is, it’s a snapshot of the table at that given time, and then CDC is the process that those entail have been logged of the database to skip following the changes to each role. Now, the challenge we’ve found with this setup is load files can take hours and CDC files are starting being tracked as soon as you start your job. So there is a chance that there’s an object to [inaudible] in the CDC portion of the pipeline that will have a timestamp that’s before the load file completed, and the load file assigned the timestamp of now.
So you have a load file version of the row that has a timestamp that’s ahead of a CDC updated version of the row. So we reset our DMS timestamps to zero for the load files to avoid this kind of race condition. Another lesson we’ve learned with the DMS and CDC is the data type conversion, and because DMS can connect to almost any database, we found sometimes it doesn’t converse the data types correctly. So for example, we’re a SQL server, a tiny Int converted to UINT and we’ve seen some overflow on that and we had to apply the rule that you see on the bottom of the slide to actually force it back to an integer. And for example, in Oracle, numeric is converted to DECIMAL(38, 10), and you have an option to set it all the way to 38, 38, but for example, in our Oracle databases, our numeric column was on a precision of 50. So there was no way for us to bring the data. We had to set the setting called numberDataTypeScale to minus two, which effectively converted this into a string.
Another lesson learned. Well, load files can be large, and they can skew data when you read. So you might need to do a salting of some sort. DMS files are not partitioned, so either think about compaction or just be aware that there’s going to be a long time consumed by Spark to have read your DMS buckets. If you’re restarting after a few months, there’s going to be thousands and thousands of small files. We set DMS to remove all the files when we start tasks, just to have a clean slate, just to try to ease this and this problem and minimize the data duplication. Some sources on the database source could have large transactions.
So let me take you back a second. When we have multiple updates to a single row within one micro-batch, we have to determine which one is delayed. This one says we want to merge into the target table, and initially we thought we could use timestamps because typically they go to millisecond or microseconds, but what we found is that if you manually open a large transaction in the database, and you put a lot of updates to the same row into that transaction and you close the transaction, all of those updates come with exactly the same timestamp. So it’s impossible for you to determine which is the latest. So we had to bring in LSN for all the databases through DMS to make sure we have some sort of deterministic way to merge and get to the latest row.
Now, another lesson that we have learned is if database has no primary keys, but there is a unique constraint that you can use to merge, and that constraint contains null values, the null values are a lot of the source, Databrick’s merge Delta will not recognize the null and null when it’s matching and it will insert new row each time. So what do we had to do is replace all the nulls with string nulls or empty strings for more deterministic merging.
Lesson learned about Kafka. So if you’re using Kafka for some sort of CDC pipeline pattern, which is also possible, you may have tables that don’t really have a lot of traffic, and you don’t want to configure a high number of partitions and waste high number of resources on Kafka for the small table, but initial load of data actually may actually bring millions of rows into that topic. So now you have a topic that doesn’t have large number of partitions, but it has a lot of data and you don’t necessarily want to re-partition that topic just for one read. So we recommend to set the minimum partition and max offset per trigger to high numbers. In our case, I think we partitioned 4,000 and offset 10,000 to force Spark to paralyze weight of this topic, to speed it up without needing to repartition.
We also use our first L1 table that we write. We optimize that table inside for each batch, and then we use it as a source for our L2 merge. And that is to avoid an action, the second action, to go back to the source, because source could be slow. Either DMS could be stopped because of this number of files or the Kafka in this particular case could be slow too. So in general, we adopted the pattern where we add a batch ID column to the data frame, we write to L1, we optimize if needed, and then we use the same batch ID to filter data back from L1 for merging. And we found this pattern to be much faster than going back to source, or caching if you have a large amount of data, is also going to be slower.
Other lessons that we’ve learned with Kafka. Well, we wanted to use the trigger once on everything as much as possible so we have options, and the Kinesis and other sources don’t support this out of the box. So we had to move all of our SNS, SQS, Kinesis into Kafka using Kafka Connect, and then we could use support… and we can use Kafka trigger once for making this job work exactly the same way as we do, for example, in the DMS.
Lessons learned about Delta. When you do bring data for the first time, optimize your table manually because you will get the faster [inaudible] and then enable the Delta Optimized Writes because merging rewrites a lot of data. Optimized Writes is really good merging and figuring out the size of files, and so you don’t have to do the compaction optimization that often. Move your batch ID and merge columns to the front of your dataframe.
So Databrick collects statistics on first rows of your data frame. If you have a very wide data frame, you’re not going to get statistics on maybe columns towards the end, especially if you just added the batch ID, that would be the last column. You need that batch ID for filtering, for example. So move everything that you’re going to merge on and search for towards the front of the dataframe. If you use a merge columns that are incremental, like auto updating numbers, for example, et cetera, you can also use the ordering to co-locate data to there and further reduce the number of files that need to be read and it has the data skipping. We also always recommend to use partitions and use the i3 instance types with IO caching for Delta.
So other lessons learned. If you have any other tool that needs to read Delta, like we, for example, we use Presto, you have to write S3 paths when you register your Delta tables in Hive. Because Presto is not going to understand the DBFS, it’s not going to understand [inaudible], so we have to manually create table definitions and put S3 paths into them. If you want your Delta files to be written and read by things like Athena, or just Hive in general, or maybe Spectrum, you need to generate the manifest files and enable auto updates to those manifest files. This way, the non-Delta or non-Delta transaction, local [inaudible] technologies can still manifest files to read parquet files.
The Presto and Spark views are not compatible at this time. Just something to be aware of, because if you create a view in Spark, Presto is not going to be able to utilize it and the other way around. We also found that extracting Delta statistics like row count, last modified into hot cache is actually very desirable because we have workloads that require many previous tables to be updated, many dependent tables to be updated first. So let’s say you have any TL that requires 30, 40 tables to be updated first. Having a running cluster and issuing 40 described commands is slow just to determine if you can run your job. So by submitting those statistics at the end of each one to hot cache, the next job just reads the cash and determines all my tables are updated and I’m good to go, I can go and schedule the job now.
Okay. And finally, Delta and Spark work together to stream Delta out of the Delta table, but currently, as of today, it only works for append. It makes sense because when you append something to a Delta table, you insert new files into the table and into the transactional log, and anyone who listens to that table knows that there are new files and those new files are read. It makes perfect sense. Now with merging, this is a little bit more complex because merging rewrites on all the data. So yes, there is a new file created and that file might have a million rows, but we only updated a thousand rows in that file. So if you amid that file out down the pipeline, you have to filter down the million rows to the thousand rows that have actually changed. And how are we doing this?
Again, we’re utilizing the batch ID that we added earlier on to know that the new batch is whatever, 27, I have to only pick up the data for 27 and therefore filter down this massive file to a small file. It’s not the most efficient, but it is working pretty well. And finally, SQL Analytics, just a word about how we’re using SQL analytics. So our data marts are built from the collection of over 1000 SQL queries and statements. We needed a way for the data marts to be lifted and shifted from the previous lab platform to this platform and so we needed good scalable SQL execution engine. Of course, Spark is that, and we wanted to utilize an existing framework we have in place and submit the SQL statements through JDBC connector to a Spark cluster. First option was to use interactive clusters, but they are fairly expensive.
So we were looking more towards maybe EMR with open-source Spark and versus Delta. When SQL Analytics products came into our scope and it still supports JDBC connection, so it was a perfect fit for us to be able to send those SQL queries and create those data marts. Some lessons learned so far because it’s a early product is we have to collect all the metrics from APIs ourselves and put them in Delta table for some monitoring and performance. You are only allowed to use one meta store per your SQL workspace, which means if you have multiple different SQL endpoints, they all are going to share a meta store, which is different than clusters where we can configure different meta stores in each cluster. So we’re a little bit constrained in terms of separating compute and storage and cross account and using meta store to really tie it up together.
It also doesn’t support UDF. So if you need a [inaudible], you’re still going to have to fall back on interactive Spark SQL cluster where you can attach jars and this time it will come without jars for SQL analytics. And finally, you will still have to learn to troubleshoot Spark. So you’re still going to have to learn to understand DAGs and the Spark Jobs and SQL views on Spark UI, because this is still just a Spark job underneath. So to be effective in finding bottlenecks in your queries, you’re still going to have to do that.
All right, thank you very much. It’s time for Q&A. And thank you, please provide your feedback. We want to hear back. We want to improve on the quality of the content that we’re sharing with you and if you have any questions, if you want to contact me offline, please feel free to reach out to me on LinkedIn. Thank you.

Tomasz Magdanski

Tomasz is a seasoned technology leader specializing in real world implementations of Big Data, Real-Time applications and Machine Learning technologies. He has deployed and managed production applicat...
Read more