Click to learn more about author Iqbal Shaikh.
What is Spark Structured Streaming, and how can you put it to use? This article will provide more details to help you get started.
Overview of Data Streaming
Data streaming refers to endless and continuous data generated with high velocity by thousands of data sources, which send the generated data records continuously in small sizes, then in batches.
Streaming data includes a wide variety of data such as log files generated by customers, e-commerce purchases, information from social networks, and financial trading transactions, and is optimal for time-series analysis, detecting patterns from continuous flowing data, and making business intelligence more significant with the fresh insight of data.
Examples of streaming data include data collected from IoT sensors, gaming data, monitoring applications, and financial transactions:
- Social media (Facebook, Twitter, etc.): Real-time sentiment analysis
- Financial institutes (such as banks): Real-time fraud detection
- Streaming apps (such as Netflix): Provide real-time movie/video recommendations
Spark Streaming
Streaming in Spark was first introduced with Spark Streaming in Spark 0.7. Spark Streaming follows micro-batch architecture, where data is divided into batches at regular time intervals known as batch intervals. It supports batch intervals from 500ms to several seconds, where batch processing could be 80% of the total batch interval as good practice – so that the next batch is not getting accumulated and waiting for processing.
Data is divided into blocks within a batch at a given time interval (200ms by default):
- Each batch has N number of blocks, where N = (batch-interval/block-interval)
- The number of blocks in the batch is equivalent to the number of tasks. An increase/decrease in blocks can increase parallelism
Spark DStream (Discretized Stream) is a basic abstraction of Spark Streaming, which is internally RDD. DStreams underwent a lot of improvements over a period, but still there were various challenges, primarily because of its low-level API and because it focuses on the how of the solution rather than what – as spark engine is not very optimized for processing the RDDs:
- Assume we have rrd.filter(conditionA).filter(conditionB)
- We know that these two filter operations can be applied in one transformation itself by using the AND operator but RDD API does not take care of such query optimizations
It uses the processing time instead of event time to process the data stream, which can lead to inaccurate results in the case that late or out-of-order data comes:
- Processing time is the time tracked by the processing engine regarding when data arrived for processing
- Event time specifies the time when data was generated at the source
As Spark streaming uses the processing time, it does not know if the data has arrived in incorrect order from the source while processing it.
Structured Streaming
As a solution to the challenges faced in Spark Streaming, structured streaming was introduced with the Spark 2.0 release. It treats all the data arriving as an unbounded table. Each new item in the stream is like a row appended to the unbounded table. No batch concepts. It is built on SparkSQL library and based on Dataframe and Dataset API, so it can easily apply any SQL query.
It can process based on the event time of data, so it is efficient in handling late or out-of-order data. What’s more, it is optimized by Spark to focus on what rather than how, as structured streaming comes with the Spark SQL code and memory optimizations.
Unlike in Spark Streaming, here Spark sees that there is no need for two filters. Instead, the same task can be done with only one filter using the AND operator, so it does execution in one filter.
Spark Structured Streaming supports the following three output modes:
- Append Mode (default): This adds a new row to the result table since the last trigger
- Complete Mode: The whole result table will be outputted to sink after every trigger
- Update Mode: Rows update in the result table since the last trigger will be outputted to sink
DStream API vs. DataFrame/Dataset API
DStream is a collection of RDD and provides low-level transformation and processing. On the other hand, DataFrame/Dataset API provides more powerful abstraction like SQL.
Windowing Operations
Window operations are very similar to groupBy operations, which allow for functions to run on a set of streams of data. In groupBy, aggregation is based on the specified group or key while in window operations aggregation is based on event windows.
Spark supports two types of windows: tumbling window and sliding window.
Tumbling Window
Tumbling window operation is used to segment a data stream into distinct time segments and perform a function against them. Tumbling windows are repeating and non-overlapping, and events cannot belong to more than one tumbling window.
One example is a 10-second tumbling window. Here, the window is accepting two parameters: The first is the event time coming from the stream of data, and the second is the size of the tumbling window.
Sliding Window
As its name suggests, this window will slide instead of tumbling on the data. These are overlapping windows and events can belong to more than one window. We can specify the level of sliding needed with the window size.
One example is a five-second sliding window. Unlike the tumbling window, the sliding window accepts three parameters: The first and second parameters are the same as the tumbling window, and the third parameter is to define the sliding time, which is second in the below example.
Output in Tabular Format
Watermarking in Streaming
With window operations, Spark application will process late or out-of-order data in the respective window, as Spark maintains the state of the intermediate aggregation. Maintaining the state of the intermediate aggregations can incur some processing overheads.
To keep these overheads bounded within acceptable limits, the size of the state data should not grow indefinitely. From Apache Spark 2.1 to bound the state data size, we can use watermarking to handle late data so that the system discards older records automatically.
Below is the way to calculate the watermark value:
- T = max event time seen by the engine – late threshold
- For example: If the max event time seen is 12:10 and the late threshold is 10 minutes, then the watermark value is 12:00
- All states related to data before the watermark value gets dropped
Late data within the threshold will be aggregated, but data later than the threshold will start getting dropped. Use with Watermark(ts_col, late_threshold) on the streaming DataFrame to handle the late or out-of-order data.