Structured Streaming Internals
With Lakehouse as the future of data architecture, Delta becomes the de facto data storage format for all the data pipelines. By using delta, to build the curated data lakes, users achieve efficiency and reliability end-to-end. Curated data lakes involve multiple hops in the end-to-end data pipeline, which are executed regularly (mostly daily) depending on the need. As data travels through each hop, its quality improves and becomes suitable for end-user consumption. On the other hand real-time capabilities are key for any business and an added advantage, luckily Delta has seamless integration with structured streaming which makes it easy for users to achieve real-time capability using Delta. Overall, Delta Lake as a streaming source is a marriage made in heaven for various reasons and we are already seeing the rise in adoption among our users.
In this talk, we will discuss various functional components of structured streaming with Delta as a streaming source. Deep dive into Query Progress Logs(QPL) and their significance for operating streams in production. How to track the progress of any streaming job and map it with the source Delta table using QPL. What exactly gets persisted in the checkpoint directory and its details. Mapping the contents of the checkpoint directory with the QPL metrics and understanding the significance of contents in the checkpoint directory with respect to Delta streams.
Shashidhar Eran…: Hi, everyone. Welcome to this particular session. This session is about a Delta Lake Streaming Under the hood, and we’ll discuss a lot more about Structured Streaming internals. Myself Shashidhar, I’m a senior resident solution engineer, part of Resident Solutions Architects group in Databricks EMEA. I work out of Databricks office in Netherlands.
Let’s get started with a sample stream. If you take this example stream here, I’m reading from a Delta table and writing to a Delta table with the checkpoint location specified. Whenever you submit this code into Spark execution engine, so what really happens is, Spark will read from your source table, process the data and writes it back to your output sink table and on doing so, in each micro-batch or in each trigger, it will also track the progress of your streaming job into a location called a checkpoint location that user has specified. As and when the new data arrives at your source Delta table, Spark will keep on ingesting the data incrementally from your source Delta table over time and process it and writes it to your output table.
If you think from physical components perspective of a stream, in terms of what really makes the streaming possible, we need to have a source from where Spark can read the input data incrementally and we need to define a sink where the output has to be stored into. We also need to specify a checkpoint location where the progress of your streaming query will be persisted as and when the micro-batch is processed. But here, we are talking about Delta Lake as a streaming source. So what is that component of Delta Lake which makes it an efficient streaming source? The answer is actually transaction logs or Delta table history.
This is what Spark Structured Streaming APIs will keep on monitoring and also use as a mechanism to read the incremental data from your source Delta table and process in a structured streaming fashion. In this stock, we’ll focus on how your transaction logs relate to the checkpoint location contents and how we can use this information to understand what really happens, what is really happening inside your streaming jobs where you are using Delta as a streaming source. As part of this, there are three different sections I want to go into more details about, to understand the correlation between the transaction logs and the checkpoint directory and make it an actionable insight. So here, I have listed three topics, like the Structured Streaming internals, Delta table properties, and what are the common issues that users face and what are the mitigation strategies that we advise for them.
In this particular presentation, we will focus only on the first aspect, which is nothing but Structured Streaming internals. So let’s get started. As part of Structured Streaming internals, we will focus on these three things with, of course, Delta as a source and Delta as a sink. We will focus on Query Progress Logs, which is nothing but a bunch of metrics that Spark Structured Streaming will emit after completion of every micro-batch. Next section is about streaming semantics with the Delta Lake. Here, we’ll see how we can use the Query Progress Logs and correlate it back to your Delta table history to understand that query progress efficiently and effectively. Finally, we’ll also inspect the checkpoint directory itself to see what exactly gets persisted into checkpoint directory and how the information that is inside checkpoint directory correlates back to your Query Progress Logs and also back to your Delta table history.
Let’s get started with Query Progress Log. What is a Query Progress Log? It is nothing but a JSON log which a Spark Structured Streaming will generate after completion of every micro-batch. These are bunch of metrics which gives a lot of execution details about your Structured Streaming job itself and these are internal metrics even Databricks also uses to display the insights from your Structured Streaming queries, if you have ever ran Structured Streaming jobs inside Databricks notebooks. So this is how it normally looks. Basically you have two tabs here. One is the dashboard, which is on the left-hand side, where we are picking up some of the key metrics from this JSON log that we get from Spark Streaming Engine and we’ll populate them in a graphical way for users to understand how their stream is behaving. But if you really want to see the raw metrics in JSON format, you can look at the right-hand side where I’m showing it as… Click on the raw data, and you will actually see the JSON itself displayed in the notebook UI.
Let us see what are the different type of metrics that we normally see inside these Query Progress Logs. At a high-level, we can divide these metrics into five different categories. First one is micro-batch execution. These are set of metrics which describes the micro-batch itself, so some of the things like what is the ID of your streaming query, which micro-batch it is, when did it trigger, things like that. And then, we have another set of metrics called source and sink metrics, which represents the state of your source, which you can correlate it back and understand how much of the data has been pulled from the source in that particular micro-batch and what exactly went into the sink. On the third, we have stream performance metrics. These are some of the key metrics which you can use to understand the behavior of your stream and if needed, you can use them to monitor and optimize your job if your stream is lagging behind.
We also have another set of metrics called batch duration metrics. These set of metrics give you the granular details of the time spent on different type of activities in your stream execution. Finally, we also have something called streaming state metrics, where all your state information will be captured in these metrics, if you are using any state related operations. So we’ll focus on some of these categories, which we’ll be using in our demo later, using which we can correlate between Delta table history and the Query Progress Logs.
The first category of metrics are batch execution metrics. Here, I have highlighted two key metrics, ID and batch ID. ID is nothing but unique ID which gets generated for every Structured Streaming query, whenever you start this query. Once it is generated, it is actually persisted inside the checkpoint directory and until you change the checkpoint directory, this unique ID won’t be changed. On the right-hand side, you can see under the notebook UI, I’m highlighting where you can find the ID for any given stream.
The second metric is batch ID, which is nothing but the micro-batch ID. This just represents which micro-batch is the Query Progress Logs that you are reading at any given point. On the right-hand side bottom, you can see the JSON representation of these two metrics I have highlighted. Of course, there are other metrics, but these are the some key metrics which I’ll be using to correlate Query Progress Logs and the Delta table history. The next set of metrics are source and sink metrics. On the right-hand side, you can see the JSON, complete JSON of source metrics and sink metric, when you’re using Delta as a source and Delta as a sink. As you can see in the sink, there is not much other than what exactly is the sink Delta table location. So we’ll ignore the sink part and we’ll focus only on the source metric side.
If you see the source metrics, we can divide them at a high-level into two different groups. One is startOffset and endOffset. StartOffset, as the name suggests, it represents the status of the source Delta table at which this particular micro-batch has started. Similarly, the endOffset represents the status of the Offset of your source Delta table at which that micro-batch has processed. Inside these Offsets, startOffset and endOffset, you have three different fields which are important. One is reservoirVersion. This is nothing but a Delta transaction ID for your Delta source table at which the micro-batch is operating on. The second one is index. This represents the filing index inside that given reservoirVersion or the Delta transaction ID. If you remember the way Delta transactions work, so you can have multiple files committed to a Delta table at a given transaction. So the index represents each individual file in that transaction.
And then, we have isStartingVersion, which is actually a BooleanField. It will be set to true, if your reservoirVersion represents the transaction ID of your source Delta table at which this particular stream has started at the first point. So it can happen like, you might start a stream from the Delta table where you already had a lot of data. In those cases isStartingVersion will be set to true for a long time. But if you’re starting from an empty Delta table and also the stream, then isStartingVersion will be advanced to false very quickly. Finally, we also have something called number of input rows, which represents how many rows we have ingested in that particular micro-batch.
Next, we move to performance metrics. Here, I have two other metrics like InputRowsPerSecond and processedRowsPerSecond. On the right-hand side, you can see how we display these metrics inside a Databricks notebook UI, and I’m also showing you the JSON complete values that we get from the Structured Streaming execution itself. What InputRowsPerSecond actually represents is the rate at which the data is getting pulled from your source Delta table into the Spark Execution Engine. It doesn’t really represent the rate at which you are ingesting the data into your source Delta table. And then, we have processedRowsPerSecond which is nothing but the rate at which you are processing the data inside your application, depending on your application code.
As you can see, if the processing rate remains higher than the input rate for a duration of time, maybe a couple of hours or couple of days that normally indicates your stream is lagging behind, and you might have to go back and figure it out like, what really happened? Do you have a spike in your input data? So you can adjust your cluster capacity or optimize your job or tune your job to perform or process your input data efficiently. So with these metrics, by understanding these set of metrics, let us go to the next section where it is about streaming semantics with the Delta Lake. So in here, we’ll take an example stream and we’ll see how the Query Progress Log changes for each micro-batch, given a source Delta table and the table history and we’ll try to correlate Query Progress Logs and the Delta table history.
Here, I have a example stream where I’m reading from a Delta table called delta_keyval and I’m writing to a Delta table called delta_stream, and I’m having this property called maxFilesPerTrigger as five, which actually represents number of files that Spark has to ingest in each micro-batch. If you don’t specify this, Spark will always fall back to the default value of 1,000. So keep in mind that if you don’t use, if you are not even using this, this property is always affecting your stream. I’m also having a trigger 60 seconds, by which we are actually telling Spark to trigger each micro-batch in every 60 second time. If you don’t use this, spark will fall back to default trigger or what we call as no trigger, where Spark will just start the new micro-batch as and when the previous micro-batch has finished. This is also what we call as micro-batch mode of execution. So this will be our example streaming job that we’ll be using for our demo purpose.
Then we have a Delta source table here. On the left-hand side, you can see the data that exists in the source table. Of course, we have three columns here, ID, key and value, but ignore the contents of what exactly they represent. But the main thing you have to keep in mind here is the count, which is 16,384 records are present in this source Delta table. On the right-hand side, you can see table history and it shows version zero which actually means I have committed eight files into this Delta table in one write operation. That is what actually you can observe in the operation metrics on the third column in the table history, where you can see numFiles equal to eight and numOutputRows equal to 16,384. At the bottom, I am representing in a more granular way of how the flies in a given version are treated for streaming. Here, we have version zero and we have eight files, right? So the index for the files will start from zero and ends at seven, because we have eight files here.
We’ll be using this graphical representation going forward, to understand the Query Progress Logs and how it maps to your table history. We have this source table now and we are having this application code here, with max files per trigger of five. Once we run the micro-batch, like the after completion of first micro-batch, we can see we got all the three columns populated. That is expected, because we are not doing anything in the stream itself. But the interesting thing is, if you observe the count of the destination table after first micro-batch, it shows 10,240 as the count. So let us see from the history perspective, what really happened here. We have a source table history. If you remember, I was showing it earlier like, version zero having numFiles eight, and this is the Query Progress Log that we get after execution of first batch. So here, as I was telling earlier about different type of metrics, we have startOffset and endOffset.
As you can observe, we have startOffset of null. Because this is the first batch, so we don’t have any startOffset. If you see the endOffset, the reservoirVersion is zero, which exactly maps to the version zero of your source Delta table. We also see that isStartingVersion Boolean flag is set to true, because this is the version at which the stream was initially started. If you observe on the top, batch ID is set to zero. If you also observe, number of input rows is 10,240 which represents the count that we saw in the destination table earlier. So if we observe the destination table history after first micro batch, we had two versions. The zero version you can ignore, because it’s a create table, because we are starting the stream to write to empty location.
The version one represents the commit, which happened from the Structured Streaming job. Here, you can see number of added files to five and also a number of output rows to 10,240 which maps to the 10,240 that we are seeing inside Query Progress Logs. To understand the index, let’s take a step back and let’s change the representation of this in more graphical manner. Here, I have Query Progress Logs for the batch zero, batch ID zero. I have startOffset and endOffset and number of input rows. These are the same values what I was showing in the previous slide. On the bottom, you have source table history where we have version zero, that is your Delta table history of source, and we have zero to seven that is representing the eight files that we have in our source table.
How can we understand the index four, is something like this. As you can see, in first batch, we have ingested file zero to file four, because we have earlier set max files for trigger to five and the index four represents the fifth file in version zero of your source Delta table. So what really happens in the next batch? Here, we have micro-batch ID one, where now we have the metrics populated for startOffset and which are exactly the same as the endOffset metrics in the previous batch. We have index four in the startOffset and index seven in the endOffset which technically represents the file five, six and seven in your source Delta table history. Now, you might think we have set max files figure to five, but we are processing only three files. That is because we only have three files yet to be processed, so Spark will just pull all the existing files in this micro-batch.
On the third batch, if you see, both startOffset and endOffset are same, index is set to seven, reservoir is zero and isStarting Version is also true. Number of input rows is now shown as zero, because we have processed all the data existing in our source Delta table now, and there is nothing much left for processing. Now what I’ll do is, I’ll add another set of eight files into my source Delta table which actually becomes a version one in your source Delta table. Earlier, we had only version zero, so now we have version one and again, the file index starts from zero to seven. Now, let us see what really happens with respect to our stream.
If you keep the stream running after addition of new set of eight files, as you can see now, the startOffset is same. We have reservoir version zero, index seven and isStarting Version to true. But if you observe the endOffset, the reservoir version is now changed to one and index is changed to four and isStarting Version is set to false, because the reservoir version one is not the starting version at which this stream was initially started. And number of input rows we have 10,240, because the index from seven to four represents file zero to file four in the version one of your source Delta table. So in the subsequent batch, we’ll see both the startOffset and endOffset having reservoir version one and index changed to four to seven, and we have around 6,000 records being processed. This is quite expected, but the interesting part is the next batch, like the last batch here. As you can see now, the indexes for startOffset and endOffset are both set to two, like the reservoir version is set to two and index is set to minus one.
If you think about what we had in our source table, we only had two versions, right? Version zero and version one, but Spark has automatically incremented the version to the next possible version in your source Delta table and the index is set to minus one, so that whenever the new file arrives, it will be anyway index zero and it will be considered for processing. So if you observe, the index doesn’t change to minus one last time when we had no files to process, that is because isStarting Version was set to true. But when we have isStarting Version set to false and we don’t have any files to process, Spark will automatically increment your reservoir version and index to the next possible increment value. So this is how currently the indexes are managed depending on the Boolean flag, that is isStarting Version.
The next section is a Streaming Checkpoint. Now here, we’ll see the contents of the checkpoint directory. Before going and checking the contents of the checkpoint directory, so let’s take a step back and understand what we had initially. We had this stream reading from a Delta table, processing using Spark and writing to a Delta table and also persisting the progress of your stream into the checkpoint location. If you take different actions into consideration, what really happens underneath, we can divide this into three different steps. One is construction of the micro-batch. The second step is processing the micro-batch and the third step is to commit micro-batch and these actually, we try to display them inside notebook UI, if you’re running streams in Databricks notebooks.
So this is how it looks. Here, I have a stream which is actually stopped. That’s why normally you see, here it is showing as stream is inactive, but if you have the stream up and running, you will see different stages of the stream, which is nothing but waiting for next trigger or processing the new data, depending on the step that is currently in, in that particular moment. So with this, let us quickly get into action and see what really gets committed into checkpoint location. Here, we have three different contents, offsets, metadata, and commits, and all the details in the checkpoint directory will be stored in JSON format. First, let’s take the Offsets folder. Here you can see, one file will be generated inside Offsets folder for every micro-batch, and it will be generated whenever your batch starts, that is the step one. We basically have two different details here, batch and streaming state details and the source details, but we’ll focus only on the source details in this stock.
To understand more, what I’m doing here is, I’ve taken an example Offset file which is Offsets/zero, which has nothing but Offset file generated for micro-batch ID zero. Then, we are taking the content of this file. Here, I have this content returning to the checkpoint location. If we take the Query Progress Log for this particular micro-batch, we have reservoir version zero which is mapping exactly to the content of the Offsets file and also index and isStartingVersion is also matching. So they directly map to each other and it represents the same thing what we discussed with respect to Query Progress Logs. As you can see, we had startOffset null. That’s not the reason why we are not seeing anything for startOffset here. Spark will always store the contents of the endOffset. It will never store the content of the startOffset, because it is just redundant, because anyway you can go back and see the previous micro-batch Offset file if you really want.
The next content is metadata folder. It is nothing but the stream ID itself, which is what gets generated whenever your streaming query started at the first point. This is the unique ID what I was telling earlier. The next one is Commits folder. This normally happens in the step three of the execution and again, one file per micro-batch will be created under Commits folder, and this file doesn’t contain much. It just represents the completion of the micro-batch. If this file is not generated, that normally represents the micro-batch has failed, and Spark has to handle the previously failed batch first, before starting the new batch. This is one of the reason why whenever you change the configuration options for your stream, it might not kick in when you restart the stream. So wait for first batch to complete, and see whether from the second batch, all your new conflicts are picked up or not.
That brings us to the summary. So what we really understand? From the Query Progress Logs, we understand different metrics which explains the stream execution patterns and with respect to Delta streaming, we understood how we can map the Delta transaction logs to the Query Progress Logs. With respect to stream checkpoints, we understand the importance of Offsets and Commits and how Spark uses them to track the stream progress and also bring the fault tolerance capabilities. Before I conclude, I just want to leave you with what is coming next. In the next series of Delta Lake Streaming Under the hood, I’ll be talking about other variations of Delta stream. There are different modes like TriggerOnce and maxBytesPerTrigger and we’ll be talking more about Delta table properties and what are the common issues that users face when running the entire streams in production, and what we advise as a mitigation strategies for them. So, that’s it from my side. I hope you enjoyed it. Enjoy the summit and don’t forget to give the feedback. Thank you.
Shasidhar is part of Resident Solutions Architects team at Databricks. He is an expert in designing and building batch and streaming applications at scale using Apache Spark. At Databricks he works...