JMS – Beispiel mit Topic und TempQueue

Overview of scenario, by Christoph Burmeister (own figure)

Overview of scenario, by Christoph Burmeister (own figure)


Szenario: Ein Server sendet kontinuierlich Notifications an ein Topic (publish), der Client ist auf dem Topic eingeschrieben (subscribe). Der Client sendet über eine Queue für Requests Anfragen an den Server mit einem JMSReplyTo-Feld auf eine Temporäre Queue. Diese ist nur für die aktuelle Session des Client verfügbar. Über das JMSReplyTo wird eine Request/Response-Kommunikation über JMS realisiert.
zuerst der Server:

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import listener.RequestListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class Server {

	private static final Logger logger = Logger.getLogger(Server.class);

	private QueueConnectionFactory queueConnectionFactory;
	private QueueConnection queueConnection;
	private QueueSession queueSession;
	private Queue requestQueue;
	private QueueReceiver queueReceiver;
	private QueueSender queueSender;

	private TopicConnectionFactory topicConnectionFactory;
	private TopicConnection topicConnection;
	private TopicSession topicSession;
	private Topic topic;
	private TopicPublisher topicPublisher;

	public Server(String topicName, String requestQueueName) {
		topicConnectionFactory = (TopicConnectionFactory) new ActiveMQConnectionFactory("tcp://localhost:61616");
		queueConnectionFactory = (QueueConnectionFactory) new ActiveMQConnectionFactory("tcp://localhost:61616");

		try {
			queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			requestQueue = queueSession.createQueue(requestQueueName);
			queueReceiver = queueSession.createReceiver(requestQueue);
			queueSender = queueSession.createSender(null);
			queueReceiver.setMessageListener(new RequestListener(queueSession, queueSender));

			topicConnection = topicConnectionFactory.createTopicConnection();
			topicConnection.start();
			topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // must not be transacted!!!
			topic = topicSession.createTopic(topicName);
			topicPublisher = topicSession.createPublisher(topic);
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}

	public void publish(String notificationContent) {
		TextMessage notification;
		try {
			notification = topicSession.createTextMessage(notificationContent);
			topicPublisher.publish(notification);
			logger.info("publish notifiaction: " + notificationContent);
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}

	public void shutdown() {
		try {
			topicSession.close();
			topicConnection.stop();
			topicConnection.close();

			queueSession.close();
			queueConnection.stop();
			queueConnection.close();
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}
}

der zugehörige Request-Listener

package listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class RequestListener implements MessageListener {

	private static final Logger logger = Logger.getLogger(RequestListener.class);
	private QueueSession queueSession;
	private QueueSender queueSender;
	

	public RequestListener(QueueSession queueSession, QueueSender queueSender) {
		this.queueSession = queueSession;
		this.queueSender = queueSender;
	}

	public void onMessage(Message message) {
		try {
		TextMessage request = (TextMessage) message;
		TextMessage response = queueSession.createTextMessage();
			logger.info("incoming request: " + request.getText());
			response.setJMSCorrelationID(request.getJMSCorrelationID());
			response.setText("Reply to " + request.getText());
			Queue responseQueue = (Queue) request.getJMSReplyTo();
			queueSender.send(responseQueue, response);
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}
}

und die Start-Klasse für den Server

import org.apache.log4j.Logger;

public class StartServer {
	/** The logger. */
	private static final Logger logger = Logger.getLogger(StartServer.class);

	/** Main-method to instantiate the server, call the publish() and shutdown the server. */
	public static void main(String[] args) {
		Server server = new Server("notifications", "requestQueue");

		for (int i = 0; i < 30; i++) {
			server.publish("notification-" + i);
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				logger.error(e.getMessage());
			}
		}

		server.shutdown();
	}
}
&#91;/sourcecode&#93;

der Client
&#91;sourcecode language="java"&#93;
import java.util.UUID;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import listener.NotificationListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class Client {

	private static final Logger logger = Logger.getLogger(Client.class);

	private QueueConnectionFactory queueConnectionFactory;
	private QueueConnection queueConnection;
	private QueueSession queueSession;
	private Queue requestQueue;
	TemporaryQueue responseQueue;
	private QueueSender queueSender;
	private QueueReceiver queueReceiver;

	private TopicConnectionFactory topicConnectionFactory;
	private TopicConnection topicConnection;
	private TopicSession topicSession;
	private Topic topic;
	private TopicSubscriber topicSubscriber;

	public Client(String topicName, String requestQueueName) {
		topicConnectionFactory = (TopicConnectionFactory) new ActiveMQConnectionFactory("tcp://localhost:61616");
		queueConnectionFactory = (QueueConnectionFactory) new ActiveMQConnectionFactory("tcp://localhost:61616");

		try {
			queueConnection = queueConnectionFactory.createQueueConnection();
			queueConnection.start();
			queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			requestQueue = queueSession.createQueue(requestQueueName);
			queueSender = queueSession.createSender(requestQueue);
			queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			responseQueue = queueSession.createTemporaryQueue();
			queueReceiver = queueSession.createReceiver(responseQueue);
			// queueReceiver.setMessageListener(new ResponseListener()); // this
			// is for asynch!

			topicConnection = topicConnectionFactory.createTopicConnection();
			topicConnection.start();
			topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
			topic = topicSession.createTopic(topicName);
			topicSubscriber = topicSession.createSubscriber(topic);
			topicSubscriber.setMessageListener(new NotificationListener());
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}

	public void sendRequest(String requestContent) {
		try {
			TextMessage request = queueSession.createTextMessage();
			request.setText(requestContent);
			request.setJMSReplyTo(responseQueue);
			String correlationId = UUID.randomUUID().toString();
			request.setJMSCorrelationID(correlationId);
			queueSender.send(request);
			logger.info("outgoing request:" + request.getText());
			TextMessage response = (TextMessage) queueReceiver.receive(); // for sync																			
			logger.info("incoming response: " + response.getText());
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}

	public void shutdown() {
		try {
			topicSession.close();
			topicConnection.stop();
			topicConnection.close();

			queueSession.close();
			queueConnection.stop();
			queueConnection.close();
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}
}
&#91;/sourcecode&#93;

der zugehörige Notification-Listener
&#91;sourcecode language="java"&#93;
package listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;


public class NotificationListener implements MessageListener {

	private static final Logger logger = Logger.getLogger(NotificationListener.class);
	
	public void onMessage(Message message) {
		TextMessage notification = (TextMessage) message;
		try {
			logger.info("incoming notification: " + notification.getText());
		} catch (JMSException e) {
			logger.error(e.getMessage());
		}
	}
}
&#91;/sourcecode&#93;

und die Start-Klasse für den Client
&#91;sourcecode language="java"&#93;
import org.apache.log4j.Logger;

public class StartClient {
	/** The logger. */
	private static final Logger logger = Logger.getLogger(StartClient.class);

	/** Main-method to instantiate the server, call the publish() and shutdown the server. */
	public static void main(String&#91;&#93; args) {
		Client client = new Client("notifications", "requestQueue");

		for (int i = 0; i < 30; i++) {
			
			try {
				client.sendRequest("request " + i); 
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				logger.error(e.getMessage());
			}
		}

		client.shutdown();
	}
}
&#91;/sourcecode&#93;

ein paar Dependencies:
&#91;sourcecode language="xml"&#93;
<dependencies>
		<dependency>
			<groupid>org.apache.activemq</groupid>
			<artifactid>activemq-core</artifactid>
			<version>5.5.1</version>
		</dependency>
		<dependency>
			<groupid>javax.jms</groupid>
			<artifactid>jms</artifactid>
			<version>1.1</version>
		</dependency>
		
		<dependency>
			<groupid>log4j</groupid>
			<artifactid>log4j</artifactid>
			<version>1.2.14</version>
		</dependency>
		<dependency>
			<groupid>org.slf4j</groupid>
			<artifactid>slf4j-api</artifactid>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupid>org.slf4j</groupid>
			<artifactid>slf4j-log4j12</artifactid>
			<version>1.6.1</version>
		</dependency>
	

und das ganze Projekt: