2 回答

TA貢獻1828條經驗 獲得超3個贊
您想要實現 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
當您這樣做時,您將實現兩種方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when it's time to save state
myState.clear();
// Update myState with current application state
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called when things start up, possibly recovering from an error
descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
myState = context.getKeyedStateStore().getMapState(descriptor);
if (context.isRestored()) {
// restore application state from myState
}
}
您可以在initializeState() 方法而不是open() 中初始化myState 變量。

TA貢獻1900條經驗 獲得超5個贊
我不相信你實際上可以在initializeState()中初始化廣播狀態。修改廣播狀態的唯一方法是通過在 processBroadcastElement 方法中獲得的讀/寫上下文。
但是你可以做的是在initializeState中使用context.isRestored()來確定KeyedBroadcastProcessFunction是否是第一次初始化,并設置一個瞬態局部變量來記錄此信息。然后,第一次調用 processBroadcastElement 方法時,您可以使用此信息來決定在廣播狀態中存儲什么。但您必須在廣播流上發送一些內容才能啟動此操作。
添加回答
舉報