Composable Data Processing with Apache Spark

Download Slides

As the usage of Apache Spark continues to ramp up within the industry, a major challenge has been scaling our development. Too often we find that developers are re-implementing a similar set of cross-cutting concerns, sprinkled with some variance of use-case specific business logic as a concrete Spark App. The consequences of this anti-pattern are significant. Cross Cutting logic is re-implemented again and again. Each isolated Spark App is responsible for its own resiliency, scalability, monitoring, and error handling. Attempting to weave together data as it flows across these Apps is highly inefficient. Pipelining data through one or more of these apps requires multiple rounds of loading and saving data to disk increasing the overall cost and risk of failure.

In addition, there is no consolidated error handling when chaining multiple Spark Apps. In this talk we will walk through the problems that led us to an extensible plugin framework, SIP, implemented to address these issues. SIP is used extensively in Adobe’s Experience Platform (AEP) for data processing. The framework enables us to support a number of complex use-cases by composing one or more simpler data conversion and/or validation operations. SIP is hosted internally, allowing a community of engineers to plugin code and benefit from the resiliency, scaling, and monitoring invested in existing infrastructure. Finally, we will dive deep into SIP’s detailed error reporting and how it enables us to provide a much improved user-experience to our customers.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hello, my name is Shone Sadler. I’m here with Dilip Biswal to talk about composable data processing. First, an intro. I’m a Principle Scientist at Adobe Systems. I’ve been working on the Adobe experience platform for about two years.

Composable Data Processing

Dilip started more recently in 2019 as a architect and he’s also a contributor to Apache Spark.

So first we’re here today to give a story about scalability, not in terms of fruit, but rather the output of our engineering team. First, we’ll talk about the why and we’ll provide some context around the Adobe experience platform, around our team and some of the problems we’ve faced. We’ll talk about the what by walking through the CDP design. And we’ll get into the how. Dilip will talk about the nuts and bolts around the different data processing techniques that we’re using in CDP. And finally, we’ll talk about the impact the CDP has had on our team, on the platform and our customers. So AEP is a platform for driving real-time personalized experiences.

Adobe Experience Platform (AEP) Zen Statement

And we do that by making data actionable and you can see that in the diagram above. First we have data, which is about people, places, events and things within your domain. Then we have semantics and control. That’s data governance, about controlling and managing how your data’s used. Third is intelligence and that’s about custom or out-of-the-box machine learning models to enrich your data. And finally, we have action, whether it’s providing targeted ad campaigns or a better web experience, Adobe AEP provides applications to enable those experiences.

Adobe Experience Cloud Evolution

Now how do we do that? We have a suite of applications from Adobe Analytics, Audience Manager, Campaign, Target, Experience Manager, Magento, Marketo and more, that’s built on a set of intelligent services. On top of that, or below that we have Adobe Sensei, and then we have the data that’s collected through Adobe Experience Platform. It’s that last piece that both Dilip and I operate at.

Data Landing (aka Siphon)

So we’re both part of the data-ingest team, we’re ingesting data into data platform called Siphon. And the zen of Siphon is about enabling producers to send chunks of data called batches, that we make available to downstream consumers. Now it’s not all about moving bits around. We also need to apply transformations to that data, take it in from raw form into an optimized format, like Merkay.

It’s also about validation. We need to apply validations to a target schema which we call the experience data model.

It’s also about partitioning, ensuring that you can have an optimal access for different data access patterns.

Compaction, this supports sources that have high frequency small files. It’s also providing an exactly once guarantee to make that data durable without duplication.

And finally, lineage tracking so we can track the who, what, where and when of the data that’s been produced.

Now even with all that said, at this point in time, we’re early on, we’re still processing about one million batches per day.

That’s 13 terabytes, that equates to 32 billion events.

Siphon’s Cross Cutting Features

So our focus has been on cross-cutting features, and what I mean by that are those features you would find across any big data application as you would expect. Things like queuing and throttling, to handle spikes in workload, also scaling based on backlog and, or utilization. We also need to break across workloads, across different bulk heads to support the cases of either having back pressure and, or poison pills that could bring the system down. We support logging custom metrics like for monitoring, alerting and to notify downstream consumers that data has arrived. And then there is the ability to execute these distributed big data applications across the web time like databricks.

