The Rise of ZStandard: Apache Spark/Parquet/ORC/Avro

May 27, 2021 11:35 AM (PT)

Download Slides

Zstandard is a fast compression algorithm which you can use in Apache Spark in various way. In this talk, I briefly summarized the evolution history of Apache Spark in this area and four main use cases and the benefits and the next steps:
1) ZStandard can optimize Spark local disk IO by compressing shuffle files significantly. This is very useful in K8s environments. It’s beneficial not only when you use `emptyDir` with `memory` medium, but also it maximizes OS cache benefit when you use shared SSDs or container local storage. In Spark 3.2, SPARK-34390 takes advantage of ZStandard buffer pool feature and its performance gain is impressive, too.
2) Event log compression is another area to save your storage cost on the cloud storage like S3 and to improve the usability. SPARK-34503 officially switched the default event log compression codec from LZ4 to Zstandard.
3) Zstandard data file compression can give you more benefits when you use ORC/Parquet files as your input and output. Apache ORC 1.6 supports Zstandardalready and Apache Spark enables it via SPARK-33978. The upcoming Parquet 1.12 will support Zstandard compression.
4) Last, but not least, since Apache Spark 3.0, Zstandard is used to serialize/deserialize MapStatus data instead of Gzip.

There are more community works to utilize Zstandard to improve Spark. For example, Apache Avro community also supports Zstandard and SPARK-34479 aims to support Zstandard in Spark’s avro file format in Spark 3.2.0.

In this session watch:
Dongjoon Hyun, Software Engineer, Apple
Pang Wu, Software Engineer, Apple



Dongjoon Hyun: Hi, everyone. Welcome to this session. We are going to talk about the rise of Zstandard in Apache Spark 3.2.
My name is Dongjoon Hyun. I’m Apache Spark PMC member and Committer, and also have been contributing to Apache ORC and REEF project. In Spark community, I am focusing on Spark SQL and Kubernetes area. Kubernetes become GA in Apache Spark 3.1, and Zstandard become more and more important in Kubernetes environment. My colleague, Pang Wu, will give a brief introduction to Zstandard.

Pang Wu: Hi everyone. My name is Pang Wu. I’m a software engineer in Apple. I play two roles in my day-to-day job. As a big data engineer I leverage big data to build pipelines that solve problems in maps. On the other hand, I also build tools and infrastructure to help other developers in a team to scale up their work. And this is where Dongjoon’s teams and I work closely together to make sure we deliver the latest feature of Spark to the developers, and also making sure the feedback from the real use case can be hear to the community.
So in this talk, we are trying to give the audience an overview of Zstandard on Spark ecosystem from both the perspective of a Spark developer and a user. Here’s the agenda; so we will start the introduction of Zstandard, and then Dongjoon will talk a bit about the issues of Zstandard with the Spark integration and its history. And then, I will talk about how to use the compression codec, and in what scenario would best help you improve the efficiency. Then Dongjoon will talk about a little bit on the limitation and end the talk with a summary.
So, what is Zstandard? So, Zstandard is a fast compression algorithm providing high compression ratio. So in Spark, we may already be familiar with multiple compression codec, for example, like LZ4, which is ideal for use as the Shuffle I/O compression, and also Snappy, which is default use as the compression codec in Parquet. So, these are all high super compression codec, which kind of focus on the speed of compression. We also have another set of compression codec, like GZip, which focus on compression ratio, and Zstandard excels in such a way that it will give you the Snappy-like compression speed, but also GZip-like compression ratio. And it also offer a very wide range of speed and compression trade-off by allowing the user set compression level.
So in the next slides, I’m going to hand over to Dongjoon to talk about the history of Zstandard Spark integration and the issues.

