Flink Streaming Windows – A Comprehensive Guide

Keeping you updated with latest technology trends, Join DataFlair on Telegram

1. Objective – Flink Streaming

This Flink Streaming tutorial will help you in learning Streaming Windows in Apache Flink with examples. Also, it will explain related concepts like the need for windowing data in Big Data streams, Flink streaming, tumbling windows, sliding windows, Global windows and Session windows in Flink. Moreover, you will also understand Flink window trigger, Flink window assigner, Flink watermark, Flink window function, Evictor in Flink, Apache Flink event time, processing time and ingestion time.

Along with this, Flink Streaming contains types of Window functions in Flink cover Reduce function in Flink, Flink fold function and Window function in Flink. You will also learn how to deal with late data using allowed lateness in Flink and non-keyed windowing in Flink API.

Flink Streaming

Flink Streaming Windows – A Comprehensive Guide

So, let’s start the Flink Streaming Tutorial.

2. Introduction to Flink Streaming Windows

Let us first understand what does window mean in Flink?
Apache Flink is a stream processor that has a very flexible mechanism to build and evaluate windows over continuous data streams. To process infinite DataStream, we divide it into finite slices based on some criteria like timestamps of elements or some other criteria. This concept of Flink called windows. This division is required when transformations that need to aggregate elements are being performed. Flink features very flexible window definitions that make it outstanding among other open source stream processors and creates differentiation between Flink, Spark and Hadoop Map Reduce.  
We need to specify a key, a window assigner and a window function for a windowed transformation. We use the key to create the logical keyed streams from infinite, non-keyed stream while we use the window assigner to assign elements to finite per-key windows. Finally, we use the window function to process the elements of each window.
The basic structure of a windowed transformation is thus as follows:

DataStream<T> input = ...;
data.keyBy(<key selector>)
window(<window assigner>)
.trigger(<trigger>)
.<windowed transformation>(<window function>); 

Let us see each component for Windowed transformations in details below:

3. Window Assigners in Flink Streaming

It specifies the way stream elements are divided into finite slices. Some of the pre-implemented window assigners for Flink are tumbling windows, sliding windows, session windows and global windows but Flink allows you to implement you own window by extending window assigner class. Except for global windows, rest all in built window assigners assign elements based on processing time or event time.

  • The processing time assigners assign elements based on the current clock of the worker machines
  • The event-time assigners assign windows based on the timestamps of elements.
  • The Ingestion time is a hybrid of processing and event time that assigns wall clock timestamps to records as they arrive in the system and continues processing with event time semantics based on attached timestamps.

But how do these Flink windows differ? Let us see these windows 1 by 1.

a. Global windows

Here subdivision of elements into windows is not done. Instead, we assign each element to 1 single per key global window. It is useful only if a custom trigger is specified without which no computation can be performed as it does not have any end at which computation can be done.
Specify global windows as below:

data.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);

b. Tumbling Windows

Here window size is specified and elements are assigned to non-overlapping windows. For example, If the window size is specified to be 2 minutes, all elements in 2 minute time would come in 1 window for processing. Some application might require smoothed aggregates for which it is important that windows are not disjunct.
You can specify tumbling event time windows in below way:

data.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);

c. Sliding Windows

It assigns windows of fixed size as in tumbling windows but here windows can overlap as well due to which 1 element can assign to multiple windows. Overlap size is the user-specified parameter window slide.
For example, you can specify 10 minutes window size with a slide of 5 minutes. We use the below way to specify sliding event time windows:

data.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);

d. Session Windows

This is good in cases where window boundaries need to be adjusted as per incoming data. With session windows, it is possible to have windows that start at individual points in time for each key and that end when a certain period of inactivity has been there. Here the configuration parameter is the session gap which is used to specify how long to wait for new data before considering a session as closed.
You can specify processing time session window as below:

data.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

4. Flink Streaming – Trigger

A trigger determines when a window is ready for processing. Except for Global windows, each window assigner comes with a default trigger. We can use trigger() with a given Trigger to specify the trigger.
EventTimeTrigger fires based on the progress of event-time as measured by the watermark and the ProcessingTimeTrigger does triggering based on processing time and the CountTrigger fires once the number of elements in a window exceeds the given limit.

5. Flink Streaming – Evictor

When a Trigger fires, the list of window elements can be given to Evictor that is optional to use. The evictor can iterate through the list and remove some of the elements that entered the window first. The remaining elements are then given to an evaluation function. If the user does not define Evictor, the Trigger hands all the window elements directly to the evaluation function.
You can specify optional evictor as below:

windowed = windowed
.evictor(myEvictor: Evictor[IN, WINDOW])

6. Window Functions in Flink

We use Window functions to process the elements of each window when the system determines that a window is ready for processing. The window function can be ReduceFunction, FoldFunction or WindowFunction.
What are these Flink window functions and what they do?

a. Reduce Function

It specifies how 2 values can be combined to form 1 output element. It is used to incrementally aggregate elements in a window.

b. Fold Function

A Fold Function specifies how to add elements from the input to an initial accumulator value that can like an empty string.

c. Window Function

It provides the highest flexibility at the cost of performance. A Window Function gets an Iterable that has all the elements of the window which is to process. You can combine a WindowFunction with either a reduced function or a FoldFunction to incrementally aggregate elements as they arrive in the window. At the time of closed window, the window function will provide with the aggregate result. This permits incrementally computation of windows while having access to the additional window meta information of the WindowFunction.
What’s the best part that makes Flink use case in Fortune 500 companies?

7. Dealing with Late Data

Sometimes the elements might arrive late while dealing with event time windowing i.e the watermark that Flink uses for tracking the progress of event-time that is already past the end timestamp of a window to which an element belongs. You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. The parameter also calls as allowed lateness that specifies by how much time, elements can be late.
We put into windows elements that arrive within the acceptable lateness. We consider them when the computing window results. We drop the elements that arrive after that. Flink makes sure that any state under the windowing operation is garbage it collects once the watermark passes the end of a window plus the acceptable lateness.
Note: The allowed lateness is 0 by default.

8. Non-Keyed Windowing in Flink Streaming

You can leave keyBy() while specifying the windowed transformation. This turns the transformation into a non-parallel operation.
Note: Non-keyed windows have the disadvantage that works can’t distribute in the cluster as windows cannot compute independently per key. This can have severe performance implications.
In this manner, Big Data is getting matured with Apache Flink that has helped Companies using Flink for their various purposes.

So, this was all in Flink Streaming Tutorial. Hope you like our explanation.

9. Conclusion

Hence, in this Flink streaming tutorial, we discussed the introduction to Flink Streaming Windows. Also, we learned about windows assigner, trigger, and evictor in Flink Streaming. Moreover, we saw windows function and non-keyed windowing in Flink. Furthermore, if you have any query regarding Flink Streaming, ask in the comment tab.

Reference for Flink

3 Responses

  1. Erickfab says:

    Very nice article of Flink windows and window concept in Flink with different types of windows in Flink.

  2. FrankCor says:

    Thank you for explaining Streaming windows in Apache Flink and windows type in Flink. It is really very helpful to learn such a new concept of Flink.

  3. Anh says:

    Pretty! This was an extremely wonderful article on Windows in Apache Flink. Thank you for
    providing this info.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.