Building a Distributed Collaborative Data Pipeline with Apache Spark

Download Slides

The year of COVID-19 pandemic has spotlighted as never before the many shortcomings of the world’s data management workflows. The lack of established ways to exchange and access data was a highly recognized contributing factor in our poor response to the pandemic. On multiple occasions we have witnessed how our poor practices around reproducibility and provenance have completely sidetracked major vaccine research efforts, prompting many calls for action from scientific and medical communities to address these problems.

Breaking down silos, reproducibility and provenance are all complex problems that will not disappear overnight – solving them requires a continuous process of incremental improvements. Unfortunately, we believe that our workflows are not suited even for that. Modern data science encourages routine copying of data, with every transformation step producing data that is disjoint from its source. It’s hard to tell where most data comes from, how it was altered, and no practical way to verify that no malicious or accidental alterations were made. All of our common data workflows are in complete contradiction with the essential prerequisites for collaboration and trust, meaning that even when the results are shared they often cannot be easily reused.

This talk is the result of 2 years of R&D work in taking a completely different perspective on data pipeline design. We demonstrate what happens if the prerequisites for collaboration such as repeatability, verifiability, and provenance are chosen as core properties of the system. We present a new open standard for decentralized and trusted data transformation and exchange that leverages the latest advancements of modern data processing frameworks like Apache Spark and Apache Flink to create a truly global data pipeline. We also present a prototype tool that implements this standard and show how its core ideas can scale from a laptop to a data center, and into a worldwide data processing network that encourages reuse and collaboration.

What you will learn:
– Shortcomings of the modern data management workflows and tools
– The important role of the temporal dimension in data
– How latest data modeling techniques in OLTP, OLAP, and stream processing converge together
– How bitemporal data modeling ideas apply to data streams
– How by combining these ideas we can satisfy all preconditions for trust and collaboration
– A summary of the proposed “Open Data Fabric” protocol for decentralized exchange and transformation of data

Speaker: Sergii Mikhtoniuk


