SafeGraph is the source of truth for data on non-residential physical places. We provide points of interest, building footprint, and foot traffic data for over 7 million locations in the US, UK, and Canada.
The rapid growth of business brings several challenges to the tech stack of SafeGraph . Comparing to the other companies which serve customers with apps and online services, offering a dataset as a product places unprecedented challenges on data versioning, data quality as well as the reliability and efficiency of data processing. Additionally, scaling our engineers to catch up with the rapidly increasing customer demands requires a sophisticatedly designed internal toolkit. The toolkit not only needs to bring the state-of-the-art ML/Data infra technology to SafeGraph but also to be minimally intrusive to avoid interrupt the product development.
In this talk, we are presenting the experiences we got from building the data platform and internal toolkit in SafeGraph: First, we will introduce the integration experience of Delta Lake + MLFlow in SafeGraph. Our integration not only improves the tracking and debuggability of our data processing stack, but also significantly improves the productivity of our ML engineers and most importantly with the minimum interruption to their existing workflow. Second, we will cover how we improve the observability of Spark applications and the manageability of our Databricks-based data processing platform with a set of internal tools. Finally, we will share the story on how we identify the bottleneck of Spark as well as Scala standard library and then we scale our Spark applications to handle the complicated data ingestion scenario.
Nan Zhu: Hello everyone. This is Nan from SafeGraph. I’m a software engineer working on data platforming company, and today I will present how we build our product the Source of Truth Place Data at a large scale.
So, in today’s presentation, I will first introduce the data engineering and analytics stack in SafeGraph and talk a little bit about what are the challenges for us to build and maintain such a stack and then I will cover three topics. The first is how we seamlessly integrate of MLFlow and Delta Lake in SafeGraph, and the second is how we manage our data processing platform based on Databricks cloud-based solution. And the third is how we optimize some of our large-scale data processing jobs, and then finally I will give a short summary about this talk.
I think one of the most important background information is about what we really do in SafeGraph. Well, putting it simply we build and serve data sets to our customers and this data set is used to describe the physical world that we live in with a high accuracy. Our data set contains information about seven million locations, the United States, Canada, and the United Kingdom, and customers can leverage our data to do different types of analytics. So, on this page I’m showing you an example which shows the relative visits count of different brands over the last two years. I think you can obviously observe that due to the COVID-19, the visit counts dropped significantly for all the brands in last March. And as the time goes by, when, for example, when the vaccine became available, the visit number becomes back to the normal level.
So, behind the data set supporting us to build such a figure, we need a powerful data engineering and analytics stack. So we build our data processing layer on top of Databricks and we have many Spark jobs read and write the data from different storage systems. Also, we have different ways to trigger these Spark applications. We can trigger applications from Airflow, from different services and our engineer’s local laptop were also started some jobs the in the Databricks platform.
What are the challenges here? To understand this question, let’s compare SafeGraph and a company, I would say, a typical company that serve customers with their online service and mobile apps. So for such a typical company, Batch Data Processing Infrastructure is very important, of course, because data scientists can leverage that to analyze a product’s usage and optimize the product. However, the Batch Data Processing Infrastructure in such a company has no direct or immediate impact to the business accessibility or correctness. “Oh. We kind of have a high tolerance sub-optimal practice or compromise the performance and reliability.” Even, for example, even in some bad case your infrastructure needs hours and long maintenance. That’s a not ideal but still okay.
However, the story is different in SafeGraph. I would say we deliver data set to our customer and this data set is directly generated from our processing layer. So we can say that the data processing jobs running in our infrastructure is our online service, where we need to do everything we need to operate a company service structure for our data processing jobs. Of course, there are a lot, a lot of things to do for maintenance services. Due to time limitations, I would cover three major aspects today.
So, first thing is about version management. When you have a service you need to manage version of API, deployment, up and downstream services, and also you want to ensure the operational health of your service. Of course, you still have a lot of topics here, today I will cover how you manage your client and how to use monitoring toolkit for different components. Maybe the most important thing is about reliability and efficiency. You have a lot of optimization strategies for availability and latency for your service. But when we map to that data processing job, what are the things that we need to do?
So, let’s start with version management. In version management, we need to focus on the version of data, the input and output of data, the Spark pipeline code, and the configuration. In the last few years, there have been a lot of tools helping us tracking the versions of data. For example, you can write your data in Delta format, sorry, in Delta Lake format, and then you can do time travel over different versions. It’s very convenient. And you can also leverage MLFlow to logging code version, artifact version and configuration version, then compile everything together. It makes it easier for you to track my Spark application with version one, write version three, and what’s the version of JAR are used. I’m not going to cover too much about how to use Delta Lake and MLFlow today. There have been a lot of talks and blog posts. The major topic I would like to discuss today is how to introduce Delta Lake and MLFlow to your company and we will share our experience.
I think the most straightforward way might be-. Okay. Let’s just read the official document and see what do they tell us to do. You can follow their instructions to add different configuration files to your code repo, you can also tell your engineers, “Oh. Please don’t run the Python script directly. You need to use MLFlow commands to do that.” Of course, we can do that and eventually we can achieve the goal. The problem with this approach is that it involves too many changes to your existing workflow and systems. And when you adopt this approach you essentially push every of your engineers to the front line to go through the learning curve of the system. Because, of course, MLFlow and Delta Lake are great, but they’re still having different issues or [inaudible] in certain scenarios. You really don’t want every of your engineers to go through that.
As a company, there are certain things you always need to log in. For example, JAR version, some configuration version. You want to get a consistent logging view in your company, but if you adopt this approach you essentially rely on your engineer to remember logging everything. To resolve this problem in SafeGraph, we developed a VersionTracker. It’s our internal library to integrate MLFlow and Delta Lake. We have different types of Spark applications, all of them are calling VersionTracker API. Engineers maintaining this application or developing new applications don’t even need to know what is Delta Lake and MLflow, because the read and write data or all the logging things are all implemented within the API. It brings a lot of advantages to us. For example, we have a very fast landing instruments of MLflow and Delta Lake with the minimal changes and it masks a lot of system complexities and also bring a consistent logging view across teams in the company.
So, let’s look at this example. In this example, you can see the only needed change for engineers to use Delta Lake is to replace their parquet read and write API with the VersionTracker API calling. And within this API, we have masked a lot of system complexities. For example, we fill language-specific functionality gaps and when we develop these, implement these APIs with hidden different issues but only the platform engineer experience this. We have a code to hide these issues from the end users, that means our data engineers within this code or within explicitly logging anything but it’s still in MLFlow UI. On this page you can see that we have automatically logged the version of the data being read, the version of the data being write, the JAR we used and the version of JAR we used. So we really brings zero problems to our engineers to remember logging everything and get a consistent logging view across applications. So this is about version management.
The second topic is about operational health. I would say that service side you need to manage your client and you need to have monitoring toolkit to be the foundation of your operational health. But when it comes to data processing jobs, we need a unified job launcher and we need to, at SafeGraph, we leverage eventlistener-based observability API to develop our observability stack. At the very beginning, we relied on Databricks UI to manage our jobs. Of course, it is very common to use UI. However, with time goes by we find there are more problems. For example, when you click Databricks runtime version, it really gives you a lot of choices and our DBR version started diverging and even we tried to unify the version, we need to keep telling, especially the new engineers, “Okay. Our internal tools are tested with, for example, DBR 8.1, but please don’t use 8.2 or 8.3, even if they are newer and they are there.”
Because they chose a pure UI-based job management interface, we really missed the unified job interface for us to instrument observability and debugging tools. And because of this issue, because of the lack of observability, it’s hard for us to answer several questions. For example, for data platform engineers, it’s hard for us to answer questions like: whether all applications have been finished successfully yesterday, whether there are some very, very expensive applications to be optimized, or whether we have seen similar issues in applications. So we can do something to save our engineers debugging time. And for data engineers when they debug a certain application, they need tools to support them to quickly answer questions like: whether the data skew happened, whether there have been too many small files being written to S3 and it brings low latency. So because we don’t have observability, we cannot answer these questions.
To resolve this problem, we developed our Spark Job Launcher. It is the interface between the channels to trigger Spark job and Databricks. Through Spark Job Launcher we can unify Databricks runtime version and also we can instrument monitoring and debugging tools. And to call the Spark Job Launcher we have different ways. On the Airflow side, we have customized the Airflow and Databricks plugin and on services side, we can call it directly, and in local environment side we have or internal Spark development tools which also calls the Spark Job Launcher. Because now we have unified the Spark Job interface, we can improve our observability.
Let’s look at how our monitoring and debugging tools work here. Within SafeGraph for each Spark application, we not only have the business logic with is defined by our user code, we also instrument the Metrics Listener and task and job failure event listener. In Metrics listener we dump Spark metrics to Dynatrace and with job and task failure event listener, we capture certain types of events and send the notification to Slack for our engineer. And also besides other applications to build a data set for our business, we also have Observability Spark application to talk with Databricks API endpoint and then build an overview of all the applications and give the notification in Slack and save the historical records in Amazon S3.
Okay. So this is about operational health, some part of operational health of course, I only cover two aspects here. The last topic that is about reliability and efficiency. For service, you always have certain things due for your availability for latency, but when it comes to data processing jobs, our finding is that it really becomes a case by case study. And if you want to optimize your availability actually you can always start from your efficiency because in the context of Spark, your application usually becomes fragile. Giving certain amount of resources, it usually becomes fragile where it faces resource pressure. So if we want to have a reliable Spark application, the first step might be just focusing on finding the performance bottleneck and improve the efficiency.
So I know we said this is a case by case study. I want to share one of the most tricky cases we meet in SafeGraph. We have such a data ingestion application. For every time we see thousands of csv files and for each of the files, we launch a job in parallel to process that, get some result and perform dynamic partition overwrite to S3 parquet. This S3 parquet had accumulated hundreds of thousands of parquet partitions and the last step of this applications is to output the snapshot of this parquet for downstream usage. The problem with this application is that the running time of this application can be as long as, more than 24 hours and also interfered occasionally. Well it is expensive and it is not reliable.
After some debugging, we found two bottlenecks here. First is actually the DAGScheduler in Spark becomes the bottleneck. Because we launched so many jobs, the DAGSchedular cannot serialize the closure and dispatch the tasks in a timely manner. That leads to underutilized resources in our Spark application and the overall progress of our application is really moving slow. And then the second bottleneck is on the last step. When we output the snapshot to downstream, we found the S3 server becomes the bottleneck because essentially we have so many partitions, we are reading files from all of them and all http requests were stuck there. So how to optimize that?
Here we, first thing we have a bottleneck at a DAGScheduler, so we start batching the job. When you [inaudible] launch a job before having csv file, we batch 100, it’s a configuration number, we batch 100 csv files to a single job and then with this optimization we can shorten the job duration to three hours on average. And since the S3 package containing hundreds and thousands of files becomes a bottleneck, then let’s just don’t partition that because the major problem for us to have a partition table is to update the [inaudible] with a [inaudible] partition key. But now since we have Delta Lake format, let’s replace the partition table with a non-partition Delta Lake table and leverage the upsert functionality in Delta Lake to do that. Then with limited number of files, we can further shorten the job duration to about just one hour. This is a case of optimizing the reliability and efficiency of our Spark Application. We shorten the job duration from 24 hours to about one hour, and then everything not only runs faster but also runs much, much more reliable.
Okay. In summary, providing data as the product at a very large scale is challenging because instead of now you’re treating your data processing stack as online service. You need to do everything you needed to operate an online service or also you need to do some mapping to the context of data jobs. And the second point here is the tools like Delta Lake and MLFlow are really great. Besides how to use that I think a more important key consideration is how you want to introduce these tools to your company. Of course, you can follow official docs but also maybe a better way is to consider how to pursue the minimal changes to your existing flow to your engineer’s habit so it can bring you a faster landing experience.
And the last thing I would like to mention is that when we build our platform cloud-based solution, we need to think about what can the solution bring to you. Of course, Databricks as a company such a cloud-based solution can bring a very, very critical foundation for your data processing layer but you still need some company-specific work to make a general cloud-based solution fit your scenario in an optimal way.
Thank you very much for listening to our talk. A very import thing is we are hiring, we are very, very actively hiring. We are hiring Data and Machine Learning engineer because we are using data and machine learning to understand this physical world. And we are also hiring an Infra and Platform Engineer. Our team is growing very, very fast so we need a well-designed platform and infrastructure to make us 10 times more productive. So if you are interested, ping me or access our website. Thank you very much again and please give feedback to our talk. Thank you.
Nan is a Software Engineer in SafeGraph, where he works on building the data platform to support the scaling of business of this data company. He also serves as the PMC member of XGBoost, one of the m...