And finally, resilience. We need to ensure that there is one million batches are taken to completion or that we’re alerted within a given SLA. Our focus in 2019, has been on hardening these features or minimal feature set.

Siphon’s Data Processing (aka Ingest Pipeline)

But 2020 is changing, we’ll be asked for a lot more features. Things like supporting a number of additional log and just formats outside of CSV, JSAL and Merkay. We’re buying additional ETL transformation capabilities to go from raw data to XTL.

Provide additional validation capabilities such as format checks, range checks, even context-specific checks based on where the data is gonna land. We’re being asked to provide additional diagnostic and reporting tools. We’re being asked to provide alternatives on how and where you can write the data to.

And finding as many opportunities we have in terms of tracking, merging and mining and providing auto correction features around failed or bad data in the system. Now considering all these different concerns that we have, we could have different sets of people, different teams working on each of these concerns. Unfortunately, there is only a single team working on these data processing features, that’s Siphon.

Engineering Bottleneck

We’re stuck with this zero sum game where we have more resources working on cross-cutting features. If we move them to data processing, we have to take away the hardening work we’re doing there. Unfortunately, we have requirements coming on both sides of the scale. The challenge we have is how do we increase the amount of data processing features that we have while not compromising the stability of the system.

Option A: Path of Least Resistance

So we really have several options. Option A is the path of least resistance. This is really just coding tool, but it means that we need to deprioritize the hardening work that we’re doing. It also means that developers would be working both on those cross-cutting features and the data processing features resulting in context switching, which is overhead we would prefer not to incur.

Overlapping those concerns can also lead to Spaghetti code, making it increasingly difficult to test over time and increasingly difficult to maintain over time as well.

Option B: “Delegate” the Problem

Therefore, we’ve moved onto to also looking at Option B which is more of a passing the buck. So rather than having a single pipeline, we break it up to have multiple pipelines, leaving this initial second pipeline untouched. However, this leads to a lack of reuse. All those cost-cutting concerns we had with Siphon, now you could be duplicated in each one of these services. This also leads to a lack of consistency because each one of these services may have different SLAs, different delivery guarantees or different validations. This makes the indent testing more complex, results in a more complex monitoring as well, and a complexity that will build up over time. However, since we have multiple steps involved here too and multiple output paths, so we result in increased latency and increased costs.

Option C: Composable Data Processing

So this led us to an Option C, which is Composable Data Processing, we kinda wanna have our cake and eat it too here. So we ask ourself the question, what if we allowed these other teams to plug into Siphon?

This will allow us to scale our engineering effort, modularize our design and code, we can also provide a clear separation of responsibilities.

It will be easier to test the system, easier to maintain and it will maximize reuse. And by having a single step, single pass to the data and a single output, we’re gonna end up benefiting with a minimum complexity, minimizing latency and minimizing the cost. So what we’ve done is we’ve taken that zero-sum game and made it a win-win game.

This leads us to the what.

And we’re gonna talk more about the Option C. We need to implement a framework that enables different teams to extend Siphon’s data ingest pipeline.

So the high level of what we need is raw data is input, and we’re showing that on the left hand side, and a target schema, which we’re showing on the right hand side.

Modularizing the Pipeline

The intent of the framework is to transform that raw data

into something that fits into that target schema. So what does make it over need to go to the data, and what doesn’t make it over, needs to be quarantined. So we need to do things like map name over to firstName, lastName. We need to map the bday over to the birthDate, type date. We need to map that level over to rewardsLevel, which has an Enum constraint defined on it. This is gonna require multiple steps.

1. Parsing

The first stage is Parsing. We’re gonna take the raw data as input and translate it into raw data frame. However, not all the data makes it over and we can see that we have one record that fails, it’s because it is corrupt data, it’s missing an ending bracket. We need to record that fact into a separate error data frame.

2. Conversion

