Sunday, July 15, 2012

EIP in action

This section shows a more complex example of integration using EIPs with Apache Camel. I will try to describe how to integrate 3 systems with four different transport protocols: Web Service, JMS (ActiveMQ), file system and SMTP. 
In the diagram below the direction of the arrows indicates the flow of a message transfer, i.e. Point of Sale (PoS) sends messages to Custom Relationship Management (CRM) system etc.
Firstly, it is important to define integration use cases and then to analyze how to implement them using EIPs taking into consideration existing requirements: 
  1. For analitical purposes ERP needs hourly a list of specified users with additional data (e.g. amount of money spent in current week).
  2. ERP upon PoS request generates a report based on the database data, stores it in a file and then sends the information to the specified email address and PoS JMS queue. 
  3.  If required PoS makes a request to CRM for information about a particular customer. 
  4.  For external systems, PoS provides SOAP based services extracting data from JMS.

It is obvious that not all systems are developed on the same platform and don’t use the same communication protocols. Let’s now explain all the available services: 
  • CRM provides SOAP based web services,
  • PoS is a new system and communicates via JMS,
  • ERP is a very old system and does not provide any external interface. The only way to communicate with ERP is inserting data to particular database tables.

So, let’s start with EIP diagrams. FuseSource developed a useful set of Eclipse plugins named Fuse IDE and I’ll use the designer provided with this tool.


Use Case 1

Here is a diagram and camel code that shows integration logic for this case:

<camelContext xmlns=""
     <route id="UpdateCustomersRoute">
            <from uri="timer://everyHour?fixedRate=true&amp;period=360000"
                           id="EveryHourTimer" />
            <setBody id="SelectCustomersQuery">
                   <constant>SELECT * FROM CUSTOMERS c WHERE c.UPDATED = 0</constant>
            <to uri="jdbc:erpdb" id="SelectCustomers" />
            <split parallelProcessing="true" id="SplitCustomers">
                   <setHeader headerName="operationName">
                   <setHeader headerName="operationNamespace">
                   <process ref="CustomerProcessor" />
                   <to uri="cxf://http://localhost:8080/customer-service?wsdlURL=src/main/resources/META-INF/customer-service.wsdl&amp;serviceName={}CustomerService&amp;portName={}CustomerServicePort&amp;dataFormat=PAYLOAD" id="FetchCustomersInfo" />
                   <setHeader headerName="id">
                          <xpath resultType="java.lang.String">//cs:id</xpath>
                   <setHeader headerName="additional">
                          <xpath resultType="java.lang.String">//cs:additional</xpath>
                   <setBody id="UpdateCustomersInfoQuery">
                          <simple>UPDATE CUSTOMERS SET ADDITIONAL = '${header.additional}' , UPDATED = 1 WHERE CUSTOMER_ID = ${}</simple>
                   <to uri="jdbc:erpdb" id="UpdateCustomers" />

As you may note there are a lot of EIP’s and DSL code but don’t be put off and continue reading this article. Most of this code is solely used for technical reasons and is so simple that even a person unfamiliar with Camel will understand it.  In the diagram I underlined 5 with thick red line EIP’s which I consider as really important. 

First (①) is EveryHourTimer which starts the route every hour. Second (②) is SelectCustomer which  executes query, defined in SelectCustomerQuery (this is first strictly technical EIP). Then each of the queried customers needs to be supplemented with additional data. SplitCustomer  (③) splits customers and invokes the web services defined in FetchCustomerInfo (④). CustomerProcessor is written in Java and it not only creates a new POJO with JAXB annotation but also adds additional information to existing data resulting from SelectCustomerQuery.  Below is the code for this simple processor and Customer POJO:
public class CustomerProcessor implements Processor {