– Hello everyone. Thank you for tuning in. My name is Sergii and I’m the founder of Kamu Data. In my presentation today, I would like to take a step back from the advanced machine learning algorithms and things like data bites, scale clusters, and focus more on fundamentals of how we manage our data and how data is being exchanged and processed and most importantly, not focused just on one company or organization, but take more of a holistic view and see how data is managed worldwide, how we collaborate and improve data. So I’d like to introduce this topic with an example. A lot of you have probably heard about the drug called hydroxychloroquine. It’s been all over the news for all the wrong reasons, but you probably missed this bit of the news. So way back in May, there was a study published by an influential journal called, The Lancet, and this was purely data science study. It included data of more than 96,000 COVID patients worldwide for data source that was using a proprietary database called surgisphere. So the finding of this study was interesting. It claimed that the use of this drug was actually increasing the risk of in-hospital mortality for the patients that are taking it. So obviously this kind of massive study very early into pandemic was treated very seriously by everyone and global trials throughout the world who are completely halted as a result. So just a little bit after that, the publication was actually retracted after the journal was flooded with numerous complaints that about data and consistencies and the provenance of data that was used for the study could not be established. Surgisphere database basically refused to open up the data to be analyzed other researchers and they basically decided to shut the database down. So what we have here as a result, it was a seven data management issue that was ignored for years, derailed potentially life-saving efforts in a very critical time. And this example is just one of many, a lot of other studies also related to COVID were using this database and this database existed for many years prior to that. So what we have here the reproducibility, the guarantee of reproducibility was violated. Reproducibility is the foundation of the scientific method but if you look at the results of this survey published way back in 2016, by The Nature Magazine, 90%, a lot of 1500 researchers said that there is a reproducibility crisis in field. So as I will show my presentation today I think the problem originates in how we manage data and I will show that with current practices, reproducibility is nearly possible to achieve in practice. So let’s take a quick look of how the reproducibility and verifiability look in modern data. To give it this a bit more structure, I’m gonna differentiate two kinds of data. So on one hand, we’ll talk about source data. This data originates, are generated by some system or originates as a result of observations. So simplest kind you can think of is a weather data, right? As a rule of thumb, this data cannot be reproduced if it was lost. So publishers in this case held full authority over this data and are responsible for its validity. On another hand, derivative data is data that is produced by some other data through some transformations. So this encompasses all the aggregate summaries, even though machine learning models I think are a part of this category. So this data usually can be reconstructed as if it was lost if we still know the transformations that were applied. So what validity in this case depends on both the source data that it was used and the quality of those transformations. What’s our expectations when dealing with the source data? I think it can be actually summarized in just one sentence. Two unrelated parties at different times should be able to access the same data and validate that this data comes unaltered from the trusted source. So in technical terms, this means stable references and also validation mechanism. So you can think of something like checksums or digital signatures. In reality however, a lot of data publisher these days are, are still publishing datasets in non-temporal formats. So this is extremely prevalent in GIS data publishers, which currently provide, most of them currently provide just the current state of the domain. So when you go download the data, you always see is the snapshot of what they think the domain looks like after this current time. When the data is updated it usually updated destructively and in place. So what we have here is a constant loss of history so we cannot even begin to talk about stable references because even the history is being lost. Temporal datasets on other hand are better because, so they store the events and observations and typically they’re append-only but they are still, those publishers are still doing the in-place updates so the new data replaces the old one. So the same URL will use different data. So again, due to this on some other more technical problems, stable reference are nearly impossible to achieve. For derivative data, my expectations would be following, repeating all transformation step produces same results as the original and a reciprocal of that is that if you have the data you should be able to verify that declared transformations were in fact truthful and that there was no accidental or malicious alterations made to the data. So this means transparency knowing which transformations were applied, and this means determinism redoing those transformations yields the same result. In reality however, reproducibility is a very manual process and it currently is a burden on shoulders of data scientists and achieved through the various workflows and compliance routines. If you were to try to implement the reproducibility in our project, you would have to start with stable reference to source data, which is, we talked very hard to achieve. You would also need to create reproducible environments. So versioning your code, versioning all the libraries that you’re using and their transitive dependencies, all the frameworks and use and ideally going down to the level of operating systems and hardware, it will also make your, need to make your project self-contained. So this means no external API calls. If your project is using something like Google API for geolocation, for example, you can not rely on them not changing this API and returning different data on the next day after you’re finished with the project or shutting down that API altogether, compromising and leaving your project stranded and non reproducible. So what I’m getting here is a, at one time is pressing your pre-disability goes out of the window because it’s how hard it is to achieve. So here are some examples, right? I see more and more prevalent, more popular for people to share datasets on platforms like Kaggle and GitHub. So the idea here is that person goes to a trusted data publisher, they don’t know the data, they see that a lot of the data is probably not very clean, so they clean it up, repackage it into a more usable format and they share it with other people on the catalog GitHub. So the goal here is to collaborate on data cleaning worldwide, but reality is that nobody will use this datasets in a serious project because they have no facilities to verify that there was no malicious or accidental alterations made to this data. So data cannot be trusted. Same goes for data hubs and portals which have a noble goal of improving the discoverability and breaking down silos. But in fact, they just re-upload the publisher data without this mechanism of establishing the validity. Therefore, most of these hubs end up more like data graveyards. So copying version approach is how enterprise data science deals with the problem of reproducibility in source data. So this involves a person from a company fetching the data from the trusted publisher, slapping a version on it and re uploading into some persistent storage words will never be changed something like S3, but this of course works within only the enterprise bubble. This approach cannot work when data needs to be exchanged between different parties. So in summary, I think we’re mismanaging data in the most fundamental way and collaboration on data is currently impossible. We’re constantly losing the forward progress because every data science project wants to begin with a trusted data from a publisher, but it produces the data that cannot be trusted. And I think we reached the limit of workflows a long time ago and we need a technical solution. So is there a better way? This is what we set out to find out and decided to design a new kind of data supply chain that would be built with reproducibility and verifiability at its core and some other traits like low latency of propagating data through the system, complete provenance, something that would encourage data reuse and collaboration. So the result of it is called Open Data Fabric and this is the project that we recently open-sourced, and this is the first time I’m presenting about it in public. Open Data Fabric is a protocol specification so you can go into GitHub and see the protocol spec for yourself. This protocol specifies how to reliably exchange data between parties and transform it all the way. To summarize it in one sentence, I would call it world’s first peer to peer data pipeline. So before I talk about details of Open Data Fabric, I need to clarify what we mean by data here. We decided that we cannot build this system for data as this term is currently used because it’s very wide and it’s very broadened all encompassing and usually brings all the negative properties of data that we would like to avoid. So what we decided is to narrow down the definition of what data means in ODF. So here are the steps that we took to do this; So first of all, we say that there is no such thing as current state. So current state is this infinitely small time interval and I find it’s quite interesting that a lot of software engineering like OTP and the web system, systems are focusing on this infinitely small time interval. Instead, we decided to focus on history. So we store history, therefore data needs to be temporal. History doesn’t change therefore our data is immutable. Future decision-making relies on history. So the more history we preserve, the better decision-making we can have in the future. So data must have infinite retention. Also time is relative and especially in data systems. It takes time to propagate data through many steps and there is a delay to propagation, so we use it bitemporality to reflect that. So overall our data models is as following, a dataset is potentially infinite stream of events and observations and every event has two timestamps associated with it. Event time, which is the time when this event occurred in the outside world and system time when this event was first observed by the system. Because system time is monotonically increasing, all you need to do to get a stable reference is to just decide on this right most bond of system time. So this one timestamped completely, already completely achieves the goal of stable references. You just truncate the data using it and you get a stable reference. Validity can be established by taking the same system time and just hashing the data preceding it. This is how data flows in Open Data Fabric. So ODF is basically a graph of datasets where every datasets can have their own maintainer, like a person who is responsible for a good quality of that dataset. The data flows from left to right from publishers to consumers. On the publisher’s side, the publisher has ODF compliant meaning that they provide all the, they comply with our data model and all the qualities that we like to achieve from the source data. That means that ODF can extend to them and simply use data from their so-called root datasets. So this is where we store source data and all the derivative datasets for derivative data can be built directly from that data. If publishers is noncompliant, that means a hundred percent of publishers right at the moment, we can employ different, change data capture techniques to retroactively give the data the properties that we need to. So to keep this more grounded, this is how we would define a dataset in ODF. So it’s just a young file and for root dataset here, we’re specifying the URL of external data publisher, in this case, Vancouver open data for portal of where the data will be fetched, how to parse it and most importantly as we pulled the data over and over how to merge the data we already saw with data that is curating in the dataset. On the derivative dataset side, all you specify is input datasets IDs and some transformation, in this case, we have an SQL query that does a join over these two datasets. So one of the important pillar of the ODF is that our metadata itself is temporal as well. So here you can see the transformation between dataset A and dataset B and the metadata related to both. The most common metadata event is of course the data has been added into the dataset. So we have those on both sides, but here, as you can see in the middle of a dataset A’s lifetime, there was a schema change event. So this is what we call life cycle metadata events, some important things that can affect schema or how the query’s executed. This chain A, lifecycle events are chained together between metadata’s derivative and the root datasets. So here to maintain our dataset B has a chance to react to the schema change in dataset A. So this is something that enables us to accommodate to the dataset evolution over time. So with all that if we do not support it, you would have to reconstruct the entire graph over and over when the schema change occurs on a real dataset. This is also a key enabler of reproducibility and verifiability and as you’ll see further, it gives some great properties for data sharing. And what we’re currently working on is achieving the fine grain provenance using metadata model. So what being able to take any data cell in derivative dataset and say exactly where it came from, what was its ultimate source on which transformations were applied. So something you don’t see in any of the enterprise pipelines today. Under the hood, this is how metadata chain looks like. So it’s very blockchain like, so a lot of blocks chain together, blocks are individually cryptographically secured and they’re also cryptographically linked to the data that associates with them. This four-set format is very extensible, so in the future we see extending it to semantics ontology, governance and licensing, all the security concerns. So the idea here is taking all the workflows and compliance routines that are gonna be very manual and codifying them, so removing the burden from the person of maintaining. This is how dataset looks on this and it’s very gate-like in the structure so every metadata block is it’s individual file. We have a references pointers to these blocks. There’s a check points that we’re gonna talk about later and the data is stored in the individual art files in the parquet format, so no surprises here. A lot of time we were thinking how to describe the transformations in a derivative datasets and the conclusion we arrived at is that batch processing simply unfit for purpose. So if you look at the picture on the right here, imagine that you’re joining two living datasets, so data is being periodically still added to those datasets. And if dataset B Is updated on a cadence of once a year, and dataset I is updated on a cadence once a day, it would need to accommodate for all these temporal differences in your batch transformation code. So this would be extremely error prone especially with the more complex scenarios like backfields and out-of-order arrivals and corrections. So basically the batch transformations, how they deal with these problems is by simply ignoring them. So they wait it out until temporal problems disappear. So they rely on the simplicity of Montenegro data instead of facing the problems. If you are right out this batch transformations it would be extremely error-prone and it would be impossible to audit. So what’s the use of having transparency and knowing what transformation take place if it’s so complex that you cannot understand. And so this is where we looked at stream processing. Stream processing is essentially designed for the temporal data problems. So we have things like event-time processing, which is basically the bitemporality I described earlier. Other great mechanisms like watermarks, windowing, and different kinds of joins. There’s this misconception that stream processing is only useful for near real-time processing when data arrives alive from like mobile devices. But I think it’s not correct. We actually found that very useful, even for datasets that are being updated on a cadence, like once a year. So we decided to make the stream processing our primary transformation method. And one of the biggest benefits of it is that it makes transformations agnostic of how and how often data arrives. So the data can arrive on periods like once a year and gigabyte batches or it can arrive every minute in a smaller batches. It doesn’t affect the way you would write the transformation. So this is very powerful. It’s also declarative and expressive, so it’s easier to audit and it’s better for determinism and reproducibility. And of course it gives us minimal possible latency because you write your you run it basically forever with no human element. On the negative side of course, it’s unfamiliarity because it’s pretty new and the limited framework support. So speaking about how these transformations are performed, what we have here is a coordinator component. Coordinator is responsible in ODF for handling everything related to metadata. So during the transform when you initiated it reads the metadata, it figures out which slice of the input dataset was not processed yet was not yet seen by the output. It takes the slice and passes down to the engine. So ODF is actually a framework agnostic, so we have several implementations of the engines. Our main workforce is the Apache Spark framework but we also extended the support to Apache Flink. So the engine can be any data processing framework focused on the stream processing mainly that just has ODF adopter, a very similar adapter on top of it. So the framework does the processing, it can use the checkpoint stored alongside the output dataset read and write into it and it produces the output slice that coordinates and stores and output dataset. The engines are what’s very important, they run in a so-called sandbox environment in a Docker container, this achieves two properties. So first of all, we strictly version every container. This means that any transformation or repeatability is gonna be performed by exactly the same version of the engine as it was performed originally. Secondly, the sandboxing ensures that engine cannot access any external resources. So this is how we disallow any kind of API calls for reproducibility purposes. This approach has interesting implications on data sharing. So for datasets there is not surprises here since this data is on breastfeed, cannot be reconstructed. If it’s lost, we recommend durable and highly available storage. For derivative datasets however, you can use any cheap and unreliable storage or no storage at all if you would like, because of transformations are reproducible. Derivative datasets are basically a form of caching, it doesn’t matter how deep of a transformation graph you have, you can always reconstruct data by only having the root data and metadata associated with it. So metadata here acts as a kind of a digital passport of data, it’s used to verify data’s integrity once you download the data. But it can also be used for validating work of another peer. So let’s say you grabbed the metadata from someone else, you see the peer doing the transformation and sharing the data, you can rerun those transformations and compare the data hashes that you computed with what they’re publishing and you can spot by doing this, you can spot malicious activity. So publisher that incorrectly declares the transformation that they performed or tries to move the data. So now I’m gonna talk about, kamu-cli? Kamu-cli is actually the tool where Open Data Fabric originated from. So an idea of terms, this is a coordinator component. It’s just a single binary app written in rust that handles tasks like ingesting the data and transforming and sharing it. It comes with three prototype engines, as I mentioned, based on the Apache Spark and Apache Flink, has some convenience of features, currently it’s SQL but it’s being actively developed. So it has a very Git-like interface, here You can see me adding dataset, the definition into my workspace from a young file, pulling the dataset. So this is where the two would ingest the data from external publisher. I can list my datasets in the workspace and see that some records were added and I can also use kamu log to see the metadata blocks just like Git log shows either Git blocks associated with your source code. So it also has the SQL shell has you would expect for exploring the data. It’s also based on the Apache Spark scale. So no surprises here. We also intergrade Jupiter Notebooks. So with one command, you can start the Jupiter server that is also linked to Apache Spark using the Apache Olivia framework and this gives you the convenience of using the SQL first or even Apache Spark code to massage the data in the format that you would like and transfer it locally into your Notebook and use any visualization library that you like to actually display the data. So in conclusion, here’s how I would summarize the Open Data Fabric. This is a data pipeline designed around the properties that we deemed to be essential for collaboration. The two drives the decades of stagnation and data management. It admittedly runs a much stricter model, so we’re preventing you from doing any sort of API calls like black box transformations. So it requires a bit of a mindset shift in how we define what data is and how we treat this data. It also encourages embracing temporal problems head on instead of trying to avoid them. So overall, I think the strict model is definitely pays off if you look at the properties that this framework gives you. So thinking more of a broad view, I think Open Data Fabric, the way I see it as becoming one of the pillars of digital democracy and if you’re familiar with the term, the next-generation decentralized IT popularized by protocols like that and IPFS and blockchain. So the way I see it is becoming this factual data supply for something like blockchain contracts. So this is a network where trusted data can be stored with no central authority, but yet is still resistant to any kind of malicious behavior. So this can also serve as a foundation for things like data monetization, so providing incentives to the publishers or publishing very high quality and recent data, and it can be a data provider for the next generation of web and applications. So these are the references of where you can find more information. So I definitely recommend you to check out kamu.cli, it has a lot of examples that I didn’t have time to get into today. So definitely take a look. You can also read the spec, it’s very refund. I think it’s interesting to read, or you can check out our blog or just, if you have any questions, shoot me an email, or just post a question on any of the repositories. So hopefully the fabric originator does response to call for action from the scientific community to address these problems of reproducibility or they have a Call For Action of my own in response. So I think we need to treat data as our modern age history book, so we should stop modifying and copying it around. I think versioning data is not the right solution. The bitemporal data modeling addresses the problems here in the much better way. I think data publishers will have to take ownership of reproducibility because reproducibility has to start from the source data. So we need to provide them with good standards and good tools where Open Data Fabric can help with that. I think non-temporal data is really a huge local optima. So temporal data should be definitely your default choice and in a similar way, string processing is a local, sorry, batch processing is a local optima and string processing, I think will very soon displace batch. So I think we should collaborate together on improving tools and not over simplifying problems. So please check Open Data Fabric out and let us know what you think and I will ask you to not forget to rate the session and then looking forward to your questions.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Sergii Mikhtoniuk

Kamu Data Inc.

Sergii is a Software Architect and a polyglot engineer with experience ranging from hardware design, compiler development, and computer graphics, to highly responsive scalable distributed systems and data pipelines. In his role at Activision Blizzard he is responsible for technical direction of the online platform that powers many of the world's most popular video games. He is also a long-time personal analytics and open data enthusiast and a founder of Kamu - a company that is helping the world to make sense of its growing supply of data.