Skip to main content

Apache Spark 1.1: Bringing Hadoop Input/Output Formats to PySpark

Engineering blog

Published: September 17, 2014

Open Source5 min read

Two powerful features of Apache Spark include its native APIs provided in Scala, Java and Python, and its compatibility with any Hadoop-based input or output source. This language support means that users can quickly become proficient in the use of Spark even without experience in Scala, and furthermore can leverage the extensive set of third-party libraries available (for example, the many data analysis libraries for Python).

Built-in Hadoop support means that Spark can work "out of the box" with any data storage system or format that implements Hadoop's InputFormat and OutputFormat interfaces, including HDFS, HBase, Cassandra, Elasticsearch, DynamoDB and many others, as well as various data serialization formats such as SequenceFiles, Parquet, Avro, Thrift and Protocol Buffers.

Previously, Hadoop InputFormat/OutputFormat support was provided only in Scala or Java. To access such data sources in Python, other than simple text files, users would need to first read the data in Scala or Java, and write it out as a text file for reading again in Python. With the release of Apache Spark 1.1, Python users can now read and write their data directly from and to any Hadoop-compatible data source.

An Example: Reading SequenceFiles

SequenceFile is the standard binary serialization format for Hadoop. It stores records of Writable key-value pairs, and supports splitting and compression. SequenceFiles are a commonly used format in particular for intermediate data storage in Map/Reduce pipelines, since they are more efficient than text files.

Spark has long supported reading SequenceFiles natively using the sequenceFile method available on a SparkContext instance, which also utilizes Scala features to allow specifying the key and value type in the method call parameters. For example, to read a SequenceFile with Text keys and DoubleWritable values in Scala, we would do the following:

val rdd = sc.sequenceFile[String, Double](path)

Spark takes care of converting Text to String and DoubleWritable to Double for us automatically.

The new PySpark API functionality exposes a sequenceFile method on a Python SparkContextinstance that works in much the same way, with the key and value types being inferred by default. The saveAsSequenceFile method available on a PySpark RDD allows users to save an RDD of key-value pairs as a SequenceFile. For example, we can create an RDD from a Python collection, save it as a SequenceFile, and read it back using the following code snippet:

rdd = sc.parallelize([('key1', 1.0), ('key2', 2.0), ('key3', 3.0)])
rdd.saveAsSequenceFile('/tmp/pysequencefile/')
...

sc.sequenceFile('/tmp/pysequencefile/').collect()
[(u'key1', 1.0), (u'key2', 2.0), (u'key3', 3.0)]

Under the Hood

This feature is built on top of the existing Scala/Java API methods. For it to work in Python, there needs to be a bridge that converts Java objects produced by HadoopInputFormats to something that can be serialized into pickled Python objects usable by PySpark (and vice versa).

For this purpose, a Converter trait is introduced, along with a pair of default implementations that handle the standard Hadoop Writables.

Custom Hadoop Converters

While the default converters handle the most common Writable types, users need to supply custom converters for custom Writables, or for serialization frameworks that do not produce Writables. To see an illustration of this, some additional converters for HBase and Cassandra, together with related PySpark scripts, are included in the Apache Spark 1.1 example sub-project.

A More Detailed Example: Custom Converters for Avro

For those who want to dive deeper, we will show how to write more complex custom PySpark converters, using the Apache Avro serialization format as an example.

One thing to consider is what input data the converter will be getting. In our case, we intend to use the converter with AvroKeyInputFormat and the input data will be Avro records wrapped in an AvroKey. Since we want to work with all 3 Avro data mappings (Generic, Specific and Reflect), for each Avro schema type, we need to handle all possible data types produced by those mappings. For example, for Avro BYTES type, both Generic and Specific mappings output java.nio.ByteBuffer, while Reflect mapping outputs Array[Byte]. So our unpackBytes method needs to handle both cases.

def unpackBytes(obj: Any): Array[Byte] = {
val bytes: Array[Byte] = obj match {
case buf: java.nio.ByteBuffer => buf.array()
case arr: Array[Byte] => arr
case other => throw new SparkException(
s"Unknown BYTES type ${other.getClass.getName}")
}
val bytearray = new Array[Byte](bytes.length)
System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
bytearray
}

Another thing to consider is what data types the converter will output, or equivalently, what data types PySpark will see. For example, for the Avro ARRAY type, the Reflect mapping may produce primitive arrays, Object arrays or java.util.Collection depending on its input. We convert all of these tojava.util.Collection, which are in turn serialized into instances of a Python List.

def unpackArray(obj: Any, schema: Schema): java.util.Collection[Any] = obj match {
case c: JCollection[_] =>
c.map(fromAvro(_, schema.getElementType))
case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
arr.toSeq
case arr: Array[_] =>
arr.map(fromAvro(_, schema.getElementType)).toSeq
case other => throw new SparkException(
s"Unknown ARRAY type ${other.getClass.getName}")
}

Finally, we need to handle nested data structures. This is done by recursively calling between individual unpack* methods and the central switch fromAvro, which handles the dispatching for all Avro schema types.

def fromAvro(obj: Any, schema: Schema): Any = {
if (obj == null) {
return null
}
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
}
}

The complete source code for AvroWrapperToJavaConverter can be found in the Spark examples, in AvroConverters.scala, while the related PySpark script for using the converter can be found here.

Conclusion and Future Work

We're excited to bring this new feature to PySpark in the 1.1 release, and look forward to seeing how users make use of both the built-in functionality, and custom converters.

One limitation of the current Converter interface is that there is no way to set custom configuration options. A future improvement could be to allow converters to take a Hadoop Configuration that would allow configuration at runtime.

Never miss a Databricks post

Subscribe to our blog and get the latest posts delivered to your inbox