我正在使用 flink 來消耗 kafka 并寫入 redis。這是我對redis的接收器功能: .addSink(new RichSinkFunction<MobilePageEvent>() { @Override public void invoke(MobilePageEvent event, Context context) { JEDIS_CLUSTER.zadd(..); } }) .name("redis sink");雖然我可以從 redis 命令行獲取數據,但指標顯示 sink 函數的輸出為零:我怎樣才能增加這個指標?
1 回答

qq_笑_17
TA貢獻1818條經驗 獲得超7個贊
numRecordsIn 和 numRecordsOut 指標僅計算在 Flink 作業本身內流動的流記錄,不包括與外部系統的通信。所以換句話說,源不報告任何記錄進來,匯不報告任何記錄出去。
在我看來,您有幾個選擇:
使用接收器上的 numRecordsIn 指標作為您想知道的近似值
fork 或擴展 RedisSink 并添加您想要的指標
此處顯示了添加計數器度量的模式。
在 redis sink 的情況下,您可以在 open() 方法中初始化一個 Counter,并在 invoke() 中遞增它。但這似乎毫無意義,因為這只會反映 numRecordsIn 指標。如果您的 redis 接收器正在執行緩沖批量寫入,那么等待增加指標直到數據實際發送到 redis 可能更有意義——在這種情況下,您可能更愿意使用 Meter 而不是 Counter。
添加回答
舉報
0/150
提交
取消