Monday, April 6, 2015

Running a Camel Route at Startup once only to load and process PiHexJ's servo configs

Servo Configuration File Loading

The servo calibration data is stored in config files as JSON representation. In the previous post I showed how I capture the config via a REST interface. To keep this simple each servo has it's own file name servo-{channel}.conf eg; servo-0.conf. At some point I'll  update to aggregate the configuration into a single file. Since it makes no difference to the operation of the robot, the simple approach will work for now.

Servo config files are stored in a folder specified by a property. What I wanted is a camel route to load the files and set the configuration of each servo when the PiHexJ main app starts up. The route should run once and only once. After the app is started the config is managed via the rest interface. If the configuration is updated a new conf file is written to persist the settings for then next startup.

servo-0.conf has a single record in json format. This file stores the settings that each servo needs to calibrate for differences in servo models and installation position.
{"channel":1,"range":180,"center":-5,"lowLimit":-80,"highLimit":90,"name":"Leg 0 Femur"}

The camel route to load these files is fairly straight forward

from("file:{{config:com.margic.pihex.servo.conf}}?noop=true&charset=UTF-8&include=.*.conf")
        .routeId("loadServoConfig")
        .autoStartup(false)
        .log(LoggingLevel.INFO, "loading config file ${in.header.CamelFileName}")
        .unmarshal().json(JsonLibrary.Jackson, ServoConfig.class)
        .to("bean:controller")
        .choice()
            .when(exchangeProperty("CamelBatchComplete").isEqualTo(true))
            .log(LoggingLevel.INFO, "Loaded all servo conf files stopping loader")
            .process(new StopProcessor())
        .endChoice();

Notice the route has .autoStartup(false) this prevents this route from starting when Camel starts up. We want to control when this route starts to ensure the system is ready before loading the configuration files.
The route reads all the files in the folder and loads them into the controller. The controller is a bean injected into the camel registry using my guice jndi binding mentioned in an earlier post. This allows me to send the message .to("bean:controller"). At this point in the flow the body of the camel message is a ServoConfig object. Camel's bean component using bean binding to automatically invoke the method on the bean that matches the argument type of a ServoConfig object and processes the config. Bean binding makes this very simple.

Stopping the route when finished loading conf files

The Camel file component is a batch consumer. This means it supports polling multiple messages. In my case camel reads all the files and creates a message for each. The batch consumer uses the following properties to signal the state of the batch.

Exchange Properties

The following properties is set on the Exchange for each Exchange polled in the same batch.
Property
Description
CamelBatchSize
The total number of Exchanges that was polled in this batch.
CamelBatchIndex
The current index of the batch. Starts from 0.
CamelBatchComplete
A boolean indicating the last Exchange in the batch. Is only true for the last entry.
I use these in the choice in my route
        .choice()
            .when(exchangeProperty("CamelBatchComplete").isEqualTo(true))
            .log(LoggingLevel.INFO, "Loaded all servo conf files stopping loader")
            .process(new StopProcessor())
        .endChoice();
The choice checks the property CamelBatchComplete. this property will be true when the message is the last message (last file). We can then instruct the batch to stop. This needs to be done in a separate thread to allow a controlled shutdown. I use a new processor as per the Camel examples.
    class StopProcessor implements Processor{
        Thread stop;
        @Override
        public void process(final Exchange exchange) throws Exception {
            // stop this route using a thread that will stop
            // this route gracefully while we are still running
            if (stop == null) {
                stop = new Thread() {
                    @Override
                    public void run() {
                        try {
                            exchange.getContext().stopRoute("loadServoConfig");
                        } catch (Exception e) {}
                    }
                };
            }
            stop.start();
        }
    }

When this thread runs it grabs the context from the last message and issues a stop route command with the id of the route. Stopping the route ensure it won't run again if the config files change while the robot is running.

Starting the Camel Route after Startup

Now I have a way to load the files and stop the route I need a way to trigger the startup of the route after the camel context is fully started. Camel provides an interface StartupListener to provide a event handler on camel startup.

In PiHexJ I used an eventbus to handle events in the application. So to trigger the startup event I added a listener to my custom camel context that implements StartupListener and posts an event to the eventbus.

public class StartupListener implements org.apache.camel.StartupListener {
    private static final Logger log = LoggerFactory.getLogger(StartupListener.class);

    @Override
    public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
        log.info("Camel startup listener, alreadyStarted: {}", alreadyStarted);
        if(!alreadyStarted){
            log.debug("Initial Startup Trigger Event on Event Bus");
            EventBus eventBus = context.getRegistry().lookupByNameAndType("eventBus", EventBus.class);
            if(eventBus == null){
                throw new Exception("Failed to lookup eventbus in camel registry. Can't initialize properly");
            }
            eventBus.post(new StartupEvent());
        }
    }
}

Now I have an event I use in another route to start the actual servo calibration conf file loader. Having this intermediate route will allow me to do multiple startup actions at a later point if required.
These two routes look like this:

from("guava-eventbus:{{config:com.margic.pihex.camel.eventBusName}}?listenerInterface=com.margic.pihex.camel.route.EventBusEvents")
        .routeId("eventBusRoute")
        .choice()
            .when(body().isInstanceOf(Servo.class))
                .to("seda:updateServo")
            .when(body().isInstanceOf(ControlEvent.class))
                .to("bean:controller?method=handleControlEvent")
            .when(body().isInstanceOf(StartupEvent.class))
                .to("direct:handleStartupEvent")
        .endChoice();

from("direct:handleStartupEvent")
        .routeId("handleStartupEventRoute")
        .log(LoggingLevel.INFO, "Starting loadServoConfigRoute")
        .to("controlbus:route?routeId=loadServoConfig&action=start");

Camel provides a controlbus component that allows control over routes. In my example I receive the event on the event bus and when that event is a StartupEvent I send the message to the handleStartupEvent route. All that route does is sent the message to the control bus to start the loadServoConfig route. I could send the message directly to the control bus but as mentioned the handleStartupEvent route will allow me to add additional startup steps at a later point.

Now we have a way to start a route on startup and shut it down once it's run.

[Thread-0] INFO com.margic.pihex.camel.context.CustomCamelContext - Total 10 routes, of which 9 is started.
[Thread-0] INFO com.margic.pihex.camel.context.CustomCamelContext - Apache Camel 2.15.0 (CamelContext: camel-1) started in 0.664 seconds
[pool-1-thread-1] INFO handleStartupEventRoute - Starting loadServoConfigRoute
[pool-1-thread-1] DEBUG com.margic.pihex.camel.context.CustomCamelContext - Warming up route id: loadServoConfig having autoStartup=true
[pool-1-thread-1] DEBUG com.margic.pihex.camel.context.CustomCamelContext - Route: loadServoConfig >>> EventDrivenConsumerRoute[Endpoint[file:///Users/paulcrofts/conf/?charset=UTF-8&include=.*.conf&noop=true] -> Pipeline[[Channel[Log(loadServoConfig)[loading config file ${in.header.CamelFileName}]], Channel[Unmarshal[org.apache.camel.component.jackson.JacksonDataFormat@467b7308]], Channel[sendTo(Endpoint[bean://controller])], Channel[sendTo(Endpoint[bean://controller])], Channel[choice{when Filter[if: exchangeProperty{CamelBatchComplete} == true do: Pipeline[[Channel[Log(loadServoConfig)[Loaded all servo conf files stopping loader]], Channel[DelegateSync[com.margic.pihex.camel.route.StartupRouteBuilder$StopProcessor@2c2734a9]]]]]}]]]]
[pool-1-thread-1] DEBUG com.margic.pihex.camel.context.CustomCamelContext - Starting consumer (order: 1010) on route: loadServoConfig
[pool-1-thread-1] INFO com.margic.pihex.camel.context.CustomCamelContext - Route: loadServoConfig started and consuming from: Endpoint[file:///Users/paulcrofts/conf/?charset=UTF-8&include=.*.conf&noop=true]
[pool-1-thread-1] INFO org.apache.camel.component.controlbus.ControlBusProducer - ControlBus task done [start route loadServoConfig] with result -> void
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] INFO loadServoConfig - loading config file servo-0.conf
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.pihex.ServoImpl - µS/deg: 5.555555555555555 for range: 180
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.adafruitpwm.AdafruitServoDriver - Updating servo position: com.margic.pihex.ServoImpl@7600c818[angle=0,servoConfig=com.margic.pihex.model.ServoConfig@31db1724[name=Leg 0 Coxa,channel=0,range=180,center=5,lowlimit=-80,highLimit=80]], count: 313
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] INFO loadServoConfig - loading config file servo-1.conf
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.pihex.ServoImpl - µS/deg: 5.555555555555555 for range: 180
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.adafruitpwm.AdafruitServoDriver - Updating servo position: com.margic.pihex.ServoImpl@1cbcac67[angle=0,servoConfig=com.margic.pihex.model.ServoConfig@7aea0edd[name=Leg 0 Femur,channel=1,range=180,center=-5,lowlimit=-80,highLimit=90]], count: 301
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] INFO loadServoConfig - loading config file servo-2.conf
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.pihex.ServoImpl - µS/deg: 5.555555555555555 for range: 180
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] DEBUG com.margic.adafruitpwm.AdafruitServoDriver - Updating servo position: com.margic.pihex.ServoImpl@37a2866f[angle=0,servoConfig=com.margic.pihex.model.ServoConfig@44a3e854[name=Leg 0 Tibia,channel=2,range=180,center=10,lowlimit=-90,highLimit=90]], count: 319
[Camel (camel-1) thread #1 - file:///Users/paulcrofts/conf/] INFO loadServoConfig - Loaded all servo conf files stopping loader
[Thread-11] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Route: loadServoConfig shutdown complete, was consuming from: Endpoint[file:///Users/paulcrofts/conf/?charset=UTF-8&include=.*.conf&noop=true]
[Thread-11] INFO org.apache.camel.impl.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
[Thread-11] INFO com.margic.pihex.camel.context.CustomCamelContext - Route: loadServoConfig is stopped, was consuming from: Endpoint[file:///Users/paulcrofts/conf/?charset=UTF-8&include=.*.conf&noop=true]

As you can see the Camel context starts up with 9 of the 10 routes running. Then the event is triggered starting the load route. Each conf is loaded. I have only 3 in this example. The servo configurations are loaded and the servo configurations are updated. Finally the loader route is gracefully shutdown.

1 comment:

  1. Hi I have a problem with your Stopping code, I need to execute the route once only. I use your code like example


    from("file:{{config:com.margic.pihex.servo.conf}}?noop=true&charset=UTF-8&include=.*.conf")
    .routeId("loadServoConfig")
    .autoStartup(false)
    .log(LoggingLevel.INFO, "loading config file ${in.header.CamelFileName}")
    .unmarshal().json(JsonLibrary.Jackson, ServoConfig.class)
    .filter(
    XXXXXXXXXXXX
    )
    .to("bean:controller")
    .choice()
    .when(exchangeProperty("CamelBatchComplete").isEqualTo(true))
    .log(LoggingLevel.INFO, "Loaded all servo conf files stopping loader")
    .process(new StopProcessor())
    .endChoice();


    If I use a filter, the Choice is never execute, only if the last exchange match the filter properties.

    Is another way to stop the route?

    Thanks

    ReplyDelete