2 回答

TA貢獻1780條經驗 獲得超5個贊
在花了一些時間尋找更好的解決方案后,我堅持以下幾點:
package test;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
public class TestQueue {
private static final String QUEUE_NAME = "TestQueue";
private static final Logger log = LoggerFactory.getLogger(TestQueue.class);
public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(regionProvider.getRegion())
.build();
// custom QueueMessageHandler to initialize only this queue
CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
queueMessageHandler.init(this);
queueMessageHandler.afterPropertiesSet();
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
}
@SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
}
}
和習俗QueueMessageHandler:
package test;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
public class CustomQueueMessageHandler extends QueueMessageHandler {
public void init(Object handler) {
detectHandlerMethods(handler);
}
}
的唯一目的CustomQueueMessageHandler是傳遞一個應該掃描 SQS 注釋的對象。由于我不使用 Spring Context 啟動它,因此它不會在每個 bean 中搜索注釋@SqsListener。但所有的初始化都隱藏在受保護的方法后面。這就是為什么我需要覆蓋該類以訪問這些 init 方法。
我不認為這是一個非常優雅的解決方案,手動創建所有 AWS 客戶端內容并調用 bean init 方法。但這是我能找到的唯一仍然可以訪問 AWS SQS 庫的所有功能的解決方案,例如轉換傳入消息和通知、刪除策略、隊列輪詢(包括故障處理)等。

TA貢獻1744條經驗 獲得超4個贊
@Bean您可以為您希望使用 custom 的帳戶聲明不同的s @Qualifier。假設您在兩個不同的帳戶中有兩個 SQS 隊列。然后聲明兩個類型為 的 bean AmazonSQS。
@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(credentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(anotherCredentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
然后在你的服務中,你可以為@Autowired他們服務。
@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
添加回答
舉報