       public void process(Exchange exchange) throws Exception {
             Map body = (Map) exchange.getIn().getBody();
             Customer customer = new Customer();
             customer.setId(((Integer) body.get("CUSTOMER_ID")).toString());          


@XmlType(name = "", propOrder = {
@XmlRootElement(name = "Customer")
public class Customer {

    @XmlElement(required = true)
    protected String id;
    protected String additional;

    //setter and getters

Web service operation name with its namespace is placed in setHeader markup (another technical EIP).
Last action that needs to be performed is to update customers data in ERP database which is configured in UpdateCustomers (⑤). Of course before executing the updating query we need to extract additional data using XPath (setHeader with xpath markups) and place it in message headers. The same applies to the query itself but in this case “storage” is placed in message body (setBody markup).
I can tell you that this is one of the most complex routes that I have ever written so please try to analyze it thoroughly and I’m sure that you will derive from it a lot of practical information.


Use Case 2

The second use case will be much simpler. As I have already written, ERP is a very old monolithic system which communicates only through database tables. PoS doesn’t have access to this database. Fortunately integration platform has such rights, so the architect decided to create very simple REST service which takes customer and places it in proper database table which acts as a queue. ERP every 5 minutes reads this table, generates the report file and saves file in FTP server.  Integration platform is responsible for polling FTP server and sending report to a particular email address and PoS queue.

REST service:


public class ErpRestService {
    public Response generateReport(@PathParam("customerId") String customerId) {
        return null;


EIP Diagrams:

<route id="RestErpProxy">
    <from uri="cxfrs://http://localhost:80/services/task/?" id="ErpRest" />
    <log loggingLevel="INFO" message="${body}"/>
    <to uri="ErpReportProcessor" id="HeaderProcessor"/>
            <simple>${in.header.operationName} == 'generateReport'</simple>
            <setBody id="GenerateReportQuery">
                <simple>INSERT INTO gen_report (CUSTOMER_ID) VALUES (${in.header.customerId})</simple>
            <to uri="jdbc:erpdb" id="GenerateReportQueryExecute"/>
            <log loggingLevel="ERROR" message="Unsupported operation: ${in.header.operationName}"/>
<route id="ReportSender">
    <from id="PollFromFTP" uri=";binary=true&amp;delay=60000"/>
    <to id="SendMail" uri="smtp://"/>
    <to id="SendToQueue" pattern="InOnly"  uri="activemq:queue:ErpReports?jmsMessageType=Bytes" />

Processor which sets customerId header.

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.cxf.message.MessageContentsList;

public class ErpReportPRocessor implements Processor {

       public void process(Exchange exchange) throws Exception {
             Message inMessage = exchange.getIn();
             MessageContentsList body = (MessageContentsList) inMessage.getBody();
             String customerId = (String) body.get(0);
             inMessage.setHeader("customerId", customerId);



Use Case 3

Most of  web services developed  in applications don’t require any mediation. Integration platform serves as a proxy and provides a central point of access to all services. Therefore this example shows HTTP proxy that intercepts the message and sends it forward.  All systems need to know only access address to all services.
EIP Diagram:

<route id="HttpProxy">
    <from uri="jetty:http://localhost:80/services?matchOnUriPrefix=true" id="JettyHttpProxy" />
    <log id="LogBody" loggingLevel="INFO" message="${body}"/>
    <to uri="" id="CrmBridgeEndpoint"/>


Use Case 4

This last case is similar to the third one. But this time, this requires the creation of WSDL’s. Service sends the message to a particular JMS queue and waits for the response from queue referred in JMS-ReplyTo. The last action that needs to be performed is to send the message to the web service client.
CXF Web service definition (cxf namespace is defined as  xmlns:cxf=
<cxf:cxfEndpoint id="PosWSProxy" address="http://localhost:80/pos/services/proxy" endpointName="pos:PosService" serviceName="pos:PosService" wsdlURL="src/main/resources/META-INF/pos-service.wsdl" xmlns:pos="" />

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<wsdl:definitions xmlns:pos=""
       xmlns:soap="" xmlns:wsdl=""
       xmlns:xsd="" name="PosService"
             <xsd:schema targetNamespace="">
                    <xsd:import schemaLocation="req-resp.xsd" namespace="" />

       <wsdl:message name="Request">
             <wsdl:part element="pos:Request" name="parameters" />
       <wsdl:message name="Response">
             <wsdl:part element="pos:Response" name="parameters" />

       <wsdl:portType name="PosService">
             <wsdl:operation name="sendRequest">
                    <wsdl:input message="pos:Request" />
                    <wsdl:output message="pos:Response" />

       <wsdl:binding name="PosServiceSOAP" type="pos:PosService">
             <soap:binding style="document"
                    transport="" />

             <wsdl:operation name="sendRequest">
                    <soap:operation soapAction="" />
                           <soap:body use="literal" />

                           <soap:body use="literal" />


       <wsdl:service name="PosService">
             <wsdl:port binding="pos:PosServiceSOAP" name="PosServicePort">
                    <soap:address location="http://localhost:8080/pos-service" />

XML Schema:
<?xml version="1.0" encoding="UTF-8"?>
<schema xmlns="" xmlns:xsd=""
       targetNamespace="" xmlns:tns=""

       <xsd:element name="Request">
                           <xsd:element name="content" type="xsd:anyType" />
       <xsd:element name="Response">
                           <xsd:element name="content" type="xsd:anyType" />


EIP Diagram:

<route id="WSQueueProxy">
    <from uri="cxf:bean:PosWSProxy?dataFormat=PAYLOAD" id="WSProxy"/>
    <log loggingLevel="INFO" message="${body}" id="LogInputBody"/>
    <to uri="activemq:queue:PosQueue?jmsMessageType=Bytes" id="SendMessageToQueue"/>
    <log loggingLevel="INFO" message="${body}" id="LogOutputBody"/>

ActiveMQ component:
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616" />

No comments:

Post a Comment