No matter if your data pipelines are handling real-time event-driven streams, near-real-time streams, or batch processing jobs. When you work with a massive amount of data made out of small files, specifically parquet, your system performance will degrade.
A small file is one that is significantly smaller than the storage block size. Yes, even with object stores such as Amazon S3, Azure Blob, etc., there is minimum block size. Having a significantly smaller object file can result in wasted space on the disk since the storage is optimized to support fast read and write for minimal block size.
To understand why this happens, you need first to understand how cloud storage works with the Apache Spark engine. In this session, you will learn about Parquet, the Storage API calls, how they work together, why small files are a problem, and how you can leverage DeltaLake for a more straightforward, cleaner solution.
Speaker 1: Imagine you’re hanging out with your family and friends. It’s weekend time. Spring is the right season to be outside. You’re really having a good time, sharing and exchanging experiences. Maybe you’re drinking a glass of wine, or if your favorite cocktail is available, you might be drinking that as well. And all of a sudden, you get a phone call from an unrecognized number. You pick up the phone and you hear a metal voice off an automated alert system. You know that something is happening. It’s some production issue. You don’t yet know how severe the case is. Is it the customer phasing? How long it will take you to fix it? Can you fix it? Should you fix it? But you know you are responsible for the cluster health. So you start to investigate.
You open all the right dashboard to monitor the cluster health. You also open the Spark UI and there you notice there’s some failed jobs. So if one of the jobs fails, you still don’t know what exactly that was. You don’t know why it failed. So you start digging. While you continue digging, you realize there’s actually a lot of failed tasks happened. Failed tasks doesn’t necessarily mean failed job. It can be that multiple tasks failed, but yet again, Spark was able to recover and finish the job successfully. It does mean something happens in the cluster that you need to figure out and get to the root cause of it. So you start digging into the logs.
In the logs, you see all this timeout, something related to the node servers, something maybe with the IU blog not being available for some reason. And you are positive something has terrible happened and you start blaming the public cloud.
You immediately started to think, this would never happen on prem. It’s only the public cloud that makes my life a hell. But you know what? Sometimes these exceptions happens on prem as well. It doesn’t necessarily mean it relates to the storage or to the public cloud. Specifically, Small File Syndrome are an issue that we encounter both on prem and the cloud storage as well.
Hi, and welcome to today’s session where we’re going to deep dive into the Small File Syndrome and why is it even a problem. So we already seen the introduction and how it can impact and how it can change and create exceptions in our logs. But we want to really understand where does it come from? How can we mitigate, how can we detect it and what are the different scenarios that brings that to life? And also what are the prominent use cases that we need to be aware of in order to mitigate it in time?
So this is what we’re going to talk about. We’re going to talk about the problem, why it happens, detection and mitigation. And in the end, we’re going to see a nice demo of how picking the different file formats can really impact our query performance. So why does it even happen? Why do we have these exceptions? Why is it a problem? So in order to understand that we need to take one step backward and look at the Apache Spark query lifecycle abstraction. Most of you already know we have this Spark driver and in the Spark driver we have the job that starts the whole run. And the logic is being distributed across many executors and tasks. Where each task is handled, it’s all part of the storage. And when we’re thinking about the storage, we really need to understand what’s the difference between local storage, storage that we might have locally on our VM, versus the storage that we use, the object storage that exists in the cloud.
So today, when you work with the cloud, storage locality is still a major part. But it highly depends on how we build the infrastructure for the data. Many cases, even with storage locality, we’ll still need to do some call to the storage itself in order to bring that information and write it at the end. So how does it look like and why does it impact our query performance? Let’s take a look at Azure Data Lake Gen2 architecture. In Azure Data Lake Gen2, we have two different layers. The first one is your hierarchical file system. And the second one is the blob storage. So the blob storage at the bottom really helps us with high availability, disaster recovery, data governance, and management, our back storage, a role-based access control, where we can actually control who is accessing the data, the objects, different sizes and different lifecycle and policy management.
And on top of that, we have another layer of security with different authentication, performance enhancement, which means they have a caching mechanism. So they can actually already cache the data for us, so we can have faster read and writes for the data. And then there is the scale and cost effectiveness, and all of that. In order to understand what happens when I am doing a write operation or read operation. So actually when I’m writing a file, I’m writing three different components. The first one is the data itself. I am going to write some image, some texts, some bytecode. The second one is expendable metadata that’s going to describe what is this data all about? So it can hold properties like management, who can access it, some statistics on the data and things like that. And the third one is a global unique identifier.
And when we say global unique identifier in the world of distributed system, it’s a global unique identifier for the whole distributed systems that are connected together. So having said that, it means we’re creating three different components. So it’s not only the data that I’m saving in the system. It’s also the metadata on the data itself, so I will be able to access it and enforce all the different policies. And the global identifier. By the way, without the global identifier, it will be a total mess. And we won’t be able to find our data in the distributed data system. This is the magic behind scaling up because then we can have multiple machines that connect together and together gives us this scalable storage that we like to use. And this is the object storage, and this is basically how it looks like.
So when my Spark executors in the task, doing a read and write call, they’re actually using the blob storage API and going through that hierarchy and doing the hierarchy of the global identifier metadata. Can I access? Am I allowed to access it? And all the way into getting into the right machine where the data is actually stored on disc. And when we’re dealing with tiny, tiny files, it means we’re doing multiple of these calls. So when we’re running analytics, if we’re running complex analytics using Spark, imagine that all Spark executors and all the tasks are doing this calls to the storage. And we know our storage is scalable. We know our storage is highly available. But we do know that there is some time to [inaudible]. There is some TTL that we can define in order to give our query in our API call, time that defines when we want to kill it.
Because when there is high pressure, high stress on the storage, it means that there’s going to be kicked in some mechanism of throttling. And throttling mechanism makes sure all the queries, all the APIs called, are going to get an answer, but it might take longer. So if I define my query to return the timeout to return in like five milliseconds or 10 milliseconds, I might not get my answer back. And then it’s going to return some exception saying, “This is the TTL you defined. We can’t cater to it right now.” And that’s it. I’m going to get back an exception. And you’re probably asking, so why file size matters and why we should care about when we’re defining our data, our distributed data, when we’re working with the [inaudible] Spark? Well, this is the case.
If I have one million tiny, tiny files of 60 bytes, that equals around 0.06 gigabytes. So if I’m doing read operation, I’m looking at around one million RPC report procedure calls that I’m doing inside my distributed data. At least, right? Because we mentioned there are three different components that we create when we’re storing and saving that data. What about if I’m taking all of that small files, small object files and combining it into one big file? So I can have, instead of having one million files of 60 byte, I can have one file of 0.06 gigabyte, which is approximately 60 megabytes, which can translate into one RPC, two, three RPCs. And it really depends how the blocks then move across the network. But at least I don’t create pressure on my server where I’m contacting the storage. So this is where I can get my improvement.
And this is why the file status matters, because although the network can definitely support this stress, it means it might take some more time and it will take longer for us to get the information back. And that brings us to… Once we understand that that brings us to the next stage of detection and mitigation. And the first step of the tech thing is understanding where the situation can even happen. The first one is event stream. So when I’m combining the two worlds of microservices and big data analytics, I often need to understand how I’m capturing all this events. It’s really great that I’m able to support a huge scale with a huge throughput of events that comes from IOT devices, servers, applications, and all of that. But it’s a totally other different question, how I support querying and analytics of that data at huge scale.
And the reason for it is because the events themselves often translate into kilobytes scale JSON, during the ingestion procedure. And this is where we see the most cases in the most issues in the ingestion part where I’m building my Spark structure streaming to take that information and bring it back to the cluster, updating the format, et cetera. Sometimes when each batch is processing… Each micro batch in my server streaming is processing only one file, then it might be a problem. Or only one small file, then I might have a problem. Second case where we see it, is when we have Over Paralleled Apache Spark jobs. So we know our Apache Spark, we have the executors and we have multiple tasks. And each read and write during the task might be writing their own files. Depends on the logic that we need. Depends on how we define them.
But if we have too many tasks processing tiny, tiny percent of the data, then it means we might be in the stage where we have Over Paralleled Apache Spark jobs. So this is the second case. The third case is having Over Partitioned Hive tables. So this is another thought about how we are doing the granularity of saving, these APIs called data, how we’re saving this events. And a rule of thumb, if our Hive partition has less than 256 megabytes per partition, we need to check and reevaluate. Maybe we need to change the file hierarchy to make sure it works well with what Hive is expecting on that point.
So we understand the cases. We went through three different cases, three different prominent cases that we see in the industry. So what do we check when we are building our own infrastructure or what can we check when we want to in the existing infrastructure we already have? First one is Data Skew. Is my Hive partition file size smaller than what we mentioned before? Do we need to change the hierarchy? Do I need to maybe combine a couple of files? Second one, when looking at the Spark job writers in the Spark history server UI. I want to make sure what is the input? What is the output of the data, how much I’m reading, how much I’m writing? Because these translate into small files. And the last one is the ingestion file size. Is when I’m combining the world of microservices and the world of big data analytics, and I want them to work together.
Cool. So how do I mitigate. Like we mentioned before, file hierarchy. Define the source, the API type, and then specify maybe by dates. If it one minute is too granular, you can make file for the whole hour. It really depends on the use case of the information, but this is another option to go with. When you’re designing your petition, and this is probably the hardest part, you need to understand what the usage. Who are the data scientists are going to use it? Who are the business analysts who are going to use it? What are the queries that are going to run? How they’re going to use it, if they have joins there or if they have some filter operations there? How are they going to build their query? So having the design and the usage in mind can really help you when you define the partition, and there is no rule of thumb in there.
Different companies build differently. Different data sources in different data customers use it differently. Right, the third one is re-partition versus Coalesce. It’s important to understand the difference. Re-partition means I’m taking all my data and I’m reporting it. So it might be that I’m adding even more partitions than what exists today. Not always. I wouldn’t want to do it always. Sometimes it will work, but sometimes especially with small files, I don’t want to introduce more small partition. I would actually want to have less partition. So it’s important to understand the difference. And then we have the Coalesce function. And Coalesce does not introduce more partition. So this is a better function often to use. But again, according to your needs and according to the data and the queries and your customers.
On Databricks, we have the auto optimize. And with the auto optimize, you can set up the properties to have optimize during the write and also optimize doing the auto compaction mechanism. So you can already set the Hive of the data warehouse that exists inside Databricks to have to constantly make sure it’s optimizing the storage and the files and [inaudible] . The last one is the Delta Lake optimized performance. And the Delta Lake is a new storage format that was introduced a couple of years ago. Many of you know and use it and discussing it in the summit. And it is doing multiple optimization that works well with Hive. The first one is compaction. So by using a mechanism, then [inaudible], it was able to take multiple small files and compact them into 1, 2, 3, 4, 5 files, depends on what exists there. Zordering is another technique that can help us combine information according to columns.
And according to when I know my call, I’m going to know how my customer use the columns, I can specify Zorder. So it’s a mathematical technique, definitely. If it’s relevant for you, try it out. And then we can define the target file size for Delta. So it will know in advance, what is the minimum at least, or what is the target? And then lastly, we have the file size for rewrites. So if you’re rewriting any data, there’s going to be some file size to that we’re going to choose from. Great.
Now, it’s the demo. Let’s take a look at the demo.
To better understand how file format and file size affect the performance of querying and analyzing data, we’re going to look into a simplified example of IOT data journey from transmission into data analytics. And today this example is highly used in factories when they want to mitigate issues and save on expenses by attaching sensors that can help them later monitoring everything that happens in the factory. Also make data-driven decision based on that by running some analytics and using tools such as machine learning to extract insights. And one of the prominent examples is valve monitoring and valve management, where we can collect events from all valves in the factory, and later on analyze their behavior, and what happened with them. This is the good sides of the things. The challenging side is often, this results in a huge amount of data.
And since we want to be able to respond relatively fast or to have ad hoc queries that run fast, we want to make sure our data systems is designed for it. So for this demo, I already mimicked IOT event data that is ingested into our system with structured streaming. And we’re going to test out two different storage formats and evaluate how they work. So in the factory, imagine that I have my measure and control, and my measure and control is constantly sending events, a telemetry stream into my analytics platform, into my data warehouse. Where later inside the data warehouse, the data is being extracted, processed, organized, and reached with other enterprise data or external database in order for us to extract insights and run analytics on top of it. For capturing the telemetry for data analytics, you can use tools such as Apache Kafka, which is scalable and highly available as well.
So it can really helps you capture a high throughput of data. The challenge is each event can have its own small JSON file that is being captured. And to run the performance evaluation on these JSON files, we’re going to mirror the data that was captured from tools such as Apache Kafka into Parquet format and Delta format. So in order to do so, I created two different Spark readers, read streams and two different write streams. One of the writes is going to read the data and write it into Parquet. And the second one is going to write it into Delta Lake format. In the file itself, I have my events by schema. It has event ID, it has event name. It can be failed or open or any of such. And I have event time. A very simple example of the data. Using Hive, it’s already built in into my Azure Databricks environment. I’m going to create a new table from the Parquet location.
So here I’m creating. If not exist, I’m creating a database, and I’m creating table zero one, which is from a Parquet format. And I’m specifying the location. The second table is going to be a Delta table, specifying Delta format and location. And just to make sure there is no data duplication or no data got lost, I want to make sure they have the same amount of rows in each table. And I already know I generated 110 files, so there should be 110. And you can see there’s 110 in the Parquet and in the Delta table. All right. Now I want to see how long it takes me to query that table. So I’m going to run two identical queries. That’s going to select event name and event time. The only one thing that is difference is I’m going to run that query once from the Parquet and the second time from the Delta table.
So running the Parquet one, and this one took me 1.56 seconds to finish. And now I’m going to run my Delta one and that one took 0.90 seconds. All right. So the Delta one is already faster. I wonder if that’s the case or is there something I can do about it? So for that, and you already know about it, we can use the Hive optimized command that helps me optimize the layout of Delta Lake, Delta. So I’m going to use optimize and the Zorder one. Let’s run the Zorder. Save some space on the data, and let’s see what’s happening. NumFile added. One numFile removed. 101. So what happened behind the scenes? My optimized query combined a couple of small files into one file. And the couple of small files is actually 101 files that were removed, and one file that was added. They’re already addressing the issue of small files. All right, can we optimize Parquet table file? I mean, is optimized query can run on Parquet?
And unfortunately the answer is no. Analysis exception, Delta table not found. It means it’s only possible with the Delta format. I can run this optimization with the Delta format. Okay. So after I optimized, after I deleted 101 files and I combined it into one file for my Delta Lake storage. Now I want to see if it made any change, what changed in the performance wise? So Parquet table took 1.1 seconds and my Delta Lake table is taking 0.81 seconds. So yeah, definitely some change. Let’s run it again and see what’s happening. I know in the Delta Lake, there should be the caching mechanism should be kicked in at some point. So let’s see what’s happening with the optimization here. Yeah, absolutely. Definitely more prominent, 0.74 seconds. Yes. So running the Hive optimization by combining all these 101 files into one file, and then adding the caching mechanism the Delta provide, really helped with improving the run query. It’s almost… Yeah, it’s half the time of what the Parquet query takes. So Delta table wins in our case.
Well, after we did this demo, we definitely see now how Delta Lake can be more efficient than Parquet and how we can leverage the Hive optimized command in order to combine multiple small files. So we’ve seen the problem. We understand what happens. We learned how we can detect and mitigate. We learned about the three permanent scenarios where this happens. And I want to leave you with this quote that I think really represents everything that we do in the data and AI summit. So this quote is by Albert Einstein and he says, “Intellectual growth should commence at birth and cease only at that.” So I really hope you learned something new today. Always happy to chat and get your DMs over Twitter or over LinkedIn. Please feel free to reach out with any question or, if you want to share your experience with big data processing and big data analytics. Thank you.
Adi Polak is a Sr. Software Engineer and Developer Advocate in the Azure Engineering organization at Microsoft. Her work focuses on distributed systems, big data analysis, and machine learning pipe...