Is Delta lake streaming a production-ready?

AS
4 min readJul 14, 2020

Delta lake comes with awesome features to overcome the outcomes of spark or any big data platform. So what outcomes does delta lake guarantees to support

  • ACID transactions on Data Lake
  • Schema Enforcement
  • Time Travel Capabilities

Delta-lake is safe and it is widely used in various production platforms. In this article, I will not talk about the features which delta lake provides rather than highlight some problem which I have faced while using delta-lake as a streaming solution for my data lake. I will talk here about some major or minor problems which we have faced while using delta lake as a solution for streaming data from data-lake

  • Consumer lag? How can we calculate consumer lag from delta lake for the different consumers?
  • How to handle or defeat the corrupt record and move forward without failing again and again?
  • Reset offset to the latest rather than from the beginning of time?
  • How to get information about multiple consumers

As we all are well aware that we can use the streaming directly from the s3

val sparkStreamingContext = new org.apache.spark.streaming.StreamingContext(
sc,Seconds(60))
val lines = sparkStreamingContext.textFileStream("s3n://path to bucket")
lines.print()
ssc.start() // Start the computation
ssc.awaitTermination()

But there are various problems of maintaining data in the single folder in s3 also the size of the checkpoint directory kept on increasing, So it is good to use the s3 bucket directly as some streaming engine but not very useful when the amount of data flow is large.

what is the difference when we use s3 directly as streaming source and delta-lake as streaming engine?

> We can use partitioned data in delta-lake that's totally not supported in s3 streaming

events.write.partitionBy("date").format("delta").save("/mnt/delta/events")

> Other benefits which we can achieve using delta table are “VACCUM”, “OPTIMIZE”, also you can limit the streaming flow by max-offset-per-trigger

OPTIMIZE for data optimization merging small files and increasing performance

OPTIMIZE events ZORDER BY (eventType)

VACUUM for managing data retentions

VACUUM events RETAIN 24 HOURS ## Managing retentions

Now let's talk about the limitations which arise when using delta-lake as streaming solutions

Consumers and consumer-lag

1- How to get information about the consumers who are currently streaming and consuming data from delta-lake?

To answer this question it can be easily managed by introducing logging in your consumers which gets you to track about the path and consumers from that path

2- How you will calculate the consumer lag?

For calculating consumer lag in any delta-lake table which arises due to the heavy inflow of data for this you need to keep an extra variable like ingest-time in delta lake for each record and can use the logger for calculating the difference between the current time and ingest-time

Ignore Corrupt record

As schema enforcement and evolution is very useful to stop the data corruption in data-lake, but in the real-world scenario, we will not be always handled with good and proper data. We always deal with junk and schema-less data so that from them we can fetch and give some meaning full information.

A very good article about schema enforcement and evolution https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html

So we required rather than failing the entire job for storing data in delta lake we can ignore one or two corrupt records and store those records in some different locations which can be referred to in future and film must go on without providing a production fix ASAP. As we can not let the production job continuously failing.

This can be achieved by filtering data after reading, Delta lake lacks in features which parquet directly provides like ignoring and merging files

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")
spark.conf.set("spark.sql.parquet.filterPushdown", "true")

Resetting the offset

Now the problem arises when I want to rest the offset of a consumer. Let's suppose I am writing data in delta-lake for about a month now at this point I realize that I need to start streaming data from this table to another sink. So I need to start streaming data from the latest offset instead of reading and streaming data from the beginning of time.

This problem can also be addressed by introducing ingest-time in the data and write filtering logic in the consumer for filtering the records but that comes at an extra computation cost.

Getting information about multiple consumers

Now If a delta-table is consumed by more than one consumer from delta-lake how to keep records about the consumers. When you are managing a data lake as a solution for different teams how you can audit the multiple consumers in a table

So it comes to concluding this discussion, So delta-lake is zero cost streaming solution which has various features. But before jumping into delta lake I would recommend thinking about the schema and some basic logging so that you can overcome all the small problems which you may face in the later time

--

--

AS

Software engineer at Expedia Group. Passionate about data-science and Big Data technologies.