[big data] take you to understand and use Time, Window and Windows Function in flyk

Reminder: the sample code in this article is based on flink1 13. When talking about the use of window, we will also explain the abandonment of some APIs in the Flink version.

1, Introduction to Time

Streaming in flash defines a variety of streaming Processing Time, including Event Time, reception time and Processing Time.

Event Time: refers to the time when an event is generated, such as the time when a piece of data in the business database is generated, the time when a piece of log data is generated, etc.

Ingestion Time: refers to the time when the flash receives data.

Processing Time: refers to the time when the data is processed by the flick operator.

In real business code development, we often use Event TIme and Processing Time.

2, The concept of Window

In flink, streaming computing is designed as a data processing engine for processing infinite data sets, in which infinite data sets refer to a data set with continuous data. Window (window) cuts the unbounded data stream into bounded data stream for processing. The implementation method is to distribute the stream to a bucket of limited size for analysis.

3, Type of Window

1. Classification relation
  • TimeWindow: generate a Window according to a certain time (e.g. every 5 seconds)
  • CountWindow: generates a Window according to the specified amount of data, which is independent of time (for example, every 20 elements).
  • Tumbling window
  • Sliding Window
  • Session Window

The relationship between them

2. Scrolling windows

Slice the data according to the fixed window length, which is characterized by time alignment, fixed window length and no overlap. For example, if you specify a 5-minute scrolling window, the creation of the window is shown in the following figure:


It is suitable for aggregation calculation in each time period, such as counting the hot search words of users every 5 minutes

3. Sliding Windows

Sliding window is a more generalized form of fixed window. Sliding window is composed of fixed window length and sliding interval. It is characterized by time alignment, fixed window length and overlap. For example, if you have a 10 minute window and a 5-minute slide, the 5-minute window in each window contains the data generated in the last 10 minutes, as shown in the figure below:


It is applicable to the statistics of the latest time period. For example, the failure rate of an interface in the last 5min is calculated to determine whether to alarm.

4. Session window (Session Windows)

It is composed of a series of events combined with a timeout gap of a specified length of time, which is similar to the session of a web application, that is, if new data is not received for a period of time, a new window will be generated. Its characteristics are: the window size is determined by the data itself, and it has no fixed start and end time. The session window is divided into different windows according to the Session gap. When a window does not receive new data within the time greater than the Session gap, the window will be closed. For example, if the set time gap is 5 seconds, the window will be triggered when the difference between adjacent records is > = 5 seconds.


It is applicable to the scenario that the average page access time of each user in an independent session and the interval between the two sessions is 15 minutes.

4, Use of windows

The use of window operator APIs can be divided into key and no key. Their APIs are written as follows:

Keyed Windows

Non-Keyed Windows

Let's explain the use of APIs according to the above window classification relationship, and the examples are Keyed Windows. We can understand non Keyed Windows by analogy with using the corresponding api.

1,Time Window

In time Window, we often use timeWindow in the old version of flink, as shown in the following figure:


Enter a Time Seconds (n) is a scrolling window, and two inputs are sliding windows. Note that the default Time used here is Processing Time.

In flink1 The method in 13 is outdated and can be used, but it is not recommended. Please use the native window, as shown below:

Specify the window type and time type to be used in the window parameter. This example shows that the scrolling window is used, and the time type is Processing Time

2,Count Window

Count Window is often used in flink
countWindow (not outdated in the advanced version). One parameter is the rolling window. If two parameters are passed in, it is the sliding window, as shown in the following figure:

3. Customize Window

As shown in the figure below, Keyed Windows and non Keyed Windows use customized interfaces. You can define trigger, evictor, allowedlatency, sideoutputlatedata, etc. to customize the window.

4. Examples
4.1 example of scrolling window

Requirement: count the words appearing in the last 5s every 5s

code:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows

/**
  * todo: Rolling window application based on timing -- Tumbling Windows
  *
  * Scroll the window and count the words appearing in the last 5s every 5s
  */
object TestTimeWindowByTumbling {

  def main(args: Array[String]): Unit = {
    //todo:1. Get the environment of streaming processing
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //todo:2. Get data source
    val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)

    //todo: 3. Operate and process data
    socketTextStream.flatMap(x=>x.split(" "))
      .map(x=>(x, 1))
      .keyBy(_._1)
      // . timeWindow(Time.seconds(5)) / / obsolete
      // scroll window
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
      .print()

    //todo: 4. Start
    env.execute("TestTimeWindowByTumbling")

  }
}
4.2 example of sliding window

Requirement: count the words appearing in the last 10s every 5s

code:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * todo: Timing based scrolling window application -- Sliding Windows
  *
  * Slide the window and count the words appearing in the last 10s every 5s
  */
object TestTimeWindowBySliding {

  def main(args: Array[String]): Unit = {
    //todo:1. Get the environment of streaming processing
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //todo:2. Get data source
    val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)

