ActiveMQ 使用介绍

1,activemq 分为 queue和topic 两种

2,下面先介绍queue ,使用spring 集成

build.gradle

compile "org.springframework:spring-jms:3.2.1.RELEASE"
    compile "org.apache.activemq:activemq-core:5.7.0"
    compile "ch.qos.logback:logback-core:1.0.9"
    compile "ch.qos.logback:logback-classic:1.0.9"
    compile "ch.qos.logback:logback-access:1.0.9"
    testCompile group: 'junit', name: 'junit', version: '4.10'

applicationContext.xml

 <description>Spring公共配置</description>

    <!-- 使用annotation 自动注册bean,并检查@Required,@Autowired的属性已被注入 -->
    <context:component-scan base-package="net.tt64"/>
    <!-- ActiveMQ 连接工厂 -->
    <bean id="advancedConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://cheyoushuo" />
        <property name="useAsyncSend" value="true" />
    </bean>
    
    <!-- Spring Caching 连接工厂 -->
    <bean id="advancedCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="advancedConnectionFactory" />
        <property name="sessionCacheSize" value="10" />
    </bean>
    
    <!-- Queue定义 -->
    <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="test.jms" />
    </bean>

    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="advancedCachingConnectionFactory" />
        <!-- 使 deliveryMode, priority, timeToLive设置生效-->
        <property name="explicitQosEnabled" value="true" />
        <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT -->
        <property name="deliveryPersistent" value="true" />
        <!-- 设置优先级, 默认为4 -->
        <property name="priority" value="9" />
    </bean>
    
    <!-- 异步接收Queue消息Container -->
    <bean id="advancedQueueContainer" depends-on="jmsService" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="advancedConnectionFactory" />
        <property name="destination" ref="notifyQueue" />
        <property name="messageListener" ref="jmsService" />
        <!-- 初始5个Consumer, 可动态扩展到10 -->
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="1" />
        <!-- 设置消息确认模式为Client -->
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
    </bean>

jms sender 

@Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private Destination dest;

    public void send(final Serializable obj) {
        jmsTemplate.send(dest, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage msg = session.createObjectMessage(obj);
                return msg;
            }
        });
    }

jms receiver

 @Override
    public void onMessage(Message message) {
        if (message instanceof ObjectMessage) {
            ObjectMessage om = (ObjectMessage) message;
            try {
                System.out.println(om.getObject().toString() + " recieved");
                Thread.currentThread().sleep(1000);
                System.out.println(om.getObject().toString() + "  process over");
            } catch (InterruptedException ex) {
            } catch (JMSException ex) {
            }
        }
    }
Main
 public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
        JmsSender sender = ctx.getBean(JmsSender.class);
        for (int i = 0; i < 100; i++) {
            System.out.println("message " + i + "send");
            sender.send("message" + i);
        }
    }
这里将 concurrentConsumers 调为 1, 这就是单任务队列

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。