asynchrone Notifications mit CXF und JUnit simulieren

Nicht jeder Webservice besteht aus einem einfachen Request mit einfacher Response. Klar, Google anwählen und per GET eine Antwort mit Suchergebnissen bekommen. Das ist dann synchron. Aber produktive Webservices, mit denen man Methoden auf entfernten Servern aufruft, laufen häufig auch asynchron. Nach einem Request folgte eine kurze Response mit „Auftrag erhalten“. Dann rödelt die Serverbox ein wenig und nach ein paar Sekunden, Minuten etc. schickt der Server als Broker einen Notification-Request zurück an den Client, auf dem ein Notification-Server gestartet sein muss. Das ist so nicht wirklich möglich, denn irgendwo muss der Broker ja die Adresse des Clients herhaben. Beispielsweise über eine statische Tabelle oder über eine Topic-Queue. Nehmen wir mal an, der ausführende Server (der Notification-Broker) weiß, wohin er die Nachricht schicken muss. Wenn man für dieses Szenario jetzt die implementierte Webclient-Methode testen soll, muss man das auch irgendwie mocken können. Dazu ein kleines Beispiel :

Das Verzeichnis :

die zugehörige POM:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>eu.christophburmeister</groupId>
	<artifactId>notification</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<cxf.version>2.2.3</cxf.version>
	</properties>
	<build>
		<sourceDirectory>src</sourceDirectory>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.5</source>
					<target>1.5</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-frontend-jaxws</artifactId>
			<version>2.2.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-transports-http</artifactId>
			<version>2.2.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-transports-http-jetty</artifactId>
			<version>2.2.3</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.0</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.16</version>
		</dependency>
	</dependencies>
</project>

die zu implementierenden Interfaces für a) den eigentlichen Webservice und b) den Notification-Service

package interfaces;

import javax.jws.WebParam;
import javax.jws.WebService;
 
@WebService
public interface IWebService {
    double calculatePowerOf(
    		@WebParam(name = "base") int base, 
    		@WebParam(name = "exp") int exp);
}
package interfaces;

import javax.jws.WebParam;
import javax.jws.WebService;

@WebService
public interface INotificationService {
	String notifyConsumer(
			@WebParam(name = "notificationMessage") String notificationMessage);
}

Die Implementierung des Web-Services :

package server;

import interfaces.IWebService;

public class WebServiceImpl implements IWebService {
	/**
	 * calculates the power-of-method, where x=base^exp.
	 */
	public double calculatePowerOf(int base, int exp) {
		return Math.pow(base, exp);
	}
}

Die Implementierung des Notification-Services:

package client;

import interfaces.INotificationService;

public class NotificationServiceImpl implements INotificationService {
	public String notifyConsumer(String notificationMessage) {
		return notificationMessage;
	}
}

Der Webservice-Consumer, also der Client des angebotenen Webservices

package client;

import interfaces.IWebService;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;


public class WebserviceConsumer {
	/**
	 * executes the operation, while sending the request with the params to the
	 * webservice-provider
	 * 
	 * @param target
	 *            the target e.g. "http://127.0.0.1:3333"
	 * @param base
	 *            the basis
	 * @param exponent
	 *            the exponent
	 */
	public void executeOperation(String target, int base, int exponent) {
		JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
		factory.setAddress(target);
		factory.getInInterceptors().add(new LoggingInInterceptor());
		factory.getOutInterceptors().add(new LoggingOutInterceptor());
		factory.setServiceClass(IWebService.class);
		IWebService service = (IWebService) factory.create();
		Client client = ClientProxy.getClient(service);
		HTTPConduit http = (HTTPConduit) client.getConduit();
		HTTPClientPolicy policy = new HTTPClientPolicy();
		policy.setReceiveTimeout(1000);
		http.setClient(policy);
		service.calculatePowerOf(base, exponent);
	}
}

Der Webservice-Provider, also der Anbieter der Webservices

package server;

import interfaces.IWebService;

import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;

public class WebserviceProvider {

	/** the cxf-server of the webservice-provider*/
	Server server;

	/**
	 * 
	 * @param target
	 *            e.g. "http://127.0.0.1:3333"
	 * @param webServiceImplementation
	 *            the implementation of the interface
	 */
	public WebserviceProvider(String target) {
		WebServiceImpl webServiceImplementation = new WebServiceImpl();
		JaxWsServerFactoryBean svrFactory = new JaxWsServerFactoryBean();
		svrFactory.setServiceBean(webServiceImplementation);
		svrFactory.setServiceClass(IWebService.class);
		String targetAdress = target;
		svrFactory.setAddress(targetAdress);
		server = svrFactory.create();
	}

	public void startWebserviceProvider() {
		this.server.start();
	}

	public void stopWebserviceProvider() {
		this.server.stop();
	}
}

Der Notification-Provider, der Webservice, der auf dem Client gestartet wird, um eingehende Requests zu ermöglichen

package client;

import interfaces.INotificationService;