    //todo: 3. Operate and process data
    socketTextStream.flatMap(x=>x.split(" "))
                    .map(x=>(x, 1))
                    .keyBy(_._1)
//                    .timeWindow(Time.seconds(15),Time.seconds(5)) / / obsolete
                    //sliding window
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                    .sum(1)
                    .print()

    //todo: 4. Start
    env.execute("TestTimeWindowBySliding")

  }
}

5, window Function

1. Classification

The window function defines the calculation operation to be performed on the data collected in the window. It can be divided into two categories:

  • Incremental aggregation functions, which calculates every data and keeps a simple state. Typical incremental aggregation functions include ReduceFunction and AggregateFunction.
  • full window functions, which collects all the data of the window first, and will traverse all the data when calculating. Typical full window aggregation functions include apply and process.
2. Incremental aggregation statistics

Every time a piece of data is added to the window, statistics will be carried out. Common incremental aggregation operators include reduce(reduceFunction), aggregate(aggregateFunction)sum(), min(), max(), etc.

Example:

Requirement: count the cumulative number of words every 5 seconds by receiving the words input in the socket

Code using reduce

import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
//import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._


/**
 * Incremental aggregate function
 * Count the cumulative value of data every 5 seconds by receiving the data input in the socket
 * Incremental aggregation of timing window data based on 'reduce' function
 */
object TestReduceOfTimeWindow {

  def main(args: Array[String]): Unit = {
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 19999)

    socketTextStream.flatMap(x => x.split(" "))
      .map(x=>(x, 1))
//      .keyBy(0)
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce((c1, c2) => (c1._1, c1._2+c2._2))
      .print()

    env.execute()
  }

}

Code using aggregate

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.AggregateFunction

/**
 * Incremental aggregate function
 * Incremental aggregation of timing window data based on 'aggregate' function
 */
object TestAggregateOfTimeWindow {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 9999)

    socketTextStream.flatMap(x => x.split(" "))
      .map(x => (x, 1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .aggregate(new MyAggregateFunction)
      .print()

    env.execute("TestAggregateOfTimeWindow")
  }
}

//Custom AggregateFunction function
class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {

  var initAccumulator: (String, Int) = ("", 0)

  //Initialization of accumulated value
  override def createAccumulator(): (String, Int) = {
    initAccumulator
  }

  //Cumulative element
  override def add(in: (String, Int), acc: (String, Int)): (String, Int) = {
    (in._1, acc._2 + in._2)
  }

  // Results of aggregation
  override def getResult(acc: (String, Int)): (String, Int) = {
    acc
  }

  //Distributed accumulation
  override def merge(acc: (String, Int), acc1: (String, Int)): (String, Int) = {
    (acc._1, acc._2 + acc1._2)
  }
}
3. Total aggregation statistics

Wait until the window is closed, or all the data in the window are available, and then make statistics.

The commonly used incremental aggregation operators are apply(windowFunction) and process(processWindowFunction), in which processWindowFunction provides more context information than windowFunction.

Example:

Requirement: count the average value of the input value in 5 seconds by receiving the value input in the socket.

Code implemented using the apply function:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TestApplyOfTimeWindow {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2. Obtain the data source, for example, enter the following data source
    /**
     * 1
     * 2
     * 3
     * 4
     * 5
     * 6
     */
    val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)

    socketStream.flatMap(x => x.split(" "))
      .map(x => ("countAvg", x.toInt))
//      .keyBy(0)
      //The key in keyBy is a virtual key and will not be output
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .apply(new MyApplyWindowFunction)
      .print()


    env.execute("TestApplyOfTimeWindow")
  }

}

class MyApplyWindowFunction extends WindowFunction[(String, Int), Double, String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[Double]): Unit = {
    //Recording times
    var totalNum = 0
    //Record the cumulative result
    var countNum = 0

    for(elem <- input) {
      totalNum += 1
      countNum += elem._2
    }

    out.collect(countNum/totalNum)

  }

}

Code implemented using the process function:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * Full aggregation of timing window data based on 'process' function
 */
object TestProcessOfTimeWindow {

  def main(args: Array[String]): Unit = {
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)

    socketStream.flatMap(x => x.split(" "))
      .map(x => ("countAvg", x.toInt))
      .keyBy(x => x._1)
      //Is the processingTime used. The interface is outdated
//      .timeWindow(Time.seconds(5))
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction)
      .print()
    env.execute("TestProcessOfTimeWindow")
  }

}

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), Double, String, TimeWindow] {
  override def process(key: String, context: Context,
                       elements: Iterable[(String, Int)],
                       out: Collector[Double]): Unit = {

    //Count times
    var totalNum = 0
    //Count accumulation times
    var countNum = 0

    for (elem <- elements) {
      totalNum += 1
      countNum += elem._2
    }

    //Calculate the average, totalNum Asinstanceof [Double] is to strongly convert totalNum to Double type
    out.collect(countNum/totalNum.asInstanceOf[Double])

  }
}

Tags: flink Windows time

Posted by baranwalpk on Thu, 14 Apr 2022 10:24:33 +0930