亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Apache Flink - 事件時間窗口

Apache Flink - 事件時間窗口

米琪卡哇伊 2021-08-19 17:44:20
我想在 Apache flink 中創建鍵控窗口,以便每個鍵的窗口在鍵的第一個事件到達后 n 分鐘執行。是否可以使用事件時間特性來完成(因為處理時間取決于系統時鐘,并且不確定第一個事件何時到達)。如果可能,請解釋事件時間和水印的分配也給事件,并解釋如何在 n 分鐘后調用進程窗口函數。以下是一部分代碼,可以讓您了解我目前在做什么:            //Make keyed events so as to start a window for a key            KeyedStream<SourceData, Tuple> keyedEvents =                     env.addSource(new MySource(configData),"JSON Source")                    .assignTimestampsAndWatermarks(new MyTimeStamps())                    .setParallelism(1)                    .keyBy("service");            //Start a window for windowTime time            DataStream<ResultData> resultData=                    keyedEvents                    .timeWindow(Time.minutes(winTime))                    .process(new ProcessEventWindow(configData))                    .name("Event Collection Window")                    .setParallelism(25);那么,我將如何分配事件時間和水印,以便窗口遵循第一個事件的事件時間作為起點并在 10 分鐘后執行(第一個事件的開始時間對于不同的鍵可能不同)。任何幫助將非常感激。        /------------ ( window of 10 minutes )Streams          |------------ ( window of 10 minutes )            \------------ ( window of 10 minutes )編輯:我用于分配時間戳和水印的類public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {    @Override    public long extractTimestamp(SourceData element, long previousElementTimestamp) {          //Will return epoch of currentTime        return GlobalUtilities.getCurrentEpoch();    }    @Override    public Watermark getCurrentWatermark() {        // TODO Auto-generated method stub        //Will return epoch of currentTime + 10 minutes        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));    }}
查看完整描述

2 回答

?
阿晨1998

TA貢獻2037條經驗 獲得超6個贊

我認為對于您的用例,最好使用ProcessFunction。您可以做的是在第一個事件到來時注冊一個 EventTimeTimer。比在onTimer方法中發出結果。


就像是:


public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {


    @Override

    public void processElement(SourceData value, Context ctx, Collector<ResultData> out)

        throws Exception {


        // retrieve the current aggregate

        ResultData current = state.value();

        if (current == null) {

            // first event arrived

            current = new ResultData();

            // register end of window

            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);

        }


        // update the state's aggregate

        current += value;


        // write the state back

        state.update(current);

    }


    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)

        throws Exception {


        // get the state for the key that scheduled the timer

        ResultData result = state.value();


        out.collect(result);


        // reset the window state

        state.clear();

    }

}


查看完整回答
反對 回復 2021-08-19
?
皈依舞

TA貢獻1851條經驗 獲得超3個贊

不久前我有一個關于事件時間窗口的類似問題。這是我的流的樣子


val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


//Consumer Setup


val stream = env.addSource(consumer)

  .assignTimestampsAndWatermarks(new WMAssigner)


// Additional Setup here


stream

  .keyBy { data => data.findValue("service") }

  .window(TumblingEventTimeWindows.of(Time.minutes(10)))

  .process { new WindowProcessor }


  //Sinks go here

我的 WMAssigner 類看起來像這樣(注意:這允許 1 分鐘的亂序事件發生,如果您不想延遲,您可以擴展不同的時間戳提取器):


class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {

  override def extractTimestamp(element: ObjectNode): Long = {

    val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")

    tsStr.toLong

  }

}

我想用于水印的時間戳是 data.ts 字段。


我的窗口處理器:


class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {

    val out = ""

    elements.foreach( value => {

      out = value.findValue("data").findValue("outData")

    }

    out.collect(out)

  }

}

如果有任何不清楚的地方,請告訴我


查看完整回答
反對 回復 2021-08-19
  • 2 回答
  • 0 關注
  • 190 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號