实现Direct,Fanout,Topic和死信转发方式实现的延迟队列
一个让处女座程序员很难受的问题:
每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
怎么解决呢,思路:
通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码
一 使用场景说明
1.Direct
根据routekey精确匹配消费,只消费一次
2.Fanout
广播消息队列,同交换机内的所有消费者,都接收到消息
3.Topic
支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
4.TTL
延迟队列,实现消息延迟指定时间消费
二 关键代码
配置类:
import com.google.common.collect.Maps;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.beans.BeansException;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.beans.factory.support.BeanDefinitionBuilder;import org.springframework.beans.factory.support.BeanDefinitionRegistry;import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;import java.util.Map;/** * @author onlinever * @date 2018/09/06 */@Servicepublic class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware { private ApplicationContext applicationContext; private BeanDefinitionRegistry beanDefinitionRegistry; private String adapterSuffix = "Adapter"; private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap(); private List<TopicConsumer> topicConsumers; @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { this.beanDefinitionRegistry = beanDefinitionRegistry; //声明交换机 declareExchange(); //声明队列和绑定 declareQueueAndBinding(); //奇怪的执行顺序 if (haveTopicQueue()) { declareTopicMessageListenerAdapter(); declareTopicMessageListenerContainer(); } } private boolean haveTopicQueue() { try { topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values()); return !topicConsumers.isEmpty(); } catch (Exception e) { System.out.println(e.getMessage()); return false; } } /** * 声明交换机 */ private void declareExchange() { for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) { switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) { case FANOUT_QUEUE: beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder .fanoutExchange(rabbitExchangeEnum.getExchangeName()) .durable(true) .build()).getBeanDefinition()); break; case TOPIC_QUEUE: beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder .topicExchange(rabbitExchangeEnum.getExchangeName()) .durable(true) .build()).getBeanDefinition()); break; default: beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder .directExchange(rabbitExchangeEnum.getExchangeName()) .durable(true) .build()).getBeanDefinition()); break; } } } /** * 声明队列和绑定 */ private void declareQueueAndBinding() { String bindingSuffix = "Binding"; for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) { //注册所有队列 beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> { Queue queue; switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) { case TTL_QUEUE: queue = QueueBuilder .durable(rabbitQueueEnum.getRouteKey()) // 配置到期后转发的交换 .withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName()) // 配置到期后转发的路由键 .withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey()) .build(); break; case TOPIC_QUEUE: queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey()))); topicQueues.put(rabbitQueueEnum, queue); break; default: queue = new Queue(rabbitQueueEnum.getRouteKey()); break; } return queue; }).getBeanDefinition()); //注册队列与交换机的绑定 switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) { case FANOUT_QUEUE: beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class)) .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition()); break; case NORMAL_QUEUE: case TTL_QUEUE: beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class)) .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class)) .with(rabbitQueueEnum.getRouteKey())).getBeanDefinition()); break; case TOPIC_QUEUE: beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class)) .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class)) .with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition()); break; default: break; } } } /** * 声明Topic消息监听适配器 */ private void declareTopicMessageListenerAdapter() { topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix, BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition())); } /** * 声明Topic消息监听容器 */ private void declareTopicMessageListenerContainer() { String containerSuffix = "Container"; topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix, BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setQueues(topicQueues.get(topicConsumer.getQueueEnum())); container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class)); container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix)); container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class)); return container; }).getBeanDefinition())); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
枚举类
2.1 交换机类型枚举
import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.TopicExchange;/** * @author onlinever * @date 2018/09/06 */public enum RabbitExchangeTypeEnum { /** * 死信转发方式延迟队列 */ TTL_QUEUE(1, DirectExchange.class), /** * 正常队列 */ NORMAL_QUEUE(2, DirectExchange.class), /** * 广播队列 */ FANOUT_QUEUE(3, FanoutExchange.class), /** * topic队列 */ TOPIC_QUEUE(4, TopicExchange.class); /** * 队列routeKey */ private int index; /** * 交换机class */ private Class exchangeClazz; RabbitExchangeTypeEnum(int index, Class exchangeClazz) { this.index = index; this.exchangeClazz = exchangeClazz; } public int getIndex() { return index; } public Class getExchangeClazz() { return exchangeClazz; } }
2.2 交换机枚举
/** * @author onlinever * @date 2018/09/06 */public enum RabbitExchangeEnum { /** * rabbit交换机名称 * 默认一个应用设置一个交换机 * exchange.{0}.{1} * 0: 交换机类型 direct、topic、fanout、headers * 1: 应用名称 */ DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE), FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE), TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),; /** * 交换机beanName */ private String beanName; /** * 交换机key */ private String exchangeName; /** * 交换机类型 */ private RabbitExchangeTypeEnum rabbitExchangeTypeEnum; RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) { this.beanName = beanName; this.exchangeName = exchangeName; this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum; } public String getExchangeName() { return exchangeName; } public String getBeanName() { return beanName; } public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() { return rabbitExchangeTypeEnum; } }
2.3 队列枚举
/** * @author onlinever * @date 2018/09/06 */public enum RabbitQueueEnum { ; /** * 队列BeanName */ private String beanName; /** * 队列routeKey */ private String routeKey; /** * 交换机 */ private RabbitExchangeEnum exchangeEnum; /** * 死信转发到队列 */ private RabbitQueueEnum rabbitQueueEnum; RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) { this.beanName = beanName; this.routeKey = routeKey; this.exchangeEnum = exchangeEnum; this.rabbitQueueEnum = rabbitQueueEnum; } public String getRouteKey() { return routeKey; } public RabbitExchangeEnum getExchangeEnum() { return exchangeEnum; } public String getExchangeName() { return exchangeEnum.getExchangeName(); } public String getBeanName() { return beanName; } public RabbitQueueEnum getRabbitQueueEnum() { return rabbitQueueEnum; } }
Topic消费者接口
/** * topic队列消费者 * * @author onlinever * @date 2018/8/17 */public interface TopicConsumer { /** * 消费的队列 * * @return 队列 */ RabbitQueueEnum getQueueEnum(); /** * 具体消费者的实现 * * @param message 消息 */ void handleMessage(String message); }
其他消费者使用@RabbitListener方式
作者:七岁能杀猪
链接:https://www.jianshu.com/p/e78341054ebc
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