Data Stream Processing with Apache Flink

Data Streams

Data streams are unbounded sequences of time-varying data elements, representing a continuous flow of information. The most recent data is often the most relevant, reflecting the current state of a dynamic system.

DBMS vs. Data Streaming

A traditional database stores data before it can be queried. Multiple queries can be run repeatedly on the same stored data. In contrast, data streaming processes data on-the-fly, as soon as it arrives. Queries are continuous (running indefinitely), and a query cannot be executed more than once on the same data.

CEP Engines

Two main dimensions characterize Complex Event Processing (CEP) engines:

  • Abstraction: Queries are programmed using connecting algebraic operators (e.g., map, filter, aggregate) or SQL-like languages.
  • Parallelization:
    • Inter-query parallelism: Centralized, with different queries potentially running in parallel.
    • Intra-query parallelism:
      • Inter-operator parallelism: Each operator can run on a different node. Scales with the number of query operators.
      • Intra-operator parallelism: Each operator can run on a cluster of nodes. Scales with event stream volume.

Types of Windows

  • Tumbling Windows: Assign each element to a window of a specified size. Tumbling windows have a fixed size and do not overlap.
  • Sliding Windows: Configured by window size and slide parameters. The slide determines how frequently a new window starts. If the slide is smaller than the window size, windows overlap, and elements can be assigned to multiple windows.
  • Session Windows: Group elements by sessions of activity. Session windows do not overlap and have no fixed start/end time. A session window closes when it does not receive elements for a defined gap period.
  • Global Windows: Assign all elements with the same key to a single global window. This requires a custom trigger, as there’s no natural end for aggregation.

Types of Time in Flink

Flink supports three notions of time:

  • Event Time: The time the event was created at its source.
  • Processing Time: The time at the machine executing the operation.
  • Ingestion Time: The time the event enters the Flink data flow.

Example: keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(3))).apply(new SimpleSum()); executes the apply function when a window doesn’t receive data for 3 seconds.

KeyBy

KeyBy partitions a stream into disjoint streams based on the values of one or more fields. Each substream can be processed independently by different instances of the next operator.

FlatMap

The FlatMap function transforms elements into zero, one, or more elements. For example, splitting sentences into words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : value.split(" ")) {
            out.collect(word);
        }
    }
});

Join Example

sensor1Stream.join(sensor2Stream)
    .where((value) -> value.f0)
    .equalTo((value) -> value.f0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    .apply((JoinFunction<Tuple, Tuple, Tuple5>) (first, second) -> {
        return new Tuple5(first.f0, first.f1, first.f2, second.f1, second.f2);
    })
    .filter((in) -> /* Filter logic */);

This joins events from two streams based on value.f0, creating a new tuple every second, and then filters the results.

Other Operations

  • Map: Transforms one element into another.
  • Filter: Evaluates a predicate for each element, outputting only those where the predicate is true.
  • Tuples: Encapsulate data. Flink provides predefined tuples (Tuple1 to Tuple25).
  • Reduce: Combines the current element with the last reduced value.
  • Aggregations: Rolling aggregations (min, max, sum) on keyed streams.
  • Windows: Split the stream into finite-sized “buckets” for computation.

Window Configurations

  • Trigger: Determines when a window is ready to be processed. Default triggers are available (e.g., CountTrigger.of(100)).
  • Evictor: Removes elements from a window after the trigger fires (e.g., CountEvictor.of(10)).
  • Lateness: Manages late events. By default, late events are dropped.

Window Processing Types

  • Processing Time Windows: Based on system time (Tumbling, Sliding, Session). Most efficient but non-deterministic.
  • Event Time Windows: Based on event timestamps (Tumbling, Sliding, Session). Progress depends on events, not the clock. Watermarks indicate no more events for a window.

Watermarks

Watermarks carry timestamps indicating no more events will arrive for a given window. They flow through the stream. When a watermark reaches an operator, time advances, and a new watermark is generated. The minimum watermark is used for operators with multiple input streams. Watermarks can be defined at the source using collectWithTimestamp and emitWatermark. Assigners produce timestamped streams.

  • AssignerWithPeriodicWatermarks: Emits watermarks periodically.
  • AssignerWithPunctuatedWatermarks: Emits watermarks based on event properties.
  • stream.assignTimestampsAndWatermarks: Generates timestamps in ascending order, used as watermarks.

JobManager and TaskManager

  • JobManager: Coordinates distributed execution.
  • TaskManager: Executes tasks. TaskManagers have task slots (at least one) representing fixed resource subsets. Each TaskManager is a JVM process.

Keyed vs. Non-Keyed Windows

keyBy() splits the stream into logical keyed streams. Without keyBy(), the stream is non-keyed. Keyed streams allow parallel windowed computation. Non-keyed streams are processed by a single task. The window assigner for keyed streams is window(), and windowAll() for non-keyed streams.

Flink Window Code Examples

Keyed Streams

  • Tumbling:
    • keyedStream.timeWindow(Time.seconds(5)): Groups elements by timestamp in 5-second windows.
    • keyedStream.countWindow(1000): Groups elements by arrival time in groups of 1000.
  • Sliding:
    • keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)): 5-second windows, sliding by 1 second.
    • keyedStream.countWindow(1000, 100): Windows of 1000 elements, sliding by 100.
  • Global:
    • stream.window(GlobalWindows.create()): All elements with the same key are assigned to the same window.
    • stream.window(TumblingEventTimeWindows.of(Time.seconds(1))): Elements assigned to 1-second windows based on timestamp.
    • stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))): 5-second windows, sliding by 1 second, based on timestamp.

Non-Keyed Streams

  • Tumbling:
    • nonKeyedStream.timeWindowAll(Time.seconds(5)): Groups elements by timestamp in 5-second windows.
    • nonKeyedStream.countWindowAll(1000): Groups elements by arrival time in groups of 1000.
  • Sliding:
    • nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1)): 5-second windows, sliding by 1 second.
  • Session Windows (Keyed and Unkeyed):
    • stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5))): Session ends after 5 minutes of inactivity.

Example Code: Temperature Alert

This example processes a stream of temperature readings and generates an alert if a single measurement exceeds 50.

//Simplified for demonstration
public static class JulyExam {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        DataStream<String> text = env.readTextFile(params.get("input"));

        SingleOutputStreamOperator<Tuple3<Long, String, Double>> mapStreamA = text.map(new MapFunction<String, Tuple3<Long, String, Double>>() {
            public Tuple3<Long, String, Double> map(String s) throws Exception {
                String[] fieldArray = s.split(",");
                return new Tuple3<>(Long.parseLong(fieldArray[0]), fieldArray[1], Double.parseDouble(fieldArray[2]));
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, Double>> filterStreamA = mapStreamA.filter(new FilterFunction<Tuple3<Long, String, Double>>() {
            public boolean filter(Tuple3<Long, String, Double> in) throws Exception {
                return in.f2 > 50; // Corrected index to f2 for temperature
            }
        });


        //Further code can be added here for windowing and aggregation

         if (params.has("output")) {
            filterStreamA.writeAsText(params.get("output")); // Output the filtered stream
        }

        env.execute("TemperatureAlert");


    }

}

The code includes sections for reading input, mapping to tuples, filtering based on temperature, and writing the output. Further windowing and aggregation logic can be added as needed. The timestamp extraction is also shown, converting the timestamp to milliseconds.