亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

ActiveMQ入門&ActiveMQ與RocketMQ的對比

標簽:
Java

1. ActiveMQ入门

前面的文章已经写过MQ的相关概念,这里不再赘述。

1.1 ActiveMQ是什么

ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

1.2 ActiveMQ的特点

  • 支持多种语言编写客户端

  • 对Spring的支持,很容易和Spring整合

  • 支持多种传输协议:TCP,SSL,NIO,UDP等

  • 支持Ajax请求

1.3 ActiveMQ的安装

1.3.1 官网下载

http://activemq.apache.org/

5acb3c8700013dc501600160.jpg

5acb3c8700013dc501600160.jpg

5acb3c8700013dc501600160.jpg

解压后的文件夹结构:

5acb3c8700013dc501600160.jpg

1.3.2 启动ActiveMQ

直接双击这个“wrapper.exe”即可

5acb3c8700013dc501600160.jpg

之后可以在浏览器输入http://localhost:8161/

5acb3c8700013dc501600160.jpg

1.3.3 进入管理中心

点击Manage ActiveMQ broker,会弹出身份验证,输入admin,admin即可

5acb3c8700013dc501600160.jpg

5acb3c8700013dc501600160.jpg

1.4 搭建Maven工程框架

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.linkedbear</groupId>    <artifactId>ActiveMQ-Demo</artifactId>    <version>0.0.1-SNAPSHOT</version>   

    <properties>        <activemq.version>5.15.4</activemq.version>    </properties>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.0.0.RELEASE</version>    </parent>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <!-- ActiveMQ -->        <dependency>             <groupId>org.apache.activemq</groupId>             <artifactId>activemq-client</artifactId>             <version>${activemq.version}</version>         </dependency>       

        <!-- 热部署 -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-devtools</artifactId>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build></project>

1.5 创建工程目录结构

5acb3c8700013dc501600160.jpg

之前的文章中写过,JMS的消息传递有两种模式,前面的RocketMQ中只写了一对一模式,本篇文章将会编写两种模式。

1.6 一对一模式的Queue

1.6.1 生产者

/**

 * 生产者Controller

 * @Title ProducerQueueController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:49

*/@Controllerpublic class ProducerQueueController {    @RequestMapping("/queueProduceMessage")

    @ResponseBody    public Map<String, Object> queueProduceMessage() throws Exception {

        //JMS的使用比较类似于JDBC与Hibernate        //1. 创建一个连接工厂(类似于JDBC中的注册驱动),需要传入TCP协议的ActiveMQ服务地址        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 创建连接(类似于DriverManager.getConnection)        Connection connection = connectionFactory.createConnection();

        //3. 开启连接(ActiveMQ创建的连接是需要手动开启的)        connection.start(); //注意不是open。。。        //4. 获取session(类似于Hibernate中的session,都是用会话来进行操作)        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建一对一的消息队列        Queue queue = session.createQueue("test_queue");

        //6. 创建一条消息        String text = "test queue message" + Math.random();

        TextMessage message = session.createTextMessage(text);

        //7. 消息需要发送方,要创建消息发送方(生产者),并绑定到某个消息队列上        MessageProducer producer = session.createProducer(queue);

        //8. 发送消息        producer.send(message);

        //9. 关闭连接        producer.close();

        session.close();

        connection.close();

       

        //------显示发送的消息到视图上------        Map<String, Object> map = new HashMap<>();

        map.put("message", text);

        return map;

    }

}

1.6.2 消费者

/**

 * 消费者Controller

 * @Title ConsumerQueueController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:56

*/@Controllerpublic class ConsumerQueueController {    @RequestMapping("/queueGetMessage1")

    public void queueGetMessage1() throws Exception {

        //1. 创建一个连接工厂,需要传入TCP协议的ActiveMQ服务地址        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 创建连接        Connection connection = connectionFactory.createConnection();

        //3. 开启连接        connection.start(); //注意不是open。。。        //4. 获取session        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建一对一的消息队列        Queue queue = session.createQueue("test_queue");

       

        //------------前5步都是相同的,以下为不同----------------        //6. 创建消费者        MessageConsumer consumer = session.createConsumer(queue);

        //7. 使用监听器监听队列中的消息        consumer.setMessageListener(new MessageListener() {

            @Override            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage) message;

                try {

                    String text = textMessage.getText();

                    System.out.println("收到消息:" + text);

                } catch (JMSException e) {

                    e.printStackTrace();

                }

            }

        });

       

        //由于设置监听器后不能马上结束方法,要在这里加一个等待点        System.in.read();

       

        //8. 关闭连接        consumer.close();

        session.close();

        connection.close();

    }



    @RequestMapping("/queueGetMessage2")

    public void queueGetMessage2() throws Exception //(完全相同,不再重复)

}

