A best-practice Modern Data Platform with Azure Databricks and Delta

At element61 we love to work with your data. Our philosophy is providing you a Modern Data Platform in which you can do real-time analytics, big data batch analytics as well as apply Machine Learning and AI.

Using the Azure Cloud, one way of setting up a Modern Data Platform is using Databricks and Delta.

A best-practice Modern Data Platform with Azure Databricks and Delta

 

Databricks is an Azure partner providing a fully managed Spark environment running on top of Azure called ‘Azure Databricks’

 

A best-practice Modern Data Platform with Azure Databricks and Delta

 

Delta is an open-source module from Spark allowing us to unify streaming & batch analytics. Delta runs in Databricks and offers us major benefits in setting up a Data Lake. Read more about its benefits here.

 

 

A Modern Data Platform architecture with Azure Databricks

The below architecture is element61’s view on a best-practice modern data platform using Azure Databricks. Modern means we guarantee modern business needs:

  • We can handle real-time data from Azure Event Hub
  • We can leverage our Data Lake – e.g. Azure Data Lake Store
  • We can do both big data compute and real-time AI analytics using Spark & Python

In this demo we want to show this set-up end-to-end with data

  • Data comes in through a real-time event-hub
  • It then gets saved into our data lake in bronze
  • We then pick it up, in real time, and clean it to a silver layer in the data lake
  • Next, we apply, in real-time, business logic and write it to a data warehouse

Let’s get into the details in how to set this up.

A best-practice Modern Data Platform with Azure Databricks and Delta

 

The basics

In above design we leverage some of our overall best-practices in Data Lake engineering: It’s a best practice to organize your data lakes in different zones. We create 3 zones:

  • Bronze zone - keeps the raw data coming directly from the ingesting sources
  • Silver zone – keeps clean, filtered and augmented data
  • Gold zone - keeps the business value data

Now, let’s take a look at the technical data flow from start to end (incl. code) in how to capture real-time data and orchestrate it into a data lake using Azure Databricks and Delta.

 

Step 1: Sourcing real-time data from Azure Event Hub into Azure Databricks

Before connecting to Event Hub, we have to make sure that the “azure-eventhubs-spark” library is installed on the cluster. Now, using the connection string- primary key from the Event Hub’s shared access policies, we can create a Spark Structured Streaming Read Stream from Azure Event Hub using the code below.

# Create connectionstring to connect to Azure Event Hubs
connectionString = event_hub_connection_string
ehConf = {
   'eventhubs.connectionString' : connectionString,
   'eventhubs.consumerGroup' : 'spark'
}

# read the Azure Event Hub stream using Spark Structured Streaming
streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

 

Step 2: Mounting Azure Blob Storage to Azure Databricks

In order to save our delta tables to our existing storage account, we would need to create a mount point to directly access the storage from inside the notebooks. The screenshot below shows how to mount a storage container to the notebooks. The account key is stored in Azure Key Vault and is accessed from a secret scope. This link explains how to create a secret scope https://docs.azuredatabricks.net/user-guide/secrets/secret-scopes.html.

# Access the storage key from our Scoped Secrets in Databricks
storage_key = dbutils.secrets.get(scope = "keys", key = "storagekey")

# Mount our Azure Blob Storage account on our Databricks Cluster
dbutils.fs.mount(
    source = SOURCE,  # in format "wasbs://container_name@account_name.blob.core.windows.net"
    mount_point = "/mnt/deltatables",
    extra_configs = {"fs.azure.account.key.{account}.blob.core.windows.net".format(account="nmbsdatalake"):key}
)

 

Step 3: Streaming data to our first Delta Table (Bronze layer)

After creating the “streaming_df” dataframe which reads from the event hub, we want to write raw streaming data to a bronze delta table in blob storage. For this purpose, we define a path to the mounted point of blob storage where we will keep the raw data and the path where we will keep the checkpoints.

By specifying the option “checkpointLocation” in the write stream, we make sure that we keep checkpoints so that, in the case of failure, the stream may continue from where it failed. This guarantees data consistency and ensures fault tolerance of our streaming application.

By specifying the option .format(“delta”), we define that we want the incoming streaming data to be saved in a delta format to the specified path of the delta table.
The code for streaming from Event Hub to a bronze delta table:

checkpoint_path_bronze='/mnt/deltatables/checkpoints/bronze/'
bronze_path='/mnt/deltatables/bronze/'
streaming_df.writeStream\
  .format("delta") \
  .outputMode("append")\
  .option("checkpointLocation", checkpoint_path_bronze) \
  .start(bronze_path)

If all runs well you should see the Spark Structured Streaming job starting similar to below image

