我有一個在 Flink 中維護配置的用例,但我真的不知道如何處理。假設我在某處存儲了一些配置,并且我需要它來進行處理。在Flink作業初始化時,我想加載所有配置。這個配置也可以在Flink作業運行期間修改,所以我必須在內存中保存這個配置的狀態,并在需要時更新它。配置的更新可以從 KafkaSource 訪問。這就是我所擁有的:我有一個函數可以加載整個配置,將其保持在某種狀態并將其與我的數據流關聯:public class MyConfiguration extends RichFlatMapFunction<Row, Row>{ private transient MapState<String, MyConfObject> configuration; @Override public void open(MyConfiguration config) throws Exception{ MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>( "configuration", BasicTypeInfo.STRING_TYPE_INFO, ... ); configuration = getRuntimeContext().getMapState(descriptor); configuration.putAll(...); // Load configuration from somewhere } @Override public void flatMap(Row value, Collector<Row> out) throws Exception { MyConfObject conf = configuration.get(...); ... // Associate conf with data out.collect(value); }}我的管道看起來像這樣:DataStream<Row> dataStream = ...; // My data streamDataStream<Map<String, MyConfObject> streamConf = env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates .map(...); return dataStream .assignTimestampsAndWatermarks(...) .flatMap(new MyConfiguration()) ... //Do some processing .map(m -> { ObjectMapper objectMapper = new ObjectMapper(); String json = objectMapper.writeValueAsString(m); return json.getBytes(); });我想要的是使用配置更新流streamConf來更新平面地圖函數內的 State 變量MyConfiguration。我怎樣才能做到這一點 ?
1 回答

江戶川亂折騰
TA貢獻1851條經驗 獲得超5個贊
我建議您編寫一個源代碼,從 Kafka 讀取配置信息,然后通過廣播流將配置更改廣播到映射函數。映射函數將以持久狀態存儲完整的當前配置,而廣播流意味著映射函數的所有實例都將獲得所有配置更改。
添加回答
舉報
0/150
提交
取消