In the previous blog post, we introduced the new built-in Apache Avro data source in Apache Spark and explained how you can use it to build streaming data pipelines with the
to_avro functions. Apache Kafka and Apache Avro are commonly used to build a scalable and near-real-time data pipeline. In this blog post, we introduce how to build more reliable pipelines in Databricks, with the integration of Confluent Schema Registry. This feature is available since Databricks Runtime 4.2.
For long-running streaming jobs, the schema of data streams often changes over time. Schema evolution is a typical problem in the streaming world. For example, to support changes in business logic, you need to make the corresponding changes by adding new columns to a data stream. The schema changes could break the existing data pipelines and cause a service outage. Instead of stopping, updating, and restarting your pipeline in case of schema evolution, the pipeline design needs to answer the following questions:
- Which schema changes are safe to do?
- How to read data from a data stream in a future-proof way?
- How to track the change history of a data stream?
Schema Registry is the most popular solution for Kafka-based data pipelines. Like an Apache Hive metastore, it records the schema of all the registered data streams, as well as the schema change history. It also defines multiple compatibility levels. For example, you can enforce that only backward-compatible schema changes are allowed.
To support reading data stream in a future-proof way, you need to embed the schema info in each record. Thus, the schema identifier, rather than a full schema, is part of each record. Schema Registry provides the custom Avro encoder/decoder. You can encode and decode the Avro records using the schema identifiers.
Databricks has integrated Schema Registry into the
to_avro functions. You can easily migrate your streaming pipelines, which are built on Schema Registry, to Spark Structured Streaming. Furthermore, the
to_avro functions can be used in batch queries as well, because Structured Streaming unifies batch and streaming processing in the Spark SQL engine.
Sample Code for Using Schema Registry
Assume you have already deployed Kafka and Schema Registry in your cluster, and there is a Kafka topic "t", whose key and value are registered in Schema Registry as subjects "t-key" and "t-value" of type string and int respectively.
The following code reads the topic "t" into a Spark DataFrame with schema
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaURL) .option("subscribe", "t") .load() .select( from_avro($"key", "t-key", schemaRegistryURL).as("key"), from_avro($"value", "t-value", schemaRegistryURL).as("value"))
The following code writes the Spark DataFrame with schema
dataDF .select( to_avro($"key", lit("t-key"), schemaRegistryURL).as("key"), to_avro($"value", lit("t-value"), schemaRegistryURL).as("value")) .writeStream .format("kafka") .option("kafka.bootstrap.servers", servers) .option("topic", "t") .save()