What is Auto Loader?
Auto Loader is a Spark feature that allows this out-of-the box. It allows to incrementally load the data as soon as it lands on the cloud storage. Auto Loader is a rather new feature and a very simple add-on in your existing Spark jobs & processes. Getting started with Auto Loader is as simple as using its dedicated cloud file source within your Spark code.
df = spark.readStream.format("cloudFiles")\ .option("cloudFiles.format", "csv")\
Under the hood (in Azure Databricks), running Auto Loader will automatically set up an Azure Event Grid and Queue Storage services. Through these services, auto loader uses the queue from Azure Storage to easily find the new files, pass them to Spark and thus load the data with low latency and at a low cost within your streaming or batch jobs. The Auto Loader logs which files were processed which guarantees an exactly once processing of the incoming data.
Why is Auto Loader relevant for your data lake?
The data ingestion part is an important step when creating data pipelines in a data analytics platform. Varying use-cases, from reporting to data analysis to machine learning, demand an integration of various data sources. This can be data coming from cloud or on-premises databases, IoT devices, applications, public-datasets etc. For this reason, a lot of organizations are already building centralized data lakes.
To take away some limitations of a classical data lake, we already recommend to use Delta Lake (read more about Delta Lake here) but we remain with the challenge to handle data incrementally and for sure when multiple data sources are ingested.
If not for Auto Loader, you would likely build a custom set-up where
- You would stream the data to an Event Hub/Kafka/Kinesis and from there load the data to the Delta Lake. This however creates an extra step/job and thus increases complexity
- Or you would store data directly to the data lake and then – for example in batch mode - list all the files in that directory to find the new files to ingest the latest data. But this is hardly efficient…
- Or you would create time-based partitions in the data lake (most used) and process the data per partition in a scheduled way.
The problem above is what Auto Loader helps solve.
Getting started with Auto Loader
Auto Loader is a free feature within Databricks which can easily be turned on by using a specific cloud file source. To make use of the Auto Loader when processing new data, you can:
- Use Structured Streaming to process the latest data in a streaming mode
- Use the trigger once mode from Structured Streaming in order to process the latest data in a batch mode
df.writeStream\ .format("delta")\ .foreachBatch(upsert_data)\ .option("checkpointLocation", "/mnt/silver/demos/covid19/checkpoints")\ .start("/mnt/silver/demos/covid19/data")\
df.writeStream\ .format("delta")\ .foreachBatch(upsert_data)\ .trigger(once=True) .option("checkpointLocation", "/mnt/silver/demos/covid19/checkpoints")\ .start("/mnt/silver/demos/covid19/data")\
A practical example
To demonstrate auto loader end-to-end we will see how raw data which is arriving on a “bronze” container in an Azure Data Lake is incrementally processed by the Auto Loader in Databricks and stored automatically in a Delta table in the “silver” zone.
Example data: The data used in this demo is taken from https://epistat.sciensano.be/covid and contains daily confirmed cases of Covid-19 per region, sex, age for Belgium.
Define your raw schema for your input data
customSchema = StructType([ StructField("DATE", DateType(), True), StructField("PROVINCE", StringType(), True), StructField("REGION", StringType(), True), StructField("AGEGROUP", StringType(), True), StructField("SEX", StringType(), True), StructField("CASES", DoubleType(), True) ])
Configure Auto Loader
Before you start loading data from your data lake using the Auto Loader you must set up some configurations. These configurations are specific to the “cloudFiles” Structured Streaming source provided by Auto Loader.
- cloudFiles.format – specifies the format of the files which you are trying to load
- cloudFiles.connectionString – is a connection string for the storage account
- cloudFiles.resourceGroup – is the resource group where the storage account is and a place where a resource of a type “Event Grid” will be created
- cloudFiles.subscriptionId – is the subscription where the Resource group is located
- cloudFiles.tenantId – is the tenant where a service principal is created (the service principal should have “contributor” rights on the storage account)
- cloudFiles.clientId – is the client ID of the service principal
- cloudFiles.clientSecret – is the client secret of the service principal
Important note: When starting a stream for the first time you have the option to load also existing files in the data lake by setting the cloudFiles.includeExisting option to “true”.
Detailed explanation of all the required configurations can be found in the documentation.
Write your transformation function which is applied for each new file
We apply a function on each file (micro-batch) while streaming which is deduplicating data in case same data is contained in multiple files.
def upsert_data(df, epochId): deltaTable = DeltaTable.forPath(spark, "/mnt/silver/demos/covid19/data") deltaTable.alias("data").merge( df.alias("newData"), "data.DATE = newData.DATE and data.PROVINCE = newData.PROVINCE and data.AGEGROUP = newData.AGEGROUP and data.SEX = newData.SEX") \ .whenNotMatchedInsertAll() \ .execute()
Finally, use Auto Loader to ingest the data in your Delta Lake in a streaming manner
With the code below we can ingest the data in a streaming mode, meaning that every time a new file lands on the data lake it will be immediately processed.
In case you have data that arrives on your data lake at certain intervals like once per day, or every couple of hours you can use the Auto Loader in a batch mode. If you want to load data in a batch mode, you can simply add a trigger once option .trigger(once=True) in the code below.
df.writeStream\ .format("delta")\ .foreachBatch(upsert_data)\ .option("checkpointLocation", "/mnt/silver/demos/covid19/checkpoints")\ .start("/mnt/silver/demos/covid19/data")\
Auto Loader in action
The full code shown in this insight can be found in the following link
How an architecture like this would look like in Azure?
What we like about Auto Loader
- Efficiently ingest data incrementally from many sources, no need to develop extra mechanisms to keep information on which files were processed and which files are new
- Highly scalable because it can handle millions of files
- User friendly because it doesn’t require creating additional services or mechanisms to handle new files and can be easily set up