Toggle navigation
Home
org.apache.flink.streaming.api.datastream.GroupedDataStream to org.apache.flink.streaming.api.datastream.KeyedStream
No. of Instances - 1
No. of Commits - 1
No. of Projects - {'flink'}
Hierarchy/Composition: -
Primitive Info: -
NameSpace: Internal -> Internal
Mapping:
Add or Remove Method invocation
stream.min(1).map(new OnceFailingIdentityMapFunction(NUM_INPUT)).groupBy(0)
to
stream.min(1).map(new OnceFailingIdentityMapFunction(NUM_INPUT)).keyBy(0)
stream.reduce(new ReduceFunction<Tuple2<Integer,Long>>(){ @Override public Tuple2<Integer,Long> reduce( Tuple2<Integer,Long> value1, Tuple2<Integer,Long> value2) throws Exception { return Tuple2.of(value1.f0,value1.f1 + value2.f1); } } ).groupBy(0)
to
stream.reduce(new ReduceFunction<Tuple2<Integer,Long>>(){ @Override public Tuple2<Integer,Long> reduce( Tuple2<Integer,Long> value1, Tuple2<Integer,Long> value2) throws Exception { return Tuple2.of(value1.f0,value1.f1 + value2.f1); } } ).keyBy(0)
stream.fold(Tuple2.of(0,0L),new FoldFunction<Tuple2<Integer,Long>,Tuple2<Integer,Long>>(){ @Override public Tuple2<Integer,Long> fold( Tuple2<Integer,Long> accumulator, Tuple2<Integer,Long> value) throws Exception { return Tuple2.of(value.f0,accumulator.f1 + value.f1); } } ).groupBy(0)
to
stream.fold(Tuple2.of(0,0L),new FoldFunction<Tuple2<Integer,Long>,Tuple2<Integer,Long>>(){ @Override public Tuple2<Integer,Long> fold( Tuple2<Integer,Long> accumulator, Tuple2<Integer,Long> value) throws Exception { return Tuple2.of(value.f0,accumulator.f1 + value.f1); } } ).keyBy(0)