Wednesday, August 15, 2012

Monitoring and managing Apache Camel using JMX


Generally speaking, Java Management Extension (JMX) is technology, which provides ability to manage and monitor applications and devices. In order to explore and understand its details it is recommended to visit the following website: http://www.oracle.com/technetwork/java/javase/tech/javamanagement-140525.html

This blog post will describe the capabilities of Apache Camel management module, for example:  
  • monitoring Camel artifacts i.e. routes, components, consumers, thread pools etc. , 
  •  managing runtime i.e. stopping particular routes or increasing size of particular thread pool,
  •  creating custom Management Bean (MBean) to provide other useful data, 
  • listening to JMX notifications.

For the first two afore-mention functionalities jConsole and FuseIDE will be used. By default jConsole is delivered with JRE and JDK in directory JAVA_HOME/bin/. FuseIDE provides console with more useful features like drag & drop messages from static file to ActiveMQ queue or topic. Unfortunately this tool is available only for subscribers (http://fusesource.com/enterprise-support/subscription-center/).

For the purpose of this article, maven archetype org.apache.archetypes / camel.archetype.spring will be used. The following code shows how to create this archetype from the command line:

mvn archetype:generate \ 
  -DarchetypeGroupId=org.apache.camel.archetypes \ 
  -DarchetypeArtifactId=camel.archetype.spring \ 
  -DarchetypeVersion=2.9.0 \ 
  -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots-group

To run Camel Maven project type: mvn camel:run.

Basics

Let’s start with jConsole. When you open it, you should see New Connection creator similar to the one shown below:


Select  org.codehouse.plexus … and click Connect. You should see six tabs: 
  •  Overview,
  • Memory,
  • Threads,
  • Classes, 
  • VM Summary and
  • MBeans.
Go to the last tab on the list and expand org.apache.camel tree:


Try to rummage a bit of the tree and see how much information can be found, i.e.:
  • there is only one component (“file”),
  • there is only one consumer (“FileConsumer”),
  • if there is no specified error handler, then “DefaultErrorHandlerBuilder” is to be used,
  • there are five processors (“choice1”, “log1”, “log2”, “to1”, “to2”). 
  

Assume that on “production stage” one of the routes has a bug. This problem should be resoved as soon as possible and during this time, route must be stopped. You can’t stop the whole application so there is one very quick way to stop specified route: use JMX. In org.apache.camel tree, expand routes node --> find the buggy route --> expand Operations node --> find operation named stop() and “click” on it.


Another example of very useful MBean method is browsing body of messages consumed by particular endpoint. Picture below shows first file which was moved to target/messages/uk directory.


As you can see there are tens of MBeans methods that may be extremely useful during the monitoring process.

Fuse IDE is based on Eclipse and provides not only similar to jConsole JMX MBean explorer but also graphical equivalent which facilitates exploring Camel applications in runtime. Additionally, Diagram View shows message flow of a particular route. Fuse IDE can be downloaded from the following website: http://fusesource.com/products/fuse-ide/ .
When you start Fuse IDE, switch to Fuse Integration perspective, expand service called maven and search for route1. Below is the screenshot of this view.



Custom MBean

Camel JMX provides a lot of useful MBeans but sometimes there is a need to create custom MBean that will provide application specific data.
Take for example that we need the average and total number of characters in the files. The class as shown below shows implementation of such processor:

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;

@ManagedResource(description = "Statictics Processor")
public class StatisticsProcessor implements Processor {
      
    private int totalRequests;
   
    private int totalCharacters;

    @ManagedAttribute
    public int getAvgChar() {
        return totalCharacters / totalRequests;
    }
   
    @ManagedAttribute
    public int getTotalChar() {
        return totalCharacters;
    }

    public void process(Exchange exchange) throws Exception {
       ++totalRequests;
       totalCharacters += ((String) exchange.getIn().getBody(String.class)).length();
    }
}


@ManagerResource is a metadata annotation that informs Camel that this class provides MBean methods and attributes.
@ManagedAttribute indicates which one of a class attributes/methods are MBeans attributes/methods. In this case getAvgChar and getTotalChar are such methods.
Now, you have to modify Camel route as follows:

  <bean id="StatisticsProcessor" class="com.blogspot.michalwarecki.camel.simple.StatisticsProcessor" />

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <description>here is a sample which processes the input files
         (leaving them in place - see the 'noop' flag)
         then performs content based routing on the message using XPath</description>
        <from uri="file:src/data?noop=true"/>
        <process ref="StatisticsProcessor"/>
        <choice>
            <when>
                <xpath>/person/city = 'London'</xpath>
                <log message="UK message"/>
                <to uri="file:target/messages/uk"/>
            </when>
            <otherwise>
                <log message="Other message"/>
                <to uri="file:target/messages/others"/>
            </otherwise>
        </choice>
    </route>
</camelContext>


Run the applications and check available processors and its methods in jConsole. As you can see there is processor named processor1 with methods getAvgChar and getTotalChar as the end.
Try to invoke them and check results:





Broadcasting notifications

JMX provides the ability to broadcast custom notifications to subscribers. These notifications may indicate a threat to proper functioning of integration platform. For example, you can create notifier which will send information about inactive web page which is required by the application.
Firstly, you need to implement custom event object which extends EventObject. Data about this event will be displayed based on toString method. Example of such class is shown below:

import java.util.EventObject;

public class MyCustomEvent extends EventObject {
      
       private static final long serialVersionUID = 7062663129376271978L;

       public MyCustomEvent(String source) {
        super(source);
    }
      
       @Override
    public String toString() {
        return "Event message: " + getSource();
       }

}

Secondly, you have to notify all subscribers that event has occurred. Apache Camel context has a few registered notifiers, of which one of them is org.apache.camel.management.JmxNotificationEventNotifier which provides ability to broadcast events. The code below shows processor which iterates over all notifiers and sends custom event to each of them.

import java.util.EventObject;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.management.JmxNotificationEventNotifier;
import org.apache.camel.spi.EventNotifier;

public class EventNotifierProcessor implements Processor {

       @Override
       public void process(Exchange exchange) throws Exception {
             List<EventNotifier> eventNotifiers =
                           exchange.getContext().getManagementStrategy().getEventNotifiers();
            
             for(EventNotifier eventNotifier : eventNotifiers) {
                    EventObject eventObject = new MyCustomEvent("CustomEventNotification");
                    eventNotifier.notify(eventObject);
             }
       }
}

Thirdly, place JmxNotificationEventNotifier in Spring configuration and add EventNotiferProcessor to the following route:

<bean id="EventNotifierProcessor" class="com.blogspot.michalwarecki.camel.simple.EventNotifierProcessor" />
 
  <bean id="JmxEventNotifier" class="org.apache.camel.management.JmxNotificationEventNotifier">
       <property name="source" value="MyCamelApplication" />
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <description>here is a sample which processes the input files
         (leaving them in place - see the 'noop' flag)
         then performs content based routing on the message using XPath</description>
        <from uri="file:src/data?noop=true"/>
        <process ref="StatisticsProcessor"/>
        <process ref="EventNotifierProcessor"/>
        <choice>
            <when>
                <xpath>/person/city = 'London'</xpath>
                <log message="UK message"/>
                <to uri="file:target/messages/uk"/>
            </when>
            <otherwise>
                <log message="Other message"/>
                <to uri="file:target/messages/others"/>
            </otherwise>
        </choice>
    </route>

</camelContext>


Now, you can check all notifications in jConsole. Expand eventnotifers , select JmxEventNotifier and click Subscribe on Notifications screen:



As you can see there are two events of type MyCustomEvent with message “Event message: CustomEventNotification”.

Receiving notifications

Sometimes there is a need to handle notifications automatically. In order to do this, you have to create a class which implements javax.management.NotificationListener interface. In handleNotification method you can invoke custom business logic to handle received event.

import javax.management.Notification;
import javax.management.NotificationListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomNotificationListener implements NotificationListener {
      
       private static Logger logger = LoggerFactory.getLogger(CustomNotificationListener.class);
      
       @Override
       public void handleNotification(Notification notification, Object handback) {
             logger.info("Get the notification : " + notification);
       }

}

To register notification listener you have to add it in Camel MBean Server. In Spring DSL there is no other possibility to access Camel route context before initialization then in custom route policy.
The code below shows the implementation of CustomRoutePolicy which registers above mentioned notification listener.

import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.ObjectName;

import org.apache.camel.Route;
import org.apache.camel.impl.RoutePolicySupport;

public class CustomRoutePolicy extends RoutePolicySupport {
      
       @SuppressWarnings("serial")
       @Override
       public void onInit(Route route) {
             ObjectName on = null;
             try {
                    on = ObjectName.getInstance("org.apache.camel:context=MichalPC/camel-1,type=eventnotifiers,name=JmxEventNotifier");
             } catch (MalformedObjectNameException e) {
                    throw new RuntimeException(e);
             } catch (NullPointerException e) {
                    throw new RuntimeException(e);
             }
       
        CustomNotificationListener listener = new CustomNotificationListener();  
        try {
                    route.getRouteContext().getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer().addNotificationListener(on,
                        listener, new NotificationFilter() {

                                  public boolean isNotificationEnabled(Notification notification) {
                                return true;
                            }
                        }, null);
             } catch (InstanceNotFoundException e) {
                    throw new RuntimeException(e);
             }
       }

}

Camel route modification:

<bean id="CustomRoutePolicy" class="com.blogspot.michalwarecki.camel.simple.CustomRoutePolicy" />

<route routePolicyRef="CustomRoutePolicy">