Simplify CDC Pipeline with Spark Streaming SQL and Delta Lake

Download Slides

Change Data Capture CDC is a typical use case in Real-Time Data Warehousing. It tracks the data change log -binlog- of a relational database [OLTP], and replay these change log timely to an external storage to do Real-Time OLAP, such as delta/kudu. To implement a robust CDC streaming pipeline, lots of factors should be concerned, such as how to ensure data accuracy , how to process OLTP source schema changed, whether it is easy to build for variety databases with less code. This talk will share the practice for simplify CDC pipeline with SparkStreaming SQL and Delta Lake. Users juest need to write a simple Merge Into Streaming SQL to build a CDC pipeline, which is from relational database to delta lake. Behind this simple Streaming SQL, we cover the data accuracy/auto data schema change detected, also with lots of delta lake improvement, data skipping to improve merge perfermance, streaming job transaction commit conflict with compaction.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Good afternoon, everyone. I’m glad to have the opportunity of making the presentation, and thank you for being here. Today’s topic is Simplify CDC pipeline with Spark streaming SQL and Delta Lake.

Simplify CDC pipeline with Spark Streaming SQL and Delta Lake

First of all, please allow me to introduce myself. I am a staff engineer from Alibaba Cloud E-MapReduce, Product Team.

I’m a Spark contributor focused on SparkSQL, and I am also a HiveOnDelta contributor. My presentation is in three parts. Firstly, I’m going to take a look at what is CDC, secondly, I will propose our CDC solution using Spark Streaming SQL and Delta Lake, and finally, I will introduce some future work.

First, let’s take a look at what is CDC.

Change Data Capture

If you are a data engineer, you probably have already come across the term CDC. CDC is short for Change Data Capture. It is an approach to data integration that is based on the checking, capture and delivery of the change to data source interface. CDC can help to load the source table into your data warehouse or Delta Lake. Here is our CDC pipeline for database. There are lots of data stored in the database or application source and we want to analyze this table. It is not probably to run the query against the database directly because it will affect the application performance. So we can use CDC to load the table to the external data warehouse or Delta Lake, apps that we can do ETL or Ad hoc on the target table stored in the data warehouse.

There are a lot of CDC solutions, including incremental import jobs or real time job. Sqoop is an open source tool that transport data between Hadoop and relational database. You can create a daily scheduled Sqoop incremental import job to load the data to our data warehouse, but Sqoop solutions have some drawbacks. For example, it still have load pressure on source database, which will affect the applications performance and the best jobs scheduled hourly and daily can’t meet the need of real time analysis. Additionally, Sqoop increments solution also have some limitation to the source table such as it need a last modified timestamp column.

And the Sqoop can handle delete rows because the subsequent increment job only import update or insert it those newer than those previously importing, it can capture the deleted rows. And we should do some DDL on the HIVE table manually if the schema of the source table change.

Another solution of CDC is using the binlog of the database. The binlog is a set of sequence log files that could then record or insert, update, delete operations. So streaming CDC pipeline using binlog, first, we can use some out source to us, like JSON, Maxwell to sync binlog to Kafka, apps that Spark streaming consume the topic from Kafka sequencing. Sequencing parses up binlog record and is the right to targeted storage system. We support Insert, Update, Delete, like Kudu or data or HBase. If you want to replay binlog to HIVE, you should do more complex merge logic by yourself.

This solution also has some drawbacks, for example, HBase and Kudu both have heavy source and we have lots of operational supports. If the data is too large, Kudu have some petitioner variability problem and HBase can not support high throughput analytics.

We have talked about the above two CDC solutions, wise press mode using Sqoop and another streaming mode using binlog, but they both have drawbacks. Here, we propose our CDC solution using Spark Streaming SQL and Delta Lake. We can drive Streaming SQL to parse the binlog and merge them into Delta Lake.

Spark Streaming SQL

Next, I will introduce Spark Streaming SQL first. SQL is a declarative language. Almost all of the data engineers have SQL skills especially in database and data warehouse, such as MySQL, HIVESQL, SparkSQL, etc. SQL make data analysts more easily and effectively. User can focus on your business logic, there’re lots of streaming engines provide SQL language, like Create SQL, Stream SQL, etc. By using Stream SQL, even if the user are not familiar with Spark Streaming or the user now learn Java or Scala programming language, they can also easily develop streaming processing.

Additionally, it is also low cost if you want to migrate from best SQL job to a Streaming SQL job.

In Sparks community, the ISO related JIRA, which discussed to support Streaming SQL in Spark and our team also get involved in this SPIP. Furthermore, we have implement our watching of Streaming SQL and we have integrate into our product, EMI in Alibaba Cloud.

The figure on the left hand shows the Streaming SQL for racing on the Spark engine stack. It is on the top of structure streaming. We provide some DDL and DML. For DDL, we support Create table, Create table and select, Create scan and Create stream. For DML, we support insert into, Merge into, and we also support some other operators, like select, where, group by, join, union all. And the UDF is also supported. Above all, structure streaming supports lots of sync and source. And a way add more sync at the source both, such as waste boats, some storage product in Alibaba Cloud like log hub, a tapes’ toy, etc, and others like to do already.

