Consumer Lag in Delta Lake

ImageSource: https://www.confluent.io/blog/kafka-lag-monitoring-and-metrics-at-appsflyer/

What is Delta lake?
Delta Lake is an open-source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Delta Lake runs on top of your existing data lake and is fully compatible with Apache Spark APIs. More information about delta lake can be found on the delta home page

What is Consumer Lag?
Consumer lag is the lag between the consumer and producer. When the consumer is consuming at a slower pace than the producer is producing, then this lag increases and will lead to various problems in the production system like the data freshness. This lag is termed consumer lag. In an ideal system, consumer lag should be zero, which means that capacity of the consumer must always be greater than the capacity of the producer.

We have active monitoring present in the streaming system like Kafka or MSK to monitor consumer lag. Since Delta lake can also be used as the stream source as a producer (https://docs.delta.io/latest/delta-streaming.html), But there is no monitoring available for consumer lag.

Why monitoring is Consumer Lag is so important?
Consumer lag plays a very important role in the streaming system, if we don't monitor consumer lag we might reach the state of permanent data loss as there must be a retention period of the data present in the Kafka or streaming source and if that data is not consumed within the retention time window the data may lose permanently.
Also, if we don't have a finite data retention window then there must be a case then the data freshness for the consumers (consumers who are processing the data) is impacted. Consumers may be processing data that is older and as the processing speed of consumer

We can calculate the lag from the logs of the delta table themselves, delta lake keeps all its checkpoints offset in folder “_checkpoint/offsets”, we can use the same offset for multiple applications and print the logline for the lag. I have used the same application path location for the application logs in EMR.

“/mnt/var/log/hadoop-yarn/containers/application_lags/container_lags/”

After that, we can use Splunk forwarder to forward these logs for the creation of dashboards or alerts.

A small scala program to calculate and print consumer lag.


import org.apache.spark.sql.SparkSession

object DeltaLakeLagCalculator {

def main(args: Array[String]): Unit = {

val sparkSession = SparkSession
.builder()
.appName("DeltaLakeStreamLag")
.getOrCreate()

val states = Map(
"App1"
-> "s3://<CheckPoint_location_App1>/_delta_log/",
"App2"
-> "s3://<CheckPoint_location_2>/_delta_log/"
)

import sparkSession.implicits._
import org.apache.spark.sql.functions._

for (pair <- states) {

val application = pair._1
val deltaLogLocation = pair._2

val log_output_path = s"/mnt/var/log/hadoop-yarn/containers/application_lags/container_lags/" +
s"${application}/stdout"
val offsetFileList =
sparkSession
.read.format("json")
.load(s"s3://<application_path_to_stream>/" +
s"${application}/v1/_checkpoint/offsets/")
.select("reservoirVersion")
.filter("reservoirVersion is not null")
.withColumn(
"path", concat(lit(deltaLogLocation),
lpad($"reservoirVersion",
20, "0"), lit(".json")))
.select("path")
.collect()
.map(_ (0).toString).toList

val currentOffsets =
sparkSession
.read
.json(offsetFileList: _*)
.select("commitInfo.timestamp")
.filter("commitInfo.timestamp is not null")
.distinct().withColumn("current_timestamp", unix_timestamp() * 1000)

val diff_secs_col = col("current_timestamp").cast("long") -
col("timestamp").cast("long")
val lag_from_current_time = currentOffsets
.withColumn("diff_hours", diff_secs_col / 3600000)
.select("diff_hours")

val list = lag_from_current_time.agg(max($"diff_hours") as "total_lag").collect()

list.foreach(element => println("lag=" + element, " Application=" + "application"))

}
}

}

--

--

--

Software engineer at Expedia Group. Passionate about data-science and Big Data technologies.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Variadic Function in C Programming

UDFs vs Map vs Custom Spark-Native Functions

Linux_Kernel!!!

Symfony 5: The Fast Track

Want to join Tezos India Fellowship- Here is the details.

Stacks Data Structures

Development Updates from Telos Core Developers

Facilitation Frenzy

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
AS

AS

Software engineer at Expedia Group. Passionate about data-science and Big Data technologies.

More from Medium

How poor provisioning of cloud resources can lead to 10X slower Apache Spark jobs

from the above table we can easily understand that 1st offset will process the 24 rows(4+12+08)…

Declarative Data Lake using Apache Hudi

Introduction to Modak’s Almaren Framework