1.6.3 运行结果

先执行两个消息的消费者

http://localhost:8080/queueGetMessage1

http://localhost:8080/queueGetMessage2

执行http://localhost:8080/queueProduceMessage

5acb3c8700013dc501600160.jpg

但是只收到一条消息

0ee8fb0d76af8b19d2f9266d89c1775c21d.jpg

1.7 一对多模式的Topic

1.7.1 生产者

/**

 * 生产者Controller

 * @Title ProducerTopicController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:49

*/@Controllerpublic class ProducerTopicController {    @RequestMapping("/topicProduceMessage")

    @ResponseBody    public Map<String, Object> topicProduceMessage() throws Exception {

        //JMS的使用比较类似于JDBC与Hibernate        //1. 创建一个连接工厂(类似于JDBC中的注册驱动),需要传入TCP协议的ActiveMQ服务地址        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 创建连接(类似于DriverManager.getConnection)        Connection connection = connectionFactory.createConnection();

        //3. 开启连接(ActiveMQ创建的连接是需要手动开启的)        connection.start(); //注意不是open。。。        //4. 获取session(类似于Hibernate中的session,都是用会话来进行操作)        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建一对多的消息广播        Topic topic = session.createTopic("test_topic");

        //6. 创建一条消息        String text = "test topic message" + Math.random();

        TextMessage message = session.createTextMessage(text);

        //7. 消息需要发送方,要创建消息发送方(生产者),并广播到某个消息广播端上        MessageProducer producer = session.createProducer(topic);

        //8. 发送消息        producer.send(message);

        //9. 关闭连接        producer.close();

        session.close();

        connection.close();

       

        //------显示发送的消息到视图上------        Map<String, Object> map = new HashMap<>();

        map.put("message", text);

        return map;

    }

}

1.7.2 消费者

/**

 * 消费者Controller

 * @Title ConsumerTopicController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:56

*/@Controllerpublic class ConsumerTopicController {    @RequestMapping("/topicGetMessage")

    public void topicGetMessage() throws Exception {

        //1. 创建一个连接工厂,需要传入TCP协议的ActiveMQ服务地址        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 创建连接        Connection connection = connectionFactory.createConnection();

        //3. 开启连接        connection.start(); //注意不是open。。。        //4. 获取session        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 创建一对多的消息广播        Topic topic = session.createTopic("test_topic");

       

        //------------前5步都是相同的,以下为不同----------------        //6. 创建消费者        MessageConsumer consumer = session.createConsumer(topic);

        //7. 使用监听器监听队列中的消息        consumer.setMessageListener(new MessageListener() {

            @Override            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage) message;

                try {

                    String text = textMessage.getText();

                    System.out.println("收到消息:" + text);

                } catch (JMSException e) {

                    e.printStackTrace();

                }

            }

        });

       

        //由于设置监听器后不能马上结束方法,要在这里加一个等待点        System.in.read();

       

        //8. 关闭连接        consumer.close();

        session.close();

        connection.close();

    }



    @RequestMapping("/topicGetMessage2")    public void topicGetMessage2() throws Exception //(完全相同,不再重复)

}

1.7.3 运行结果

先执行两个消息的消费者

http://localhost:8080/topicGetMessage1

http://localhost:8080/topicGetMessage2

执行http://localhost:8080/topicProduceMessage

378469c62f5b4a39f29eeae8f92eee96ad1.jpg

这次收到了两条消息

a28d20a54f2c16246cfe87d185f65a64a10.jpg

2. RocketMQ与ActiveMQ的对比

从这两种消息中间件的编写过程来看,两种产品的区别是比较大的,下面就这两种产品进行多方面对比。

参考文章:https://blog.csdn.net/jasonhui512/article/details/53231566

比较项

RocketMQ

ActiveMQ

语言支持

只支持Java

多语言,Java为主

可用性

分布式

主从

JMS规范

常用的使用方式没有遵循JMS

严格遵循JMS规范

消息持久化

硬盘

内存,硬盘,数据库

部署方式

独立部署

独立部署、嵌入应用,可以与Spring很好的整合

社区活跃

活跃

不很活跃


點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
JAVA開發工程師
手記
粉絲
23
獲贊與收藏
51

關注作者,訂閱最新文章

閱讀免費教程

  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消