In a world where compute is paramount, it is all too easy to overlook the importance of storage and IO in the performance and optimization of Spark jobs. In reality, the choice of file format has drastic implications to everything from the ongoing stability to compute cost of compute jobs. These file formats also employ a number of optimization techniques to minimize data exchange, permit predicate pushdown, and prune unnecessary partitions. This session aims to introduce and concisely explain the key concepts behind some of the most widely used file formats in the Spark ecosystem – namely Parquet, ORC, and Avro. We’ll discuss the history of the advent of these file formats from their origins in the Hadoop / Hive ecosystems to their functionality and use today. We’ll then deep dive into the core data structures that back these formats, covering specifics around the row groups of Parquet (including the recently deprecated summary metadata files), stripes and footers of ORC, and the schema evolution capabilities of Avro. We’ll continue to describe the specific SparkConf / SQLConf settings that developers can use to tune the settings behind these file formats. We’ll conclude with specific industry examples of the impact of the file on the performance of the job or the stability of a job (with examples around incorrect partition pruning introduced by a Parquet bug), and look forward to emerging technologies (Apache Arrow).
After this presentation, attendees should understand the core concepts behind the prevalent file formats, the relevant file-format specific settings, and finally how to select the correct file format for their jobs. This presentation is relevant to Spark+AI summit because as more AI/ML workflows move into the Spark ecosystem (especially IO intensive deep learning) leveraging the correct file format is paramount in performant model training.
– Hi everyone, and welcome to the Spark File Format Ecosystem talk here at Spark Summit 2020.
My name is Vinoo Ganesh and I’m the Chief Technology Officer at Veraset.
To frame this session, here’s our high level agenda. We’ll start with my company Veraset and why this presentation is directly relevant for us. We’ll continue to discuss the goals of this session On-disk storage, OLTP and OLAP workflows and specific file formats. Finally, we’ll look at a few case studies, the format ecosystem and conclude by looking forward at some of the advancements in the industry. Let’s start with some quick background. Again, my name is Vinoo and I’m the CTO of Veraset. Previously, I was the lead of compute and Spark at Palantir Technologies. Veraset is a Data-as-a-Service or DaaS company focused on delivering high quality anonymized geospatial data to power innovation across a number of articles. Most recently, our datasets have been heavily used during the COVID-19 investigations to measure the effectiveness of non-pharmaceutical interventions and social distancing policies. Each year we process, cleanse, optimize and deliver over two petabytes of data. As a DaaS company, data itself is our product. We don’t build analytical tools or visualizations. In other words, we’re just data. Everyday, we think about the bottlenecks associated with operationalizing our data and how we can reduce them. To achieve our mission, it is pivotal that our data is optimized from a workflow, specific storage, retrieval and processing perspective. As an obligatory plug, if you find our mission or any of the content that we’re discussing today motivating, we are hiring. My email is in the upper right hand corner.
The singular goal of this session is to do a whirlwind tour of frequently used file formats in the Spark ecosystem. We’ll look into the On-disk storage strategies of these formats and build frameworks for selecting a format based on the data access workflows. For each format, we’ll talk through the feature set of the format, provide a visual inspection of a file, and look at some frequently used configuration settings. Finally, we’ll look forward at some of the new developments in the format ecosystem.
A format is just a specification for how bits are used to encode and decode data. You’ve undoubtedly seen many of these before. For our purposes, we’re gonna categorize them into three groups. Unstructured, Semi-structured and Structured. Text files are about as unstructured as you can get. CSVs and TSVs while they have structured, don’t have the structure programmatically enforced in the way that some of the other semi-structured data formats do. For this reason, we can consider CSVs or TSVs as either structured or unstructured data. Semi-structured data is data that contains tags or markers that delineate semantic elements, but aren’t as rigid as structured data. Typical examples of semi-structured data include XML and JSON. In this session, we’ll look at JSON as an example semi-structured data. Structured data formats, rigidly enforce schema and data type rules. Many leverages this knowledge about the data to provide optimizations at query time right out of the box. We’ll spend the most time in this session talking about structured data through the lens of Apache Avro, Apache ORC and Apache Parquet.
To frame this conversation, we need to understand what happens On-disk. On-disk data is stored in chunks known as blocks. A block is the smallest amount of data that can be read during a single read operation. Meaning that if you only want to read one byte of data on a hard drive with a 512 byte block size, you would need to read the full 512 bytes. As the disk spins, the head reads data. To read data from elsewhere on the desk, the head physically has to move and reposition in a process known as a random seek. As you can guess, reading sequential data is much easier than reading fragmented data. To ensure optimal reads we wanna read only what we absolutely have to while minimizing random seeks. The notion of spinning disks or blocks doesn’t just exist in spinning disks. Even technologies, such as HDFS have a block size of 128 megs. That leads us to a big insight. Your goal as an application developer, should be to lay data out On-disk in a format that is optimized for your workflow. Later in this talk, we’ll cover OLTP and OLAP. Two common categorizations for these workflows.
To understand how different file formats may lay data out On-disk, let’s look at some example data. Our sample data will consist of four rows and three columns and will be in tabular form. We’ll then break the data into pieces to understand the specific storage methodologies as shown in this slide.
In row oriented or a row wise stored methodology, rows are stored contiguously on disk. As you can see, A0, B0 and C0 are stored one after another inside of block one. This type of storage methodology can be great if your goal is to access full rows at a time. For example, reading the third row of data, A2, B2, C2 is a pretty trivial operation in the row oriented model only requires me to read block two. As you can infer, writing additional rows is a pretty trivial operation as well. However, let’s try something slightly different. Let’s say I’d like to reconstruct column B. Well, this would require a linear scan of the entire data set On-disk in every single one of the blocks. Not only that, but as part of this process, we’d be reading data that we don’t necessarily care about for the operation. Specifically, data from columns A and C. Well, that seems not great. Let’s try a different solution and see if we can fix this.
In the columnar storage methodology, columns are stored contiguously On-disk. As you can see, A0, A1, A2 and A3 are stored one after another. This type of storage methodology is great for the column query that I posed earlier. But now, trying to recreate the rows themselves has become the fairly intensive process. It’s not only that, but let’s say I have a new row of data that I wanna write. Well, that process just became a lot more expensive. Our takeaway here is that the columnar storage model allows us to quickly do column level operations. But we’d really struggle with workflows that are write heavy. If only there was a way we could get the best of both worlds. Well, lucky for us, turns out there is.
The hybrid storage model is a combination of both the row-wise and the columnar model. In this model, we first select the groups of rows that we intend to lay out. For our purposes, and to be consistent with the naming scheme of Parquet. As we’ll see later on. We’ll call these row groups. We’ll then apply the columnar layout inside each of the row groups. So let’s break down what you’re looking at. In row group one, in the bottom half of the picture, you can see we’ve logically grouped together the first two rows of the table. What we’ve implemented a columnar partitioning scheme inside of the group. Now what would this look like On-disk.
In the On-disk layout, we employ the row based partitioning strategy, but aim to fit one row group inside one block. This allows us to recreate individual rows on one singular read. I will allow us to do effective column level manipulations when using conjunction with some of the features of the format’s that will soon discuss. I wanna call this out explicitly. Matching the block to the row group size isn’t always possible as shown in this slide. But if you’re able to do it, it can lead to big performance ones.
In summary, we’ve seen that row wise storage formats are best for write-heavy workflows that involve frequent row additions, or deletions, which we’ll call transactional workflows. On the other hand, columnar formats are best optimized for read-heavy workflows, which we’ll call analytical workflows. Hybrid formats combine these for the best of both worlds. And for that reason, we’ll see a number of formats that use the hybrid approach. We’ve been speaking about the difference between transactional and analytical workflows for a bit of time now. And luckily for us, there are characterizations using the industry to differentiate them. The most common ones are called OLTP and OLAP.
The OLTP, or Online Transaction Processing workflow generally involves a data processing focused workflow. It operates with short lived transactions, and is geared towards row based processing rather than column based processing. Meaning data is generally updated and deleted on a per record basis much more often. We can best reason that row wise storage formats are generally better for OLTP workflows. The OLAP or Online Analytical Processing workflow generally involves an analytics focused workflow. These are generally workflows that involve column based analytics. Since the queries here are generally more complex, the transaction times are longer as well.
These types of workflows are best stored by columnar storage formats. This leads us to yet another insight. Our data storage and access patterns should inform the selection of our file format. Let’s put this in practice with some example data.
We’re going to use this data throughout the rest of the examples. It is notional data that contains information about different student’s scores in different classes. You’ll notice a few things immediately. First, we have three columns, all with different data types. The first column contains an integer which describes the ID of the student. The second one contains a string which contains a subject. And finally, the third column contains a double describing the student’s score in that subject. Additionally, the data types contained in these columns are relatively simple data types. Meaning we don’t have any structs, arrays, or enumerations here. Finally, each of these columns have pretty clearly defined minimum and maximum values. Whether through mathematical comparison or string lexicographical comparison. With this in mind, let’s kick off our whirlwind tour of file formats. Starting with our good old friend, the CSV. CSV stands for Comma Separated Value. It is a format that delineates columns with a comma and rows with a new line. The CSV format was originally developed by IBM in 1972 and was largely motivated by its ease of use with punched cards. You can see an example of the format on the right hand side where I cut out my table dot CSV. The CSV format is a flexible row based in human readable format. Its flexibility can come at a cost though, without stringent file format rules. It’s easy to have corrupt or unpassable CSV files floating around as a I’m sure many folks on data engineering teams here can attest to. The CSV format is compressible, and when that data is either in the raw or compressed format with the splittable format like (indistinct), it’s also splittable. Spark has a native CSV reader and writer making it both fast and easy to write CSV files. CSV is a pretty standard format in the ecosystem, and can be especially powerful when working on smaller data sets that don’t need some of the query time optimizations that would be beneficial for larger data sets. Now, let’s talk about a ubiquitous semi-structured data format, JSON.
The JSON format was originally specified in the early 2000s. It’s the first file format that we discussed that falls in the category of self-describing. A self-describing file is a file that contains all of the necessary information to read and interpret the structure and contents of a file inside of the file itself. It generally includes attributes like the file schema or the data types of the columns. The JSON format was specified not only as a storage format, but also as a data exchange format, which makes it a bit more verbose than CSV. However, similar to the CSV format, it is row based and human readable. As you can see in the cutted out file on the right hand side, JSON contains the column name as part of each row. However, we’re still relying on Spark schema inference to select the correct data types of the columns. JSON is both compressible and is generally splittable. Save one exception involving the whole file read operation in Spark. JSON is also natively supported in Spark and has the benefit of supporting complex data types like arrays and nested columns. Finally, it’s pretty fast from a write perspective as well. The JSON format can be a powerful format, especially when you have highly nested values. However, it lacks some of the optimization of some of the structured data formats (indistinct) students see. Now, let’s talk about the heavy hitters. The heavy hitters in specifically the Spark file format space. Starting with Apache Avro. Avro is unique in that it is both a data format as well as a serialization format. In other words, Avro can be used both for writing data to disk, and for sending data over the wire. It’s a self-describing format that has a huge feature known as schema evolution. This unique feature gives users the ability to update the schema of a data set without requiring a full rewrite of all of the files inside that data set. The actual schematics of how schema evolution works is beyond the scope of this presentation. But suffice to say that there are a complex set of governing rules to ensure things work properly. In schema evolution, both column additions and deletions are supported with full backwards compatibility with previous schema versions. Avro is a row based and binary format. It’s the first format that we’ve spoken about this far that is not easily human readable. Now there are ways to inspect the file as we’ll shortly see, but it won’t be as easy as just cutting the file out. Avro is both compressible and splittable. Reading and writing Avro files is supported using an external library in Spark. Finally, Avro supports rich data structures like arrays, sub-records and even enumerated types.
Using Avro tools,
we can dump a file to inspect its contents. The first section shows the JSON representation of the data inside of the Avro file.
You’ll notice information such as the column name,
data types, and actual values are all contained inside of the file.
Equally interestingly, we can dump the metadata of the same file to view the schema and compression codec of this file. I wanna draw your attention to the blue values specified as the type in the fields block. These are complex types called union types that specify the value of the field may either be the data type specified or null. This is how we specify whether a field is nullable in the schema. You can use Avro tools to manipulate and understand the contents of Avro files, very powerful ways.
Here’s some configurable Spark settings for Avro. Both of these settings control compression options, including the implementation in deflate level. We can see that Avro has powerful features, but it’s row oriented nature generally leads it to better support OLTP workflows.
Let’s now look at a little bit more into the data types that are better supported for OLTP workflows. Starting with ORC.
The Optimized Row Columnar or ORC file format has its origins in the Hive RC format. It was created in 2013 as part of the Stinger initiative to speed up Hive. It is a self-describing hybrid format that groups rows into structures called stripes. Similar to Avro, it is a binary format that will require us to use special tools to inspect or visualize the file contents. ORC is both compressible and splittable and is natively supported in Spark. ORC also supports rich data structures and Hive data types such as structs, lists, maps and union. To understand the power behind ORC, we’ll need to delve a little bit more into its file structure.
As I mentioned, ORC stores row group information in structures known as stripes that are by default 250 megs. Each stripe contains three sections. Index data, row data, and the stripe footer. The index data section contains metadata about values inside of the data. That section contains information about the minimum and maximum values on a per column basis, as well as the row offsets within the data. The actual data itself lives in the row group section of the stripe. As you can see in the image on the right, the columns are stored in a columnar manner inside the row data section. Similar to what we saw in the hybrid storage method earlier. The stripe footer contains a directory of stream locations. Finally, each ORC file has a postscript that contains compression information. To get a more concrete understanding of the data contained in these files, let’s actually inspect one of them.
Just as we use Avro tools before, we can use ORC tools to inspect an ORC file. You can see some of the structures that we spoke about here. From top pop up, we can see that this file has four rows. It was compressed by Snappy, has a compression size of 262,000 bytes, or about 262 kilobytes. The schema and data type following the type section as well. Under the stripe statistics block, you can see that we have one stripe that appears to have four columns. Our actual data is stored in the latter three. We can see that each column in a stripe has four values, no null values, how many bytes on disk the column takes up, and the minimum and maximum values for each column. On the right hand side, we can see detailed information about the index data inside the stripe. We can see where the row index for each column starts, as well as the data itself. Finally, the bottom four lines show the encoding that is being used. Direct the first RLE V1 or Run-length Encoding V1, and direct V2 or first RLE V2 or Run-length Encoding V2. Looking at just this output, the wealth of information empowered contained in structured data formats like ORC should be apparent.
Spark has a few ORC config related settings.
The out of the box set defaults are mostly same so I would recommend being very deliberate and mindful before changing these. There are a few settings that I’d like to encourage you to be particularly mindful of. The first is the columnar reader batch size. This setting controls a number of rows included in the batch vectorize. Be mindful of this value as tuning it incorrectly could (indistinct).
The second setting I’d like to call out
is the new merge schema functionality available in Spark 3.0 that allows on the fly merging of schemas of all of the files in the data set. It’s off by default.
But if you have a use case for it, it is available for use. ORC has large step in the file format ecosystem, and specifically the structured file format ecosystem. Next, let’s turn to Apache Parquet.
Apache Parquet is one of the most prevalent formats
in the Spark and big data ecosystem. And the majority of you have probably used or at a minimum, at least seen it before. Parquet was originally built as a collaborative effort between Twitter and Cloudera. It’s a self-describing hybrid binary format that is both compressible and splittable and it is natively supported in Spark. If these lists of attributes look familiar from the ORC slide, don’t be too surprised. The format’s are actually fairly similar conceptually.
In Parquet, each row group is a container for a set of column chunks. The column chunks are made up of a set of contiguous pages of columns. As we can see, just like ORC there’s information contained about the columns inside of the metadata.
That information includes the column type, path, encodings, and codec, as well as the number of values, data and index, page offsets, compression sizes. Any additional metadata. As we saw in the hybrid storage format one Parquet file can have multiple row groups inside of it, as you can see in the image on the right hand side.
We can inspect Parquet files using Parquet tools. Let’s start by looking at the metadata of the file first using the meta command. As you’d expect, we have information about the schema, row groups and columns inside of the row groups. We can see the data type in blue, compression in read, encoding in yellow, and the minimum and maximum along with number of nulls value in green.
We can also dump the file with Parquet tools to see the actual values contained in the data as shown on the right hand side. So I know there’s a lot of content on these inspecting slides. I’ll pause for a few seconds, but keep in mind that these slides will also be available afterwards.
Parquet has similar config settings to ORC. As such, I’d like to mention the columnar reader batch sizes setting again, as remindful to be mindful when tuning in. One thing we’ve seen a few times is that columns containing metadata have information about min and max values. And these may just be nice pieces of information to have. But how do they actually help us? The answer to that is known as predicate push down or filter push down. The metadata that exists on columns allows for pruning of unnecessary all irrelevant data. Meaning for example, if I have a query that’s looking for column values above a certain number, through the min and max attributes. I can completely ignore or prune out certain row groups. Parquet actually has a wealth of these settings, called filter push down. You can see a full list of the types you can push down on, on the right hand side. And these optimizations are available to you right out of the box. The structured format ecosystem brings a number of optimizations to data analytics, which can drastically decrease query times. To demonstrate this, let’s take our discussion out of the abstract and move it into the concrete by looking at a case study about Veraset company I work at.
Veraset data pipeline processed over three terabytes of data daily. The numbers actually slightly higher now. Our entire tabular pipeline was previously written in GZip CSV. It took nearly six hours to run and any type of analysis of the data was an exercise in Zen patience. So recognizing that something had to be done, we revisited our workflow fundamentals. Veraset receives and writes data in bulk twice a day. We have a number of users, including ourselves frequently doing analytics on individual columns in the data. Finally, we found that queries we were running tended to be fairly long running and increasingly complex. This led to the obvious but fitting conclusion that we were using an OLTP system for an OLAP workflow. We need to change the system to be consistent with our workflow.
Additionally, as a data company, schema changes our breaking changes. So, we can effectively treat all of our schemas as fixed. No need for schema evolution capabilities. We observed most of our data is hot data, making Snappy a much better choice for compression than GZip. Given all of this, we made the decision that our pipeline was much better suited to be in Snappy Parquet, rather than GZip CSV. Once we made these changes, which were actually two lines of changes in the code, our pipeline dropped to a little over two hours. This seems like a seemingly minor change resulted in massive compute savings as well as engineering time savings. Not only that, but it was laundered across our customer base, who also saw large drops in data processing and analytics. Focused on both the cost and time perspective. One of our customer pipelines actually dropped from 11 day processing period to a two day processing period. So, clearly file formats are the greatest thing since sliced bread. Right? However, I do have one huge disclaimer, file formats are software and software can have bugs. To illustrate this, let’s look at a recent Parquet bug.
We’ve already covered the benefits and functionality of partition pruning already.
Prior to Parquet-1246 however, there was actually a pretty scary issue hidden in the code. This issue resolved a bug where a lack of sort order for negative 0.0 or positive 0.0 or net NaN values would result in incorrect partition pruning. Meaning entire row groups will inadvertently be pruned out of your query, returning incorrect results. Of course, once the issue was identified, it was swiftly fixed. I use this case today to bring up an important lesson that all developers should know. Keeping libraries up to date is imperative. New software releases bring bug fixes and performance improvements and releases of data formats are no exception.
As we look forward in the format ecosystem, I wanted to briefly touch on Apache Arrow. Arrow is an in memory data format that complements the On-disk formats that we just spoke about. Its goal is to provide interoperability between different formats. Arrow is a columnar layout in memory uses Zero-copy reads to minimize SerDe Overhead. This cache-efficient OLAP workflows and allows for SIMD optimizations on modern CPUs. Finally it has a flexible data model. Many are already seeing big performance ones in prod with Arrow. If your workflow frequently involves data exchange, for example, from Python to the JVM, you may wanna experiment with it. Other sessions at Spark Summit 2020 focus exclusively on Arrow and if you’re interested in learning more, I recommend checking out some of those talks.
To wrap up, it’s important to think critically about your workflows and your file formats. File format selection can have a large performance implications. So, it’s also important to test your format changes before going to prod. Compression is a topic we didn’t cover too much in this session. But as we’ve seen in our case, it also can have a pretty large impact on your data processing pipeline. Finally, keep your libraries up to date. You never know what bug fixes or performance improvements you may get for free. And with that, thank you for your time and attention. If you’re interested in learning more or have more questions, feel free to reach out or ask questions.
Veraset
Vinoo is CTO at Veraset, a data-as-a-service startup focused on understanding the world from a geospatial perspective. Vinoo led the compute team at Palantir Technologies, tasked with managing Spark and its interaction with HDFS, S3, Parquet, YARN, and Kubernetes across the company (including a Datasource V2 implementation and the External Shuffle Service SPIP). Vinoo is also an experienced startup advisor, advising Databand on helping solve data observability problems across the stack, and advising Horangi on building and enhancing their existing best-in-class cybersecurity product, Warden.