That gets us to the second stage, Conversion. The input to this is gonna be the raw data frame. When I translate that into the target data frame based on the target schema. So you can see that the name has now split out to first name, last name. The birthday has been mapped over to birthDate. Remember, once again not all the data has made it over. The second row with Id three didn’t make it over because it has an invalid day, time string. We also need to record that in the error data frame as well.

The third stage is Validation. So now the input here is gonna be the target data frame but now we have some invalid data, we need to also filter out. We can see that it’s the second row here, with Id four that’s invalid. The reason is because of the rewards level field it has an Enum constraint so that value needs to be one of bronze, silver or gold.

So the forth stage, we now have the target data frame with valid data, we can simply save it to data lake.

4. Persisting the Good

And in the fifth stage, things get a little more interesting

5. Quarantining the Bad

as we now have a set of error data frames that need to union together. We want to join that to their initial raw data frame, write that data out to quarantine so that clients can diagnose issues and, or correct data.

Weaving It All Together

Finally, we’ve looked at all the components and their various responsibilities. These are people that within Spark driver, running on top of data frames. I’m gonna pass now over to Dilip to give more details around the plug in run time. – Thank you Shone for giving a very good high-level overview of the problem that we are trying to solve. So we now look at some of the implementation challenges in detail. I will start by discussing the DSL, drive state execution of various plugins, which are responsible for either converting or validating the data. We’ll go to the APIs of the contracts between SIP and the plug-ins. Next, we’ll look at the positioning of the three different kinds of errors; the parsing, the conversion and the validation errors. We’ll then go to a small illustration that shows how the three different kinds of errors are consolidated into a custom error Schema. Finally, we look into the details of how these errors are generated and captured at a low level.

Domain SpecificLanguage

So aside from ingest custom framework or SIP platforms, the task governed by DSL. We can also think of it as a second binary computations based on which SIP does all its actions. So in this case, we can see that the parsing of the input here is defined, it goes through various stages. First it is fed through parser, then a set of converters and validators while capturing the errors in each of the stages. At the end, the surviving rows are the rows that are free of errors, are included in the dataSink and the cumulative errors are routed out to the errorSink. The dataSink and the errorSinks are basically per system stores like a DLS (mumbles).

Converter Interface

So in the interest of time, I’m not gonna get into the details of the IPIs and the interfaces, but I’m gonna touch up on some of the key points. So firstly, both the converter and validator take a data frame as an input and return the results. So converter returns both the success and the error data frames, whereas the Validator returns just the error data frame. The reason for this difference is the output of the converter maybe of different schema than the input, whereas the Validator works on the data that is already converted. That is, the data is already aligned with the target schema. I’ll discuss this more later in the talk.

Parsing Errors

So with this background, let’s look at each of the errors in detail. First, the parsing errors. So one thing to note here is that, the parsing errors that are applicable only for text-based or marked as CSV and JSON.

And only processed by a SIP at the beginning of the validation process. Because we want to get rid of the parsing errors as only the initial processes as possible. Here, we completely rely on Spark to detect the parsing errors as Spark is computing (mumbles) that takes the data. So, with that let’s take a look at some of the examples.

So, we have two examples here, one is for JSON, one is for CSV. But the CSV case actually is more interesting ’cause we need to parse some extra information to Spark, whereas the JSON case is pretty straightforward. So let’s focus on the CSV case in the interest of time. So, we have two rows. The second row is a problematic row because it has an initial column that does not come from the schema. The schema has two columns, name and age. So in order to parse out, get the parsing errors, we need to provide Spark with the right schema. Here we see that we have added one extra column, column, corrupt record, which will be used by Spark to store the parsing error. And in the three options, we see that we have instructed Spark to use the permissive mode of parsing and also we have supplied the column name of the corrupt record. So with this two extra pieces of information, when we did this file, Spark will produce the adaption that look like the one on the right. Here we see that we have one additional column, _corrupt_record, which actually captures the parsing error. With this data frame, if we can apply simple predicate on the corrupt record, we can actually find out the good records and the bad records. So now to Conversion and Validation errors. So once we have processed the parse errors, the surviving rows are subjected to conversions and validations. So here again, using the DSL as the input, SIP determines the list of converters and validators. Equates to invoke and calls upon them in a sequence. So a converter plug-in collects and computes both output rows and the error rows and only the good rows pass to the next plug-in sequence. This is because if it is not converted row to confirm to the target schema, then that is not worth examining that row any further. But this is different in the case of validator plug-in in that, even though a row is rejected by one validator plug-in, it is still examined by the next validator plug-in in the chain, (mumbles) to be to collect and report all the possible errors in the data. This is to minimize the number of round trips the user has to make in order to get his data ingested into our system. So with that, let’s look at a small illustration that shows how we consolidate these different kinds of errors.

