- 1. Objective of Windowing in Apache Flink
- 2. Introduction to Streaming Windows in Apache Flink
- 3. Window Assigners
- 4. Trigger in Flink
- 5. Evictor in Flink
- 6. Window Functions
- 7. Dealing with Late Data
- 8. Non-Keyed Windowing
This tutorial will help you in learning Streaming Windows in Apache Flink with examples and related concepts like need of windowing data in Big Data streams, Flink streaming, tumbling windows, sliding windows, Global windows and Session windows in Flink. You will also understand Flink window trigger, Flink window assigner, Flink watermark, Flink window function, Evictor in Flink, Apache Flink event time, processing time and ingestion time. Types of Window functions in flink cover Reduce function in Flink, Flink fold function and Window function in flink. You will also learn how to deal with late data using allowed lateness in Flink and non-keyed windowing in Flink API.
Let us firstly understand what does window mean in Flink?
Apache Flink is a stream processor that has a very flexible mechanism to build and evaluate windows over continuous data streams. To process infinite DataStream, it is divided into finite slices based on some criteria like timestamps of elements or some other criteria. This concept of Flink is called windows. This division is required when transformations that need to aggregate elements are being performed. Flink features very flexible window definitions that make it outstanding among other open source stream processors and creates differentiation between Flink, Spark and Hadoop Map Reduce.
We need to specify a key, a window assigner and a window function for a windowed transformation. The key is used to create the logical keyed streams from infinite, non-keyed stream while the window assigner is used to assign elements to finite per-key windows. Finally, to process the elements of each window, window function is used.
The basic structure of a windowed transformation is thus as follows:
DataStream<T> input = ...; data.keyBy(<key selector>) window(<window assigner>) .trigger(<trigger>) .<windowed transformation>(<window function>);
Let us see each component for Windowed transformations in details below:
3. Window Assigners
It specifies the way stream elements are divided into finite slices. Some of the pre-implemented window assigners for Flink are tumbling windows, sliding windows, session windows and global windows but Flink allows you to implement you own window by extending window assigner class. Except global windows, rest all in built window assigners assign elements based on processing time or event time.
- The processing time assigners assign elements based on the current clock of the worker machines
- The event-time assigners assign windows based on the timestamps of elements.
- The Ingestion time is hybrid of processing and event time that assigns wall clock timestamps to records as they arrive in system and continues processing with event time semantics based on attached timestamps.
But how do these Flink windows differ? Let us see these windows 1 by 1.
a. Global windows
Here sub division of elements into windows is not done. Instead each element is assigned to 1 single per key global window. It is useful only if a custom trigger is specified without which no computation can be performed as it does not have any end at which computation can be done.
Specify global windows as below:
data.keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
b. Tumbling Windows
Here window size is specified and elements are assigned to non-overlapping windows. For ex. If window size is specified to be 2 minutes, all elements in 2 minute time would come in 1 window for processing. Some application might require smoothed aggregates for which it is important that windows are not disjunct.
You can specify tumbling event time windows in below way:
data.keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>);
c. Sliding Windows
It assigns windows of fixed size as in tumbling windows but here windows can be over lapping as well due to which 1 element can be assigned to multiple windows. Overlap size is defined by the user-specified parameter window slide.
For ex you can specify 10 minutes window size with slide of 5 minutes. Below way is used to specify sliding event time windows:
data.keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>);
d. Session Windows
This is good in cases where window boundaries need to be adjusted as per incoming data. With session windows it is possible to have windows that start at individual points in time for each key and that end when certain period of inactivity has been there. Here the configuration parameter is the session gap which is used to specify how long to wait for new data before considering a session as closed.
You can specify processing time session window as below:
data.keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
A trigger determines when a window is ready for processing. Except Global windows, each window assigner comes with a default trigger. trigger() with a given Trigger can be used to specify the trigger.
EventTimeTrigger fires based on the progress of event-time as measured by the watermark and the ProcessingTimeTrigger does triggering based on processing time and the CountTrigger fires once the number of elements in a window exceeds the given limit.
When a Trigger fires, the list of window elements can be given to Evictor that is optional to use. The evictor can iterate through the list and remove some of the elements that entered the window first. The remaining elements are then given to an evaluation function. If user does not define Evictor, the Trigger hands all the window elements directly to the evaluation function.
You can specify optional evictor as below:
windowed = windowed .evictor(myEvictor: Evictor[IN, WINDOW])
6. Window Functions
Window functions are used to process the elements of each window when system determines that a window is ready for processing. The window function can be ReduceFunction, FoldFunction or WindowFunction.
What are these Flink window functions and what they do?
a. Reduce Function
It specifies how 2 values can be combined to form 1 output element. It is used to incrementally aggregate elements in a window.
b. Fold Function
A FoldFunction specifies how elements from the input will be added to an initial accumulator value that can be like an empty string.
c. Window Function
It provides highest flexibility at the cost of performance. A WindowFunction gets an Iterable that has all the elements of the window being processed. You can combine a WindowFunction with either a ReduceFunction or a FoldFunction to incrementally aggregate elements as they arrive in the window. The WindowFunction will be provided with the aggregated result when window is closed. This permits incrementally computation of windows while having access to the additional window meta information of the WindowFunction.
What’s the best part that makes Flink use case in Fortune 500 companies?
7. Dealing with Late Data
Sometimes the elements might arrive late while dealing with event time windowing i.e the watermark that Flink uses for tracking the progress of event-time that is already past the end timestamp of a window to which an element belongs. You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. The parameter is called allowed lateness that specifies by how much time, elements can be late.
Elements arriving within the allowed lateness are put into windows and are considered when computing window results. Elements arriving after the allowed lateness will be dropped. Flink makes sure that any state held by the windowing operation is garbage collected once the watermark passes the end of a window plus the allowed lateness.
Note: The allowed lateness is 0 by default.
8. Non-Keyed Windowing
You can leave keyBy() while specifying windowed transformation. This turns the transformation into a non-parallel operation.
Note: Non-keyed windows have the disadvantage that work cannot be distributed in the cluster as windows cannot be computed independently per key. This can have severe performance implications.