mit Camel eine JMS-Queue durchsuchen


Quelle : http://upload.wikimedia.org/

Da schreibt mir Camel eine Nachricht in eine Queue. Soweit so gut… Aber was, wenn es diese Nachricht schon in der Queue gibt? Man bräuchte also ein Bean, dass eine Liste mit eingegangenen Nachrichten untersucht und entscheidet, ob die vermeintlich neue Nachricht schon vorhanden ist (optimalerweise über eine UUID oder CorrelationId) oder nicht. Die Macher von Camel haben sich hierfür einen interessanten Weg ausgedacht: einen browsable endpoint, welcher als „To“ dient und alle Messages speichert und den man einfach zwischen „From“ und dem eigentlichen „To“ schalten kann. In einem anschließenden Bean kann man dann entsprechend die Abfrage einbauen und einen selbstgewählten Header (z.Bsp. nextStep) mit „outbox“ oder „trash“ belegen. Eine Choice-Anweisung dient dann zur Weiterleitung in die richtige outbox-Queue.

die camel-context.xml sieht dann so aus:





	
		
	

	

	

		
			
			
				2000
			
			
			
			
				
					${in.header.nextStep} == 'outbox'
					
				
				
					${in.header.nextStep} == 'trash'
					
				
			
		

	


das ProcessBean selbst wird so implementiert:

package eu.christophburmeister.camel.jms.core;

import java.util.List;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.BrowsableEndpoint;

public class ProcessBean {

	private boolean found;

	public void process(CamelContext ctx, Exchange thisExchange) {
		found = false;
		BrowsableEndpoint browsableEndpoint = ctx.getEndpoint(
				"browse:myBrowserEndpoint", BrowsableEndpoint.class);
		List exchanges = browsableEndpoint.getExchanges();

		String thisCorrelationId = thisExchange.getIn()
				.getHeader("JMSCorrelationId").toString();

		EXCHANGE_TRAVERSING: for (int i = 0; i < exchanges.size() - 1; i++) {

			String correlationId = exchanges.get(i).getIn()
					.getHeader("JMSCorrelationId").toString();

			System.out.println("actual inbox-messages: " + correlationId
					+ "---" + "incoming-message: " + thisCorrelationId);
			if (thisCorrelationId.equals(correlationId)) {
				found = true;
				break EXCHANGE_TRAVERSING;
			}
		}

		if (found) {
			thisExchange.getOut().setHeader("nextStep", "trash");
		} else {
			thisExchange.getOut().setHeader("nextStep", "outbox");
		}
	}
}

folgende Dependencies (aus dem 2.7.0er Camel) werden verwendet:


		
			org.apache.camel
			camel-core
			2.7.0
		
		
			org.apache.camel
			camel-jms
			2.7.0
		
		
			org.apache.camel
			camel-jdbc
			2.7.0
		
		
			org.apache.activemq
			activemq-camel
			5.2.0
		
		
			commons-logging
			commons-logging
			1.1.1
		
		
			log4j
			log4j
			1.2.14
		
		
			org.springframework
			spring-jdbc
			1.2.6
		
		
			mysql
			mysql-connector-java
			5.1.6
		
		
			commons-pool
			commons-pool
			20030825.183949
		
		
			commons-collections
			commons-collections
			20040616
		
		
			org.apache.camel
			camel-jetty
			2.7.0
		
		
			org.slf4j
			slf4j-api
			1.6.1
		
		
			org.slf4j
			log4j-over-slf4j
			1.6.1
		
	

Zum Testen nehme ich einfach meinen primitiven JMS-Client.