Error consolidation (contd)

So we start with the input data and we apply the expression called monotonically increasing I’d, which is basically to assign the logically, to identify each of the row. Now next, imagine we have a conversion that looks like the one that is shown on the mapping table. Here, we have a mapping rule which says that we have to coordinate the first name and the last name column, and the target column is gonna be full name.

So here, I would like to say that this is a very simple example of how convertor may look like, but in real life actually, a different plug-in, a different component that Shone mentioned, would actually implement the convertors using their custom rules and mappings. So, once we apply this mapping and it’s input is out, we see that we produced two data frames. One is a success data frame and one is a error data frame. We see that the row, they call it row_id two, is rejected because the last name field is null and that is not accepted in our system in this example. And the success data frame has two rows, the row_id one and row_id three. But we do see a problem with the second row, because A’s column is negative, but that is not a concern of the convertor at this point. We’ll see how the validator processes this row.

So, now we take the output of the convertor, the good rows, and we apply target Schema on to of it. In the target schema, we have defined a constraint, which says that it has to be positive. So when we apply this target schema on this data, we see that we have error data frame.

The record with row_id three is rejected because age cannot be negative. Now at this point, we have both the conversion and validation errors, so we could at this point, apply simple additional linear operation to combine these two errors. So one thing here to note is that, just like we standardized on the API and the interfaces that we talked about earlier, we also have a standardized Schema updator. That means all the plug-ins and the components actually return the errors using the same schema. Therefore, we could combine this two data frames using the union operation.

So we have now the cumulative errors and we see that if we just applied two basic join operations we can compute the final error data frame and the final (mumbles) data frame. When we take the original set of rows where we generated the row identifier, we take those rows and we join that with the cumulative data frame. On the join column being (mumbles) identifier, it actually produces the final error data frame. We can see that the final error data frame contains all the input columns as they are sent from the user. Because the data that the user sends to us may not have an actual row identifier, by sending all the input columns, all input column values, hopefully provides enough context to our users to identify the rows that are problematic and fix them and resend. So to generate the success data frame, we take the output data frame, the success data frame, that we got from our convertor, and we do anti-join, Why do we do anti-join? Because we see the second row which was having age column, negative age column, actually was rejected by the subsequent validator and we must exclude that row from our output. That’s why we need to do anti-join between the success data frame from convertor with the cumulative error data frame, on the same, join any column that is row identifier, to get our final success data frame. So once we have these two data frames like Shone mentioned, we could write them into a computable sync, any error syncs. So at this point, we have looked at three different kinds of errors and how they are consolidated and how basically we compute the success error data frame.

Error Trapping

But how does this error generate in the first place? So most of our existing conversion and validations, actually make use of UDFs. So prior to introducing this plug-in architecture, we used to do data validations and data conversions but we used to work in a very fast mode, that means when the first error occurred, we used to fail ingestion, and report the very first error to the users. And that is what we are trying to solve as part of this work.

So we needed a way to be able to trap these errors at run-time and proceed to examine the next row in the sequence. And the second thing is that our data was based on heavily nested types, therefore we had to write the nested UDFs.

And currently, it is not possible to capture the errors from the nested UDFs the way we want. That’s why we introduced these custom expressions that help us to trap the errors. Now, let us look at how these custom expressions look like.

