ActiveMQ-Message Acknowledgement mit Auto-Reconnect

Franz-von-Assisi-Kirche Wien, by Christoph Burmeister (own photo)

Franz-von-Assisi-Kirche Wien, by Christoph Burmeister (own photo)

Der Unterschied zwischen synchron und asynchron liegt ja bekanntlich im Auge des Betrachters 🙂 Wir gehen davon aus, dass eine Message synchron versendet wird, wenn der Sender solange blockiert ist, bis der EmpfĂ€nger eine BestĂ€tigung ĂŒber den Erhalt der Message zurĂŒcksendet. JMS ist ja fĂŒr sein asynchrones Verhalten bekannt, aber bei genauerer Betrachtung, gibt es hier auch BestĂ€tigungen… explizit und implizit. Bei der Erstellung der Session wird entweder ein CLIENT_ACKNOWLEDGEMENT oder ein AUTO_ACKNOWLEDGEMENT gesetzt. Beim ersten Ack muss explizit vom Receiver (oder Listener) angegeben werden, dass die eingegangene Message soweit verarbeitet ist, dass sie aus der Queue entnommen werden kann. das geschieht ĂŒber acknowledge(). Die zweite Möglichkeit setzt automatisch ein Ack, sobald der Receiver die Message erhĂ€lt.
Ganz nebenbei ĂŒberwachen wir den Status der Verbindung zum Broker und setzen mit dem failover-Transport von ActiveMQ ein Auto-Reconnect ein, der bei Nicht-VerfĂŒgbarkeit des Brokers kontinuierlich versucht, sich wiederzuverbinden.

zuerst die Version in „pure Java“ und einem Listener, der auf Messages lauscht und diese verarbeitet und bestĂ€tigt:

package jmsconsumer;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.log4j.Logger;

public class JmsConsumer {

    private static final Logger logger = Logger.getLogger(JmsConsumer.class);
    private static final String CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
    private static final String BROKER_MASTER_URL = "failover:(tcp://127.0.0.1:61616)?maxReconnectDelay=50000"; // reconnection-delay=5s
    private static final String BROKER_MASTER_QUEUE_NAME = "receivedMessages";
    private static final String QUEUE_CONNECTION_FACTORY_NAME = "QueueConnectionFactory";

    private static QueueListener queueListener;
    private static ConnectionListener connectionListener;

    public static void main(String[] args) throws JMSException, NamingException, InterruptedException {

        queueListener = new QueueListener();
        connectionListener = new ConnectionListener();

        QueueReceiver masterQueueReceiver = createQueueReceiver(BROKER_MASTER_URL, BROKER_MASTER_QUEUE_NAME);

        masterQueueReceiver.setMessageListener(queueListener);

        while (true) {
            logger.info("ping"); // just to show "yepp, i'm alive"
            Thread.sleep(1000);
        }
    }

    private static Context createContext(String brokerUrl, String queueName) {
        Hashtable<string , String> props = new Hashtable</string><string , String>();
        props.put(Context.INITIAL_CONTEXT_FACTORY, CONTEXT_FACTORY);
        props.put(Context.PROVIDER_URL, brokerUrl);
        props.put("queue." + queueName, queueName); // Specify the queue --> queue.[jndiName] = [physicalName]
        try {
            return new InitialContext(props);
        } catch (NamingException e) {
            logger.error(e.getMessage());
            return null;
        }
    }

    private static QueueReceiver createQueueReceiver(String brokerUrl, String queueName) {
        try {
            Context context = createContext(brokerUrl, queueName);
            QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup(QUEUE_CONNECTION_FACTORY_NAME);
            //            QueueConnection             connection = factory.createQueueConnection(); // for all jms-providers
            ActiveMQConnection connection = (ActiveMQConnection) factory.createQueueConnection(); // activemq-specific     
            QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); // the client acks the reception of the messages!
            Queue queue = (Queue) context.lookup(queueName);
            QueueReceiver queueReceiver = session.createReceiver(queue);

            connection.addTransportListener(connectionListener);// activemq-specific
            connection.start();
            return queueReceiver;
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return null;
    }
}

Der QueueListener:

package jmsconsumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

public class QueueListener implements MessageListener {

    private static final Logger logger = Logger.getLogger(QueueListener.class);

    public void onMessage(Message msg) {
        ObjectMessage queueObject = (ObjectMessage) msg;
       
        // do sth. with the queueObject
        
// 		finally ack the message so that it can be removed from queue, 
//              because the session is CLIENT_ACKNOWLEDGE
//        try {
//            msg.acknowledge();
//        } catch (JMSException e) {
//            logger.error(e.getMessage());
//        }
    }

}

Die Implementierung des TransportListeners, die auf den Verbindungsstatus zum Broker lauscht:

package jmsconsumer;

import java.io.IOException;

