Get ready to revolutionize the way you handle real-time data! 🔥
In this insight, we will show you how to create and set up an efficient streaming pipeline using Microsoft Fabric. No more struggling with slow and inaccurate data streams - with our step-by-step process, you can easily set up a robust and lightning-fast pipeline that will deliver real-time insights like never before. Whether you're a seasoned pro or starting this insightful guide will help you take your data-handling skills to the next level.
Fabric offers two options to achieve your goals. The first option uses their Real-Time Analytics service, and the second relies on Spark. This discussion will focus on the second option, which covers the Stream's Ingestion, Transformation, and Storage aspects.
Let's explore the possibilities! 🚀🚀
As we set up the stream, we will uncover the underlying technology -Spark Structured Streaming- drawing parallels with familiar environments like Databricks. We will also introduce the concept of a "Spark Job Definition," a unique aspect of Fabric's streaming architecture.
The subsequent sections detail the five critical steps in creating a streaming pipeline within Fabric. From scripting the logic to executing and monitoring the Spark Job Definition.
In conclusion, you will find a summary of what we liked and what we consider to be the weaknesses of streaming with Fabric using Spark Structured Streaming.
Getting things ready for the trial
First and foremost, we need to have a streaming source available. To get close to a real-life example, we decided to get data from a REST API. And to simulate a dynamic and data-rich environment we settled on the API of the popular web forum Reddit, which allows about 100 API calls per minute.
The API calls can be made from wherever is deemed most relevant. In our example, we used a Fabric notebook, which allows us to easily and securely drop the raw data from the API into a folder in Fabric’s OneLake, where the streaming pipeline will take over. Each API call results in one parquet file.
Here specifically the stream is going to check for new files in the path we have specified. Whenever a new file is detected, the stream fires on and the new data will be processed.
Other possible sources Spark Streaming can be listening to are, for example, Kafka or a Delta Table.
Those who have been developing streaming pipelines within Databricks or on more barebones Apache Spark environments will recognize the Spark Structured Streaming library throughout the rest of this insight.
Set up of the Stream
According to Microsoft’s documentation (Get streaming data into lakehouse and access with SQL analytics endpoint) there are 5 steps involved in the creation of a streaming pipeline:
- 🐍 Creation of Python script (where the actual logic behind the stream execution is set)
- 🏡 Creation of a Lakehouse.
- ⭐ Create a Spark Job Definition.
- ➿ Set Retry policy for Spark Job Definition.
- 🔎 Execute and monitor the Spark Job Definition.
Here we already find a noticeable difference between streaming within Databricks and Fabric. It’s the notion of “Spark Job Definition”.
Normally (i.e. in Databricks), we would just have the stream coded in a notebook, and just run this notebook. It is still possible to do so within Fabric. But while playing around with multiple streams we quickly understood why it is necessary to use a Spark Job Definition. We’ll explain this later.
However, the best is to start developing the stream 🐍 in a notebook anyway. The creation of the Lakehouse 🏠 can also be done at the same time.
For simplicity, the only action this stream is going to do is to read the new parquet files, and then update existing data into a Delta Table or insert new rows if no previous data about the record is available (i.e. upsert or merge data).
import pyspark.sql.utils
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
def foreach_batch_function(df, epoch_id, delta_table):
df = df.dropDuplicates(["id"])
delta_table.alias("target_table").merge(
source=df.alias("updates"),
condition="target_table.id = updates.id",
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
if __name__ == "__main__":
spark = SparkSession.builder.appName("Reddit_API_Stream").getOrCreate()
# This code block is used to create the target table if it doesn't exist yet.
# You should normally do it in a separate notebook, but it is useful to have it
# here for development (because you may need to recreate this table
# multiple times until you get things working)
df_sample = spark.read.format("parquet").load(
get_first_file().replace("/lakehouse/default/", "")
)
delta_table_path = "abfss://9f359e65-96b2-472c-9dfc-9924aa90e917@onelake.dfs.fabric.microsoft.com/6801e0e0-1336-42ae-883a-3075a19a49cb/Tables/posts"
try:
delta_table = DeltaTable.forPath(spark, delta_table_path)
except pyspark.sql.utils.AnalysisException:
df_sample.write.format("delta").mode("overwrite").save(
delta_table_path
)
delta_table = DeltaTable.forPath(spark, delta_table_path)
# the same as with Databricks. readStream can simply read new files from a file system and get those
# directly in small dataframes that will be processed on the fly (i.e. micro batches)
df = (
spark.readStream.format("parquet")
.schema(df_sample.schema)
.load("Files/Posts/*/*/*/*")
)
# the readStream part doesn't actually start anything. You need to specify the sink and
# start the actual stream with ".start()" as well as ".awaitTermination()"
# This last part was not necessary in Databricks, but appear to be necessary in Fabric
# the option cleanSource (which is set to "delete" here) doesn't appear to work in Fabric at the moment.
# However it is necessary to handle the source with either deletion or
# archiving (i.e. move to file to an archive folder after processing)
# If not done, each read will take a bit more time for each new file that is available in the source
df.writeStream.trigger(processingTime="1 seconds").outputMode(
"append"
).option("checkpointLocation", "Files/Streaming/Bronze/reddit_api").option(
"cleanSource", "delete"
).foreachBatch(
lambda df, epoch_id: foreach_batch_function(df, epoch_id, delta_table)
).start().awaitTermination()
These are all the elements necessary for the streaming to take place. We choose to use writeStream’s foreachBatch method to have maximum flexibility concerning the transformations applied to the stream. This allows you to apply any function you desire to each batch and write to any sink you like. Other methods are available and can be found here.
For those that are surprised by the term “batch” employed here, note that Spark Structured Streaming processes data batch by batch. New incoming data is grouped into a Dataframe that is then processed. You can set some parameters to those micro-batches by adding options with spark.readStream.option(…) (e.g. max number of files read in a batch, trigger every n second, trigger when a new file is available, etc.)
If you run the notebook, it will work without any extra steps.
However, once you start working on other tasks, ⚠️you will get an error such as the below:
That is because, as of 30/01/2024, you can only run a limited number of notebooks at the same time. For an F64 capacity, the amount of notebooks is 3 (check here for more info: Fabric Notebook Concurrency Explained).
One option to continue developing in notebooks is to run the notebooks in a high-concurrency session, but that is only good for development.
To get to production, you should use the Spark Job Definition ⭐ mentioned before. This allows you to change the “Job Type” submitted to your compute and change the behaviour of concurrent runs, allowing up to 64 concurrent jobs for an F64 capacity.
It is a bit weird to aim for a “batch” job in a “streaming” context, but that is how it is. What is done is sort of launch one “batch” containing our streaming code. This batch will thus run indefinitely.
A Spark Job Definition job requires a Python file. You can simply download your notebook from the “Home” tab in .py format.
To create the Spark Job Definition, you can continue following this documentation: Get streaming data into lakehouse and access with SQL analytics endpoint.
The only 2 elements that absolutely need to be set are the Python file and the Lakehouse reference. The rest can be left with default settings or tweaked according to your needs.
One thing to pay attention to (but that Microsoft should fix soon) is that ⚠️ you can’t upload a Python script with whitespace or symbols in the name (except from the dot in .py extension). It kept failing without explanations under the name “Streaming - Bronze to Silver - Reddit API data.py” but succeeded once renamed “reddit_api_stream.py”.
If you are lucky, this will work directly. But in our case, we still needed to make adjustments to the script for it to work, as a Spark Job is not exactly the same as a Notebook run.
In a notebook, a bunch of settings are set for you for a smoother development flow. One such help is, for example, the fact that you can access files with a relative path such as “/Files/myfile.parquet”.
With the Spark Job ⚠️ you will need to specify a full abfss path (e.g. abfss://9f3123456-96b2-472c-9dfc-9924123456@onelake.dfs.fabric.microsoft.com/4563e0e0-1336-42ae-883a-3075a19a49cb/Files/Posts/*/*/*/*). Wildcards do work even with the abfss paths.
Another related surprise was the Python “os” library not working (there might be other issues like this - we are aiming to create a full list of these issues).
Before actually getting everything right, ⚠️ do not set a retry policy as suggested on Microsoft’s tutorial, as it will slow you down in getting to the error messages.
You will need to continue investigating the potential issues by checking the logs of each failed run.
Until you figure it all out! You want to see it continuously "running" here. If you see it "Succeeded", that means your job ended successfully, or in other words that you stream ended. Additional data won't be processed.
Now our stream is working. Looking at the data we can see it is being updated. Our goal is thus achieved. Below is a view of the complete streaming flow:
However, we are still missing some features in this setup, that are available in Databricks.
Indeed, we find no interface where we can see the performance 🔎 of our streaming pipeline, while in Databricks we would have nice visuals telling us for how long each micro-batch was being processed, how many rows were included in each micro-batch etc. See Below:
What is available in Fabric is the details of the job execution. We would have expected to find the performance information here, but for some reason, every metric is set to 0 (processed, data read, data written, right of the table).
Querying our sink table in a loop, however, shows that the stream is running and updating/appending rows to a silver table.
We find that this information is precious when developing streaming pipelines as those are often business-critical, and we have had cases where data ended up being processed with a latency of about 10 minutes, which obviously needs to be addressed. This is difficult to spot without the proper data and/or dashboards at hand.
Another point of attention comes with the need to upload a Python file to the Spark Job Definition. This means that ⚠️ normal versioning and CI/CD for notebooks cannot be used there, and a flow should be developed to enable this with Spark Job Definitions.
Conclusions
Microsoft Fabric's Spark Structured Streaming is an efficient way to make streaming more accessible, particularly for those who don't have an advanced analytics data platform like Databricks.
However, the development flow could be improved, as moving from Notebooks to Spark Job definitions involves manual steps and time-consuming re-dos. Moreover, the lack of reliable monitoring solutions makes it somewhat riskier than Databricks, which is far ahead in that domain. Additionally, setting up new CICD pipelines for Spark Job Definitions is also a disadvantage. Despite these limitations, it's impressive to witness Fabric's team's efforts in delivering a working solution for streaming.
Since it's a fairly new product in the Microsoft Fabric suite, we're looking forward to seeing how Microsoft will bridge the gap for the missing features and are eager to experiment with those enhancements soon 💪