JMS-Consumer: blockierend und nicht-blockierend

Folgendes Szenario: Camel liest aus einem Outbox-Verzeichnis eingehende XML-Dateien ein und steckt die in eine Outbox-JMS-Queue. Ein JMS-Consumer kann diese dann zum einen blockierend lesen (MessageReceiver) und nichtblockierend über den Eingang von Nachrichten informiert werden (MessageListener).

camel.xml:

<beans
   xmlns="http://www.springframework.org/schema/beans"  
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  
  	<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route>            
            <from uri="file://D:/outbox" />		
			<to uri="log:incomingOrder_xml?level=INFO" />
			<!-- convert the file (BytesMessage) to javax.jms.TextMessage while sending to queue -->
            <to uri="activemq:outbox?jmsMessageType=Text"/> 
        </route>
    </camelContext>
	
    <!-- configure the camel activemq component to use the current broker -->
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
        <property name="connectionFactory">
          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
            <property name="userName" value="${activemq.username}"/>
            <property name="password" value="${activemq.password}"/>
          </bean>
        </property>
    </bean>
</beans>

Der nicht-blockierende Consumer:

package eu.christophburmeister.jmstest.core;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.Logger;

public class Start {

	private static final Logger logger = Logger.getLogger(Start.class);

	public static void main(String[] args) throws JMSException, NamingException, InterruptedException {
		String queueName = "inbox";

		Context jndiContext = new InitialContext();

		QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) jndiContext
				.lookup("QueueConnectionFactory");

		Queue queue = (Queue) jndiContext.lookup(queueName);
		QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
		QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		QueueReceiver queueReceiver = queueSession.createReceiver(queue);
		queueConnection.start();
		queueReceiver.setMessageListener(new MessageListener() {
			public void onMessage(Message msg) {
				TextMessage message = (TextMessage) msg;
				try {
					logger.info(message.getText());
				} catch (JMSException e) {
					logger.error(e.getMessage());
				}
			}
		});

		while (true) {
			logger.info("spongebob"); // just to show the external-threaded loop
			Thread.sleep(1000);
		}
	}
}

und die Variante mit dem blockierenden Empfang:

package eu.christophburmeister.jmstest.core;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.Logger;

public class Start {

	private static final Logger logger = Logger.getLogger(Start.class);

	public static void main(String[] args) throws JMSException, NamingException, InterruptedException {
		String queueName = "inbox";

		Context jndiContext = new InitialContext();

		QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) jndiContext
				.lookup("QueueConnectionFactory");

		Queue queue = (Queue) jndiContext.lookup(queueName);
		QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
		QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		QueueReceiver queueReceiver = queueSession.createReceiver(queue);
		queueConnection.start();

		while (true){
		     TextMessage message = (TextMessage) queueReceiver.receive();
		     logger.info(message.getText());
		     logger.info("spongebob");
		}
	}
}

die jndi.properties:

# name the factory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
 
# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616
 
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.inbox = outbox
log4j.rootLogger=INFO, console
 
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

und die pom.xml für den JMS-Consumer:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>eu.christophburmeister</groupId>
	<artifactId>jmstest</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>javax.jms</groupId>
			<artifactId>jms</artifactId>
			<version>1.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.5.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.1</version>
		</dependency>
	</dependencies>
</project>