In digital advertising, one of the most important things to be able to deliver to clients is information about how their advertising spend drove results. The more quickly we can provide this, the better. To tie conversions or engagements to the impressions served in an advertising campaign, companies must perform attribution. Attribution can be a fairly expensive process, and running attribution against constantly updating datasets is challenging without the right technology. Traditionally, this has not been an easy problem to solve as there are lots of things to reason about:
- How do we make sure the data can be written at low latency to a read location without corrupting records?
- How can we continuously append to a large, query-able dataset without exploding costs or loss of performance?
- And where and how should I introduce a join for attribution?
Fortunately, Databricks makes this easy with Structured Streaming and Databricks Delta. In this blog post (and associated notebook), we are going to take a quick look at how to use the DataFrame API to build Structured Streaming applications on top of Kinesis (for those using Azure Databricks, you can use Azure EventHubs, Apache Kafka on HDInsight, or Azure Cosmos DB integration), and use Databricks Delta to query the streams in near-real-time. We will also show how you can use the BI tool of your choice to review your attribution data in real-time.
The first thing we will need to do is to establish the impression and conversion data streams. The impression data stream provides us a real-time view of the attributes associated with those customers who were served the digital ad (impression) while the conversion stream denotes customers who have performed an action (e.g. click the ad, purchased an item, etc.) based on that ad.
With Structured Streaming in Databricks, you can quickly plug into the stream as Databricks supports direct connectivity to Kafka (Apache Kafka, Apache Kafka on AWS, Apache Kafka on HDInsight) and Kinesis as noted in the following code snippet (this is for impressions, repeat this step for conversions)
// Read impressions stream val kinesis = spark.readStream .format("kinesis") .option("streamName", kinesisStreamName) .option("region", kinesisRegion) .option("initialPosition", "latest") .option("awsAccessKey", $awsAccessKeyId$) .option("awsSecretKey", $awsSecretKey$) .load()
Next, create data streams schema as noted in the following code snippet.
// Define impressions stream schema val schema = StructType(Seq( StructField("uid", StringType, true), StructField("impTimestamp", TimestampType, true), StructField("exchangeID", IntegerType, true), StructField("publisher", StringType, true), StructField("creativeID", IntegerType, true), StructField("click", StringType, true), StructField("advertiserID", IntegerType, true), StructField("browser", StringType, true), StructField("geo", StringType, true), StructField("bidAmount", DoubleType, true) ))
Finally, we will want to create our streaming impressions DataFrame. With the Databricks display command, we will see both the data and the input/processing rate in real-time alongside our data.
// Define streaming impressions DataFrame val imp = kinesis.select(from_json('data.cast("string"), schema) as "fields"').select($"fields.*") // View impressions real-time data display(imp)
Sync Streams to Databricks Delta
The impression (
imp) and conversion (
conv) streams can be synced directly to Databricks Delta allowing us a greater degree of flexibility and scalability for this real-time attribution use-case. t allows you to quickly write these real-time data streams into Parquet format on S3 / Blob Storage while allowing users to read from the same directory simultaneously without the overhead of managing consistency, transactionality and performance yourself. As noted in the following code snippet, we’re capturing the raw records from a single source and writing it into its own Databricks Delta table.
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window // Persist Impression `imp` data to Databricks Delta imp.withWatermark("impTimestamp", "1 minute") .repartition(1) .writeStream .format("delta") .option("path","/tmp/adtech/impressions") .option("checkpointLocation","/tmp/adtech/impressions-checkpoints") .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 seconds")).start()
It is important to note that with Databricks Delta, you can also:
- Apply additional ETL, analytics, and/or enrichment steps at this point
- Write data from different streams or batch process and different sources into the same table
Databricks Delta Views for Ad Hoc Reporting
Now that we have created our impression and conversion Databricks Delta tables, we will create named views so we can easily execute our joins in Spark SQL as well as make this data available to query from your favorite BI tool. Let’s first start with creating our Databricks Delta views.
%sql use adtech; create or replace view impressionsDelta as select * from delta.`/tmp/adtech/impressions`; create or replace view conversionsDelta as select * from delta.`/tmp/adtech/conversions`;
Calculate Real-time Attribution
Now that we have established our Databricks Delta views, we can calculate our last touch attribution on the view and then calculate weighted attribution on the view.
Calculate Last Touch Attribution on View
To calculate the real-time window attribution, as noted in the preceding sections, we will need to join two different Delta streams of data: impressions and conversions. As noted in the following code snippet, we will first define our Databricks delta-based impressions and conversions. We will also define window specification which will be used by the following dense_rank() statement. The window and rank define our attribution logic.
// Define needed Impression data val imps = spark.sql("select uid as impUid, advertiserID as impAdv, * from sparksummit.imps").drop("advertiserID") // Define needed Conversions Data val convs = spark.sql("select * from sparksummit.convs") // Define Spark SQL window ordered by Impression Timestamp // partitioned by Impression Uid and Impression Advertisor val windowSpec = Window.partitionBy("impUid","impAdv").orderBy(desc("impTimestamp")) // Calculate real-time attribution by // joining impression.impUid == conversion.uid // ensure impression time happened before conversion time // filtering via dense_rank val windowedAttribution = convs.join(imps, imps.col("impUid") === convs.col("uid") && imps.col("impTimestamp") Now we can calculate our real-time window attribution by joining the impression and conversion data together but filtering for the most recent impression user id (as defined by impUid). Finally, we can create a <i>global temporary view</i> (.createGlobalTempView) so this real-time view is accessible by downstream systems. For example, you can view your real-time data using Spark SQL in the following code snippet.
%sql select * from global_temp.realTimeAttribution
As well, you can plug in your favorite BI tool such as Tableau to perform ad-hoc analysis of your data.
Calculate Weighted Attribution on View
In the preceding case, we demonstrate a very naive model — isolating all of a user’s impressions prior to conversion, selecting the most recent, and attributing only the most recent impression prior to conversion. A more sophisticated model might apply attribution windows or weight the impressions by time such as the code snippet below.
// Define attribution window for impressions val attrWindow = Window .partitionBy("uid") .orderBy($"impTimestamp".desc) val attrRank = dense_rank().over(attrWindow) // Define ranked window by taking attribution window and // partition by conversionID val rankedWindow = Window .partitionBy("conversionID") val numAttrImps = max(col("attrRank")).over(rankedWindow) // Reference impression table val imps = spark.sql("select * from sparksummit.imps").withColumn("date", $"impTimestamp".cast(DateType)).drop("advertiserID") // Reference conversion table val convs = spark.sql("select * from sparksummit.convs").withColumnRenamed("uid","cuid") // Join impression and conversions val joined = imps.join(convs, imps.col("uid") === convs.col("cuid") && imps.col("impTimestamp")
In this blog, we have reviewed how Databricks Delta provides a simplified solution for a real-time attribution pipeline. The advantages of using Databricks Delta to sync and save your data streams include (but not limited to) the ability to:
- Save and persist your real-time streaming data like a data warehouse because Databricks Delta maintains a transaction log that efficiently tracks changes to the table.
- Yet still, have the ability to run your queries and perform your calculations in real-time and completing in seconds
- With Databricks Delta, you can have multiple writers can simultaneously modify a dataset and still see consistent views.
- Writers can modify a dataset without interfering with jobs reading the dataset.
- An important optimization is that Databricks Delta avoids the “many small file” problem typical of many big data projects because it features automatic file management that organizes data into large files so that they can be read efficiently.
- Statistics enable speeding up reads by 10-100x and data skipping avoids reading irrelevant information.
Together with your streaming framework and the Databricks Unified Analytics Platform, you can quickly build and use your real-time attribution pipeline with Databricks Delta to solve your complex display advertising problems in real-time.