ActiveMQ入门教程

小白程序之路2019-03-20 11:06:09

点击上方 “小白程序之路”,选择 “置顶公众号”


       更多资讯,第一时间送达!

介绍

ActiveMQ

ActiveMQ 是Apache出的,最流行的,功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。

什么是JMS?

Java消息服务(Java Message Service) 即JMS,是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP?

AMQP(advanced message queuing protocol) 是一个提供统一消息服务的应用层标准层协议,基于此协议的客户端与消息中间件可传递性,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

JMS 和 AMQP 对比
ActiveMQ、RabbitMQ 和 Kafka 简单对比

JMS规范

JMS相关概念
  • 消费者/订阅者 : 接收并处理消息的客户端

  • 消息 : 应用程序之间传递的数据内容

  • 消息模式 : 在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

JMS消息模式—队列模型
  • 客户端包括生产者和消费者

  • 队列中的消息只能被一个消费者消费

  • 消费者可以随时消费队列中的消息

队列模型示意图

JMS消息模式—主题模型
  • 客户端包括生产者和消费者

  • 主题中的消息被所有消费者消费

  • 消费者必须先订阅,才能消费发送到主题中的消息

主题模型示意图

JMS编码接口
  • ConnectionFactory 用于创建连接到消息中间件的连接工厂

  • Connection 代表了应用程序和消息服务器之间的通信链路

  • Destination 指消息发布和接收的地点,包括队列或主题

  • Session 表示一个单线程的上下文,用于发送和接收消息

  • MessageConsumer 由会话创建,用于接收发送到目标的消息

  • MessageProducer 由会话创建,用于发送消息到目标

  • Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

JMS编码接口之间的关系

安装

  • 查询Docker镜像

$ docker search activemq
  • 下载Docker镜像

$ docker pull webcenter/activemq
  • 创建&运行ActiveMQ容器

$ docker run -d --name myactivemq -p 61616:61616 -p 8161:8161 webcenter/activemq
  • 查看WEB管理页面

浏览器输入http://127.0.0.1:8161/  点击Manage ActiveMQ broker使用默认账号/密码:admin/admin进入查看

队列模式消息和主题模式消息代码演示

队列模式消息演示

生产者 AppProducer.java

public class AppProducer {

   // MQ服务器地址
   private static final String BROKER_URL = "tcp://127.0.0.1:61616";

   // 队列名称
   private static final String QUEUE_NAME = "queue-test";

   public static void main(String[] args) throws JMSException {
       // 1.创建连接工厂
       ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

       // 2.创建连接
       Connection connection = factory.createConnection();

       // 3.启动连接
       connection.start();

       // 4.创建会话
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

       // 5.创建一个目标
       Destination destination = session.createQueue(QUEUE_NAME);

       // 6.创建一个生产者
       MessageProducer producer = session.createProducer(destination);

       // 循环发送消息
       for (int i = 0; i < 100; i++) {
           // 创建消息
           TextMessage textMessage = session.createTextMessage("【test-activemq-producer】" + i);
           // 发布消息
           producer.send(textMessage);

           System.out.println("消息发送成功:" + textMessage);
       }

       // 关闭连接
       connection.close();
   }
}

消费者 AppConsumer.java (由于消费者的创建流程和生产者非常类似,特意用 ActiveMqHelper 类进行简单封装一下)

public class AppConsumer {

   public static void main(String[] args) throws JMSException {
       // 创建连接
       Connection connection = ActiveMqHelper.createConnection();

       // 创建会话
       Session session = ActiveMqHelper.createSession(connection);

       // 创建一个目标
       Destination destination = ActiveMqHelper.createQueue(session);

       // 创建一个消费者
       MessageConsumer consumer = session.createConsumer(destination);

       // 消费者监听要消费的信息
       consumer.setMessageListener(message -> {
           // 接收到的消息内容c
           TextMessage textMessage = (TextMessage) message;
           try {
               System.out.println("【消费者接收到的消息】" + textMessage.getText());
           } catch (JMSException e) {
               e.printStackTrace();
           }
       });

       // 关闭连接,因为监听是异步操作,
       // 如果事先关闭连接,监听操作还未处理完成,则会收不到消息,
       // 正常处理逻辑是在监听事件处理完成后,再释放连接
       // connection.close();
   }
}

MQ帮助类 ActiveMqHelper.java

public class ActiveMqHelper {

   // MQ服务器地址
   private static final String BROKER_URL = "tcp://127.0.0.1:61616";

   // 队列名称
   private static final String QUEUE_NAME = "queue-test";

   // 主题名称
   private static final String TOPIC_NAME = "topic-test";


   /**
    * 创建连接
    *
    * @return
    * @throws JMSException
    */

   public static Connection createConnection() throws JMSException {
       // 1.创建连接工厂
       ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

       // 2.创建连接
       Connection connection = factory.createConnection();

       // 3.启动连接
       connection.start();

       return connection;
   }


   /**
    * 创建会话
    *
    * @param connection
    * @return
    * @throws JMSException
    */

   public static Session createSession(Connection connection) throws JMSException {
       // 4.创建会话
       return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   }

   /**
    * 创建队列目标
    *
    * @param session
    * @return
    * @throws JMSException
    */

   public static Destination createQueue(Session session) throws JMSException {
       // 5.创建一个队列目标
       return session.createQueue(QUEUE_NAME);
   }


   /**
    * 创建主题目标
    *
    * @param session
    * @return
    * @throws JMSException
    */

   public static Destination createTopic(Session session) throws JMSException {
       // 5.创建一个主题目标
       return session.createTopic(TOPIC_NAME);
   }
}

代码效果图展示

主题模式消息演示

生产者 AppProducer.java (主题模式的代码示例和队列模式的代码示例基本相同,只有在创建主题目标的时候不同,还请老铁自行实现) 消费者同理参考队列模式代码实现

public class AppProducer {
   public static void main(String[] args) throws JMSException {
       // ....

       // 3.创建主题目标 (与队列模式唯一区别的代码,其它都一样)
       Destination destination = session.createTopic(TOPIC_NAME);

       // ....
    }
}

主题模式注意点
1、先启动AppConsumer.java类(消费者先进行订阅),在启动 AppProvider.java 类(生产者进行生产)这样消费者才能监听到生产者生成的消息进行消费;队列模式没有此要求
2、生产者生产100条消息,若有多个消费者,每个消费者均消费100条消息;而队列模式,则是每个消费者合计起来消费这 100 条消息

代码效果图展示