亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

初始化MapState的內容

初始化MapState的內容

qq_遁去的一_1 2023-10-12 14:39:29
我實現了一個RichFunction具有以下結構的 Flink:public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {    private MapState<String, MyState> myState;                  @Override    public void open(Configuration conf)throws Exception{        myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));    }    @Override    public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {        MyState state = myState.get(value.ID());        // Do things    }    @Override    public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {        state.put(value.ID(), value.state());   // Update the mapState with value from broadcast    }    // retrieve all the state values and put them in the MapState    private void initialState() throws Exception{       Map<String, MyState> initialValues = ...;       this.cameras.putAll(initialValues);    }}該mapState變量存儲通過BroadcastedStream. 更新是在processBroadcastElement()函數中完成的。在作業開始時,我想mapState使用該initialState()函數來初始化。問題是我無法在函數中使用它open()(請參閱此處原因)在這種情況下初始化的正確方法是什么mapState?(在所有使用 RichFunctions 的情況下)
查看完整描述

2 回答

?
子衿沉夜

TA貢獻1828條經驗 獲得超3個贊

您想要實現 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction


當您這樣做時,您將實現兩種方法:


@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {


    // called when it's time to save state


    myState.clear();


        // Update myState with current application state 


}


@Override

public void initializeState(FunctionInitializationContext context) throws Exception {


    // called when things start up, possibly recovering from an error


    descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));


    myState = context.getKeyedStateStore().getMapState(descriptor);


    if (context.isRestored()) {


        // restore application state from myState  


    }       


}

您可以在initializeState() 方法而不是open() 中初始化myState 變量。


查看完整回答
反對 回復 2023-10-12
?
梵蒂岡之花

TA貢獻1900條經驗 獲得超5個贊

我不相信你實際上可以在initializeState()中初始化廣播狀態。修改廣播狀態的唯一方法是通過在 processBroadcastElement 方法中獲得的讀/寫上下文。

但是你可以做的是在initializeState中使用context.isRestored()來確定KeyedBroadcastProcessFunction是否是第一次初始化,并設置一個瞬態局部變量來記錄此信息。然后,第一次調用 processBroadcastElement 方法時,您可以使用此信息來決定在廣播狀態中存儲什么。但您必須在廣播流上發送一些內容才能啟動此操作。


查看完整回答
反對 回復 2023-10-12
  • 2 回答
  • 0 關注
  • 115 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號