JMS-Consumer für mehrere Queues auf verschiedenen Brokern

Szenario: Ein JMS-Consumer, der mit zwei Brokern verbunden ist und auf jeweils eine Queue der Broker lauscht. Eigentlich ganz einfach 🙂

Die Consumer-Klasse, die die Verbindung zu den Brokern aufbaut. Diesmal ganz ohne JNDI, sondern in Plain Java:

package jmsconsumer;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.log4j.Logger;

public class Consumer {

    private static final Logger logger = Logger.getLogger(Consumer.class);
    private static final String CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
    private static final String BROKER_MASTER_URL = "tcp://127.0.0.1:61616";
    private static final String BROKER_STANDBY_URL = "tcp://10.33.97.149:61616";
    private static final String BROKER_MASTER_QUEUE_NAME = "outbox_on_master_broker";
    private static final String BROKER_STANDBY_QUEUE_NAME = "outbox_on_standby_broker";
    private static final String QUEUE_CONNECTION_FACTORY_NAME = "QueueConnectionFactory";

    public static void main(String[] args) throws JMSException, NamingException, InterruptedException {
        
        QueueListener queueListener = new QueueListener();

        QueueReceiver masterQueueReceiver = createQueueReceiver(BROKER_MASTER_URL, BROKER_MASTER_QUEUE_NAME);
        QueueReceiver standbyQueueReceiver = createQueueReceiver(BROKER_STANDBY_URL, BROKER_STANDBY_QUEUE_NAME);
        
        masterQueueReceiver.setMessageListener(queueListener);
        standbyQueueReceiver.setMessageListener(queueListener);

        while (true) {
            //
        }
    }

    private static Context createContext(String brokerUrl, String queueName) {
        Hashtable<String, String> props = new Hashtable<String, String>();
        props.put(Context.INITIAL_CONTEXT_FACTORY, CONTEXT_FACTORY);
        props.put(Context.PROVIDER_URL, brokerUrl);
        props.put("queue." + queueName, queueName); // Specify the queue --> queue.[jndiName] = [physicalName]
        try {
            return new InitialContext(props);
        } catch (NamingException e) {
            logger.error(e.getMessage());
            return null;
        }
    }

    private static QueueReceiver createQueueReceiver(String brokerUrl, String queueName) {
        try {
            Context                     context = createContext(brokerUrl, queueName);        
            QueueConnectionFactory      factory = (QueueConnectionFactory) context.lookup(QUEUE_CONNECTION_FACTORY_NAME);        
            QueueConnection             connection = factory.createQueueConnection();        
            QueueSession                session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);                
            Queue                       queue = (Queue) context.lookup(queueName);
            QueueReceiver               queueReceiver = session.createReceiver(queue);        
            connection.start();        
            return queueReceiver;
        } catch (Exception e){
            logger.error(e.getMessage());
        }
        return null;
    }      
}

Da hier ein angepasster MessageListener verwendet wird, muss dieser auch erstellt werden:

package jmsconsumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class QueueListener implements MessageListener {

    private static final Logger logger = Logger.getLogger(QueueListener.class);

    public void onMessage(Message msg) {
        TextMessage message = (TextMessage) msg;
        try {
            logger.info(message.getText());
        } catch (JMSException e) {
            logger.error(e.getMessage());
        }
    }

}

Die Maven-Dependencies:

	<dependencies>
		<dependency>
			<groupId>javax.jms</groupId>
			<artifactId>jms</artifactId>
			<version>1.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.5.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.1</version>
		</dependency>
	</dependencies>