Saturday, October 19, 2013

Java on Parallella


Many of our algorithms are implemented in a way that enables them to execute many calculations simultaneously. It is called parallelization. In my short IT career I have seen many erroneous thinking “If 8 simultaneous threads speeded up my algorithm 6 times than I'll increase number of threads and this will give me even better results! Great!” Unfortunately, Nope …
 
If more than N threads are trying to run in parallel on N core processor this is physically impossible. In this case (most often) operating system “cuts” your algorithm in a few pieces trying to switch between them giving impression that we can create arbitrary number of threads with impunity.
Unfortunately this switching (named context switching) comes with a cost:


Ideal multitasking

multitasking with context switching


All of us can easily understand that context switching involves some time consuming logic but why the hell tasks with context switching are running little bit longer? Anter alia because context switch clears TLB (http://en.wikipedia.org/wiki/Translation_lookaside_buffer) and “dirty” CPU caches. That's why we often should take a look at hardware and “mechanical sympathy”.

So, what we really want is more cores/processors but companies like Intel are selling us in reasonable price maximum 8 cores. These processors are fast but sometimes we want to have little bit slower but two or four times more of them. A few months ago I started looking for cheap multimulti processor computer and I found Parallella (http://www.parallella.org/). 

Parallella supercomputer

Parallella computer costs 99$ and contains:
- 16 cores Epiphany coprocessor (64 cores en route),
- 2 cores ARM-A9 host processor (with FPGA logic),
- 1 GB SDRAM,
- Gigabit Ethernet, 2x uUSB, uHDMI, uSD.
Impressed? I was.


Epiphany architecture

Heart of Parallela is Epiphany coprocessor. It consists of 16/64 RISC Processors connected together by mesh network-on-chip.


The main challenge in programming on Epiphany is to create the correct algorithm to offload host processor. You have multiple CPUs available but currently no operating system and what is more important no memory fences (http://en.wikipedia.org/wiki/Memory_barrier). Memory access architecture is referred as “weak ordering of loads and stores”. The guarantees are:
  • Load operations complete before the returned data is used by a subsequent instruction,
  • Load operations using data previously written use the updated values,
  • Store operations eventually propagate to their ultimate destination.

Why the hell am I writing about this? Because the Java Memory Model (http://www.slideshare.net/michalwarecki/java-memory-model-23207253) requires specified rules when using monitors and volatile keyword. E.g. x86 processors contains locked instruction which allows us to be sure that sequential write and read to the same variable will be executed in this order.
Epiphany doesn't have such instructions but if we want to be Java Language Specification (JLS) compatible we need to implement it somehow. Such JLS requirements will be the hardest to implement.

Epiphany SDK

Epiphany comes with very good Software Development Kit which makes programming easier. It consists of:
  • Epiphany Multicore Development IDE (based on Eclipse),
  • C/C++ Compiler (E-GCC),
  • Assembler (E-AS),
  • Linked (E-LD),
  • Instruction set simulator (E-RUN),
  • Hardware Connection Server (E-SRVER),
  • Loader Utility (E-LOADER),
  • Debugger (E-DBG),
  • Utility libraries.
This is enough to create powerful programs.


Great but can I run Java on it?

Not yet but we can help to make it work. Java has support for many architectures but Epiphany is not enough popular to interest large companies like Oracle or Redhat. I'm a big fun of Java and all hardware novelties so I decided to try implementing it on my own. Currently, I'm working on engineering Epiphany assembly code generator which is based on OpenJDK Graal JiT Compiler. Another challenge is to create efficient code dispatcher and Java API which would make programming easier and safer. It has to be designed for heterogeneous architectures like Parallella. Here comes OpenJDK Sumatra project started by AMD to support GPU/APU offload handling.
Implementation of communication by a shared memory buffers between main Java process and offloaded code will be just a pleasure :-)

OpenJDK Graal

Purpose of Graal project is to exposure functionalities to make dynamic compilers and interpreters in pure Java. Currently, there is possibility to generate assembly code for architectures:
  • AMD64,
  • Sparc,
  • PTX,
  • HSAIL.
I want to make it possible to generate Epiphany assembly code. People often ask me, “why Graal?” so the simplest answers are:
  • because it will be much more quicker to implement such code generator (e. g. why would we need to reinvent the weel?),
  • because it has a very good architectural design,
  • because I can refer to the existing implementations,
  • because it is supported by Oracle and AMD.


Code example for HSAIL:
 
Java code:
public static void testMulThreeArrays(int[] out, int[] ina, int[] inb,
                                      int gid) {
        out[gid] = ina[gid] * inb[gid];
}


Generated HSAIL code:
kernel &run (  
 kernarg_u64 %_arg0, 
 kernarg_u64 %_arg1, 
 kernarg_u64 %_arg2 
 ) { 
 ld_kernarg_u64  $d6, [%_arg0]; 
 ld_kernarg_u64  $d2, [%_arg1]; 
 ld_kernarg_u64  $d1, [%_arg2]; 
 workitemabsid_u32 $s8, 0; 
                                    
@L0: 
 ld_global_s32 $s0, [$d2 + 12]; 
 cmp_ge_b1_u32 $c0, $s8, $s0; 
 cbr $c0, @L1; 
@L2: 
 ld_global_s32 $s0, [$d1 + 12]; 
 cmp_ge_b1_u32 $c0, $s8, $s0; 
 cbr $c0, @L3; 
@L4: 
 ld_global_s32 $s0, [$d6 + 12]; 
 cmp_ge_b1_u32 $c0, $s8, $s0; 
 cbr $c0, @L5; 
@L6: 
 cvt_s64_s32 $d0, $s8; 
 mul_s64 $d0, $d0, 4; 
 add_u64 $d2, $d2, $d0; 
 ld_global_s32 $s0, [$d2 + 16]; 
 cvt_s64_s32 $d3, $s8; 
 mul_s64 $d3, $d3, 4; 
 add_u64 $d1, $d1, $d3; 
 ld_global_s32 $s3, [$d1 + 16]; 
 mul_s32 $s3, $s3, $s0; 
 cvt_s64_s32 $d8, $s8; 
 mul_s64 $d8, $d8, 4; 
 add_u64 $d6, $d6, $d8; 
 st_global_s32 $s3, [$d6 + 16]; 
 ret; 
@L1: 
 ret; 
@L3: 
 ret; 
@L5: 
 ret; 
};


Cool! I'm not familiar with HSAIL but it is very readable. How to generate such code:
private void test(String snippet) {
        StructuredGraph graph = parse(snippet);
        HSAILCompilationResult compResult = HSAILCompilationResult
            .getHSAILCompilationResult(graph);
        compResult.dumpCompilationResult();
}


where snippet is a method name. Nice and simple. I'll skip parse implementation because it is not necessary here.
Main Graal compiler internal components:
  • architecture: in this component we define how hardware architecture (registers etc.) looks like. For example this is my draft implementation of Epiphany:
/**
 * Represents the Epiphany architecture.
 */
public class Epiphany extends Architecture {

    /**
     * Epiphany architecture does not provide any implicit memory barrier.
     */
    public static final int NO_MEM_BAR = 0x0000;

    /**
     * The general purpose registers can all be used in an integral context as well as in a floating-point context.
     * There is no distinction per FPU and CPU so CPU category is used.
     */
    public static final Register.RegisterCategory CPU = new Register.RegisterCategory("CPU");
    public static final Register.RegisterCategory FPU = new Register.RegisterCategory("FPU");

    //Argument / result / scratch registers
    public static final Register a1  = new Register(1,  1,  "r0",  CPU);
    public static final Register a2  = new Register(2,  2,  "r1",  CPU);
    public static final Register a3  = new Register(3,  3,  "r2",  CPU);
    public static final Register a4  = new Register(4,  4,  "r3",  CPU);

    //Registers variable
    public static final Register v1  = new Register(5,  5,  "r4",  CPU);
    public static final Register v2  = new Register(6,  6,  "r5",  CPU);
    public static final Register v3  = new Register(7,  7,  "r6",  CPU);
    public static final Register v4  = new Register(8,  8,  "r7",  CPU);
    public static final Register v5  = new Register(9,  9,  "r8",  CPU);
    public static final Register v6  = new Register(10, 10,  "r9",  CPU);
    public static final Register v7  = new Register(11, 11,  "r10",  CPU);

    //Variable Register / Frame Pointer
    public static final Register v8  = new Register(12, 12,  "r11",  CPU);

    //Intra - procedure call scratch register
    public static final Register pcs  = new Register(13, 13,  "r12",  CPU);

    //Stack Pointer
    public static final Register sp  = new Register(14, 14,  "r13",  CPU);

    //Link Register
    public static final Register lr  = new Register(15, 15,  "r14",  CPU);

    //General use
    public static final Register r15  = new Register(16, 16,  "r15",  CPU);
    public static final Register r16  = new Register(17, 17,  "r16",  CPU);
    public static final Register r17  = new Register(18, 18,  "r17",  CPU);
    public static final Register r18  = new Register(19, 19,  "r18",  CPU);
    public static final Register r19  = new Register(20, 20,  "r19",  CPU);
    public static final Register r20  = new Register(21, 21,  "r20",  CPU);
    public static final Register r21  = new Register(22, 22,  "r21",  CPU);
    public static final Register r22  = new Register(23, 23,  "r22",  CPU);
    public static final Register r23  = new Register(24, 24,  "r23",  CPU);
    public static final Register r24  = new Register(25, 25,  "r24",  CPU);
    public static final Register r25  = new Register(26, 26,  "r25",  CPU);
    public static final Register r26  = new Register(27, 27,  "r26",  CPU);
    public static final Register r27  = new Register(28, 28,  "r27",  CPU);

    //Reserved for constants
    public static final Register r28  = new Register(29, 29,  "r28",  CPU);
    public static final Register r29  = new Register(30, 30,  "r29",  CPU);
    public static final Register r30  = new Register(31, 31,  "r30",  CPU);
    public static final Register r31  = new Register(32, 32,  "r31",  CPU);

    //General use
    public static final Register r32  = new Register(33, 33,  "r32",  CPU);
    public static final Register r33  = new Register(34, 34,  "r33",  CPU);
    public static final Register r34  = new Register(35, 35,  "r34",  CPU);
    public static final Register r35  = new Register(36, 36,  "r35",  CPU);
    public static final Register r36  = new Register(37, 37,  "r36",  CPU);
    public static final Register r37  = new Register(38, 38,  "r37",  CPU);
    public static final Register r38  = new Register(39, 39,  "r38",  CPU);
    public static final Register r39  = new Register(40, 40,  "r39",  CPU);
    public static final Register r40  = new Register(41, 41,  "r40",  CPU);
    public static final Register r41  = new Register(42, 42,  "r41",  CPU);
    public static final Register r42  = new Register(43, 43,  "r42",  CPU);
    public static final Register r43  = new Register(44, 44,  "r43",  CPU);
    public static final Register r44  = new Register(45, 45,  "r44",  CPU);
    public static final Register r45  = new Register(46, 46,  "r45",  CPU);
    public static final Register r46  = new Register(47, 47,  "r46",  CPU);
    public static final Register r47  = new Register(48, 48,  "r47",  CPU);
    public static final Register r48  = new Register(49, 49,  "r48",  CPU);
    public static final Register r49  = new Register(50, 50,  "r49",  CPU);
    public static final Register r50  = new Register(51, 51,  "r50",  CPU);
    public static final Register r51  = new Register(52, 52,  "r51",  CPU);
    public static final Register r52  = new Register(53, 53,  "r52",  CPU);
    public static final Register r53  = new Register(54, 54,  "r53",  CPU);
    public static final Register r54  = new Register(55, 55,  "r54",  CPU);
    public static final Register r55  = new Register(56, 56,  "r55",  CPU);
    public static final Register r56  = new Register(57, 57,  "r56",  CPU);
    public static final Register r57  = new Register(58, 58,  "r57",  CPU);
    public static final Register r58  = new Register(59, 59,  "r58",  CPU);
    public static final Register r59  = new Register(60, 60,  "r59",  CPU);
    public static final Register r60  = new Register(61, 61,  "r60",  CPU);
    public static final Register r61  = new Register(62, 62,  "r61",  CPU);
    public static final Register r62  = new Register(63, 63,  "r62",  CPU);
    public static final Register r63  = new Register(64, 64,  "r63",  CPU);

    public static final Register[] gprRegisters = {
        r15, r16, r17, r18, r19, r20, r21, r22, r23, r24, r25, r26, r27,
        r32, r33, r34, r35, r36, r37, r38, r39, r40, r41, r42, r43, r44, r45, r46, r47, r48,
        r49, r50, r51, r52, r53, r54, r55, r56, r57, r58, r59, r60, r61, r62, r63
    };

    public static final Register[] allRegisters = {
        a1, a2, a3, a4, v1, v2, v3, v4, v5, v6, v7, v8, pcs, sp, lr,
        r15, r16, r17, r18, r19, r20, r21, r22, r23, r24, r25, r26, r27, r28, r29, r30, r31,
        r32, r33, r34, r35, r36, r37, r38, r39, r40, r41, r42, r43, r44, r45, r46, r47, r48,
        r49, r50, r51, r52, r53, r54, r55, r56, r57, r58, r59, r60, r61, r62, r63
    };

    public Epiphany() {
        /**
         * About machine code call displacement offset:
         * Epiphany has relative as well as absolute branching. Each offers a 16-bit and 32-bit opcodes.
         * For relative branch, the offset is either 8-bit or 24-bit (signed) accordingly.
         * It can also branch based on an absolute register value, where the offset can be 32-bit.
         */
        super("Epiphany", 4, ByteOrder.LITTLE_ENDIAN, false, allRegisters, NO_MEM_BAR, 1, r63.encoding + 1, 4);
    }

    @Override
    public boolean canStoreValue(Register.RegisterCategory category, PlatformKind platformKind) {
        if (!(platformKind instanceof Kind)) {
            return false;
        }
        Kind kind = (Kind) platformKind;
        if (category == CPU) {
            switch (kind) {
                case Boolean:
                case Byte:
                case Char:
                case Short:
                case Int:
                case Long:
                case Object:
                    return true;
            }
        } else if (category == FPU) {
            switch (kind) {
                case Float:
                case Double:
                    return true;
            }
        }
        return false;
    }

    @Override
    public PlatformKind getLargestStorableKind(Register.RegisterCategory category) {
        if (category == CPU) {
            return Kind.Long;
        } else if (category == FPU) {
            return Kind.Double;
        } else {
            return Kind.Illegal;
        }
    }
}
  • compiler: as the name implies, this components is used for code compilation,
  • ASM: generates architecture-specific assembly code,
  • LIR: aims to manage intermediate representation graph.

OpenJDK Sumatra

This project is even more interesting. The primary purpose of it is to use graphics processing unit (GPU) and accelerated processing unit (APU – i.e. Epiphany) in JVM. It is so awesome that it creeps me out :-).
Graphics cards have computations power of thousands of GFlops, so not using this power is just a waste of money. But heterogeneous architecture is not so easy to implement. Guys from AMD and Oracle wants to make it easier to offload logic written in JVM based languages to GPUs and APUs. Currently, it is possible to redirect code but in the future such offload will be possible dynamically (i.e. using C2).
Example of redirecting Java code:
IntStread.range(0, players.length).parallel().forEach(n → {
    Player p = players[n];
    if(p.getAb() > 0) {
        p.setBa((float)p.getHits() / (float) p.getAb());
     } else {
        p.setBa((float)0.0);
     }
}); 
I this example, we can see that each calculation of player batting average is executed in parallel on different processing unit where n is the unique identifier of this unit.

Summary

If you like what I do and would like to participate, please send me a message. It will be motivating!

Wednesday, August 15, 2012

Monitoring and managing Apache Camel using JMX


Generally speaking, Java Management Extension (JMX) is technology, which provides ability to manage and monitor applications and devices. In order to explore and understand its details it is recommended to visit the following website: http://www.oracle.com/technetwork/java/javase/tech/javamanagement-140525.html

This blog post will describe the capabilities of Apache Camel management module, for example:  
  • monitoring Camel artifacts i.e. routes, components, consumers, thread pools etc. , 
  •  managing runtime i.e. stopping particular routes or increasing size of particular thread pool,
  •  creating custom Management Bean (MBean) to provide other useful data, 
  • listening to JMX notifications.

For the first two afore-mention functionalities jConsole and FuseIDE will be used. By default jConsole is delivered with JRE and JDK in directory JAVA_HOME/bin/. FuseIDE provides console with more useful features like drag & drop messages from static file to ActiveMQ queue or topic. Unfortunately this tool is available only for subscribers (http://fusesource.com/enterprise-support/subscription-center/).

For the purpose of this article, maven archetype org.apache.archetypes / camel.archetype.spring will be used. The following code shows how to create this archetype from the command line:

mvn archetype:generate \ 
  -DarchetypeGroupId=org.apache.camel.archetypes \ 
  -DarchetypeArtifactId=camel.archetype.spring \ 
  -DarchetypeVersion=2.9.0 \ 
  -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots-group

To run Camel Maven project type: mvn camel:run.

Basics

Let’s start with jConsole. When you open it, you should see New Connection creator similar to the one shown below:


Select  org.codehouse.plexus … and click Connect. You should see six tabs: 
  •  Overview,
  • Memory,
  • Threads,
  • Classes, 
  • VM Summary and
  • MBeans.
Go to the last tab on the list and expand org.apache.camel tree:


Try to rummage a bit of the tree and see how much information can be found, i.e.:
  • there is only one component (“file”),
  • there is only one consumer (“FileConsumer”),
  • if there is no specified error handler, then “DefaultErrorHandlerBuilder” is to be used,
  • there are five processors (“choice1”, “log1”, “log2”, “to1”, “to2”). 
  

Assume that on “production stage” one of the routes has a bug. This problem should be resoved as soon as possible and during this time, route must be stopped. You can’t stop the whole application so there is one very quick way to stop specified route: use JMX. In org.apache.camel tree, expand routes node --> find the buggy route --> expand Operations node --> find operation named stop() and “click” on it.


Another example of very useful MBean method is browsing body of messages consumed by particular endpoint. Picture below shows first file which was moved to target/messages/uk directory.


As you can see there are tens of MBeans methods that may be extremely useful during the monitoring process.

Fuse IDE is based on Eclipse and provides not only similar to jConsole JMX MBean explorer but also graphical equivalent which facilitates exploring Camel applications in runtime. Additionally, Diagram View shows message flow of a particular route. Fuse IDE can be downloaded from the following website: http://fusesource.com/products/fuse-ide/ .
When you start Fuse IDE, switch to Fuse Integration perspective, expand service called maven and search for route1. Below is the screenshot of this view.



Custom MBean

Camel JMX provides a lot of useful MBeans but sometimes there is a need to create custom MBean that will provide application specific data.
Take for example that we need the average and total number of characters in the files. The class as shown below shows implementation of such processor:

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;

@ManagedResource(description = "Statictics Processor")
public class StatisticsProcessor implements Processor {
      
    private int totalRequests;
   
    private int totalCharacters;

    @ManagedAttribute
    public int getAvgChar() {
        return totalCharacters / totalRequests;
    }
   
    @ManagedAttribute
    public int getTotalChar() {
        return totalCharacters;
    }

    public void process(Exchange exchange) throws Exception {
       ++totalRequests;
       totalCharacters += ((String) exchange.getIn().getBody(String.class)).length();
    }
}


@ManagerResource is a metadata annotation that informs Camel that this class provides MBean methods and attributes.
@ManagedAttribute indicates which one of a class attributes/methods are MBeans attributes/methods. In this case getAvgChar and getTotalChar are such methods.
Now, you have to modify Camel route as follows:

  <bean id="StatisticsProcessor" class="com.blogspot.michalwarecki.camel.simple.StatisticsProcessor" />

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <description>here is a sample which processes the input files
         (leaving them in place - see the 'noop' flag)
         then performs content based routing on the message using XPath</description>
        <from uri="file:src/data?noop=true"/>
        <process ref="StatisticsProcessor"/>
        <choice>
            <when>
                <xpath>/person/city = 'London'</xpath>
                <log message="UK message"/>
                <to uri="file:target/messages/uk"/>
            </when>
            <otherwise>
                <log message="Other message"/>
                <to uri="file:target/messages/others"/>
            </otherwise>
        </choice>
    </route>
</camelContext>


Run the applications and check available processors and its methods in jConsole. As you can see there is processor named processor1 with methods getAvgChar and getTotalChar as the end.
Try to invoke them and check results:





Broadcasting notifications

JMX provides the ability to broadcast custom notifications to subscribers. These notifications may indicate a threat to proper functioning of integration platform. For example, you can create notifier which will send information about inactive web page which is required by the application.
Firstly, you need to implement custom event object which extends EventObject. Data about this event will be displayed based on toString method. Example of such class is shown below:

import java.util.EventObject;

public class MyCustomEvent extends EventObject {
      
       private static final long serialVersionUID = 7062663129376271978L;

       public MyCustomEvent(String source) {
        super(source);
    }
      
       @Override
    public String toString() {
        return "Event message: " + getSource();
       }

}

Secondly, you have to notify all subscribers that event has occurred. Apache Camel context has a few registered notifiers, of which one of them is org.apache.camel.management.JmxNotificationEventNotifier which provides ability to broadcast events. The code below shows processor which iterates over all notifiers and sends custom event to each of them.

import java.util.EventObject;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.management.JmxNotificationEventNotifier;
import org.apache.camel.spi.EventNotifier;

public class EventNotifierProcessor implements Processor {

       @Override
       public void process(Exchange exchange) throws Exception {
             List<EventNotifier> eventNotifiers =
                           exchange.getContext().getManagementStrategy().getEventNotifiers();
            
             for(EventNotifier eventNotifier : eventNotifiers) {
                    EventObject eventObject = new MyCustomEvent("CustomEventNotification");
                    eventNotifier.notify(eventObject);
             }
       }
}

Thirdly, place JmxNotificationEventNotifier in Spring configuration and add EventNotiferProcessor to the following route:

<bean id="EventNotifierProcessor" class="com.blogspot.michalwarecki.camel.simple.EventNotifierProcessor" />
 
  <bean id="JmxEventNotifier" class="org.apache.camel.management.JmxNotificationEventNotifier">
       <property name="source" value="MyCamelApplication" />
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <description>here is a sample which processes the input files
         (leaving them in place - see the 'noop' flag)
         then performs content based routing on the message using XPath</description>
        <from uri="file:src/data?noop=true"/>
        <process ref="StatisticsProcessor"/>
        <process ref="EventNotifierProcessor"/>
        <choice>
            <when>
                <xpath>/person/city = 'London'</xpath>
                <log message="UK message"/>
                <to uri="file:target/messages/uk"/>
            </when>
            <otherwise>
                <log message="Other message"/>
                <to uri="file:target/messages/others"/>
            </otherwise>
        </choice>
    </route>

</camelContext>


Now, you can check all notifications in jConsole. Expand eventnotifers , select JmxEventNotifier and click Subscribe on Notifications screen:



As you can see there are two events of type MyCustomEvent with message “Event message: CustomEventNotification”.

Receiving notifications

Sometimes there is a need to handle notifications automatically. In order to do this, you have to create a class which implements javax.management.NotificationListener interface. In handleNotification method you can invoke custom business logic to handle received event.

import javax.management.Notification;
import javax.management.NotificationListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomNotificationListener implements NotificationListener {
      
       private static Logger logger = LoggerFactory.getLogger(CustomNotificationListener.class);
      
       @Override
       public void handleNotification(Notification notification, Object handback) {
             logger.info("Get the notification : " + notification);
       }

}

To register notification listener you have to add it in Camel MBean Server. In Spring DSL there is no other possibility to access Camel route context before initialization then in custom route policy.
The code below shows the implementation of CustomRoutePolicy which registers above mentioned notification listener.

import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.ObjectName;

import org.apache.camel.Route;
import org.apache.camel.impl.RoutePolicySupport;

public class CustomRoutePolicy extends RoutePolicySupport {
      
       @SuppressWarnings("serial")
       @Override
       public void onInit(Route route) {
             ObjectName on = null;
             try {
                    on = ObjectName.getInstance("org.apache.camel:context=MichalPC/camel-1,type=eventnotifiers,name=JmxEventNotifier");
             } catch (MalformedObjectNameException e) {
                    throw new RuntimeException(e);
             } catch (NullPointerException e) {
                    throw new RuntimeException(e);
             }
       
        CustomNotificationListener listener = new CustomNotificationListener();  
        try {
                    route.getRouteContext().getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer().addNotificationListener(on,
                        listener, new NotificationFilter() {

                                  public boolean isNotificationEnabled(Notification notification) {
                                return true;
                            }
                        }, null);
             } catch (InstanceNotFoundException e) {
                    throw new RuntimeException(e);
             }
       }

}

Camel route modification:

<bean id="CustomRoutePolicy" class="com.blogspot.michalwarecki.camel.simple.CustomRoutePolicy" />

<route routePolicyRef="CustomRoutePolicy">