我正在嘗試配置以下流程:當消息到達 Rabbit 隊列時嘗試獲取鎖,查詢遠程文件服務器以獲取某些文件,并為找到的每個文件向另一個隊列發送新消息,并在發送所有文件后釋放鎖文件。IntegrationFlows.from(Amqp.inboundGateway(container) .messageConverter(messageConverter)) .filter(m -> lockService.acquire()) .transform(m -> remoteFileTemplate.list(inputDirectory)) .split() .handle(Amqp.outboundAdapter(amqpTemplate) .exchangeName("") .routingKey(routingKey) .aggregate() .handle(m -> { log.info("Releasing lock"); lock.release(); }) .get();問題是流在第.handle一種方法之后停止(老實說,正如預期的那樣),我無法弄清楚如何配置它來做我想做的事?我嘗試使用.wireTapand.publishSubscribeChannel但這使得 2 個流相互不依賴,并且我的鎖在文件實際發送之前被釋放。如果有人可以幫助我解釋如何使用 DSL 修復它會很棒,因為我正在動態創建這些流......
1 回答
米脂
TA貢獻1836條經驗 獲得超3個贊
拆分后的 pub/sub,在一個子流上使用 AMQP 處理程序,而在另一個子流上使用聚合器應該可以正常工作。
每個都將在同一個線程上連續調用,最后一條消息導致從聚合器釋放,再次在同一個線程上。
話雖如此,您將需要在入站網關上進行一些 errorChannel 處理,以在發生錯誤時釋放鎖。
編輯
一個不太復雜的解決方案是ChannelInterceptor在轉換之前而不是過濾器之前在通道上進行自定義,以鎖定preSend()并釋放它afterSendCompleted()(成功和失敗都調用它)。
添加回答
舉報
0/150
提交
取消