import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;

public class NotificationProvider {
	/** the cxf-server */
	Server server;

	/**
	 * 
	 * @param target
	 *            e.g. "http://127.0.0.1:4444"
	 * @param notificationServiceImpl
	 *            the implementation of the interface
	 */
	public NotificationProvider(String target) {
		NotificationServiceImpl notificationServiceImplementation = new NotificationServiceImpl();
		JaxWsServerFactoryBean svrFactory = new JaxWsServerFactoryBean();
		svrFactory.setServiceBean(notificationServiceImplementation);
		svrFactory.setServiceClass(INotificationService.class);
		String targetAdress = target;
		svrFactory.setAddress(targetAdress);
		server = svrFactory.create();
	}

	/**
	 * starts the internal cxf-server of the Notification-provider
	 */
	public void startNotificationProvider() {
		this.server.start();
	}

	/**
	 * stops the internal cxf-server of the Notification-provider
	 */
	public void stopNotificationProvider() {
		this.server.stop();
	}
}

Und der Notification-Client, der auf dem Webservice-Provider sitzt und nach erfolgreicher Operation asynchron eine Nachricht an den Notification-Provider (beim Client) sendet.


import interfaces.INotificationService;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;


public class NotificationClient {
	/**
	 * executes the operation, while sending the request with the params to the
	 * webservice-provider
	 * 
	 * @param target
	 *            the target e.g. "http://127.0.0.1:4444"
	 * @param notificationMessage
	 * 				the message, which shall be sent with notification
	 */
	public void executeOperation(String target, String notificationMessage) {
		JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
		factory.setAddress(target);
		factory.getInInterceptors().add(new LoggingInInterceptor());
		factory.getOutInterceptors().add(new LoggingOutInterceptor());
		factory.setServiceClass(INotificationService.class);
		INotificationService service = (INotificationService) factory.create();
		Client client = ClientProxy.getClient(service);
		HTTPConduit http = (HTTPConduit) client.getConduit();
		HTTPClientPolicy policy = new HTTPClientPolicy();
		policy.setReceiveTimeout(1000);
		http.setClient(policy);
		
		service.notifyConsumer(notificationMessage);
	}
}

Das ganze verpacken wir noch in einen kleinen Test und schauen uns das gestellt Szenario an :

/**
 * 
 */
package client;


import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import server.NotificationClient;
import server.WebserviceProvider;

/**
 * @author Christoph
 *
 */
public class WebserviceConsumerTest {
 
	/** the logger */
	private static final Logger logger = Logger.getLogger(WebserviceConsumerTest.class);
	/** the cxf-server for the webservice */
	private WebserviceProvider webserviceProvider;
	/** the cxf-server for the notification */
	private NotificationProvider notificationProvider;
	/** the target-endpoint for the webservice-provider */
	private static final String WEBSERVICE_PROVIDER_URL_PORT = "http://127.0.0.1:3333";
	/** the target-endpoint for the notification-provider */
	private static final String NOTIFICATION_PROVIDER_URL_PORT = "http://127.0.0.1:4444";

	/** @throws java.lang.Exception */
	@Before
	public void setUp() throws Exception {
		logger.info("creating webservice-provider");
		webserviceProvider = new WebserviceProvider(WEBSERVICE_PROVIDER_URL_PORT);
		logger.info("creating notification-provider");
		notificationProvider = new NotificationProvider(NOTIFICATION_PROVIDER_URL_PORT);
		
		logger.info("starting webservice-provider");
		webserviceProvider.startWebserviceProvider();
		logger.info("starting notification-provider");
		notificationProvider.startNotificationProvider();
	}

	/** @throws java.lang.Exception */
	@After
	public void tearDown() throws Exception {
		logger.info("stopping webservice-provider");
		webserviceProvider.stopWebserviceProvider();
		logger.info("stopping notification-provider");
		notificationProvider.stopNotificationProvider();
	}
	
	@Test
	public void executeOperationTest() throws InterruptedException{
		logger.info("creating webserviceClient");
		WebserviceConsumer webserviceClient = new WebserviceConsumer();
		logger.info("creating notificationClient");
		NotificationClient notificationClient = new NotificationClient();
		
		logger.info("exectuting webservice-request through webservice-client");
		webserviceClient.executeOperation(WEBSERVICE_PROVIDER_URL_PORT, 5, 3);
		
		Thread.sleep(5*1000); // wait 5 sec for notification
		
		logger.info("exectuting notification-request through notification-client");
		notificationClient.executeOperation(NOTIFICATION_PROVIDER_URL_PORT, "I-FEEL-HAPPY-NOTIFICATION");		
	}
}

Das Logging ist übrigens absichtlich nur auf Client-Seite aktiviert, damit die vielen Chains nicht verwirren…

Fazit : Soviel zu den Grundlagen, wenn alle Parameter bekannt sind. In den nächsten Posts werde ich mich ein bisschen mit Timeout-Behandlung und Receiver-Management beschäftigen.