Dongjoon Hyun: Many people try to use Zstandard in their productions. However, there are some issues. The possible Apache Hadoop ZstandardCodec requires Hadoop 2.9 or above. You cannot use it if your Hadoop version is lower than this. As you know, Apache Spark still provide user both Hadoop 2.7 and Hadoop 3.1 based distributions. In other words, Hadoop 2.7 based Spark distribution users are unable to use Zstandard. So in addition, even with Hadoop 3, the user need to provide a custom Hadoop environment specially built with the Zstandard library. For example, the latest official Spark 3.1.1 distribution will throw exception when you try to use Zstandard in the Vanilla Kubernetes environment.
To use Zstandard without Hadoop codec limitation, we need to implement and use own Zstandard codec by using JNI library, like zstd-jni or L compressor. Several Apache projects, including Spark choose this way.
Next issue, the performance. Although native Zstandard is fast, there exist a Java virtual machine side overhead. So, without a proper optimization, it shows relatively slow compression and decompression speed. Recently, we optimized Apache Spark, Apache Parquet and Apache AVRO by using RecyclingBufferPool. The improvement are impressive. As you see in the graph, the compression become faster up to 3.2 times, and the decompression decompressor up to 1.7 times. In addition, efficient performance management is important not only for the speed-up, but also the memory consumption of the code. Previously, we observe heavier memory consumption in Zstandard use case. For example, if you switch Spark’s Java I/O codec from LG4 to Zstandard, you may hit OOMKilled situation like this. We also optimized and improved garbage collection behavior recently in Apache Spark and AVRO.
The next issue is Zstandard JNI inconsistency. Of course, it means HI incompatibility. To simply put, Zstandard’s JNI library is not driving replacement even in our maintenance version upgrade. Apache Spark, Parquet, AVRO, Kafka choose Zstandard JNI library in the same way. However, these are independent projects with different development speed, that consequently, they use different zstd-jni versions. In other words, if you replace zstd-jni JAR file in stock distribution, it may break Parquet/AVRO/Kafka functionality. We hit those issue and collaborate with upstream to resolve this. So in general, when it comes to JNI related library used by multiple projects, you should test it in all combination.
Next issue is performance inconsistencies. Please note that we are talking about zstd-jni Java library. Instead of the Zstandard library, the newer version may have a performance regression in some cases. For example, it happened when zstd-jni Java library introduced BufferPool and lead the JAR in later. The buffer pool was added as a default at version 1.4.5, and diverted to NoPool policy at version 1.4.7. As a user of this library, we need to lead by our usage to be consistent across Apache Spark versions and all the other Apache project.
To sum up, to mitigate these inconsistency or incompatibility, we collaborate with Apache Spark Parquet, AVRO, Kafka community, and upgrade it to the same 1.4.9 version consistently and improve their use cases.
So, let’s take a look at the brief history in the project-wise perspective. The first, Apache Spark with Zstandard.
The Apache Spark added its first Zstandard compression support at 2.3 three years ago, but it was much slower than LG4 in general. 2.4 start to support Parquet Zstandard with Hadoop 1.3.1 profile. However, it’s difficult to use due to the issue I mentioned before. Spark 3 start to use Zstandard by default for map status broadcasting internally, and also split event log compression from sharper compression. It enabled Zstandard for event log without affecting other Spark operation. Spark 3.1 upgrade Zstandard to 1.4.8, the latest version.
The second, Apache Parquet, ORC and AVRO with Zstandard. All these projects latest release are starting to support Zstandard by removing Hadoop codec requirement, and also have the latest Zstandard 1.4.9, and it optimized the Java usage. Apache Spark 3.2 will deliver all these improvement on top of Spark itself Zstandard improvements. So, we addressed all the previously mentioned issues, and keep moving forward. To see the progress, please visit the umbrella JIRA issue, Spark 34651.
From now, Pang will talk about when and why and how to use Zstandard.

