At Adobe Experience Platform, we ingest TBs of data every day and manage PBs of data for our customers as part of the Unified Profile Offering. At the heart of this is a bunch of complex ingestion of a mix of normalized and denormalized data with various linkage scenarios power by a central Identity Linking Graph. This helps power various marketing scenarios that are activated in multiple platforms and channels like email, advertisements etc. We will go over how we built a cost effective and scalable data pipeline using Apache Spark and Delta Lake and share our experiences.
Yeshwanth Vijay…: Hi guys. This is Yeshwanth Vijayakumar. I’m a senior engineering manager and architect at Adobe Experience Platform, specifically the Unified Profile team. So today we are going to talk about Massive Data Processing in Adobe using Delta Lake.
Agenda-wise, we’re going to introduce what kind of data we’re storing and what kind of challenges we have with the representation and other issues we’ll talk about it with Delta Lake. And then we’re going to talk about strategies that we use to mitigate them. And hopefully we have enough time to show some performance from this tool to say what we’re doing is actually working and useful.
But before we dive any deeper into the architecture I think we need to understand how the data flow looks like for our platform. So we have a lot of internal solutions and external solutions which are piping in a lot of marketing data. We have solutions like Adobe Campaign, which is for email campaign, orchestration, Adobe Analytics. It’s one of the largest vendors of analytics software in the world. So you have a fire hose of data coming in, in a variety of formats.
The formats can be JSON, parquet, whatever, name whatever you want. Now, all of this gets converted into an experience data model, or we call it XDM. This is a standardized JSON Schema tailor made for marketing use cases and other use cases but today we’re just going to focus on the marketing side.
We use a lot of Spark to gather this data and get it into the unified profile system which gives you a 360 degree view of your customer’s details. Now every single tenant, that is every client, has their own Unified Profile Store. And the data from the Unified Profile Store feeds into a fire hose of changes, of the change feed, which again, you can use for streaming statistic generation, and so on. The biggest value add of the Unified Profile is that you can make extremely complex queries. Everybody who did a trial of Illustrator in the last seven days, added Photoshop into the cart, but have not purchased it yet. This is just touching the surface of its capabilities. We can do a lot more. But performance and magnitude of scale is the things in order.
At the heart of Unified Profile, is the… In order to give the 360 degree view, we need to maintain the identity graph. Identities of personas can come from online data, offline data. The identities themselves can be anonymous IDs, like cookie IDs, et cetera, or known IDs. Like email addresses, phone numbers, loyalty IDs, et cetera.
So all of these IDs are put together in this graph generator, and we get a graph representation of the identities that we need to store in the Unified Profile. Now, we’re going to see why this is the exact element that actually complicates the storage and the processing.
Storing a graph, there are so many different ways, but storing a graph at scale is a problem of its own and it’s very researched. What we are going to try to do is we’re going to dumb down the entire problem into a very simple representation that we have here on the slide. Let’s take primary ID. The primary ID is the ID with which the data was generated. Let’s say a group of cookie ID related to a click on another website. The related IDs refers to the graph edges, right? Yeah. As in the graphs nodes that it’s connected to. The fields, field 1 to 1000, we just have a whole lot of fields and they are related to the record.
So you can imagine that each record in this table represents a click and primary ID, it tells you who generated the click. Related IDs tell you who that ID is related to. Let’s actually focus our attentions on the last two rows which is like a… So it says 789 is linked to 101. And so what you will see is… They also have a role for a record that is generated by 101. And we see that the Related ID is 789, 101 again. So the idea is we need to keep track of the related IDs as the data that’s coming in, whether it can be immediately or probably after, but when we are doing the processing, that is when we need to run reports or segmentation queries for activation and targeting, we need, the related IDs field is extremely important.
Now, let’s assume the same scenario where we get a new linkage, a new record comes in and this causes a new linkage and this causes a graph change as it is isn’t perfect. So this record is generated by this ID 103, and it links 789 with 101 along with 1103. So we don’t need to focus on the fields. The linkage is the main problem that we have. So what this does is this causes a cascading change in the rows of 789 and 101. So all the previous rows generated by 789, 101 or 103, have to be updated with this, related ID field, not from the immediately, but when we read the data in for processing, we need to understand that this is a change that is happening.
So what is the main access pattern that we’re trying to solve here? I gave you a complex segmentation query that is in fact the application. But the primary use case that you’re trying to optimize for me is we want to run multiple queries over one single rule. What do I mean by that? So since this is Spark confidence, we go a bit into this part. Now assume that we have this piece of code, which tended to be different. What we do is we take the raw records that we saw in the data that we saw earlier. And we do a group by the data runnings. This step, what happens, we get a list of related IDs and a sequence of records, put together as a result of the profile. Now for every profile or identity, like a merged identity, we have gotten the set of all activities that have occurred for that.
Now, what we want to do is we want to execute queries, multiple queries, not one. Exhibit multiple queries on top of this record. Why we do this kind of a pattern, thing is we have a system, a customized query engine, which is built for high performance use cases like targeting, wording and all that stuff, which can take and compile a huge amount of queries, thousands to tens of thousands of these queries. And we can run it on top of individual rules. So we can’t do it, as in for the use case that we want, people now would be like, “Hey, why can’t you just run sequel queries with the fast digital run?
This doesn’t work because even if you have fast scheduling and say, if you have an effective panelism of three queries at a time, a thousand queries is going to be the normal one, fine.
We’ll still be in an asset to waterfall model. So this is the hill that they had chosen to die. So you need to take my word for it, that this is the best approach that we found out. So given that this is a problem that we want to solve, right, let’s look into… Now, we’ll park that part of the thing. And we will dive slightly into the complexities of the data. Our data, like I said before, we are using JSON. So JSON, you have a hell of a lot of nested fields here. So the nestedness makes Schema influence itself and extremely hard. The next part is arrays. So we have a large number of arrays so we need to support them. And then comes the MapType. Now the MapType specifically now… So what happens is every single time we have such as Schema, or nested, arrays Schema, just representing thousands or tens of thousands fields in the Schema as a Spark struck by it’s tons of them being on the driver.
So this is a problem that I wish no one else has, but sadly, we do have that. And we’ve benchmarked that. And we see that as a huge bottleneck for the driver and a participant as such. And added to this complexity, every tenant or a client has a different Schema. Like what would you say, client A is going to have say they might be a retax file. They will have a different Schema. And we could be like, would you say a B2B client and they will have a different Schema. So it’s not exactly, what is it? We cannot take commonalities out of this so we need to treat everyone individually. And adding to this, the Schema keeps evolving constantly. Fields can get deleted, updated. We do want to think of a way, we don’t have a photographs kind of a way, but we have a lead additions going on, but no subtractions. But in a real life scenario, right guys, people are modeling data, bringing in data in the system, evolution happens.
So we can’t get away from it. So this is a problem that we need to acknowledge and try to solve for. And on top of that, we also have a multi-source problem. What do I mean by multi-source? We have data coming in from various sources, like from streaming sources, from bad sources. As soon as you make a click on that site, or you push a button on your app, the data starts flowing. So it’s like instantaneous. So that would be for the streaming thing. The batch could be like the client would be having a huge data processing system, which is generating summarized data and putting it into the system.
So we need to be able to build a system that can handle multiple sources and multiple channels. What kind of scale are we talking about? Right, because whatever problems have stated right now, if the scale was small enough, I could throw it into PostgreSQL with some support and quality. I don’t need to do anything more. But the problem is, and it’s a good problem to have, it’s the tenants have tens and billions of rows, we have petabytes of data that we need to manage. And clearly on top of millions of requests per second, speaking of processes system. And this flow that I’m talking about triggers multiple downstream applications, segmentation, activation like email campaigns, push notifications, et cetera.
So what is De;ta Lake? So this is a spot conference, or I’m going to assume that a good amount of people know what Delta Lake is. But for people that are not there, so it’s a open source project that implements or helps us achieve the Delta Lake architecture on top of the existing, the Delta Lake storage systems like S3, and ADLS. Key features, we’re not going to go over everything because this is not a specific data lecture. The key thing that I want to focus on is the ACID transactions. So you get a consistent, as in we get eventually consistent rights on top of this system, between various writing systems, even across various customers.
And then you also get a benefit of the data versioning system. So you can go back to a snapshot of the data, like say, five motions back. The good thing is it’s not trying to be re-invent the [inaudible] by creating a new storage format. It uses parquet and it makes it for the shortcomings of parquet, like its own transaction log.
And also the Schema enforcement and evolution is a key part. Audit history is also a very good feature that is necessary for a system like ours. But the most important thing, and I saved it for the last is the updates and deletes support. Now, if you put together what we went over in the initial, the data flow that we saying, you can see that we need to do a lot of updates. Instead goes without saying, but we also need to do a lot of updates and deletes to keep the system consistent with the data that’s coming in to let this back to the state of an identity or merged the identity. Currently, what we do is we use a hot store. It’s extremely expensive in order to get this capability. So if you’re trying to support whatever, it’s a data, it’s not good, cost-wise and of-course doing all this on a hot store.
So that’s where the Delta Lake comes into the picture to try to save us from a huge hot service. So in practice right, Delta Lake, how does it look, what does it look like from an API point of view? For everyone who uses parquet instead of using dox format of parquet we just use dox format of Delta. That’s that’s the major thing. The key point that they’re going to be focusing on right, is on the updates and so on. So the update workflow has an API like this, where you’re able to specify the stage data or the update data, like for example, not merge of new data. And you can do a join with the old data that already exists on the Delta Lake. And it gives good paradigms, like, okay, when it’s matched, do you want to do an update? When it’s not matched, do you want to do an insert do you want to do a Nua-PA? So this makes it a very nice layout profile of the dat.
The good thing is also it’s sequel compatible. So you don’t need to worry about, okay, you don’t need all the intricacies of the scallop programmatic API. I just want to write sequel. You’re good with that too. This is extremely important when you have multiple consumers in the system, and not everyone needs to be proficient in scallop or scallop programmatic API. Now we went to Delta Lake, all the niceties of it, but then there is no free launch. So we need to still do some work on our data to make sure that the introduction of Delta Lake in our ecosystem is going to help us without creating a new set of problems. And what we noticed was there are a new set of problems. It’s not like a magic bullet to wipe everything off it. So the first one is the concurrency conflicts.
So we went over how we have multiple sources, multiple channels, trying to write and write into the profile store at multiple different throughputs, as well as frequencies. So this diagram, this chart is from the Delta Lake dox. And the key thing to notice the second row, it’s talks about how the updates, deletes and merge into scan conflict with other inserts or mutations that are happening at the same time. Delta does use an optimistic concurrency. So the very nature of optimistic concurrency is kind of assumes that the chances of such a coalition happening at less, that is you’re modeling for such a situation, but in reality, with multiple writers and [inaudible] the situation that I described, what we notice, and I guess it, it does conflict. So most of the times Delta does able to recover, but once you’re throughput in pieces, right, the recovery process itself might not work sometimes. So we need to bake it into an architecture, instead of trying to just say like, “Oh, this might work,” and free to go.
The next one is the column size. This is more Delta, a specific one, but then this is more like a party’s perspective. An individual column data exists to GB, we see a degradation in rights or the others. This is not necessarily a problem that everyone would face. You always get that one hell of a client who decides to do things their way, and you can’t blame them for it. They might have a legitimate reason to stuff in some binary for 2 GB, this is necessary for them. The other thing is the update frequency, right. So that’s talking about every single writing source has a different [tube book]. So if you’re going to make multiple frequent updates all the time, what happens is it can cause underline file store metadata issues, like S3 or ADLS region 2. The modern file systems, they are pretty resilient, but then you never know when you’re going to get [inaudible] because it’s not about storing, petabytes and exabytes of data.
Sometimes you might store millions of files itself to represent the petabytes of data. So that’s where some limits might get… And the internal reason why this actually matters is every single transaction that you do on an individual parquet partition, it causes a copy-on-write that Delta has to take the entire petition, make the mutation, and then right at the back. So the more updates that are there and transaction that you do, the more rewrites on the HDFS that can be. So if your underlying file system is expensive for the blocks of rights that you do, you might end up in a very tricky situation. And the other thing is you might end up with too many small funds, but Delta does have some things put in place like auto compaction and the optimal tries to help us.
We’re going to look at them a little bit. So our existing change data capture system is you have a bunch of apps that can mutate data like batch ingestion, streaming ingestion, or an API base port that can make changes to data. They send requests to the hot store, once we get an acknowledge back with the duet sets, we emit a CDC message on the firewalls to say like, “This is a logical change that’s happened.” So what we want to do here is we want to replicate the data from this hot store, into the cold store or the Delta store in order to be able to do scans on top of that for the use case that I’m checking for partition before. So let’s look at how this flow will look. So we have the familiar tables that we saw earlier with the familiar ID, the data ID and all the fields residing in the hot store.
We have a batch process to backfill the detect the [inaudible] time boundary. So I did do a Delta Lake table, let’s call it the raw table, for a lot of facts and purposes from now. Very important distinction between what you see on the left hand side and the right-hand side is that you see that the primary area and related ideas maintained, but then all the fields, 1, 2000 and more have disappeared into a column conditions. So what we decided to do was, we’re going to attach into this interesting anti-pattern. A lot of people be like, wait, what the hell are you doing? This is not how parquet should be used, but we will actually go into a bit, why we are doing these anti-parquet. So that’s still biggest and glaring thing that I would focus your attentions on. The other normal thing that we want to do is once you’re back to where you got it, you have the change feed notifications expressing the changes coming in to get an ID.
And we have a long running spark structured streaming application. It’s called the CDC dumper. It dumps data into a global single tenant, sorry, a multi-tenant staging table. This staging table, the key thing to note it’s up and running. And it’s partitioned by tenant and by 15 minute tenants, I have a diagram showing what the 15 minute tenant looks like, a second one. So the key thing to notice, this is up and running. So we’re eliminating the problem that we were talking about, with multiple writers trying to conflict with each other. So you can in effect, how multiple partitions of the CDC dumper writing the same, trying to modify the same partition. But since it’s an append only, an insert will not conflict with another insert. So for the high-throughput writes, we are taking care of by getting it return into an append only, staging table and solving that issue.
The next thing is we have a long running processor that texts for work every instance. Let’s say it’s configurable, according to… Now it looks like, “Hey, do I have any work to do? And it’s going to ask the state agency, “Since the last time I saw you do I have work to do for a particular tenant? And then if so, give me the codes back.” Then it takes them and it doesn’t upsert and delete into the individual tenants Spark. So now you see why we needed to upsert and delete functionality into parquet, which we get from the Delta Lake.
The next question can be like, “Hey, but you did say that your mutations can cause problems as in simultaneous mutations can cause an issue, right? So what if the processor and the backfill are running at the same time?” So that would be a very likely scenario. And that is why we have a tenant lock put in place on the raw table. So that only a single process can actually write into the raw table. So the key difference that we’re going to see is we’re doing the fan-in pattern versus the fan out. The fan out is extremely easy. You have multiple different microservices trying to do its own thing. But the problem is you’re going to run into cost issues because everyone’s trying to do their own thing. You’re not going to be able to optimize for cost. So by centralizing all the leads from the change data capture, we are guaranteed that all rights, what is it?
Rights to the hot store generated CDC and since the staging table is in an append only mode, we will not have conflicts for it. The next thing is for the size problem that I said, if we ended up having some bad data from system enforcements that we need to do, we can filter it out based on specials before making it in the raw table. The next thing is I was talking about the update key meetings and the frequency of updates. Since we are batching the rights by reading larger blocks of data from the staging table, the staging table is acting like a message box. So we don’t have to worry about, okay, maybe we can set SLS, like, okay, in 15 minutes, the data would have been replicated from the hot store to the cold store or the Delta Lake store. So we are able to take care of the two small fine problems at least in the raw table, which is our main source of scan or the middle of the [inaudible] that we are worried about.
If you remember, I was talking about the 15 minute time interval partitioning on the staging table, we do this because we get a lot of bursty traffic. Suddenly you might have a huge traffic and so on. So we need a way to bunch the data together, even in the staging table before even getting to the raw table. So what we do is the staging table is partitioned in 15 minute intervals for every part so we are at max, four intervals. And we keep track of how much progress has been made by the staging tables through the progress map. So this tenant one has finished one at 9-15 TSKey or etc. So this gives you a logical view of how we are representing data in the staging table, because all the staging… All the CDC records are partitioned in this fashion. Now comes the big question of why are we using JSON string format?
So like I said before, this is an anti-pattern. What we want to do is a Schema and evolution. I spent half a slide talking about the problems of the Schema evolution and the nestedness and so on. So what we saw was to begin with, Nested Schema Evolution in 2020, as in early 2020 was not supported on an update. So since we do have a good amount of Delta Lake here, and Delta did not have it at that time, we were forced to look at other alternatives. It is supported with the latest version though. So that’s a good thing. But then we noticed another side of it because after you took this, what we do is before we apply changes or patches into say, the hot store, we have like a store procedure it resolves and merges the conflicts between the new data and the old data.
So how would we do something like this is part, the answer would be a UDS. Now, UDFs, like I said, they are UDFs are very strict on types. So if every tenant has its own Schema, and if we have to maintain that evolving Schema and the UDFs for that, it is not a maintenance nightmare to try to maintain UDFs dynamically for so many templates. Now we just have one single UDF for the result and knowledge. And it’s just as simple as this one watch, like our own sprinkling of custom stuff. Other thing is we use JSON-iter, which is an extremely fast library. The main thing that it gives us, it does lazy passing so unless you’re actually using a field, that field is not passed into… So which is extremely useful in the cases that we…
The next question that usually people ask me is, “Hey, don’t you lose predicate pushdown?” We did pull out all the fields that are necessary for our existing workloads, like timestamp, recordType, i.d, et cetera. The rest we did like a comparison where we read the whole JSON string from the datalake, BCLI and did processing. And then we also read just 20% of all the fields from hot stone all the time, right, the datalake one, without even a hint of a competition.
Schema on read seems to be a much more future safe approach. At least the way that we are proceeding right now, and seems to be much more manageable from a code point of view and a performance point of view. Any downstream stocks that we want to materialize, they will have stats so we don’t have to worry too much about that. The partitioning scheme as such of this is very simple.
The main thing that I would pay attention to is the z-order on the primary ID and also the timestamp being used because sometimes not all data have timestamp. So we do this before to the high default value. The z-order thing definitely helps specifically on the update, but because we weren’t very quick access to find out which partition a particular XIB, oh sorry an ID is available so that we can update. The most important thing with our structure with the JSON school is the data skipping number and next columns. Delta by default generates the statistics on every single column that that is. This helps it do effectively data skipping while the colocate column is happening. But for JSON strings, it’s just a massive binary BLOB. We don’t want it to do all that analysis stuff. So wastes a lot of time and ingestion so turning it off for that column makes a huge difference in our ingestion.
So monitoring the system is also extremely important. So we have two ways to monitor the system. One is the CDC Lag from Kafka that is upstream. We need to be able to tell how much more work is needed to be done to be right, as in for us to write to the staging table. The second lag is, how we track lag on a per tenant basis. I guess I take the tenant then I need to be able to say, “Okay, how much more time will it take for me to probably have it to cater to the current database? So what we do is we have internal reconstructive tracking the max timestamp in the CDC per tenant, we also track the max timestamp process by the processor. So we are able to use the difference of this to give a rough lag on the application, to see how much work has been done or how much… Yeah. So that definitely helps monitor the health of the system.
Now, after talking all of this, we need to understand okay, for your workloads, how did the merge and upsert performance do? Because we all know RK reads are extremely efficient, but the bread and butter thing, if you’re talking about the update and delete, how is that doing? So we simulate. So we took [inaudible] and we analyzed the parquets or the [inaudible] for them. So how long does it take X CDC messages to get updated or upserted into the raw table? What we noticed was for 170,000 records, which maps to around a 100K records that we need to update into to just 15 seconds with a very minimal cluster that we have. The second one was 1.7 million CDC records, like changed records, which maps to one of the main rules that we upsert into, but just to go over a minute.
So this is fantastic with respect to scalability, even for the upsert, to get into the raw table. And this is post the staging payment, right? So we have already grouped good amount of data. So this helps us kin to a good amount of throughput coming from the resources.
Two important future settings, these are available one being the perpetuation, but extremely important to note is the auto compact and the optimized trend. These two make a huge difference in the performance that you get for the incidents. Now, the read, right? I was talking about the use case where we need to execute multiple queries. So for that, they took a particular tenant, 1TB of data on the hot store, it becomes just 64GB of data on the Delta Lake, of course, RT compression is going to happen for sure. The number of partitions in the no sequence store was just 80. The number of partitions on the Delta Lake it’s 189. It’s extremely useful because you can have 112 node cluster, 112 core cluster, but you wouldn’t be able to utilize the code count at 50-50 efficiently. Now because of the 118 and partitions, we are able to utilize it for deal of shipping.
So what we saw is the job time that took, went from three hours or 25 minutes, for some tenants it’s a massive improvement, even more than what we are showing you here. So takeaways wise, what I would say is the scan or your speed from the datalake is always very regulated in the hot store. And this stating the obvious, but sometimes it just needs to be said. We’re able to get reasonably fast, eventually consistent application within minutes. So by trading off this the minute, instead of instantaneous seconds, we are able to reduce the cost by a massive [inaudible]. More partitions means better spot executed code utilization. We just saw that in action in the previous slide, and this also gives us a lot more ability to aggressively remove data in the hot store. What data do we actually mean?
Can we remove it more recently? The main thing that I would say that came out of this is an Incremental Computation Framework because the staging table tells you between time T1 to T2 what has changed. So now we have a programmatic way to fetch those changes and make incremental changes or processing. So hopefully this was useful to you guys. Delta Lake has made a huge effect in our workloads. So yeah, I’m hoping to get some feedback with respect to how it’s working for you guys too, or any compliment to improve our own workloads. Thank you.
I am a Sr Engineering Manager/Architect on the Unified Profile Team in the Adobe Experience Platform; it’s a PB scale store with a strong focus on millisecond latencies and Analytical abilities and ...