Camel-Router von SNMP via JMS auf JDBC


Quelle : http://www.enterpriseintegrationpatterns.com

„Basically a router just consumes Message exchanges from some Endpoint, then sends them on to some other Endpoint using some kind of Enterprise Integration Patterns.“
– direkt von der Camel-Website.

Also wird auch nix anderes gemacht. In diesem Fall wird das Beispiel aus einem vorigen Post einfach erweitert. Diesmal wird die Message, die im ActiveMQ schlummert nicht in eine Datei, sondern in eine Datenbank geschrieben. Dafür wird erstmal ein Bean benötigt, dass aus dem JMS-Body ein annehmbares SQL-Insert für den JDBC-Endpoint von Camel erzeugt:

package eu.christophburmeister.camel.snmprouter.core;

import org.apache.camel.language.XPath;

public class OrderToSqlBean {
    public String toSql(
            @XPath("//entry[oid='1.3.6.1.2.1.1.3.0']/value/text()") String corId,
            @XPath("//entry[oid='1.3.6.1.4.1.42.1.1']/value/text()") String msg) {
    	
        StringBuilder sb = new StringBuilder();
        sb.append("insert into snmp_traps ");
        sb.append("values (");
        sb.append("'").append(corId).append("', ");
        sb.append("'").append(msg).append("') ");
        return sb.toString();
    }
}

Dann muss noch der von uns erweiterte SnmpViaJmsToDatabaseRoute-RouteBuilder angepasst werden. Eigentlich ist das nur die zweite Route, die elegant mit der Java-DSL umgeschrieben wird, sodass … naja, eigentlich kann man das ja schon rauslesen 🙂

        from(jmsQueue).
            to("bean:jmsOrderToSqlBean").
            to("jdbc:jdbcDataSource");

Das eigentlich entscheidende findet in der Start-Klasse statt. Hier werden zunächst die Bean-Komponente JmsOrderToSqlBean und die Datasource ds einer JNDI-Registry übergeben (hier gibts mehrere Möglichkeiten, wie etwa einer Simple-Registry oder OSGI-Registry etc.) und anschließend wird der CamelContext von dieser JNDI-Registry ausgehend instanziiert. Das Ergebnis sieht dann so aus:

package eu.christophburmeister.camel.snmprouter.core;

import javax.naming.Context;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.jndi.JndiContext;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

public class Start {

    private final static String JMS_BROKER_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws Exception {

        // creating the context from jndi-registry
        CamelContext camelContext = new DefaultCamelContext(getJndiContext());
        
        // creating the routes
        SnmpViaJmsToFileRoute snmpTrapRoute = new SnmpViaJmsToDatabaseRoute();

        // adding some components to the context
        camelContext.addComponent("jms", getJmsComponent());
        camelContext.addRoutes(snmpTrapRoute);

        // start the context
        camelContext.start();

        while (true) {
            // just a server-wait!
        }
    }

    private static Context getJndiContext() throws Exception {
        JndiContext jndiContext = new JndiContext();
        jndiContext.bind("jmsOrderToSqlBean", new JmsOrderToSqlBean());
        jndiContext.bind("jdbcDataSource", getJdbcDatasource());
        return jndiContext;
    }

    private static DataSource getJdbcDatasource() {
        DriverManagerDataSource ds = new DriverManagerDataSource();
        ds.setDriverClassName("com.mysql.jdbc.Driver");
        ds.setUrl("jdbc:mysql://localhost:3306/testdb");
        ds.setUsername("user23");
        ds.setPassword("geheim33");
        return ds;
    }

    private static JmsComponent getJmsComponent() {
        JmsComponent comp = new JmsComponent();
        comp.setConnectionFactory(new ActiveMQConnectionFactory(JMS_BROKER_URL));
        return comp;
    }
}

Hat man alles up-and-running, werden die vom SNMP-Client gesendeten Messages zuerst in die JMS-Queue „incomingSnmpTraps“ persistiert und von dort durch den zweiten Router-Eintrag durch das Prozessor-Bean in einen hübschen Insert-String umgewandelt, wobei der TimeStamp, der per Xpath bereits im Header der JMS-Message gelandet ist, einfachheitshalber als CorrelationId verwendet wird. Die eigentliche Message, die in der Datenbank stehen soll, kann dann irgendein beliebiger Wert sein, der (in diesem Fall) mit Hilfe von ein bisschen XPath-Magie vom Key „1.3.6.1.4.1.42.1.1“ stammt. Wie das ganze mit Spring viiiiieeeel einfacher geht/gehen soll, zeige ich demnächst…