So it’s a simple expression as we can see, that extends from another expression, which means this expression has got one child, which is the expression itself, and then second thing is that the output of this expression is actually a StructType, and it has input column, output column and the header information. The input column is nothing but the input that actually is parsed to the child expression. Input is a child expression. And then the output column is actually output of the child expression. And the header information obviously is one of the errors that happened during conversion and validation. With that, okay, now lets look at the evaluate method which is again very straightforward. We call upon the child’s evaluate method but we love that in a (mumbles) block for us to be able to trap the errors. So when we see an error, we actually save it in these variables, error message and error name, and finally we adapt them, the resulting name, internal row, and send it as a written value of this expression. Okay, now let’s take an example, Look at the example and see.

Error Trapping – Example

We have input data frame, which has two rows and two columns, name and age. So we print it out, we have a problem with the second row. We’re gonna see how we are able to trap the error. So we take a small UDF which stimulates how a validation UDF may look like, that takes an integer input and it checks. If the value is negative, then it was the exception. And for us to be able to show the output value, our output column of this error-trapping expression, I multiply the input age by two.

Okay here, we create this custom expression and we can see that it’s child is actually this validating UDF, which is age UDF. And now when we map the age column and we attach this new created expression with the age column by calling the width column API, and then execute this data frame. We see that errors are actually trapped in the second row. The first row, we see both the input and output column, input being 30 and output 60 because you multiplied by two. But the error information is actually null because we could successfully convert this row. Whereas the second row, we had a problem converting, therefore we don’t have an output column value which is null, but we do have the error information. And then I bring the schema just to illustrate that the age columns, time, actually has changed from in to struct. And under the struct, we store all this input, output and error expression.

So with the data frame schema the way we saw, we could easily compute the error and the success data frame.

Error Trapping – Example Continued (RAS)

To find out the good rows, all we have to do is to look for rows where age data error code is null. Similarly to find the bad rows, all we have to do is to look for rows where age data error code is not null. So that’s how we trap the errors and we figure out good rows from bad rows. With that, I’m gonna hand it over to Shone to share his final thoughts. Thank you Shone. – So you remember that zero-sum game? We had to trade off between data processing features and stability? That is no longer a problem.

With CDP, we now have other teams, three other teams in fact that are contributing not just code but components that can enhance the plug-in framework for Siphon.

And this leads to many benefits for us on the Siphon team. We have scalable engineering, we have a clear separation of responsibilities between those working on cost-cutting concerns, us, and other teams are now working on data prep and data validation. We have more readable code at the end, more testable code and it’s easier to maintain.

If we look at the benefits for platform, we actually need to take a step back because we initially had taken that Option B, being passing the buck, where we had multiple connected pipelines, but the thing we did notice, is because we had a passer with a data with multiple times, we had an increase in latency, we also had a significant amount of storage costs and compute costs, it just wasn’t tenable. With SDP, we’ve solved that problem.

Now for our customers, what it means is that we can deliver more features, faster with ETL for validation, for error reporting. More importantly, the overall experience for the average file size that we have going to these more complicated rows, were down from minutes to seconds in execution.

So this does end our session, what we hope you get out of it is two things. First, think about extensibility in your software project and how it can help you scale your project and you development. Second, also leverage some of the data processing concepts and techniques that Dilip talked about, in terms of how you deal with failed data.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Dilip Biswal

Adobe, Inc.

Dilip Biswal is a Software Architect at Adobe working on Adobe Experience Platform. He is an active Apache Spark contributor and works in the open source community. He is experienced in Relational Databases, Distributed Computing and Big Data Analytics. He has extensively worked on SQL engines like Informix, Derby, and Big SQL.

About Shone Sadler

Adobe, Inc.

Shone is a glorified plumber (aka Principle Scientist) at Adobe Systems responsible for siphoning data into Adobe's Digital Marketing Cloud. Back in the day, he was a chief architect at Q-Link Systems, a leader in Business Process Management. It was in 2004 Shone when joined Adobe as an Architect of its Livecycle Document Platform helping lead Adobe's initial foray into the enterprise. Shone received his Masters in MIS from Depaul University in 2000 and subsequently a Masters in Programming Languages from Georgia Institute of Technology.