2-Retail_DLT_CDC_sql(SQL)

Loading...

Implement CDC In DLT Pipeline: Change Data Capture


By Morgan Mazouchi

%python
slide_id = '10Dmx43aZXzfK9LJvJjH1Bjgwa3uvS2Pk7gVzxhr3H2Q'
slide_number = 'id.p9'
 
displayHTML(f'''<iframe
 src="https://docs.google.com/presentation/d/{slide_id}/embed?slide={slide_number}&rm=minimal"
  frameborder="0"
  width="75%"
  height="600"
></iframe>
''')

Importance of Change Data Capture (CDC)

Change Data Capture (CDC) is the process that captures the changes in records made to a data storage like Database, Data Warehouse, etc. These changes usually refer to operations like data deletion, addition and updating.

A straightforward way of Data Replication is to take a Database Dump that will export a Database and import it to a LakeHouse/DataWarehouse/Lake, but this is not a scalable approach.

Change Data Capture, only capture the changes made to the Database and apply those changes to the target Database. CDC reduces the overhead, and supports real-time analytics. It enables incremental loading while eliminates the need for bulk load updating.

CDC Approaches

1- Develop in-house CDC process:

Complex Task: CDC Data Replication is not a one-time easy solution. Due to the differences between Database Providers, Varying Record Formats, and the inconvenience of accessing Log Records, CDC is challenging.

Regular Maintainance: Writing a CDC process script is only the first step. You need to maintain a customized solution that can map to aformentioned changes regularly. This needs a lot of time and resources.

Overburdening: Developers in companies already face the burden of public queries. Additional work for building customizes CDC solution will affect existing revenue-generating projects.

2- Using CDC tools such as Debezium, Hevo Data, IBM Infosphere, Qlik Replicate, Talend, Oracle GoldenGate, StreamSets.

In this demo repo we are using CDC data coming from a CDC tool. Since a CDC tool is reading database logs: We are no longer dependant on developers updating a certain column

— A CDC tool like Debezium takes care of capturing every changed row. It records the history of data changes in Kafka logs, from where your application consumes them.

Setup/Requirements:

Prior to running this notebook as a pipeline, make sure to include a path to 1-CDC_DataGenerator notebook in your DLT pipeline, to let this notebook runs on top of the generated CDC data.

How to synchronize your SQL Database with your Lakehouse?

CDC flow with a CDC tool, autoloader and DLT pipeline:

  • A CDC tool reads database logs, produces json messages that includes the changes, and streams the records with changes description to Kafka
  • Kafka streams the messages which holds INSERT, UPDATE and DELETE operations, and stores them in cloud object storage (S3 folder, ADLS, etc).
  • Using Autoloader we incrementally load the messages from cloud object storage, and stores them in Bronze table as it stores the raw messages
  • Next we can perform APPLY CHANGES INTO on the cleaned Bronze layer table to propagate the most updated data downstream to the Silver Table

Here is the flow we'll implement, consuming CDC data from an external database. Note that the incoming could be any format, including message queue such as Kafka.

Make all your data ready for BI and ML

How does CDC tools like Debezium output looks like?

A json message describing the changed data has interesting fields similar to the list below:

  • operation: an operation code (DELETE, APPEND, UPDATE, CREATE)
  • operation_date: the date and timestamp for the record came for each operation action

Some other fields that you may see in Debezium output (not included in this demo):

  • before: the row before the change
  • after: the row after the change

To learn more about the expected fields check out this reference

SET
  spark.source;
CREATE
  OR REFRESH STREAMING LIVE TABLE customer_bronze (
    address string,
    email string,
    id string,
    firstname string,
    lastname string,
    operation string,
    operation_date string,
    _rescued_data string
  ) TBLPROPERTIES ("quality" = "bronze") COMMENT "New customer data incrementally ingested from cloud object storage landing zone" AS
SELECT
  *
FROM
  cloud_files(
    "${source}/customers",
    "json",
    map("cloudFiles.inferColumnTypes", "true")
  );
 
message
1
This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table.

Showing all 1 rows.

CREATE OR REFRESH TEMPORARY STREAMING LIVE TABLE customer_bronze_clean_v(
  CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_address EXPECT (address IS NOT NULL),
  CONSTRAINT valid_operation EXPECT (operation IS NOT NULL) ON VIOLATION DROP ROW
)
TBLPROPERTIES ("quality" = "silver")
COMMENT "Cleansed bronze customer view (i.e. what will become Silver)"
AS SELECT * 
FROM STREAM(LIVE.customer_bronze);
CREATE OR REFRESH STREAMING LIVE TABLE customer_silver
TBLPROPERTIES ("quality" = "silver")
COMMENT "Clean, merged customers";
APPLY CHANGES INTO LIVE.customer_silver
FROM stream(LIVE.customer_bronze_clean_v)
  KEYS (id)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY operation_date --auto-incremental ID to identity order of events
  COLUMNS * EXCEPT (operation, operation_date, _rescued_data);

Next step, create DLT pipeline, add a path to this notebook and add configuration with enabling applychanges to true. For more detail see notebook "PipelineSettingConfiguration.json".

After running the pipeline, check "3. Retail_DLT_CDC_Monitoring" to monitor log events and lineage data.