2 回答

TA貢獻2037條經驗 獲得超6個贊
我認為對于您的用例,最好使用ProcessFunction。您可以做的是在第一個事件到來時注冊一個 EventTimeTimer。比在onTimer方法中發出結果。
就像是:
public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {
@Override
public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
throws Exception {
// retrieve the current aggregate
ResultData current = state.value();
if (current == null) {
// first event arrived
current = new ResultData();
// register end of window
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
}
// update the state's aggregate
current += value;
// write the state back
state.update(current);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
throws Exception {
// get the state for the key that scheduled the timer
ResultData result = state.value();
out.collect(result);
// reset the window state
state.clear();
}
}

TA貢獻1851條經驗 獲得超3個贊
不久前我有一個關于事件時間窗口的類似問題。這是我的流的樣子
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//Consumer Setup
val stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new WMAssigner)
// Additional Setup here
stream
.keyBy { data => data.findValue("service") }
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process { new WindowProcessor }
//Sinks go here
我的 WMAssigner 類看起來像這樣(注意:這允許 1 分鐘的亂序事件發生,如果您不想延遲,您可以擴展不同的時間戳提取器):
class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {
override def extractTimestamp(element: ObjectNode): Long = {
val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
tsStr.toLong
}
}
我想用于水印的時間戳是 data.ts 字段。
我的窗口處理器:
class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
val out = ""
elements.foreach( value => {
out = value.findValue("data").findValue("outData")
}
out.collect(out)
}
}
如果有任何不清楚的地方,請告訴我
添加回答
舉報