We instill some new keywords in DDL and DML. Next, I will pick some of them to explain why it needs them and how to use them.

The first advice, Create scan, create scan syntax define how to read a table. Why should we introduce scan to define how to read a table instead of reading the create the, data source tables directly? Because table created by Spark SQL is just a definition of data source. If we support both streaming SQL and batch SQL, spout count, determinate, whether this table related SQL is a batch query or streaming query, so we introduced the create scan syntax, take Kafka for example, as you know that Kafka data source table can be both used in streaming and batch processing.

In batch mode, we can use Spark SQL to query these Kafka data source table directly, but for streaming SQL, we must create a scan to tell the spark engine that it is a stream type query.

Here is the scan details index, create scan table named alias in our data source table and tell the query type by the keyword using, additionally, we can also send longtime parameters of this scanning option clause, such as max-offset per trigger. Create scan is a temporary runtime view on top of the data source table. It is not saved in our master, it will all be deleted after the Spark session exit. Create scan can be used to define a target table.

So if we want to use batch mode, we can query on the data source table directly, or we can create a batch scan on top of the table. If we want to use streaming mode, we must create a stream scan on top of the table. After create a stream scan on top of Kafka data source table, then we can use DML SQL to process the streaming data source. For example, we can select from the data source and insert it into the target table like data.

Spark for stream, Spark for streaming job, there are also longtime job parameters like checkpoint, location, output mode, etc. How should we set these parameters for this job through SQL? Here, we use create stream syntax, a stream represent our struck streaming job, the job is the, the job parameters together with DML SQL. Here is the detail syntax, question Kafka text, stream job with options clause, which is used to set the runtime stream job parameters, and the DML operations on the Kafka test scan. Additionally, it is allowed to run multiple stream jobs in one application, so we use both to create multiple streams in one SQL file. Every create a stream has own job parameters on the his own option clause, this is very effective and clearly.

Merge into statement is the combination of insert, update, and delete, you can specify a condition to determine whether to update data or insert data to the target table.

Here is the merge into syntax, merge into a targeted table from the source data defined in the using clause. The source in using clause can be created stream scan or a sub query, as and we should also provide the merge condition to determine whether to update, delete, insert to the target table.

We can use merge into to replay binlog, currently, we use both data and Kudu as a target table.

This command can simplify the implement of CDC in streaming mode and this is a simple example.

UDF is a important part of data analysis. It can extend more ability to process data using SQL, Spark streaming SQL also supports Spark SQL UDF or HIVE UDF, and it is also supports some window functions, window functions to both streaming to do aggregation of event time window and we use both two window type, wise tumbling window which means all the windows in the stream job not overlapped it and otherwise, hopping window or sliding window which means windows in the streaming job can be overlaptive.

And watermark is used to resolve the problem of late data. You can define the watermark of our query by specify the event time column. And the shares code on how late data is expected to be.

In the frame API, we can use this watermark function to set it. In our streaming SQL, we can put a delay UDF in the event statement.

Here’s our example, query data from Kafka to do average operator over a tumbling window with two minutes watermark.

Key now, I have introduced Spark Streaming SQL, we can use SQL to implement or struck streaming job instead of Scala, or Java API, it is more simple and more effective.

Next, API introduce Delta Lake and our work on it. Delta Lake is a open source storage layer, fabrics described it as a lake house, which is a new data management project.

It combines the advantage of Delta Lake and data warehouse. This figure shows the key feature of the Delta Lake. Delta Lake is built on the perquet format and provides some features to support more complex data analysis requirements. For example, Delta Lake has its own metadata management, which can handle, but bad at scale tables with billions of partitions and fails at ease. The most important is that it’s both CDC transaction, then we can unify bets and streaming, that is we can use Spark streaming to sync data to data table, and at the same time, we can query, we can do a batch query on the same table.

So, it is easy to build a real time data pipeline. We can also do the update, delete of mode of ratio on the data table. Besides, you can support schema enforcement and evolution which can provide better data quality and data management. Time travel provides snapshots of data, then we can query any earlier worsening of the data. Turn the lead, only Spark can write data through data, including batch mode and streaming mode, and Presto how spark can query data from data.

Our team also do loss or improvement of Delta Lakes. Firstly, we provide SQL support for some operations of Delta Lake, like update, delete, optimize and vacuum, etc, and provides asumistical related DDL and DML.

Additionally, we support HIVE and Presto to query data tables, which is different from the data, from the implement of data community. For example, once we create a table, once we create our own data table, we get queried by Presto or HIVE are in the community. Presto and HIVE both have their own tables.

Here’s our example to show how we use Presto and HIVE to query data table.

CDC solution using Spark Streaming SQL & Delta Lak

Above all, I have introduced our work on Spark Streaming SQL and Delta Lake. Next, I will introduce our CDC solution using Spark streaming SQL and the Delta Lake.

We can get some benefit from this solution. Firstly, binlog can eliminate the load pressure on the source database. Secondly, Delta Lake is just a static job. It has no extra servers to operate, secondly, Spark Streaming SQL can simplify the implement of replacing binlog.

