博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ专题1: 入门实例
阅读量:6572 次
发布时间:2019-06-24

本文共 6410 字,大约阅读时间需要 21 分钟。

好久没有写博客了,最近真的是可以说是忙成狗了。项目的事和自己的终身大事忙得焦头烂额,好在是一切都是越来越好了......趁着项目今天唯一的一点喘息时间,加上项目开始接触到的mq,开始写一篇amq的入门专题

AMQ入门实例

下载导入源码:

  • 下载地址: http://activemq.apache.org/activemq-5155-release.html (可以同时下载安装包和源码,我这里为了和项目中使用的保持一致,下载的是5.8.0版本)
  • 源码导入方式: 先通过mvn编译,然后导入

管理后台

  • 通过bin目录下的activemq.bat启动之后,就可以通过: http://localhost:8161/admin 来访问activemq的管理后台了
  • 默认的用户名和密码都是: admin. 用户名和密码配置在conf目录下的jetty-realm.properties文件中
  • 访问的端口配置在conf目录下的jetty.xml文件中

入门使用实例

1. 引入mq依赖

org.apache.activemq
activemq-all
5.8.0

2. 启动activemq服务

3. 简单的服务端实现

public class SimpleProducer {    public static void main(String[] args) {        // STEP1: 得到连接工厂        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);                Connection connection = null;        Session session = null;        Destination destination = null;        MessageProducer producer = null;        MessageProducer topicProducer = null;        Destination topicDestination = null;        try {            // STEP2: 从连接工厂得到连接并且启动连接            connection = connectionFactory.createConnection();            connection.start();                        // STEP3: 获取会话            /**             * 第一个参数表示是否开启事务:             * 当第一个参数为true的时候,会忽略第二个参数,无论第二个参数为啥,都需要显示调用 session.commit() 消息才会提交到MQ             * 当第一个参数为false的时候,第二个参数不能为:Session.SESSION_TRANSACTED。 且当第二个参数为其他合法值时,都不需要调用 session.commit(),消息都会发送到MQ             * 第二个参数表示当未开启事务的时候,消费者或者客户端在什么时候发送确认消息             */            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                        // STEP4: 创建目标队列、主题             /**             * 队列和主题的区别在于:             * 1、 队列是点对点的,队列中的消息只会被消费一次             * 2、 主题类似于广播机制,只要订阅了该主题的消费者都可以对该消息进行消费             * 3、 一般来说如果生产者在消费者启动之前创建了主题,那么消费者启动后接收不到主题。             */            destination = session.createQueue("KiDe-Demo");            topicDestination = session.createTopic("KiDe-Demo");                        // STEP5: 创建消息生产者            producer = session.createProducer(destination);            topicProducer = session.createProducer(topicDestination);                        /**             * 参数表示生产者发送的消息是否进行持久化             */            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      // 设置不持久化            topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     // 设置不持久化 (不管最终设置的是持久化还是不持久化,只要生产者在消费者之前启动,主题消息都会丢失)                        // STEP6: 发送消息            for (int i=0; i<20; i++) {                TextMessage message = session.createTextMessage("Producer message:" + i);                producer.send(message);                topicProducer.send(message);            }                        // STEP7: 如果开启了事务 ,此时需要调用session提交操作            // session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            if (connection != null) {                try {                    connection.close();                } catch (JMSException e) {                }            }        }    }}

简单消费者实现

package com.rampage.learning.activemq;import java.util.concurrent.TimeUnit;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 简单的队列消费者 *  * @author ziyuqi * */public class SimpleConsumer {    public static void main(String[] args) {        // STEP1: 创建连接工厂        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);        Connection connection = null;        Session session = null;        Destination destination = null;        Destination topicDestination = null;        MessageConsumer consumer = null;        MessageConsumer topicConsumer = null;        try {            // STEP2: 从连接工厂得到连接并且启动连接            connection = connectionFactory.createConnection();            connection.start();            // STEP3: 获取会话            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            // STEP4: 创建目标队列            destination = session.createQueue("KiDe-Demo");            topicDestination = session.createTopic("KiDe-Demo");                        // STEP5: 创建消费者            consumer = session.createConsumer(destination);            topicConsumer = session.createConsumer(topicDestination);                        // STEP6: 设置消息接收者接收消息 也可以通过死循环接收消息            /*while (true) {                TextMessage textMessage = (TextMessage) consumer.receive(1000);                System.out.println(textMessage.getText());            }*/            consumer.setMessageListener(new MessageListener() {                                @Override                public void onMessage(Message paramMessage) {                    TextMessage message = (TextMessage) paramMessage;                    try {                        System.out.println("消费者接收到队列消息:" + message.getText());                    } catch (JMSException e) {                        e.printStackTrace();                    }                }            });            topicConsumer.setMessageListener(new MessageListener() {                                @Override                public void onMessage(Message paramMessage) {                    TextMessage message = (TextMessage) paramMessage;                    try {                        System.out.println("消费者接收到主题消息:" + message.getText());                    } catch (JMSException e) {                        e.printStackTrace();                    }                }            });            TimeUnit.SECONDS.sleep(200);    // 睡眠20秒,使得客户端可以接收到对应消息        } catch (Exception e) {        } finally {            if (connection != null) {                try {                    connection.close();                } catch (JMSException e) {                }            }        }    }}

代码说明

​ 从上面的代码可以看出,生产者和消费者的处理流程大致相同。存在很多重复代码,不难发现可以抽取出公共的代码来使得代码更加简洁。

运行结果说明

img_688595f06318baddebae5375bb1df163.png

img_dbced8c87ea3171f806dee17c9c7b7aa.png

img_ae05550d73cc855247a6a897ab4d72c5.png

我这里运行了producer后,运行了两个consumer。不难发现,topic中的每条消息会被每个consumer完全消费,而queue中的消息,每一条消息只会被两个consumer中的一个消费。

黎明前最黑暗,成功前最绝望!

转载地址:http://wcmjo.baihongyu.com/

你可能感兴趣的文章
让您的电脑在任意目录可以支持图片的粘贴,试试看呗~
查看>>
Jenkins+QTP自动化测试框架
查看>>
文件下载
查看>>
《Node.js In Action》笔记之流程控制
查看>>
C++类和对象
查看>>
3518EV200 SDK学习1
查看>>
JavaScript初学者应注意的七个细节
查看>>
1163: 零起点学算法70——Yes,I can!
查看>>
zookeeper原理及作用
查看>>
[ZJOI2015]诸神眷顾的幻想乡
查看>>
oracle之 ORA-12557: TNS: 协议适配器不可加载
查看>>
2018-2019-2 网络对抗技术 20165318 Exp1 PC平台逆向破解
查看>>
关于图片或者文件在数据库的存储方式归纳
查看>>
存储过程和SQL语句比较及存储过程在C#中调用方法
查看>>
C#开发移动应用系列(1.环境搭建)
查看>>
hihocoder 1014 Trie树
查看>>
ADO.NET笔记——使用DataSet返回数据
查看>>
【Spark篇】---SparkSQL on Hive的配置和使用
查看>>
【机器学习】--关联规则算法从初识到应用
查看>>
windows 下nginx php安装
查看>>