import org.apache.activemq.transport.TransportListener;
import org.apache.log4j.Logger;

public class ConnectionListener implements TransportListener {

    private static final Logger logger = Logger.getLogger(ConnectionListener.class);

    public void onCommand(Object command) {
        logger.info("command:" + command.toString());
    }

    public void onException(IOException error) {
        logger.info("exception" + error.toString());
    }

    public void transportInterupted() {
        logger.info("interrupted");
    }

    public void transportResumed() {
        logger.info("resumed");
    }
}

ein paar Dependencies:

<dependencies>
		<dependency>
			<groupid>javax.jms</groupid>
			<artifactid>jms</artifactid>
			<version>1.1</version>
		</dependency>
		
		<dependency>
			<groupid>org.apache.activemq</groupid>
			<artifactid>activemq-core</artifactid>
			<version>5.5.1</version>
		</dependency>
		
		<!-- logging stuff -->

		<dependency>
			<groupid>log4j</groupid>
			<artifactid>log4j</artifactid>
			<version>1.2.14</version>
		</dependency>
		<dependency>
			<groupid>org.slf4j</groupid>
			<artifactid>slf4j-log4j12</artifactid>
			<version>1.6.2</version>
		</dependency>
		<dependency>
			<groupid>org.slf4j</groupid>
			<artifactid>slf4j-api</artifactid>
			<version>1.6.2</version>
		</dependency>
	</dependencies>

Jetzt noch die Spring-Version:

package jmsconsumer;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class JmsConsumerSpring {
	public static void main(String[] args) {
		ApplicationContext appContext = new ClassPathXmlApplicationContext("application-context.xml");
                while (true){
                    //
                }
        }	
}

die application-context.xml

< ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

	<bean id="jmsContainer"
		class="org.springframework.jms.listener.SimpleMessageListenerContainer">
		<!-- the connectionFactory -->
		<property name="connectionFactory" ref="activeMQConnectionFactory"></property>
		<!-- the destination (Queue or Topic) -->
		<property name="destination" ref="destination"></property>
		<!-- the custom MessageListener-implementation -->
		<property name="messageListener" ref="queueListener"></property>
		<!-- "sessionTransacted" set to "true": Transactional acknowledgment after 
			successful listener execution; guaranteed redelivery in case of exception 
			thrown. -->
		<property name="sessionTransacted" value="true"></property>
		<!-- is it synchronized? Client_Ack means that the client has to ack every 
			message to delete it from destination. -->
		<!-- http://static.springsource.org/spring/docs/2.0.x/api/org/springframework/jms/listener/AbstractMessageListenerContainer.html -->
		<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property> <!-- doesn't work as expected 🙁 s. https://jira.springsource.org/browse/SPR-3796 -->

		<!-- <property name="recoveryInterval" value="10000" /> --> <!-- just works with DefaultMessageListenerContainer! -->
		<property name="concurrentConsumers" value="10"></property>
	</bean>

	<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="failover:tcp://127.0.0.1:61616"></property>
		<property name="transportListener" ref="connectionListener"></property><!-- just for the failover-stuff -->
	</bean>

	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor -arg value="receivedMessages"></constructor>
	</bean>

	<bean id="queueListener" class="jmsconsumer.QueueListener"></bean>
	<bean id="connectionListener" class="jmsconsumer.ConnectionListener"></bean>

</beans>

Die beiden Listener-Implementierungen sind die gleichen wie bei der Nicht-Spring-Variante

Beim Test stellt sich aber heraus, dass die Nicht-Spring-Variante die Messages in der Queue behĂ€lt, wenn kein acknowledge() darauf aufgerufen wird… klar, ist ja auch richtig. Das gleiche Verhalten hĂ€tte ich mir auch bei der Spring-Variante vorgestellt. Jedoch nehmen die Kollegen von Spring dem Entwickler wieder eine ganze Menge Arbeit ab… leider ein bisschen zuviel, denn obwohl in der Spring-Konfig CLIENT_ACKNOWLEDGEMENT verwendet wird, werden die Messages aus der Queue entfernt. In einem Jira-Ticket von Spring () wird dieses Verhalten erklĂ€rt:

This is a misunderstanding of the semantics of CLIENT_ACKNOWLEDGE in the context of a Spring message listener container. Quoting from the javadoc of AbstractMessageListenerContainer, here is how the semantics are defined:

„sessionAcknowledgeMode“ set to „AUTO_ACKNOWLEDGE“ (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
„sessionAcknowledgeMode“ set to „CLIENT_ACKNOWLEDGE“: Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.

This means that the message listener container will always acknowledge your message; you may just refine when exactly the acknowledgment is going to happen. If you need to skip a message, use transacted delivery („sessionTransacted“=“true“) and throw an exception from your listener if message processing failed.

Vielleicht gibt’s ja noch eine andere Möglichkeit… mal schauen.