A best-practice Modern Data Platform with Azure Databricks and Delta

Step 4: Streaming data from our one Delta Table to another Delta Table (Bronze to Gold layer)

Same as before, we specify the path for the checkpoints. Here, however, the source of the stream will be the bronze table with raw events, and therefore, the path to the bronze table has to be given in the load option. In this stream we would like to clean the raw json events and apply some aggregations, hence the transformation function applied to the stream consists of individual steps that perform these actions.

In order to do this, we can use the foreachBatch option, which takes micro-batches of events and feeds each micro-batch as an input to the function that performs the transformation. In this stream we do not directly stream data to a location, but with the function applied with foreachBatch we aggregate and merge it to a silver delta table. With this merge we also make sure that there are no duplicate records in case of replayed events. The snippets of code for streaming from bronze to silver and the function for merging the data are shown below.

Partitioning the data in a delta lake table is advised as it allows to build the data lake in more structured way that makes the data more accessible and quickly reachable. The snippet below shows how to create a delta table which is partitioned by date.

 

bronze_path='/mnt/deltatables/bronze/'
checkpoint_path_silver='/mnt/deltatables/checkpoints/silver/'

(spark.readStream
       .format("delta")
       .load(bronze_path)
       .writeStream
       .format("delta")
       .foreachBatch(clean_events)
       .option("checkpointLocation", checkpoint_path_silver)
       .start()
       )

def aggregate(df, epochId):
   sqlContext.sql("drop table if exists df")
  df.write.saveAsTable("df")
  sqlContext.sql("""MERGE into silver
                     USING df
                     ON df.start_station = silver.start_station and
                     df.departure_timestamp=silver.departure_timestamp and
                     df.vehicle_id = silver.vehicle_id and
                     df.end_station=silver.end_station
                     WHEN NOT Matched
                     THEN INSERT (retrieval_timestamp,   retrieval_date,   station_id, 
                                   start_station,   longitude,   latitude, 
                                   departure_id,       cancelled,    delay, 
                                   departure_connection,   is_extra,  left,
                                   platform_id,  platform_normal,  end_station,
                                   vehicle_id,   departure_date,  departure_timestamp)
                                  VALUES
                                  (df.retrieval_timestamp,   df.retrieval_date,   df.station_id, 
                                  df.start_station,   df.longitude,   df.latitude, 
                                  df.departure_id,  df.cancelled,    df.delay,
                                  df.departure_connection,   df.is_extra,  df.left, 
                                  df.platform_id,   df.platform_normal, df.end_station,
                                  df.vehicle_id,  df.departure_date,   df.departure_timestamp)  """)


spark.sql("""
  CREATE TABLE silver
  (
  retrieval_timestamp STRING,
  retrieval_date STRING,
  station_id STRING,
  start_station STRING,
  longitude STRING,
  latitude STRING,
  departure_id STRING,
  cancelled STRING,
  delay STRING,
  departure_connection STRING,
  is_extra STRING,
  left STRING,
  platform_id STRING,
  platform_normal STRING,
  end_station STRING,
  vehicle_id STRING,
  departure_date STRING,
  departure_timestamp STRING,
  DelayMinutes STRING,
  DelayBins STRING
)
USING DELTA
PARTITIONED BY (retrieval_date)
LOCATION '{}' """.format(silver_path))

 

How to turn re-use this streaming pipeline in Batch?

The great thing about the delta feature is the ability to use the same functions for both real-time and batch pipelines. We have already applied the function that cleans the data and aggregates it to the real-time stream with foreachBatch, and now we can reuse the same function for the batch job.
Here, we can choose the source to be the raw events from the bronze table, or we can use the silver delta table to e.g. once per day enrich the data and insert it to a DWH. If we choose to have the bronze table as a source, we can re-apply the functions for cleaning and aggregating the data and potentially apply additional transformation to the data.

Conclusion

Any organization dealing with large amounts of unstructured data or having the need to combine real time and batch transformations should invest in developing the delta lake feature on top of their data lake as part of their data architecture. Some of the benefits are:

  • One (!) code base for both batch and real-time
  • One compute service to handle both pipelines
  • Same transformations for batch and streaming
  • Data consistency
  • Optimized performance
  • Efficiency when performing updates and deletions

 

element61 experience

We believe that Delta Lake will become an essential part of the modern data platform that will enable organizations to build data pipelines both efficiently and at scale.
Our team is very familiar with all the challenges that come with big data and data lakes and we have already helped customers overcome many of them by implementing Delta Lake at their organizations. We have the knowledge to help you integrate this as part of your existing data architecture.

 

Want to learn more about Spark, Azure Databricks and Delta?

Continue reading our other Azure Databricks and Spark articles