亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

SpringBoot集成RabbitMQ-動態注入Bean方式

標簽:
Spring

实现Direct,Fanout,Topic和死信转发方式实现的延迟队列

一个让处女座程序员很难受的问题:
每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
怎么解决呢,思路:
通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码

一 使用场景说明
1.Direct
根据routekey精确匹配消费,只消费一次
2.Fanout
广播消息队列,同交换机内的所有消费者,都接收到消息
3.Topic
支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
4.TTL
延迟队列,实现消息延迟指定时间消费

二 关键代码

  1. 配置类:

import com.google.common.collect.Maps;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.beans.BeansException;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.beans.factory.support.BeanDefinitionBuilder;import org.springframework.beans.factory.support.BeanDefinitionRegistry;import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;import java.util.Map;/**
 * @author onlinever
 * @date 2018/09/06
 */@Servicepublic class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {    private ApplicationContext applicationContext;    private BeanDefinitionRegistry beanDefinitionRegistry;    private String adapterSuffix = "Adapter";    private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap();    private List<TopicConsumer> topicConsumers;    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {        this.beanDefinitionRegistry = beanDefinitionRegistry;        //声明交换机
        declareExchange();        //声明队列和绑定
        declareQueueAndBinding();        //奇怪的执行顺序
        if (haveTopicQueue()) {
            declareTopicMessageListenerAdapter();
            declareTopicMessageListenerContainer();
        }
    }    private boolean haveTopicQueue() {        try {
            topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values());            return !topicConsumers.isEmpty();
        } catch (Exception e) {
            System.out.println(e.getMessage());            return false;
        }
    }    /**
     * 声明交换机
     */
    private void declareExchange() {        for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) {            switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) {                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder
                            .fanoutExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());                    break;                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder
                            .topicExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());                    break;                default:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder
                            .directExchange(rabbitExchangeEnum.getExchangeName())
                            .durable(true)
                            .build()).getBeanDefinition());                    break;
            }
        }
    }    /**
     * 声明队列和绑定
     */
    private void declareQueueAndBinding() {
        String bindingSuffix = "Binding";        for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) {            //注册所有队列
            beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> {
                Queue queue;                switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {                    case TTL_QUEUE:
                        queue = QueueBuilder
                                .durable(rabbitQueueEnum.getRouteKey())                                // 配置到期后转发的交换
                                .withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName())                                // 配置到期后转发的路由键
                                .withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey())
                                .build();                        break;                    case TOPIC_QUEUE:
                        queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey())));
                        topicQueues.put(rabbitQueueEnum, queue);                        break;                    default:
                        queue = new Queue(rabbitQueueEnum.getRouteKey());                        break;
                }                return queue;
            }).getBeanDefinition());            //注册队列与交换机的绑定
            switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {                case FANOUT_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition());                    break;                case NORMAL_QUEUE:                case TTL_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class))
                            .with(rabbitQueueEnum.getRouteKey())).getBeanDefinition());                    break;                case TOPIC_QUEUE:
                    beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
                            .bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
                            .to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class))
                            .with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition());                    break;                default:                    break;
            }
        }
    }    /**
     * 声明Topic消息监听适配器
     */
    private void declareTopicMessageListenerAdapter() {
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition()));
    }    /**
     * 声明Topic消息监听容器
     */
    private void declareTopicMessageListenerContainer() {
        String containerSuffix = "Container";
        topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix,
                BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> {
                    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                    container.setQueues(topicQueues.get(topicConsumer.getQueueEnum()));
                    container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class));
                    container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix));
                    container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class));                    return container;
                }).getBeanDefinition()));
    }    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;
    }
}
  1. 枚举类
    2.1 交换机类型枚举

import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.TopicExchange;/**
 * @author onlinever
 * @date 2018/09/06
 */public enum RabbitExchangeTypeEnum {    /**
     * 死信转发方式延迟队列
     */
    TTL_QUEUE(1, DirectExchange.class),    /**
     * 正常队列
     */
    NORMAL_QUEUE(2, DirectExchange.class),    /**
     * 广播队列
     */
    FANOUT_QUEUE(3, FanoutExchange.class),    /**
     * topic队列
     */
    TOPIC_QUEUE(4, TopicExchange.class);    /**
     * 队列routeKey
     */
    private int index;    /**
     * 交换机class
     */
    private Class exchangeClazz;


    RabbitExchangeTypeEnum(int index, Class exchangeClazz) {        this.index = index;        this.exchangeClazz = exchangeClazz;
    }    public int getIndex() {        return index;
    }    public Class getExchangeClazz() {        return exchangeClazz;
    }
}

2.2 交换机枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */public enum RabbitExchangeEnum {    /**
     * rabbit交换机名称
     * 默认一个应用设置一个交换机
     * exchange.{0}.{1}
     * 0: 交换机类型 direct、topic、fanout、headers
     * 1: 应用名称
     */
    DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE),
    FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE),
    TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),;    /**
     * 交换机beanName
     */
    private String beanName;    /**
     * 交换机key
     */
    private String exchangeName;    /**
     * 交换机类型
     */
    private RabbitExchangeTypeEnum rabbitExchangeTypeEnum;

    RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) {        this.beanName = beanName;        this.exchangeName = exchangeName;        this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum;
    }    public String getExchangeName() {        return exchangeName;
    }    public String getBeanName() {        return beanName;
    }    public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() {        return rabbitExchangeTypeEnum;
    }
}

2.3 队列枚举

/**
 * @author onlinever
 * @date 2018/09/06
 */public enum RabbitQueueEnum {

    ;    /**
     * 队列BeanName
     */
    private String beanName;    /**
     * 队列routeKey
     */
    private String routeKey;    /**
     * 交换机
     */
    private RabbitExchangeEnum exchangeEnum;    /**
     * 死信转发到队列
     */
    private RabbitQueueEnum rabbitQueueEnum;


    RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) {        this.beanName = beanName;        this.routeKey = routeKey;        this.exchangeEnum = exchangeEnum;        this.rabbitQueueEnum = rabbitQueueEnum;
    }    public String getRouteKey() {        return routeKey;
    }    public RabbitExchangeEnum getExchangeEnum() {        return exchangeEnum;
    }    public String getExchangeName() {        return exchangeEnum.getExchangeName();
    }    public String getBeanName() {        return beanName;
    }    public RabbitQueueEnum getRabbitQueueEnum() {        return rabbitQueueEnum;
    }
}
  1. Topic消费者接口

/**
 * topic队列消费者
 *
 * @author onlinever
 * @date 2018/8/17
 */public interface TopicConsumer {    /**
     * 消费的队列
     *
     * @return 队列
     */
    RabbitQueueEnum getQueueEnum();    /**
     * 具体消费者的实现
     *
     * @param message 消息
     */
    void handleMessage(String message);
}
  1. 其他消费者使用@RabbitListener方式



作者:七岁能杀猪
链接:https://www.jianshu.com/p/e78341054ebc


點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消