A basic requirement of many stream processing programs is to be able to group data, and the grouped data share a common attribute. The DataStream API provides an abstraction called KeyedStream, which logically partitions the DataStream. The partitioned data has the same Key value, and the partitioned streams are not related to each other.
The state transition operation for KeyedStream can read data or write data to the state corresponding to the current event Key. This means that all events with the same Key can access the same state, that is, so these events can be processed together.
Be careful with state transition operations and Key-based aggregation operations. If there are more and more Key values, for example: Key is an order ID, we must clear the state corresponding to Key in time to avoid memory problems.
KeyedStream can be processed using map, flatMap and filter subclasses. Next, we will use the keyBy operator to convert DataStream into KeyedStream, and explain key-based conversion operations: rolling aggregation and reduce operator.
Keyed stream operator - keyBy
keyBy converts DataStream to KeyedStream by specifying key. Events in the stream will be assigned to different partitions based on different key s. All events with the same key will be processed in the same subtask slot of the next operator. Events with different key s can be handled in the same task. But the operator can only access the state corresponding to the key of the current event.
The same key must be sent to the same task slot, and different keys may be sent to the same task slot for processing
As shown in the figure, the color of the input event is used as the key, the black event is output to one partition, and the other colors are output to another partition.
The keyBy() method accepts a parameter that specifies a key or keys, and there are many different ways to specify keys.
The following example shows how to use the keyBy operator to group time by the user field:
public class KeyByOperator { // group by user public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // keyBy: Distribute the data of the same key to the same logical partition env .addSource(new ClickSource()) // The first generic type: stream data type // The second generic type: key type .keyBy(new KeySelector<Event, String>() { @Override public String getKey(Event value) throws Exception { return value.user; } }) .print(); env.execute(); } }
Basic Rolling Aggregation Operators
The rolling aggregation operator is called by KeyedStream and generates an aggregated DataStream, for example: sum, min, max. A rolling aggregation operator keeps an aggregated value for each observed key. For each input event, the operator will update the saved aggregation result and send an event with the updated value to the downstream operator. Rolling aggregation does not require a user-defined function, but it needs to accept a parameter, which specifies which field to perform the aggregation operation on. The DataStream API provides the following rolling aggregation methods.
Rolling aggregation operators can only be used in rolling windows, not in sliding windows.
- sum(): Perform a rolling addition operation on the specified field on the input stream.
- min(): Calculates the minimum value for the specified field on the input stream.
- max(): Calculates the maximum value for the specified field on the input stream.
- minBy(): Finds the minimum value for the specified field on the input stream and returns an event containing the currently observed minimum value.
- maxBy(): Calculates the maximum value for the specified field on the input stream and returns an event containing the currently observed maximum value.
Rolling aggregation operators cannot be used in combination, and only one single rolling aggregation operator can be used for each calculation.
A rolling aggregation operation saves a state for each key. Because the state is never cleared, rolling aggregation operators can only be used on streams with a limited number of key s.
The following example conducts keyBy on the first element of the data in Tuple2 format and then aggregates statistics:
public class AGGReduceOperator { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements( Tuple2.of("a", 1), Tuple2.of("a", 3), Tuple2.of("b", 3), Tuple2.of("b", 4) ); stream.keyBy(r -> r.f0).sum(1).print("sum: "); stream.keyBy(r -> r.f0).sum("f1").print("sum -> f1: "); stream.keyBy(r -> r.f0).max(1).print("max: "); stream.keyBy(r -> r.f0).max("f1").print("max -> f1: "); stream.keyBy(r -> r.f0).min(1).print("min: "); stream.keyBy(r -> r.f0).min("f1").print("min -> f1: "); stream.keyBy(r -> r.f0).maxBy(1).print("maxBy: "); stream.keyBy(r -> r.f0).maxBy("f1").print("maxBy -> f1: "); stream.keyBy(r -> r.f0).minBy(1).print("minBy: "); stream.keyBy(r -> r.f0).minBy("f1").print("minBy -> f1: "); env.execute(); } } res: minBy: :11> (a,1) minBy -> f1: :3> (b,3) minBy -> f1: :3> (b,3) sum -> f1: :11> (a,1) min: :3> (b,3) min: :3> (b,3) maxBy: :3> (b,3) max -> f1: :3> (b,3) sum -> f1: :11> (a,4) maxBy: :11> (a,1) maxBy: :3> (b,4) maxBy: :11> (a,3) maxBy -> f1: :11> (a,1) max: :3> (b,3) max -> f1: :11> (a,1) minBy -> f1: :11> (a,1) min: :11> (a,1) minBy -> f1: :11> (a,1) min: :11> (a,1) minBy: :11> (a,1) max -> f1: :3> (b,4) min -> f1: :3> (b,3) min -> f1: :11> (a,1) min -> f1: :3> (b,3) maxBy -> f1: :3> (b,3) min -> f1: :11> (a,1) minBy: :3> (b,3) max -> f1: :11> (a,3) max: :3> (b,4) maxBy -> f1: :11> (a,3) max: :11> (a,1) sum -> f1: :3> (b,3) max: :11> (a,3) sum: :11> (a,1) sum -> f1: :3> (b,7) minBy: :3> (b,3) maxBy -> f1: :3> (b,4) sum: :3> (b,3) sum: :11> (a,4) sum: :3> (b,7)
Rolling aggregation operator - Reduce
The reduce operator is a generalization of rolling aggregation. It applies a ReduceFunction to a KeyedStream. The reduce operator will aggregate each input event and the currently reduced value. The reduce operation does not change the event type of the stream. The output stream data type is the same as the input stream data type.
The reduce function can be created by implementing the interface ReduceFunction to create a class. The ReduceFunction interface defines the reduce() method, which takes two input events and outputs one event of the same type.
The following example adds up the scores of the same users:
public class ReduceOperator { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .fromElements( Tuple2.of("Alick",98), Tuple2.of("Alick",69), Tuple2.of("Bob",91), Tuple2.of("Tom",87), Tuple2.of("Bob",69) ) .keyBy(elem -> elem.f1) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0,value1.f1 + value2.f1); } }) .print(); env.execute(); } } res: 16> (Bob,91) 3> (Alick,98) 16> (Tom,87) 3> (Alick,69) 3> (Alick,138)