Pang Wu: Thank you Dongjoon. So in general, Zstandard could help in three places to improve efficiency. So the first place is the event log compression. So, in Spark 3, as Dongjoon mentioned earlier, that Spark’s event log compression could be configured separately using Spark event log compression ratio codec.
So, here we compare the size of event log against three different setup. So the first setup is we enable the event log, but without the event log compression set to true. This is the default setup. So in this case, event log will write to external storage, which is configured by user either into the local file system or HDFS or S3 as text file. So, in the second setup, we enable the event log compression. So in this case, it will pick up the Spark Shuffle I/O compression codec as the compression codec of the event log. In this case, it’s LZ4. And in the third setup, we enable the Spark zstd as the Spark event log compression codec. So as you can see from the results, so the Zstandard is 3X smaller than LZ4, and 17X smaller than the raw text.
The second aspect Zstandard can help improve is Shuffle I/O. So for application with large Shuffle, Zstandard not only can help reduce the I/O cost significantly, it can also reduce the chance of hitting the local disk’s limitation. So, in the cloud environment, when user running Spark job, the local disk limitation is pretty low. So in this case, Zstandard can help to improve the robustness of the job.
So, in our benchmark test, Zstandard yields 44% less Shuffle write size comparing to LZ4. And also it consumes 43% less Shuffle read size comparing to LZ4 as well. And by the way, you can turn on Zstandard compression codec by specifying the Spark I/O compression codec configuration.
And also because of the reduced Shuffle I/O cost, we also observe the QUERY execution runs 15% to 20% faster in our benchmark test. And the larger the Shuffle is, the more significant the save is. So this is also something we observe in our production jobs.
So, the last aspect Zstandard can excel is to use as a compression codec in a storage format. For example, like Apache Parquet. So, in our benchmark test, a file size generated by Zstandard compression codec is very similar to what GZip has. It’s even slightly less so, and also the data size generated by Zstandard compression is significantly smaller than Snappy and LZ4. So, given the nature Zstandard has provide the user a wide range of CPU compression speed and ratio choices, and as well as a very stable and fast decoding speed, so the user can choose different compression level based on the application scenario, for example, if they want to save the storage, and the data only compress once, they can go for higher compression level, and vice versa.
Also, when Zstandard use with Apache ORC format, it generate very similar data size than Parquet. In our benchmark test, actually it is slightly smaller than the Parquet file size in general.
So, here’s the configuration you can use for Zstandard in these three different file format. So, in Parquet and AVRO, you can select the codec by specifying Spark C or K compression codec or Spark SQL AVRO compression codec. And in Parquet and AVRO, you can also specify different compression level as well as whether to use the buffer pool for compression. So in ORC, the compression level and the buffer pool is not yet implemented.
So in the next slides, I will hand the presentation over to Dongjoon to talk about the limitation of the Zstandard codex.

Dongjoon Hyun: Thank you Pang. Yeah, there is some limitations. So, there is a hardware acceleration technique for Spark workload from the Intel or Nvidia, but Zstandard is not supported by CPU and GPU acceleration yet. And also Apache ORC is still using Zstandard 1.3.5. It is using AVRO, so we can catch up that to latest zstd-jni library by replacing the L compressor. And also, Apache Parquet has more room to optimize memory consumption, so there is a Parquet issue, Parquet 2022, and it is not released yet. But Apache Spark 3.2 will take a chance to bring this patch together.
Let’s rev up. The Apache Spark community is improving Zstandard more and more for Apache Spark 3.2. The Apache Spark is revisiting Zstandard codec in various area to maximize our cluster utilization. For event log, Apache Spark 3.2 start to use it by default. And for Kubernetes environment, you can use it to unblock the size limitation of local containers to reach. For the long-term storage, Apache Spark 3.2 support in Zstandard in Parquet, ORC, and AVRO without any limitation.
Thank you.

Dongjoon Hyun

I'm a software engineer and my main focus area is a fast and efficient data processing. At Apple, as an Apache Spark and ORC PMC member, I develop and maintain the internal distributions powered by Ap...
Read more

Pang Wu

I am a software engineer at Apple focusing on building large scale data pipelines and the supporting infrastructure/development tools under the context of maps. I work closely with Spark communities i...
Read more