Consumer Lag in Delta Lake

AS
3 min readDec 15, 2021
ImageSource: https://www.confluent.io/blog/kafka-lag-monitoring-and-metrics-at-appsflyer/

What is Delta lake?
Delta Lake is an open-source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of your existing data lake and is fully compatible with Apache Spark APIs. More information about delta lake can be found on the delta home page

What is Consumer Lag?
Consumer lag is the lag between the consumer and producer. When the consumer is consuming at a slower pace than the producer is producing, then this lag increases and will lead to various problems in the production system like the data freshness. This lag is termed consumer lag. In an ideal system, consumer lag should be zero, which means that capacity of the consumer must always be greater than the capacity of the producer.

We have active monitoring present in the streaming system like Kafka or MSK to monitor consumer lag. Since Delta lake can also be used as the stream source as a producer (https://docs.delta.io/latest/delta-streaming.html), But there is no monitoring available for consumer lag.

Why monitoring is Consumer Lag is so important?
Consumer lag plays a very important role in the streaming system, if we don't monitor consumer lag we might reach the state of permanent data loss as there must be a retention period of the data present in the Kafka or streaming source and if that data is not consumed within the retention time window the data may lose permanently.
Also, if we don't have a finite data retention window then there must be a case then the data freshness for the consumers (consumers who are processing the data) is impacted. Consumers may be processing data that is older and as the processing speed of consumer

We can calculate the lag from the logs of the delta table themselves, delta lake keeps all its checkpoints offset in folder “_checkpoint/offsets”, we can use the same offset for multiple applications and print the logline for the lag. I have used the same application path location for the application logs in EMR.

“/mnt/var/log/hadoop-yarn/containers/application_lags/container_lags/”

After that, we can use Splunk forwarder to forward these logs for the creation of dashboards or alerts.

A small scala program to calculate and print consumer lag.


import org.apache.spark.sql.SparkSession

object DeltaLakeLagCalculator {

def main(args: Array[String]): Unit = {

val sparkSession = SparkSession
.builder()
.appName("DeltaLakeStreamLag")
.getOrCreate()

val states = Map(
"App1"
-> "s3://<CheckPoint_location_App1>/_delta_log/",
"App2"
-> "s3://<CheckPoint_location_2>/_delta_log/"
)

import sparkSession.implicits._
import org.apache.spark.sql.functions._

for (pair <- states) {

val application = pair._1
val deltaLogLocation = pair._2

val log_output_path = s"/mnt/var/log/hadoop-yarn/containers/application_lags/container_lags/" +
s"${application}/stdout"
val offsetFileList =
sparkSession
.read.format("json")
.load(s"s3://<application_path_to_stream>/" +
s"${application}/v1/_checkpoint/offsets/")
.select("reservoirVersion")
.filter("reservoirVersion is not null")
.withColumn(
"path", concat(lit(deltaLogLocation),
lpad($"reservoirVersion",
20, "0"), lit(".json")))
.select("path")
.collect()
.map(_ (0).toString).toList

val currentOffsets =
sparkSession
.read
.json(offsetFileList: _*)
.select("commitInfo.timestamp")
.filter("commitInfo.timestamp is not null")
.distinct().withColumn("current_timestamp", unix_timestamp() * 1000)

val diff_secs_col = col("current_timestamp").cast("long") -
col("timestamp").cast("long")
val lag_from_current_time = currentOffsets
.withColumn("diff_hours", diff_secs_col / 3600000)
.select("diff_hours")

val list = lag_from_current_time.agg(max($"diff_hours") as "total_lag").collect()

list.foreach(element => println("lag=" + element, " Application=" + "application"))

}
}

}

--

--

AS

Software engineer at Expedia Group. Passionate about data-science and Big Data technologies.