In the diagram
below the direction of the arrows indicates the flow of a message transfer,
i.e. Point of Sale (PoS) sends messages to Custom Relationship Management (CRM)
system etc.
Firstly, it
is important to define integration use cases and then to analyze how to
implement them using EIPs taking into consideration existing requirements:
- For analitical purposes ERP needs hourly a list of specified users with additional data (e.g. amount of money spent in current week).
- ERP upon PoS request generates a report based on the database data, stores it in a file and then sends the information to the specified email address and PoS JMS queue.
- If required PoS makes a request to CRM for information about a particular customer.
- For external systems, PoS provides SOAP based services extracting data from JMS.
It is obvious that not all systems are developed on the same platform and don’t use the same communication protocols. Let’s now explain all the available services:
- CRM provides SOAP based web services,
- PoS is a new system and communicates via JMS,
- ERP is a very old system and does not provide any external interface. The only way to communicate with ERP is inserting data to particular database tables.
So, let’s start with EIP diagrams. FuseSource developed a useful set of Eclipse plugins named Fuse IDE and I’ll use the designer provided with this tool.
Use Case 1
Here is a
diagram and camel code that shows integration logic for this case:
<camelContext xmlns="http://camel.apache.org/schema/spring"
xmlns:cs="http://crm.mw.com/">
<route id="UpdateCustomersRoute">
<from uri="timer://everyHour?fixedRate=true&period=360000"
id="EveryHourTimer" /> ①
<setBody id="SelectCustomersQuery">
<constant>SELECT * FROM CUSTOMERS c WHERE c.UPDATED = 0</constant>
</setBody>
<to uri="jdbc:erpdb" id="SelectCustomers"
/> ②
<split parallelProcessing="true" id="SplitCustomers"> ③
<simple>${body}</simple>
<setHeader headerName="operationName">
<constant>findCustomer</constant>
</setHeader>
<setHeader headerName="operationNamespace">
<constant>http://crm.mw.com/</constant>
</setHeader>
<process ref="CustomerProcessor" />
<to uri="cxf://http://localhost:8080/customer-service?wsdlURL=src/main/resources/META-INF/customer-service.wsdl&serviceName={http://crm.mw.com/}CustomerService&portName={http://crm.mw.com/}CustomerServicePort&dataFormat=PAYLOAD" id="FetchCustomersInfo" /> ④
<setHeader headerName="id">
<xpath resultType="java.lang.String">//cs:id</xpath>
</setHeader>
<setHeader headerName="additional">
<xpath resultType="java.lang.String">//cs:additional</xpath>
</setHeader>
<setBody id="UpdateCustomersInfoQuery">
<simple>UPDATE CUSTOMERS SET ADDITIONAL = '${header.additional}' , UPDATED = 1
WHERE CUSTOMER_ID = ${header.id}</simple>
</setBody>
<to uri="jdbc:erpdb" id="UpdateCustomers"
/> ⑤
</split>
</route>
</camelContext>
</camelContext>
As you may note there are a lot of EIP’s and DSL code but don’t be put off and continue reading this article. Most of this code is solely used for technical reasons and is so simple that even a person unfamiliar with Camel will understand it. In the diagram I underlined 5 with thick red line EIP’s which I consider as really important.
First (①) is EveryHourTimer
which starts the route every hour. Second (②) is SelectCustomer
which executes query, defined in SelectCustomerQuery (this is first strictly
technical EIP). Then each of the queried
customers needs to be supplemented with additional data. SplitCustomer (③) splits
customers and invokes the web services defined in FetchCustomerInfo (④). CustomerProcessor
is written in Java and it not only creates a new POJO with JAXB annotation
but also adds additional information to existing data resulting from SelectCustomerQuery. Below is the code for this simple processor
and Customer POJO:
public class
CustomerProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map body = (Map)
exchange.getIn().getBody();
Customer customer = new Customer();
customer.setId(((Integer)
body.get("CUSTOMER_ID")).toString());
exchange.getIn().setBody(customer);
}
}
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name = "", propOrder =
{
"id",
"additional"
})
@XmlRootElement(name = "Customer")
public class Customer {
@XmlElement(required = true)
protected String id;
protected String additional;
//setter and getters
}
Web service
operation name with its namespace is placed in setHeader markup (another technical EIP).
Last action
that needs to be performed is to update customers data in ERP database which is
configured in UpdateCustomers (⑤). Of
course before executing the updating query we need to extract additional data
using XPath (setHeader with xpath markups) and place it in message
headers. The same applies to the query itself but in this case “storage” is
placed in message body (setBody markup).
I can tell
you that this is one of the most complex routes that I have ever written so
please try to analyze it thoroughly and I’m sure that you will derive from it a
lot of practical information.
Use Case 2
The second
use case will be much simpler. As I have already written, ERP is a very old
monolithic system which communicates only through database tables. PoS doesn’t
have access to this database. Fortunately integration platform has such rights,
so the architect decided to create very simple REST service which takes
customer and places it in proper database table which acts as a queue. ERP
every 5 minutes reads this table, generates the report file and saves file in FTP
server. Integration platform is
responsible for polling FTP server and sending report to a particular email address
and PoS queue.
REST
service:
package
com.blogspot.mw.camel.inaction;
import
javax.ws.rs.Consumes;
import
javax.ws.rs.POST;
import
javax.ws.rs.Path;
import
javax.ws.rs.PathParam;
import
javax.ws.rs.core.Response;
@Path("/erp-service/")
public class
ErpRestService {
@POST
@Path("/generate-report/{customerId}")
@Consumes("text/xml")
public Response generateReport(@PathParam("customerId") String customerId)
{
return null;
}
}
EIP Diagrams:
Code:
<route id="RestErpProxy">
<from uri="cxfrs://http://localhost:80/services/task/?resourceClasses=com.blogspot.mw.camel.inaction.ErpRestService"
id="ErpRest" />
<log loggingLevel="INFO"
message="${body}"/>
<to uri="ErpReportProcessor"
id="HeaderProcessor"/>
<choice>
<when>
<simple>${in.header.operationName} == 'generateReport'</simple>
<setBody id="GenerateReportQuery">
<simple>INSERT INTO gen_report (CUSTOMER_ID) VALUES (${in.header.customerId})</simple>
</setBody>
<to uri="jdbc:erpdb"
id="GenerateReportQueryExecute"/>
</when>
<otherwise>
<log loggingLevel="ERROR"
message="Unsupported operation:
${in.header.operationName}"/>
</otherwise>
</choice>
</route>
<route id="ReportSender">
<from id="PollFromFTP"
uri="ftp://login@ftp.crm.mw.com/public/reports?password=pass&binary=true&delay=60000"/>
<to id="SendMail"
uri="smtp://smtp.crm.mw.com?username=boss@crm.mw.com"/>
<to id="SendToQueue"
pattern="InOnly" uri="activemq:queue:ErpReports?jmsMessageType=Bytes"
/>
</route>
Processor
which sets customerId header.
package
com.blogspot.mw.camel.inaction;
import
org.apache.camel.Exchange;
import
org.apache.camel.Message;
import
org.apache.camel.Processor;
import
org.apache.cxf.message.MessageContentsList;
public class ErpReportPRocessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message inMessage =
exchange.getIn();
MessageContentsList
body = (MessageContentsList) inMessage.getBody();
String customerId =
(String) body.get(0);
inMessage.setHeader("customerId", customerId);
}
}
Use Case 3
Most of web services developed in applications don’t require any mediation. Integration
platform serves as a proxy and provides a central point of access to all
services. Therefore this example shows HTTP proxy that intercepts the message
and sends it forward. All systems need
to know only access address to all services.
EIP
Diagram:
Code:
<route id="HttpProxy">
<from uri="jetty:http://localhost:80/services?matchOnUriPrefix=true"
id="JettyHttpProxy" />
<log id="LogBody"
loggingLevel="INFO" message="${body}"/>
<to uri="http://www.crm.mw.com:8081/services?bridgeEndpoint=true"
id="CrmBridgeEndpoint"/>
</route>
Use Case 4
This last
case is similar to the third one. But this time, this requires the creation of
WSDL’s. Service sends the message to a particular JMS queue and waits for the response
from queue referred in JMS-ReplyTo. The
last action that needs to be performed is to send the message to the web
service client.
CXF Web service definition (cxf
namespace is defined as xmlns:cxf=http://camel.apache.org/schema/cxf):
<cxf:cxfEndpoint id="PosWSProxy" address="http://localhost:80/pos/services/proxy"
endpointName="pos:PosService" serviceName="pos:PosService" wsdlURL="src/main/resources/META-INF/pos-service.wsdl"
xmlns:pos="http://pos.mw.com/" />
WSDL:
<?xml version="1.0" encoding="UTF-8"
standalone="no"?>
<wsdl:definitions xmlns:pos="http://pos.mw.com/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
name="PosService"
targetNamespace="http://pos.mw.com/">
<wsdl:types>
<xsd:schema targetNamespace="http://pos.mw.com/">
<xsd:import schemaLocation="req-resp.xsd" namespace="http://pos.mw.com/" />
</xsd:schema>
</wsdl:types>
<wsdl:message name="Request">
<wsdl:part element="pos:Request" name="parameters" />
</wsdl:message>
<wsdl:message name="Response">
<wsdl:part element="pos:Response" name="parameters" />
</wsdl:message>
<wsdl:portType name="PosService">
<wsdl:operation name="sendRequest">
<wsdl:input message="pos:Request" />
<wsdl:output message="pos:Response" />
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="PosServiceSOAP"
type="pos:PosService">
<soap:binding style="document"
transport="http://schemas.xmlsoap.org/soap/http"
/>
<wsdl:operation name="sendRequest">
<soap:operation soapAction="http://pos.mw.com/pos-service/sendRequest"
/>
<wsdl:input>
<soap:body use="literal" />
</wsdl:input>
<wsdl:output>
<soap:body use="literal" />
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="PosService">
<wsdl:port binding="pos:PosServiceSOAP" name="PosServicePort">
<soap:address location="http://localhost:8080/pos-service" />
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
XML Schema:
<?xml version="1.0" encoding="UTF-8"?>
<schema xmlns="http://www.w3.org/2001/XMLSchema" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://pos.mw.com/"
xmlns:tns="http://pos.mw.com/"
elementFormDefault="qualified">
<xsd:element name="Request">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="content" type="xsd:anyType"
/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="Response">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="content" type="xsd:anyType"
/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</schema>
EIP
Diagram:
Code:
<route id="WSQueueProxy">
<from uri="cxf:bean:PosWSProxy?dataFormat=PAYLOAD"
id="WSProxy"/>
<log loggingLevel="INFO"
message="${body}" id="LogInputBody"/>
<to uri="activemq:queue:PosQueue?jmsMessageType=Bytes"
id="SendMessageToQueue"/>
<log loggingLevel="INFO"
message="${body}" id="LogOutputBody"/>
</route>
ActiveMQ component:
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL"
value="tcp://localhost:61616" />
</bean>