Engineering blog

Using Apache Flink With Delta Lake

Incorporating Flink datastreams into your Lakehouse Architecture
Share this post

As with all parts of our platform, we are constantly raising the bar and adding new features to enhance developers’ abilities to build the applications that will make their Lakehouse a reality. Building real-time applications on Databricks is no exception. Features like asynchronous checkpointing, session windows, and Delta Live Tables allow organizations to build even more powerful, real-time pipelines on Databricks using Delta Lake as the foundation for all the data that flows through the Lakehouse.

However, for organizations that leverage Flink for real-time transformations, it might appear that they are unable to take advantage of some of the great Delta Lake and Databricks features, but that is not the case. In this blog we will explore how Flink developers can build pipelines to integrate their Flink applications into the broader Lakehouse architecture.

A stateful Flink application

Let’s use a credit card company to explore how we can do this.

For credit card companies, preventing fraudulent transactions is table-stakes for a successful business. Credit card fraud poses both reputational and revenue risk to a financial institution and, therefore, credit card companies must have systems in place to remain constantly vigilant in preventing fraudulent transactions. These organizations may implement monitoring systems using Apache Flink, a distributed event-at-a-time processing engine with fine-grained control over streaming application state and time.

Below is a simple example of a fraud detection application in Flink. It monitors transaction amounts over time and sends an alert if a small transaction is immediately followed by a large transaction within one minute for any given credit card account. By leveraging Flink’s ValueState data type and KeyedProcessFunction together, developers can implement their business logic to trigger downstream alerts based on event and time states.


import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.entity.Transaction

object FraudDetector {
  val SMALL_AMOUNT: Double = 1.00
  val LARGE_AMOUNT: Double = 500.00
  val ONE_MINUTE: Long     = 60 * 1000L
}

@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

  @transient private var flagState: ValueState[java.lang.Boolean] = _
  @transient private var timerState: ValueState[java.lang.Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    flagState = getRuntimeContext.getState(flagDescriptor)

    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
    timerState = getRuntimeContext.getState(timerDescriptor)
  }

  override def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    // Get the current state for the current key
    val lastTransactionWasSmall = flagState.value

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
        // Output an alert downstream
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // Clean up our state
      cleanUp(context)
    }

    if (transaction.getAmount 

In addition to sending alerts, most organizations will want the ability to perform analytics on all the transactions they process. Fraudsters are constantly evolving the techniques they use in the hopes of remaining undetected, so it is quite likely that a simple heuristic-based fraud detection application, such as the above, will not be sufficient for preventing all fraudulent activity. Organizations leveraging Flink for alerting will also need to combine disparate data sets to create advanced fraud detection models that analyze more than just transactional data, but include data points such as demographic information of the account holder, previous purchasing history, time and location of transactions, and more.
Try Databricks for free

Related posts

Engineering blog

10 Powerful Features to Simplify Semi-structured Data Management in the Databricks Lakehouse

November 11, 2021 by John O'Dwyer and Emma Liu in Engineering Blog
Hassle Free Data IngestionDiscover how Databricks simplifies semi-structured data ingestion into Delta Lake with detailed use cases, a demo, and live Q&A. WATCH...
Engineering blog

A look at the new Structured Streaming UI in Apache Spark 3.0

This is a guest community post from Genmao Yu, a software engineer at Alibaba. Structured Streaming was initially introduced in Apache Spark 2.0...
Company blog

Building a Real-Time Attribution Pipeline with Databricks Delta

August 9, 2018 by Caryl Yuhas in Company Blog
Try this notebook in Databricks In digital advertising, one of the most important things to be able to deliver to clients is information...
See all Open Source posts