Over the past year, YipitData spearheaded a full migration of its data pipelines to Apache Spark via the Databricks platform. Databricks now empowers its 40+ data analysts to independently create data ingestion systems, manage ETL workflows, and produce meaningful financial research for our clients. Today, YipitData analysts own production data pipelines end-to-end that interact with over 1,700 databases and 51,000 tables without dedicated data engineers. This talk explains how to identify key areas of data infrastructure that can be abstracted with Databricks and PySpark to allow data analysts to own production workflows. At YipitData, we pinpointed sensitive steps in our data pipelines to build powerful abstractions that let our analyst team easily and safely transform, store, and clean data. Attendees will find code snippets of utilities built with Databricks and Spark APIs that provide data analysts a clear interface to run reliable table/schema operations, reusable data transformations, scheduled jobs on spark clusters, and secure processes to import third-party data and export data to clients.
The talk will also showcase our system of integrating Apache Airflow with Databricks, so analysts can rapidly construct and deploy robust ETL workflows within the Databricks workspace. System administrators and engineers will also learn to utilize Databricks and Airflow metadata to discover large-scale optimizations of pipelines managed by analysts and create business value. Attendees will walk away with concrete strategies, tools, and architecture to drive their data analyst team to own production data pipelines and as a result, scale their engineering team and business.
– Hello everyone. Today I am really excited to talk to you about how to use Databricks as analysis platform.
And to give you a little more color about what we’re going to be talking about, I’m going to be providing you with various sets of strategies and techniques to really abstract away a lot of your data operations to scale your analyst and engineering organizations. And to do that, I’m gonna first make the case for why a platform-level solution really makes sense to kind of scale these problems and how we’ve been successful at YipitData in handling this. And then after that, I’m gonna go into a deep dive in terms of our data platform at YipitData and showing you how we approach various challenges with respect to data ingestion, data transformation, detailed workflow automation, and even platform visibility.
So as always, feedback is very welcome, so please feel free to rate and review this session.
And so, now I wanna get things started by giving you a little bit of context about what we do at YipitData.
And so at YipitData, we answer key investor questions. We have about 70 plus research products that cover US and international companies and so, our clients who include over 200 investment funds and fortune 500 companies, can expect to receive on a daily basis email reports, excel files, and data downloads that cover key performance indicators and provide unique insight into the performance of these companies and the back general market outlook. And so we source this information using transaction data, web data, app data, target interviews, and we’re constantly evaluating new forms of data sets to ingest and provide a unique window into these companies. And so the team kinda behind these products and the data platform we’re kind of talking about today, includes 53 data analysts and three data engineers. Now we do have like 22 engineers total across our platform, but these are gonna be the people who are using the platform and maintaining the platform that we’re gonna be talking about today.
And so a little bit about myself, I’m a senior software engineer at YipitData. I helped manage the core platform for ETL workloads, and so I’m very invested in kind of automating various transformation work that our analysts are going to be doing using the platform we’re discussing today and I’m based out of New York City. If you ever wanna get in touch, feel free to shoot me a message on LinkedIn.
So to give you the overlaying vision for why we wanted a data platform for our company is that we want our data analysts to really own our product from start to finish. Our product is extremely data driven, and as a result, we’ve kind of identified key verticals in our product development process and that includes data collection, where it is kind of ingesting and storing raw data, data exploration where we start identifying patterns and trends in this data set, and once we have that insight kind of determined, we can set up recurring pipelines to transform this data into clean tables using ETL workflows. And finally, those clean tables can kind of, feed into those report assets that go out to our clients. And so while these steps are somewhat independent in their own nuances and requirements, what we’ve recognized is that there’s a lot of deep technical intuition required to build successful products, and that intuition needs to be applied in each one of these verticals and so, these steps are actually quite interconnected. For example, understanding how our reports are faring by our clients will help inform what data we should be collecting in the future and understanding what kind of data and how we’re collecting our data really shapes how to explore that data and set up these recurring workflows. So as a result, we kind of recognize that we actually want a singular group kind of involved in this product development process from start to finish and our data analysts are best positioned to do that because they are kind of the expertise on these data sets that we work with on a daily basis. And so to have them own this process from start to finish might seem a little surprising to you. And that’s because, you’re rightly so in thinking that it’s kind of crazy to have one group own this process from start to finish, there are all kinds of implementation details, with kind of ingestion, transformation and even report generation, how do you expect one individual or group of individuals to own this from start to end? It might be even antithetical to how a lot of companies operate today. And that’s really where our platform-level solution kind of takes hold. We want our analysts to own this process from start to finish because they have that technical intuition and they can be the best people positioned to answer the questions our clients care about and build must-have products. But we don’t want them burdened by all of the implementation details, all of the data operations that make this product development life cycle time consuming. And that’s where our engineers really step in and provide a platform. They’re going to be building really powerful abstractions and provide them to our analysts, so that they can use these tools and utilities to do this product development on their own, without direct engineering support and so that way, they can develop these products efficiently, reliably, and scalably, without needing direct engineering guidance on doing so, or being bottlenecked by our engineering team.
And so to really succinctly put what our platform looks like, it’s a Python library built on top of the Databricks ecosystem. So, Databricks is really the foundational element for our platform, because it provides a great notebook interface for analysts to jump in and start exploring data and generating insight. However, even with this great interface, there’s a lot of work that is needed to be done by our analysts to get to a production-ready product. And as a result, what we have done, is built a custom Python library using PySpark and the Databricks REST API that allows, our analysts, to have all of these helpful utility functions to abstract away these common data operations that are typically owned by engineers. So they can find utilities to create a table, set up a Spark Job, set up a ETL workflow, or even really monitor everything that’s going on in their system that can all happen within this library. So for the duration of this presentation, I’m gonna be talking about this library in detail and how we approach our solutions, but the key thing here is that, it all comes down to this library. And there’s gonna be code appendix, there’s gonna be appendix of code slides at the end of this presentation deck, so feel free to check out that to see how we really implement these functions.
So to get things started with our platform and what we approach, it’s really important to start with how we ingest data, because that’s really the entry point. And to give you an idea of what kind of data we work with on a daily basis, we have about a petabyte of compressed Parquet that is constantly growing. And while that’s not massive by any means, our real challenge is that, we have so much variety in our datasets. We deal with 60,000 tables and 1,700 databases, and so that wide range of table sizes and schemas really forced us to approach our solutions at a platform level and provide generalized solutions. So any kind of utility function that we provide, needs to be able to work in the general case and work with any kind of data set our analysts work with.
And so, a majority of our storage layer actually comes from raw web scrape data that we ingest ourselves using an in-house software as a service platform called Readypipe. We offer this both internally and externally, but for what our analysts get is a notebook interface where they can write web scraping code. And if you’ve ever done web scraping before, it’s essentially an exercise of repeatedly capturing a snapshot of that website and analyzing that time series data. The challenge here is that websites frequently change, and so being able to handle that data flexibly and making that data quickly available for analysis, is really what’s critical to having Readypipe be such a important component of our data platform. And so while there’s many components to Readypipe, I’m gonna specifically talk about how we do that data ingestion.
So, when you save data in Readypipe and take that raw web data, what happens under the hood is that Readypipe sends that data to Kinesis Firehose, which is an Amazon service that allows you to stream data to S3. And so all of that raw input data gets stored in S3, in a bucket as JSON files. And we use JSON files because, the key challenge with our web scraping is that we don’t really know the schema of our data ahead of time. We want our analysts to be able to capture whatever data points they can from websites, not be burdened by thinking about what kind of columns they have, what kind of column types, all of that stuff should be handled kind of on the backend, so they can focus on what matters, which is capturing that information.
So from there, what we have to do is, we have to convert this JSON data and make it available and query-able in our data lake.
And to do that, we convert this JSON data to Parquet, which is a much more efficient and performant file format in Spark. So what we do to do this conversion is, we actually have a Spark cluster that is running and subscribed to S3 events on this JSON bucket. So any time an object is written to that bucket, Spark is aware of that event and will be able to convert that JSON data into Parquet files. And so as this process is happening, the unique thing here is that we’re gonna be interest back into this JSON object and seeing what new columns are appearing, and when those columns appear, we update our metastore in Glue to see those new columns, and as a result, our analysts are able to quickly jump into this data and see the picture of their data set as their schemas are evolving from that website.
So even with, essentially, Parquet files and Parquet data, you may run into bottlenecks in reading this data, and that presents a problem for analysts because we want them to query this information.
And so what we do is a process called compaction, which is essentially reading in these Parquet files and compacting them and getting them to a smaller file count, because what happens with Spark is if you have a lot of Parquet files, you can still be bottlenecked at S3 doing I/O operations searching these files and trying to read that. And so what compaction helps with is, decrease that file size and get you to much larger files which can be more efficiently read. So we typically target 200 megabytes to a gigabyte, and this entire exercise, which we have an example in the annex mean, essentially it takes a Spark job reading in you, partitions of data and that parquet underlying parquet data and coalescing them together and outputting that data into a new S3 location. And so when we do that, we essentially update that partition information Glue, so we use these compacted files and steps. The key thing was compacting is it’s important to get this compaction process and logic down correctly. If you don’t pick the right number of files, you can compact suboptimally and lead to poor performance. And so how we do this is we actually figure out what is their input input file size by looking at the number of columns in each parquet file, figure out the bytes for columns and getting an estimate of how many bytes each row takes, multiplying that by the number of rows, of our input parquet data, we can get to an accurate file size estimate, that allows us to compact session rate. So I really just want to flag that as something important when you think about doing this.
So with web data, we have a lot of great abstraction in place, but with third party data, we have a different set of challenges. Well, third party data as a growing portion of our storage late, it is really an exercise of getting this information uniformly available to our analysts. And so the providers that work with me provide data in various file formats. We may have permissioning challenges accessing that data, tracking that data throughout our products can also be quite challenging and then refreshing these data sets as their provider provides updates can be quite challenging and all of these are really difficult for analysts to own on their own. Lucky for us, Databricks really helps us manage this third party data sets, by giving us a lot of utilities to really analyze and handle this data.
And so, out of the box, the Databrick’s ecosystem has an ad data tool that allows our analysts who may receive files directly from our provider to upload these files into our storage layer and then access it within notebooks. But if, our analysts need programmatic access, for example, to an external list, three buckets that we don’t know, this is very easy to set up because we can do is set up a template in notebook that allows them to assume a different, I am role within the Databricks ecosystem and start reading in that data. And that way our analysts don’t need console access or access to the CLI truly injustice data on their own. And what we do is we have kind of template and code in place to help them guide them, to converting that data into parquet, adding some additional metadata, so that they can track it later on. And so now that we spent all this time kind of ingesting data and getting it into a usable format, our work for analyst just get started. And so a lot of the time that they spend is actually, transforming and analyzing this raw data into a clean and usable format and so what they’re gonna be doing is creating intermediary tables to essentially take this input data and output it after some sort of transformation has happened.
And from their perspective, what they really wanna do is set up some transformation code using Spark SQL or Pi Spark. And then out put that data to a new table and database. And because we recognize those are the inputs that matter to analysts, what we did is we helped create a helper function to essentially do that table creation process for them. Now you may be thinking, why go to this trouble of creating, a writing a helper function to create a table? Pi Spark has some native APIs who already read parquet files, do those transformations and then output it to a new location. I mean, why go through the exercise of writing a helper function?
The challenge is when you’re working with 53 data analysts and 60,000 tables, you need to have a lot more structured organization plays to make sure things happen reliably. And so having good table hygiene is critical. So you need to be able to validate table names so that they meet your product requirements. You need to have your storage, they are organized, so you don’t accidentally override any data, even having kind of table versioning in place in case something does go wrong, or analysts need to look at previous version that data set is all very critical and dealing with this number of tables in our metal store and your storage layer, pretty much obligates having automatic table maintenance in place to really control that administrative overhead. And while this is all great in concept from an infrastructure perspective, our team is really focused on analysis.
So, our analysts want to be focusing on the business value, creating work that is generating answers for our clients, and they don’t wanna be burdened by these implementation details of structuring the they’re a storage layer. As a result, we take all of these best practices and integrated into create table and so in the appendix, you’ll find the, an example of this create table function, how we implemented table versioning, a table validation, storage layer organization, all automatically for our analysts use, whenever they use this helper function so they can focus on what matters to them, but we still get all of the benefits from the infrastructure perspective as well.
So now that we have this great way to transform data, we also need to have a way to power these transformations using Spark clusters. Now, if you’ve ever worked with Spark before, there’s a wide range of options for a Spark clusters, there’s all kinds of settings at the hardware layer, the permissioning layer, even Spark configuration, so that you can access your metastore, your S3 data and have your Spark jobs run efficiently.
The problem here is that this is an overwhelming number of settings for our analysts to use, and we really dettracts their mind share from focusing on that business value, creating exercise. So instead in our platform, we decided to take a more, abstracted approach and offer t-shirt sizes for our clusters. So our analysts can see that there is a small, medium and large kind of cluster, a set menu of clusters available for them and all that changes between these clusters is a number of sizes of workers, essentially that compute resource available to power that workload. We also add a few bells and whistles in terms of providing for small jobs, a pool, a form instances so that these jobs, which run only a few minutes long are able to process quickly, whereas our large job was really handle heavy duty workloads, how additional EBS volumes to cash those query results and really process these large jobs. The key thing here is that between all of these clusters, there is a standard Amarillo Meta store and S3 access. So, our analysts have a consistent experience using any one of these clusters and they should only expect the query run time to change.
And so for here, what happens is that we are able to launch these Spark jobs very simply. And so our analysts, how this helper function called create job, which launches the Spark job by taking a notebook, a Databrick notebook is an input, some email notifications and that cluster size we were referring to. And so under the hood, this cluster size essentially maps to a JSON configuration, which includes all of the details for what the Spark cluster has in terms of the instances, our configuration, the run time, all of that stuff is kind of buried away in the library and actually feeds directly into the database rest API, which can programmatically launch this job.
So when we do launch that job, using Databricks,
it starts doing the heavy lifting. So it’s going to take that JSON configuration and provision the appropriate compute resources from our cloud provider. And then it’s been a monitor those instances as this job is training so, if the cluster is under heavy load, it can even on a scale for additional compute or make sure those instances are available for the duration of the job. And then on top of all of this, it’s going to apply a wide range of Spark optimizations. So what we really see here is a very reliable experience watching Spark jobs from, for our analysts. So they can be in full control of this process without direct engineering support. And so now that our analysts kind of have this building block to transform data and then actually power those transformations with clusters, they wanna be able to sequence these transformations for recurring workflows.
And for these kinds of ETL jobs, Airflow is our preferred tool and if you aren’t familiar with Airflow, it’s essentially an open source Python library that allows you to describe these workflows. And Airflow essentially provides a construct called a DAG, which essentially allows you to define these various jobs and the relationship between jobs in terms of how to structure the order of these jobs. And from there, Airflow is able to execute these tasks. The great thing here is it works with Databricks right off the box, so Airflow has full ability to launch Databricks clusters. The challenge here is that it requires someone to manage this code and so essentially, as their ETL workflows evolve, an individual has to come into this file, add additional code for these new jobs and then deploy it to our Airflow servers and this presents somewhat of a bottleneck for our analysts. Because we don’t want them burdened by having to maintain this. So what we offered instead is essentially a way to programmatically create these DAGs, these Python files using the Databricks API. So how does that look under the hood?
We essentially have, what we recognize is our analysts are trying to sequence various notebooks, various Spark jobs, one after another. So to provide that interface in the Databricks workspace, we define a DAG as a I folder in the Databrick’s workspace. And inside that folder is a series of notebooks, which would be the individual tasks or Spark jobs off that DAG and so within each one of those notebooks, we’re going to see that our analysts have the option of specifying that dependency graph and this maps very well to what the Airflow API does.
So if they need to run a, b and c in order. In notebook b, they can simply say that it depends on notebook a and notebook c would depend on notebook b and then they also specify the cluster for the Spark job. Isn’t the same name conventions we were describing earlier. And the remainder of this notebook code includes all of the transformation logic that they need for their job. And so here, they’re going to be talking, using create table any kind of Pi Spark API or utility is to really do that transformation logic. And so once we have our analysts kind of define all of these notebooks, what we’re able to do is use the Databricks API, to fetch all these notebooks and build out this dependency graph that we can use in our Airflow DAGs And so we basically are able to extract the notebook paths, the cluster size and that dependency information.
And so once we have this graph structure in place, we can build a Python file around this to essentially have this Airflow code ready to go and deploy it. And from there, once we deployed this Airflow code, we have these pipelines deployed without direct engineering guidance.
And so, Airflow is gonna be spinning up these part jobs one after another, orchestrating it in that dependency graph and we get all through a bus logging, error handling that Airflow provides. However, it’s very easy for an analyst to own this process because they can quickly add new notebooks to that database folder and change that dependency graph with just one line modifications and so all of their changes and edits all happen within the Databricks ecosystem and they’re really able to control this process on their own.
And so, now that we have this tool to kind of build recurring workflows and all of these operations that are happening underneath, we need some visibility in place to make sure that all of your systems go as expected.
And so for our analysts, what we recognize is because we’re running about 2000 Airflow tests a day, having good monitoring in place for these ETL workflows is critical. And what we did is we took the already built in or alerting system Airflow provides and builds custom email notifications that are tailored to the Databricks experience and so our analysts can receive error alerts whenever these e-tail workflows fail and it quickly link to the job run, the database notebook, even in one block, like to restart this workflow all from their inbox. And if they want to tune their workflows over time, they receive ongoing summary emails that give them an idea. Here are all the clusters that are running, how long they took, and so that they have an understanding of their usage and can fine tune these workflows periodically.
But on top of that, we recognize that with Databricks, Readypipe, and even Airflow, there’s all of this rich metadata that can help answer questions for analysts in case something goes wrong and even be used for administrative purposes. So what we do is we actually standard these logs as parquet tables pre-ingesting them within database. By having these tables available, or analysts are able to troubleshoot any issues they have, really understand what’s going on with their system.
But even with that granular data, it can be somewhat overwhelming, so to really streamline this monitoring, what we do is we actually take off that granular data and visualize it using Databricks built in charting tools. And so there are built in dashboards and email reports that kinda plot out the results of what’s happening within the various systems within our platform. And these are delivered to our analysts and they can quickly visualize and make sure that things are going as expected. And if something is wrong, then they jump into that granular data we were discussing. So I want to end here by talking about why we invested in all of these various tools and components of our platform, because the most rewarding art’s office is that it starts to invite new solutions on itself. And so what we’ve seen is that our analysts, because they have all of these building blocks for their product development are able to really abstract way more of their work and they’re able to standardize queries and notebooks, so that they have consistent ways to analyze these products and share it across the organization. We’ve even seen our analysts really automate their workflows triggering one ETL workflow after another, or even starting their reporting process right after their email workflow finishes, so that their entire processes from end to end really automated. And that’s really exciting as engineers to see these patterns emerge because then we can kinda step in and provide greater abstractions and more utilities platform to kind of continually scale our solutions available to our analysts. So hopefully through this presentation you have seen a lot of strategies and techniques to really scale your data platform. And so if you’re interested in working with us and find what we do at yipitdata very exciting, we are hiring. So go to yipitdata.com/careers or reach out to me. Thank you very much for your time.
Anup is a Senior Software Engineer at YipitData, a fast-growing fin-tech startup that answers investor questions through alternative data analysis and research. At YipitData, Anup has helped migrate existing data infrastructure to the Databricks platform, managed tooling for ETL workflows through Apache Airflow, and led various projects focused on application visibility and software reliability. Previously, Anup worked in investment banking at Citigroup and studied at Indiana University. In his free time, Anup enjoys swimming and is interested in data privacy issues and regulation.