Apache Spark is a general-purpose big data execution engine. You can work with different data sources with the same set of API in both batch and streaming mode. Such flexibility is great if you are experienced Spark developer solving a complicated data engineering problem, which might include ML or streaming. In Airbnb, 95% of all data pipelines are daily batch jobs, which read from Hive tables and write to Hive tables. For such jobs, you would like to trade some flexibility for more extensive functionality around writing to Hive or multiple days processing orchestration. Another advantage of reducing flexibility is creating ‘best practices’, which can be followed by less experienced data engineers.
In AirBnB we’ve created a framework called ‘Sputnik’, which tries to address these issues. Data engineers need to extend the sputnik base class and write code for data transformation without bothering about the filtering of dates for which the job would run. End users do not read or write to Hive directly, they use Sputnik wrappers for Hive. Read wrapper filters input data based on parameters from the console including the time frame. Write wrapper get information about result table from case class annotations, writes meta-information about the table, makes verifications on the data and much more. The core idea of the framework is that all functionality of the job consists of job-specific logic and run-specific logic. Job specific logic is a transformation defined by data engineer and meta information about the tables. Run specific logic is filtering input data based on current date and writing data to Hive. Data Engineer needs to specify job-specific logic, and Sputnik handles all run specific logic based on assumptions about the right way of operating daily Hive batch jobs. https://github.com/airbnb/sputnik
– Hi, my name is Egor and I’m a software engineer in Airbnb and, today, I’d like talk to you about Sputnik, our framework on top of Apache Spark, which we built in Airbnb to help with data engineering.
Let’s talk about typical Spark jobs
which we’re running in Airbnb.
We’re reading data from Hive, we’re writing data from Hive, and we’re processing one day’s worth of data. If we would try to process historical data every day when we’re processing the data, it would be too heavy of a job to run daily. So daily, we run the job which takes the data for the previous day, processes it, and engineers a result in a result Hive table for a partition of that particular date.
And if you look at such typical Spark jobs, how it would look like, you would see that it has a lot of logic which has nothing to do with the business logic of our application. This logic is more about orchestration of running this job. On this slide, you can see that we need to parse our arguments from the console to be able to understand what kind of round of the job are we doing. We need to create Spark contexts so we can run our Spark application. We need to filter out the data for the date with which we are working with.
After that, we’ll need to write our business logic and process this data. And then, we are writing our data to the result table. As you can see, percentage-wise, the real logic of which represents our business logic is just a fraction of a job, and it creates problems for maintaining a big number such (mumbling) organization. You are starting to get complications between different pipelines because they are all very similar in the way they’re parsing the arguments and do the filtering of data. And another problem of this job is it relies on the idea that you are, you can process only one date at a time, which can be not very useful if you need to backfill three years worth of data, but a run of every job is like two minutes, (speaking faintly) But to be able to run this job for a bigger period of time, you actually would need to go to the Spark job and change the code of the job to be able to run it for a bigger period of time. It’s just one of the problem. Generally, the problem is, when you have that engineer writing a pipeline, it’s not ideal that data engineers should implement orchestration-based infrastructure logic and not just only business logic. That engineer, at the end of the day, is very good at understanding the business requirements and translating the business requirements into code which can process this data according to the business requirements.
So if we talk about, and the reason for that problem is that Spark is a general purpose execution engine, which is great if you have a big diversity of applications within the organization. It’s great for the execution engine because it can support multiple data sources, multiple languages, and multiple execution models like Spark SQL, like machine learning, like graph processing. It allows you, given very different requirements of your organization, to use a single execution engine to run, to be able to satisfy those requirements. And it’s a big feature of Spark. However, this is not particularly great if you have the majority of your pipelines following a certain pattern. The pattern which we have in Airbnb for 99% of our pipelines is that they’re written on Scala. They use either DataFrame API or something like Spark SQL, and they read the data and write the data from Hive. It means that all the flexibility which Spark is great for is not very useful when you are thinking about providing a good environment for every data engineer at Airbnb to write the jobs because this flexibility creates ambiguity, and it doesn’t suggest, really, good data engineer practices or easy guidelines about what your data pipelines should consist of.
So when we think about what data engineers should actually do on a day to day basis, then you would see that data engineers should implement something like simple transformation code.
It should get some dataframe, as in code, do all necessary transformation according to business logic to provide decent output, and actually, it should be all that’s really required from a data engineer in terms building the pipeline.
And to be able to create a state which falls under a data engineering domain and which falls under an infrastructure domain.
Let’s properly break down all the logic with an application to big buckets. There is the job logic and there is the run logic. Job logic, it’s something which the engineer should define. It’s business logic, for example, counting new visits for every URL. Some pipelines can do that. And job needs to specify what is the input table, what is the output table, what is the partitioning schema for this table, and it should provide some information about how you validate the result before writing to the table. But run logic can be handled by some piece of infrastructure, some framework, and we wouldn’t want to require a data engineer to implement it. It’s logic like running a job for a specific day, and the job should retrieve input only for that date from a table. When we try to write data to some table which does not exist, we should create this table in (speaking faintly).
And with this separation in mind, we create Sputnik. It is a framework which tries to take all the orchestration logic and leave what a data engineer would implement in business logic. So how does a typical Sputnik job look like? You are getting your data from Hive. You are writing some business logic, and you are writing your result. When you are getting your data, you are reading not directly from Spark API or Hive API. You are reading from HiveTableReader, which is part of Sputnik. This table reader, it can take into account console parameters which were passed during this run of the job. For example, a parameter for which date we are running this application. And Sputnik creates this data from the Hive table, taking into account what kind of run of this job we are making. When you are writing your result, again, you are not writing directly to Hive. You are writing through a table writer which is part of Sputnik, and, for example, a parameter which can be taken into account is what environment are we writing in? Is it a developer environment? Is it a production environment? So when the table writer writes to Hive, it does all this kind of logic which it would require to do if you wouldn’t have Sputnik as your framework. So the typical Sputnik job code would look like that.
So you have some job, which is Object. You extend a SputnikJob, which is Trait. You read some data from HiveTableReader, and you are passing information of which table you are reading from. You are processing your result. It’s like business logic which you should implement, and you write your data with HiveTableWriter where you pass your result and you pass your table name. As you can see, you do not do anything regarding creating Spark context or filtering out (speaking faintly) how exactly to write your data to Hive. All this logic is handled by Sputnik.
You run your job the same way you would run any other Spark application. Sputnik has Sputnik Job Runner, and you pass as a parameter the class of your job, as well as parameters for that particular op.
When you are writing your data in Sputnik, there are two ways to write your data. There is the DataFrame approach, we’ve already seen on previous slides where you are passing your DateFrame and you are passing information about which table you are writing.
And there is the dataset API. There is not a problem in data engineering which is not addressed by any existing piece of infrastructure, which has to do with schema management. Our data engineers used to do the next thing to be able to run their pipeline for the first time. They needed to go to Hive, created a table to which they’re writing, and add the information about this table, like a description of the table, comments, how the table is partitioned. And then, they run the job and write the data to that table. The problem with that approach is that they have some DDL script which often is not even checked into Git. Even it’s checked into Git, it’s not getting updated when run from DDL to change the table definition. So, effectively, we don’t have a single source of truth for our schema definition. You have your data frame as part of your Sputnik application, and you have a real table in Hive. Annotations on case classes in Sputnik tries to address this issue. You can annotate your case class, which would be the source of truth for schema management and Sputnik would rely on these annotations to create tables in Hive with Create Table statements and all this DDL. Annotations consist of, obviously, a table name, to which table you are writing, a table description, information about what this table is about, table format, or see how you want to store your data, comments on a particular field, and other information which allows you to add more comments to your table. Partitioning schema. You want to specify information about how you’re partitioning your data as part of specification of the schema of that data. And another functionality is field formatting. You see, in Hive, the convention is to force case formatting when in Scala, and in Java, it’s CamelCase. It’s not a big deal, but a lot of our internal users in Airbnb really ask to (speaking faintly) names of the fields from one convention to another convention while you are reading and writing the data. It’s not a big deal. It sometimes creates additional problems when you try to debug your job and understand what’s going on. But generally, it allows you to follow the conventions of the language you are working with.
Let’s talk about what HiveTableWriter does
so we can understand why we actually need to have this intermediate layer between us and Spark plus Hive. First of all, it’s Create Table Statement based on our, either, DateFrame or our case class we are passing as part of the dataset API.
And we’re doing it if this table does not exist. It updates table metainformation with descriptions and comment like we described before. It normalizes dataframe scheme according to output Hive table. There is actually a bug which we run a lot when we had some applications writing directly to Hive. The problem is that, if you change the order of your fields and your result dataframe and they are the same type, you do not get any exception, that something is going wrong. But at the same time, you’re writing the data for one column into other columns. Sputnik has also logic which looks at the result Hive table and tries to adjust the dataframe according to this result of a Hive table.
There is a problem which used to be very important when we had the last version, it becomes important less with SS3, but still has cost implications, is repartitioning before you’re writing your data so you can reduce the number of small files. If you’re not do anything and you have some general operations, some wide operations, you are getting the same number of files as number of reducers as your final stage, which for all of our jobs, is a big number like 2,000, even when the amount of data is not very big. It resulted in us having multiple files one megabyte, or even kilobytes in size. So we are actually repartitioning our result before writing to Hive so we can reduce this number of result files on disk. We run checks on result before writing it, and I will talk a little bit more about that in the next slides.
And we support notion of the environment. So we have a staging environment, production environment, testing environment, and we change the name of the table based on that. And it’s the responsibility of Hive to go (speaking faintly) When you are reading your data, you, again, can read either in DataFrame API where you getting just the other FORH-RAYM and you will need to pass information about your table name, or you’re reading in the dataset where you’re passing the name of your case class, and some information about some adjustment to filtering out the DS from your input table.
As I mentioned before, Sputnik implicitly filters out the data based on date from an input table. But sometimes, you need to adjust the logic around this filter. For example, you aggregate and process data for the last seven days to find some averages for the last seven days. Sputnik allows you to do these modifications through DateBoundsOffset. And so, the logic which we are talking about in the scope of HiveTableWriter which is running your job in an environment-specific way, like adjusting the names of the tables. It works for HiveTableReader, as well. If you’re running your job and you specify which environment to be testing in production, it could adjust it, as well.
Spark is really great for unit testing your application. It provides Spark local context which allows you to run your Spark job in the same jar machine, in just a single jar machine, which is very useful when you are running your unit test during (speaking faintly) stage, and within the jar machine without spinning out any docker containers, or going into any cluster, you can run around your job. It’s a feature which has a very big advantage compared to Hive where you need to go through some troubles to really run your SQL script. And it was one of the big reasons why, in Airbnb, we decided to move the majority of our clients from Hive to Spark. Another big advantage when you’re testing in Spark is that you are writing your pipelines in Scala. It means that you can break down your pipeline into small pieces of logic and unit test these different pieces of logic separately. But there is some limitation to what Spark has to offer in terms of unit testing. So there is this nice project, Spark Testing Base, by Holden Karau, which was a big influence into testing functionality within Sputnik. And what it provides is some Singleton Spark session. So multiple tests wouldn’t create their own Spark sessions, so we wouldn’t have too much overhead on launching Spark while running multiple tests. Sputnik provides dataFrame comparison. It provides loading data from csv/json. We found that, for us, it’s easier to store this input and output data in the resources files rather than to just add it to Scala code. And we are doing, cleaning up Hive between our runs. So your typical tests for your Sputnik job would look like that. You’re reading your input data from csv/json and you write this data to a Hive table. Then, you’ll run the job, and then, you compare the result of your job which is in Hive again with some expected result.
When you run your job, you can pass for which specific date you’re running your job. You can pass all the parameters for which you would have passed if you would rerun your job in production. So it tries to relate running your job as closely as possible to real-life scenarios, being reading from Hive, writing from Hive, and taking into account all parameters which you can pass through your job.
Another functionality of Sputnik is working with configurations. It serves two purposes. First, it’s a good way to extract all your Spark configurations so you can have job-specific Spark configurations and you wouldn’t have to put it in your Scala file. You can, if you use DataFrame API, specify all the metainformation about your Hive table, and you can pass some configs which are job-specific. And this example, on the slide, you can see that config on the right with a specified subvalue for a certain key. And within Spark in the job on the left, you can get access to this schema.
When we’re writing the data to our output, a failed job is not the worst thing which can happen. The worst thing which can happen is us writing the wrong data. Because it would trigger downstream jobs to start processing our data. So when we find the issue and fix our issue, it’s not just we need to run our job. We need to communicate to all downstream consumers of our job that that happened, and then, they need, either, to rerun their jobs, or even might do something more to fix the issue. It means that it’s very important to check results before writing to a result table. So in Sputnik, there is this trait check which you implement to define a check you want to perform on your result data. And Sputnik comes with a set of predefined typical checks for possible scenarios. Examples would be Not Empty checks. You are checking that the result of the job, of one of your jobs is not empty.
And it does that through just reading from this DataFrame, which is actually your result, the result of your job, and just the (speaking faintly) on this result. And you can, an option with either an error message if something went wrong, or you do not if everything’s fine and the check has passed. And to be able to run these checks in your job, you need to pass these checks to have (speaking faintly) and run those checks in parallel after running your job. Another big problem when you’re running your job for a historical period of time is that a daily run of your job can be small, again, five to 10 minutes. But you need to backfill years worth of data. So to be able to address that, you pass to Sputnik, instead of parameter DS, (speaking faintly) in Sputnik. When it would read data from input table, it would take data, actually, for this whole period of time, from start date to end date. It means that, if you are relying on this functionality, it means that you cannot assume the data which is coming into your job is just one day’s worth of data. It can be conditioned for certain types of applications where, for example, you identify users which you’ve seen for the first time. You are relying on previous runs of applications and it’s to seem that you are running just for that date. For these certain kinds of scenario, you even need to specify within Sputnik that this job can be run only one data for that time, or you need your logic to take into account the fact that you can, your job can be run for a longer period of time.
And when you’re processing three years worth of data, it can be a very big amount of data which you cannot process in a single run. You would like to break it down into chunks which you can process one at a time, like the perfect scenario is something like a two-hour job. So when you run your backfill job, you actually have a stepSize parameter which specifies how many days at a time you are processing. Here, we’re seeing stepSize Three, which means that you are processing three days of time, even though you are backfilling for a longer period of time.
And we already talked about environments. And so, Sputnik helps you in managing environments. If you have your production environment, the the table from which you are reading and the table to which you are writing, it’s exactly the same table, which is you’re specifying when you write to the frame, or the same table name which you specify in your annotation. But if you are running your job in a developer environment, then Sputnik automatically adds Dev suffix to the result table name which allows you to run your job on the production and not overwrite production data. And you can overwrite the Sputnik logic about what kind of transformation you are making on the result table name to get to the testing environment. So you can overwrite this logic and change the namespace name, or I could do other kinds of changes. And, right now, Sputnik, three environments are defined, Production, Dev, and Stage, and you can see what are the standard suffixes we are adding to this (speaking faintly).
Just to quickly iterate over other parameters which Sputnik understands, you can specify a drop result table to be true. It’s when you run your job on the cluster and you’ve changed some schema or changed something about your job and you don’t want the previous run of the application to affect this run. So when we have dropped the result tables, it means that we dropped the table before we write to it. We can sample the data. The sample logic is not where it runs. It’s just taking 10% of all input data. Again, we can, regarding the partition, we can either state that we are wanting a partition, or we don’t want a partition. If your job takes some arguments, you can pass these arguments through the jobArguments parameter. And if you have several different scenarios in which you run your job and the scenario can differ in terms of which Spark configuration we are running or what kind of parameters (speaking faintly), you can pass a configPath to some config on the machine on which you’re running Sputnik application, and that config would be used as a config from which it takes Spark configurations as well as all other configurations. And this is it. Sputnik was open-sourced September of last year. It’s on GitHub. You can look it up.
And which other thing which I really wanted to underline is that, even if you wouldn’t use Sputnik as a framework, you still need to think about what kind of work your data engineers in the organization are doing which can be automated, which can be extracted in some pieces of infrastructure, or some framework which allows you to have data engineers, less work on operations in the organization’s code and work more on business logic. And this is it.
Egor is a Spark contributor and Senior Software Engineer in AirBnB where he works on infrastructure to simplify creating and managing Spark pipelines. Before joining Airbnb, he worked in Apple on configurable, high-load streaming and batch pipelines. Egor led the engineering team in Anchorfree responsible for a data solution on top of Hadoop. This solution included in-house DSL for defining DAGs of Spark jobs, Apache Zeppelin, Impala, Tableau. Egor has been working with Apache Spark since version 0.9.