There’s no need to write Java or Scala code, and last, the CDC pipeline provide low-latency data analysis. To change the data on the source table can be sync to Delta Lake with minutes latency, as the user can query the targeted data table immediately.

Next, I will show you how we build the CDC pipeline in detail.

First of all, we should have sync the binlog of the table to Kafka using double zoom or other similar products. If you’re listening to us, how different binlog format, so the binlog parser are also different, then we can use Spark Streaming SQL to consume the binlog from Kafka, and parse the binlog to Ganzel roll record data and the operation type of this record, like insert, update or delete, then we can merge this past record data to Delta Lake. Here is the example to show how to use Spark Streaming SQL to build the pipeline step by step. Step one, we should create two tables, one source, Kafka table, and another is target data table. Step two, we create a streaming scan on top of the Kafka table and set some parameters in options clause, like studying offsets, max offset per trigger. Step three is major logic of the CDC pipeline. We create a screen to wrap the merge into statements and the job parameters.

The merge into contain a using clause which define a sub query on the Kafka source table. For example, parsing have been locked, and it also specify the condition to determine whether to insert, update, delete data to the target data table. For example, when the record from the source table has our merged record in the target table, and the operation type in update, then we should update the target table with new record data. Step four, we can use Streaming SQL command to launch the SQL file. This command will launch a young client mode streaming job. After that, we have viewed CDC streaming pipeline and we can query the data table in the outer link, if there are some data changes in the source database table.

These figures show the online logic of merge into statement. As you can see, for each batch of the streaming, we call the data’s merge function to merge the parse binlog record into the target data table.

Long Running Stability Improvement How to handle small files?

After we launched our CDC stream job, there are some issues which will affect the job long running stability. The first one is how to handle the small files because we code that has much function, but each batch and commonly the batch interval is about one or several minutes, then we’ll get more and more small files after the job run many days, so we must take some action to handle this small files during the job running, for example, we can increase the batch interval to reduce the coding of the data merge function. And compaction is a important tool to merge the data small files to large. Compaction not only change the data, it’s just changed the data layout. We use both Spark SQL to run the optimized command to do compaction. Additionally, because data has much function, where large Spark job to do join. So we can use the adaptive execution mode to reduce the reducer tasks. This can also significantly reduce small files.

Next, I will introduce how we do compaction on the long running stream job. There’re two ways to do compaction, we can launch a scheduled compaction best job, hourly or daily. This job is just a simple optimized SQL command, but when scheduled compaction job run, it may cause the streaming job fail because our data transaction complete.

Here’s the left screen goes to the timeline of the transaction come in between the streaming batch and compacting batch job.

First, the stream batch job read the data table, the compacting job through the transaction commit. After that, the streaming batch job will do his own transact commit. It will adjust the committed complete with COBOL compaction transaction commits.

If the complete check failed, the streaming job will fail. The drive figure show how data through the logic of transaction commit complete check. There are three complete type, concurrent append exception, concurrent to delete read exception, concurrent delete, delete exception. For example, according to the left figure, the streaming best job and compaction job, read the same source data files and the combating job will delete the source data files and re-write to new files but not change the data.

Use our streaming best job to the merge logic and also delete the same source data file. This will cause concurrent delete, delete exception, because the deleted data by streaming the best job has been rewritten to new files, this data is not really deleted.

So to prevent the streaming pipeline file from transacting committed check, we also do some work on this. Why is that we fix a bug that when the streaming batch only contained, insert the binlog, it should be always succeeded to do transaction commit. Another is that we added retry for the file test job if the complete target failed.

Another way to do compaction is auto compaction. We pulled out compaction operations between the streaming batch. There is no complete because of sequential execution.

Currently, we support file comes strategy to determine whether we should do a compaction during the stream running. For example, if we found the files count is greater than our shared code setting, we will trigger compaction.

The third way to reduce small files is using adaptive execution, adaptive execution can auto merge small partitions to just decrease the number of reducers and decrease the number of output files.

Above all, I have introduced the small files issue and provides some solutions. Next, I will introduce another issue that is a performance issue, because the implement of data mode if you are launched to join jobs to do the merge logic, if the target data table size get larger, the join operation will take more time. This will decrease the performance and effect the long running stability. The longtime filter can help to improve the performance of join especially the streaming batch source data is small. Here’s the (murmurs) we summit.

And last, I will introduce the future works of the CDC solution.

Why schema change auto detected. We can auto detect the schema change and handle the schema during the job running. So there’s no need to stop the streaming job to handy. Another is the performance improvement, we want to implement the read mode feature, we can reduce the merge cost of the read side to prevent the processing time of the batch from increasing.

We also want to simplify the user experience by sync statement. We can hide the replay binlog logic, the user just tell us the source binlog format type and the target data table.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Jun Song


Jun Song, a senior engineer and big data expert @Alibaba, focusing on Spark area, especially Spark Core and Spark SQL. He is also an Apache Spark contributor and a winner of CloudSort Benchmark Competion 2016 using Spark as a compute engine. Additional he also submit benchmark report to TPC-DS website(, top one ranking, which is accomplished by lots of optimize for SparkSQL.