Thursday, December 27, 2012

ActiveMQ: High Performance Messaging Using LevelDB

ActiveMQ is already built for high performance and is one of the fastest brokers available but did you know it can be even faster by configuring the LevelDB message store?  LevelDB is a fast light weight key-value storage system which maps byte keys to byte values.

Not too long ago I came across an issue where the ActiveMQ broker seems to be experiencing a performance issue.  This issue arose from a very specific use case, none the less it has highlighted the exceptional performance of the LevelDB store.  This use case involved durable topic subscribers with selectors; one subscriber with no selector to receive all messages, one subscriber with a selector of "true", and one subscriber with a selector of "false".  A producer was then configure to alternate the property set on the message to "true" and "false".

Using the kahaDB store I saw that it would take upwards of 7 hours for 200,000 messages to be published to the broker.  Out of curiosity I switch to the LevelDB store and saw the exact same scenario only took 70 seconds.  This was absolutely amazing!

If you are interested in enabling the LevelDB store it can be configured with a couple simple steps.  Depending on the version of the broker you are running there are different instructions.

Fuse Distributions

First, download the fuse-leveldb-1.3-uber.jar (which is the latest at the time of this writing) and copy the JAR into the broker's lib directory.  Then, in the broker configuration file (activemq.xml) update the persistence adapter to use the LevelDB store as follows:

  <bean xmlns=""   
   <property name="directory" value="${activemq.base}/data/leveldb"/>  
   <property name="logSize" value="107374182"/>  

Apache Distributions

The Apache distribution of ActiveMQ 5.7 and greater come with the LevelDB library so all you need to do is enable the persistence adapter in the broker configuration:

  <levelDB directory="${activemq.base}/data/leveldb" logSize="107374182"/>  

That's all there is to it.  For more information on the ActiveMQ LevelDB store and performance metrics check out the GitHub project page at FuseMQ-LevelDB.  For a list of all the available binaries for the fuse-leveldb library have a look at this fusesource repository.

Monday, November 5, 2012

ActiveMQ: Securing the ActiveMQ Web Console in Tomcat

This post will demonstrate how to secure the ActiveMQ WebConsole with a username and password when deployed in the Apache Tomcat web server.  The Apache ActiveMQ documentation on the Web Console provides a good example of how this is done for Jetty, which is the default web server shipped with ActiveMQ, and this post will show how this is done when deploying the web console in Tomcat.

To demonstrate, the first thing you will need to do is grab the latest distribution of ActiveMQ.  For the purpose of this demonstration I will be using the 5.5.1-fuse-09-16 release which can be obtained via the Red Hat Support Portal or via the FuseSource repository:
Once you have the distributions, extract and start the broker.  If you don't already have Tomcat installed you can grab it from the link above as well.  I am using Tomcat 6.0.36 in this demonstration.  Next, create a directory called activemq-console in the Tomcat webapps directory and extract the ActiveMQ Web Console war by using the jar -xf command.  

With all the binaries installed and our broker running we can begin configuring our web app and Tomcat to secure the Web Console.  First, open the ActiveMQ Web Console's web descriptor, this can be found in the following location: activemq-console/WEB-INF/web.xml, and add the following configuration:

     <web-resource-name>Authenticate entire app</web-resource-name>  
        <!-- transport-guarantee can be CONFIDENTIAL, INTEGRAL, or NONE -->  

This configuration enables the security constraint on the entire application as noted with /* url-pattern.  Another point to notice is the auth-constraint which has been set to the activemq role, we will define this shortly.  And lastly, note that this is configured for basic authentication.  This means the username password are base64 encoded but not truly encrypted.  To improve the security further you could enable a secure transport such as SSL.

Now lets configure the Tomcat server to validate our activemq role we just specified in the web app.  Out-of-the-box Tomcat is configured to use the UserDataBaseRealm.  This is configured in [TOMCAT_HOME]/conf/server.xml.  This instructs the web server to validate against the tomcat-users.xml file which can be found in [TOMCAT_HOME]/conf as well.  Open the tomcat-users.xml file and add the following:

 <role rolename="activemq"/>  
 <user username="admin" password="admin" roles="activemq"/>  

This defines our activemq role and configures a user with that role.

The last thing we need to do before starting our Tomcat server is add the required configuration to communicate with the broker.  First, copy the activemq-all jar into the Tomcat lib directory.  Next, open the startup script and add the following configuration to initialize the JAVA_OPTS variable:

 JAVA_OPTS="-Dwebconsole.jms.url=tcp://localhost:61616 -Dwebconsole.jmx.url=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi -Dwebconsole.jmx.user= -Dwebconsole.jmx.password="  

Now we are ready to start the Tomcat server.  Once started, you should be able to access the ActiveMQ Web Console at the following URL: http://localhost:8080/activemq-console.  You should be prompted with something similar to this dialog:

Once you enter the user name and password you should get logged into the ActiveMQ Web Console.  As I mentioned before the user name and password are base64 encoded and each request is authenticated against the UserDataBaseRealm.  The browser will retain your username and password in memory so you will need to exit the browser to end the session.

What you have seen so far is a simple authentication using the UserDataBaseRealm which contains a list of users in a text file.  Next we will look at configuring the ActiveMQ Web Console to use a JDBCRealm which will authenticate against users stored in a database.

Lets first create a new database as follows using a MySQL database:

 mysql> CREATE DATABASE tomcat_users;  
 Query OK, 1 row affected (0.00 sec)  

Provide the appropriate permissions for this database to a database user:

 mysql> GRANT ALL ON tomcat_users.* TO 'activemq'@'localhost';  
 Query OK, 0 rows affected (0.02 sec)  

Then you can login to the database and create the following tables:

 mysql> USE tomcat_users;  
 Database changed  
 mysql> CREATE TABLE tomcat_users (  
   -> user_name varchar(20) NOT NULL PRIMARY KEY,  
   -> password varchar(32) NOT NULL  
   -> );  
 Query OK, 0 rows affected (0.10 sec)  
 mysql> CREATE TABLE tomcat_roles (  
   -> role_name varchar(20) NOT NULL PRIMARY KEY  
   -> );  
 Query OK, 0 rows affected (0.05 sec)  
 mysql> CREATE TABLE tomcat_users_roles (  
   -> user_name varchar(20) NOT NULL,  
   -> role_name varchar(20) NOT NULL,  
   -> PRIMARY KEY (user_name, role_name),  
   -> CONSTRAINT tomcat_users_roles_foreign_key_1 FOREIGN KEY (user_name) REFERENCES tomcat_users (user_name),  
   -> CONSTRAINT tomcat_users_roles_foreign_key_2 FOREIGN KEY (role_name) REFERENCES tomcat_roles (role_name)  
   -> );  
 Query OK, 0 rows affected (0.06 sec)  

Next seed the tables with the user and role information:

 mysql> INSERT INTO tomcat_users (user_name, password) VALUES ('admin', 'dbpass');  
 Query OK, 1 row affected (0.00 sec)  
 mysql> INSERT INTO tomcat_roles (role_name) VALUES ('activemq');  
 Query OK, 1 row affected (0.00 sec)  
 mysql> INSERT INTO tomcat_users_roles (user_name, role_name) VALUES ('admin', 'activemq');  
 Query OK, 1 row affected (0.00 sec)  

Now we can verify the information in our database:

 mysql> select * from tomcat_users;  
 | user_name | password |  
 | admin   | dbpass  |  
 1 row in set (0.00 sec)  
 mysql> select * from tomcat_users_roles;  
 | user_name | role_name |  
 | admin   | activemq |  
 1 row in set (0.00 sec)  

If you left the Tomcat server running from the first part of this demonstration shut it down at this time so we can change the configuration to use the JDBCRealm.  In the server.xml file, located in [TOMCAT_HOME]/conf, we need to comment out the existing UserDataBaseRealm and add the JDBCRealm:

    <Realm className="org.apache.catalina.realm.UserDatabaseRealm"  
    <Realm className="org.apache.catalina.realm.JDBCRealm"  
        userTable="tomcat_users" userNameCol="user_name" userCredCol="password"  
        userRoleTable="tomcat_users_roles" roleNameCol="role_name" />  

Looking at the JDBCRealm, you can see we are using the mysql JDBC driver, the connection URL is configured to connect to the tomcat_users database using the specified credentials, and the table and column names used in our database have been specified.

Now the Tomcat server can be started again.  This time when you login to the ActiveMQ Web Console use the username and password specified when loading the database tables.

That's all there is to it, you now know how to configure the ActiveMQ Web Console to use Tomcat's UserDatabaseRealm and JDBCRealm.

The following sites were helpful in gathering this information:

Tuesday, October 30, 2012

ActiveMQ: KahaDB Journal Files - More Than Just Message Content Bits

I recently came across an issue where the ActiveMQ KahaDB journal files were continually rolling despite the fact that only a small number of small persistent messages were occasionally being stored by the broker.  This behavior seemed very strange being that the message sizes being persisted were only a couple of kilobytes and there was a relatively small amount of messages actually on a queue.  In this scenario something was filling up the 32MB journal files, but I wasn't quite sure what it could be?  Were there other messages somewhere in the broker?  Did an index get corrupted that was actually causing messages to be written across multiple journal files?  It was pretty strange behavior but it can be explained fairly easily.  This post describes the actual cause of this behavior and I have created it to remind myself in the future that there is more in the journal file than just the message content bits.

The KahaDB journal files are used to store persistent messages that have been sent to the broker.  In addition to storing the message content, the journal files also store information on KahaDB commands and transactional information.  There are several commands for which information is stored; KahaAddMessageCommand, KahaCommitCommand, KahaPrepareCommand, KahaProducerAuditCommand, KahaRemoveDestinationCommand, KahaRemoveMessageCommand, KahaRollbackCommand, KahaSubscriptionCommand, and KahaTraceCommand.  In this particular case, it was the KahaProducerAuditCommand which was responsible for the behavior that was observed.  This command stores information about producer ids and message ids which is used for duplicate detection.  In this case information is stored in a map object which over time grows.  This information is then stored in the journal file each time a checkpoint is run, which by default is every 5 seconds.  Over time, this can begin to use up the space allocated by the journal file causing low volume smaller messages to roll to the next journal file which in turn prevents the broker from cleaning up journal files which still have referenced messages.  Eventually this situation can lead to Producer Flow Control being trigger by the broker's store limit which prevents producers from sending new messages into the broker.

This behavior can occur under the following conditions:
  • Persistent messages are being sent to a queue
  • The messages are not being consumed on a regular basis
  • The rate of messages being sent to the broker is low
These conditions allow for this behavior to be observed fairly easily.  As the persistent messages do not get consumed they remain referenced and prevent the journal files from being cleaned up.  The low message rate allows time to pass between each new message being stored in the journal file and in the meantime checkpoints are being run which cause KahaProducerAuditCommand information to space out the actual messages within the journal file.

For this use case you can disable the duplicate detection by essentially limiting the growth of the KahaProducerAuditCommand using the following configuration on the persistent adapter in the broker configuration:

       <kahaDB directory="${activemq.base}/data/kahadb" failoverProducersAuditDepth="0" maxFailoverProducersToTrack="0"/>  

 This is something to think about when designing your system.  Under normal circumstances, if you have consumers available to consume the persistent messages, this condition would probably never occur as the journal files roll and messages are consumed, the broker can begin to clean up old journal files.

There is currently an enhancement request at Apache which will also help resolve this issue.  AMQ-3833 has been opened to enhance the broker so it will only write the KahaProducerAuditCommand if a change has occurred since the last checkpoint.  This will help reduce the amount of data that is written to the journal files in between message storage.

Wednesday, August 1, 2012

Android: MQTT with ActiveMQ

I have been wanting to create a simple demo for a while that sends a message from an Android device to ActiveMQ.  With ActiveMQ 5.6 the broker was enhanced with the MQTT protocol.  The MQTT protocol is a very light weight publish/subscribe messaging protocol that is ideal for use in portable devices such as phones and tablets where a small footprint is needed and network bandwidth may be limited or unreliable.  So I decided to have a look at the mqtt-client library that Hiram Chirino has been working on to build a little demo app that can be used to publish and subscribe to a JMS topic.  This demo is just the basics and provides the starting point to building something such as a mobile chat application.

Building the Code:

The client library I used for my demo application is the FuseSource MQTT client.  This is a very nice library that supports Blocking, Future, and Callback/Continuation Passing based APIs.  The source for this library is available at the following github repo: mqtt-client.  To run the Android MQTT demo you'll need to clone this repo and build the mqtt-client.

 git clone  

For the purpose of this demo we are only interested in the mqtt-client so you can change into the mqtt-client directory and run:

 mvn clean install  

You should then see the build was completed successfully.

Now that you have the required library built, lets go a head and download the Android MQTT Client.  The source for this demo is available in the following github repo: android-mqtt-demo.  This repo can be cloned using the following command:  

Once you have the android-mqtt-demo cloned you can build the source within eclipse using the Android SDK.  To open the project in eclipse, use the new project wizard to create a new Android project and select "Android Project from Existing Code".  Browse to the src files you just cloned and click finish.

Now you will need to add the mqtt-client library to a libs directory in the android-mqtt-demo project so the library will be deployed to the emulator or Android device.  With the mqtt-client library added to libs directory the project should build successfully.  You can now deploy this to an Android device or run it from the Android emulator.  Note this may very depending on the ADT version you are currently running.  I am running with version 20.0.1.  In previous versions you could add the library to the build path and it would automatically get deployed.  For more information on this see the thread on Dealing with dependencies in Android projects.

Running the Demo:

Okay, so now for the fun part; running the demo. The first thing we need to do is start an ActiveMQ 5.6 broker instance that has the MQTT protocol enabled.  This can be done by modifying the broker's configuration file, conf/activemq.xml, as follows to add a mqtt transport connector:
   <transportConnector name="openwire" uri="tcp://"/>  
   <transportConnector name="mqtt" uri="mqtt+nio://"/>  

If you would also like to enable security you could add the simpleAuthenticationPlugin configuration as follows:
          <authenticationUser username="system" password="manager" groups="users,admins"/>  
          <authenticationUser username="user" password="password" groups="users"/>  
          <authenticationUser username="guest" password="password" groups="guests"/>  

Now you can start the broker and Android MQTT Demo.  When the activity loads enter the URL Address for the MQTT connector, enter the User Name and Password if needed and click connect.  Once connected enter a message and click send.  The application is listening for messages on the same topic so you should see the message appear in the Received text box.

This should also work for the Apollo message broker as well which has a MQTT implementation.

Saturday, June 30, 2012

SerivceMix: Configuring Sift Appender with a Rolling Log File

I have come across several issues where people were having trouble configuring the Sift file appender in ServiceMix to enable per bundle logging.  Specifically, issues arose when trying to configure a rolling log file appender for sift.  This is rather simple and straight forward as you will see in the configuration below.  You simply need to use the org.apache.log4j.RollingFileAppender as you would for any log4j appender.  The important part is ensuring the proper configuration when applying the rolling log file appender to sift.

Lets have a look at the org.ops4j.pax.logging.cfg of the latest distribution of Fuse ESB, which is currently 4.4.1-fuse-03-06.  The Sift appender comes configured as follows:
 # Sift appender  
 log4j.appender.sift.appender.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n  

To enable a rolling log file we need to change:

Then we need to add the following properties to enable the rolling log:

Of course you would configure these properties as required by your application.

Now the Sift appender configured for rolling log files should look as follows:
 # Sift appender  
 log4j.appender.sift.appender.layout.ConversionPattern=%d{ABSOLUTE} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n  

That's all there is to it.  You now have a Sift appender configured for rolling log files.  Just remember you also need to have the rootLogger configured as follows to ensure the sift appender is activated.

Note: This is found at the top of the org.ops4j.pax.logging.cfg file.
 log4j.rootLogger=INFO, sift, osgi:VmLogAppender  

Wednesday, May 30, 2012

CamelOne: Check Out The Presentations

Early this month I was fortunate enough to attend the second annual CamelOne event in Boston, MA presented by FuseSource.  The event had three tracks full of great presentations and it was difficult to decide which presentations to attend.  Luckily, all presentations were recorded and will be posted to the CamelOne site.  If you were unable to attend the event, or even if you attended and wanted to see some of the presentations you missed, have a look at the site.

There were some stellar presentations for instance on Large Scale Deployments of Apache Camel in the Cloud by James Strachan where the Fuse IDE product is highlighted.  If you are looking to get a glimpse into the future of Open Source Messaging, have a look at the Next Generation Open Source Messaging with Apollo presentation by Hiram Chirino.  Another presentation that was particularly interesting was Develop Real Time  Applications - HTML 5 using WebSockets, Apache Camel, and ActiveMQ by Charles Moulliard.  There were many more interesting topics including Large Scale Messaging with ActiveMQ for Particle Accelerators at Cern so make sure you check out the full list of presentations at the CamelOne presentation site.

Monday, April 30, 2012

ActiveMQ: How to Start/Stop Camel Routes on an ActiveMQ Slave

Do you have a business requirement in which you need ActiveMQ to deploy your Camel routes but you have come to realize that in a Master/Slave configuration the Camel Context is always started on the slave broker?

In this example I will show you how you can configure ActiveMQ to deploy Camel routes as well as how to control when these routes should be started.  In this example we have a master broker with routes that start when the broker is started.  Additionally, we will have a slave broker which will have routes that we only want to start when the slave becomes the master.

I am currently using the apache-activemq-5.5.1-fuse-04-01, which is the latest release of ActiveMQ from FuseSource at the time of this writing.  You can grab the binaries/source from the following link: apache-activemq-5.5.1-fuse-04-01

So you may be wondering how you might be able to accomplish this, right?  Well luckily, we can easily have something working with just a little code and a little configuration.


The code we need to implement is rather simple.  We just need to create a class that implements the ActiveMQ Service interface.  Below is the simple example I created to demonstrate how this works:

 package com.fusesource.example;  
 import org.apache.activemq.Service;  
 import org.apache.camel.spring.SpringCamelContext;  
 import org.slf4j.Logger;  
 import org.slf4j.LoggerFactory;  
  * Example used to start and stop the camel context using the ActiveMQ Service interface  
 public class CamelContextService implements Service  
   private final Logger LOG = LoggerFactory.getLogger(CamelContextService.class);  
   SpringCamelContext camel;  
   public void start() throws Exception {  
     try {  
     } catch (Exception e) {  
       LOG.error("Unable to start camel context: " + camel);  
   public void stop() throws Exception {  
     try {  
     } catch (Exception e) {  
       LOG.error("Unable to stop camel context: " + camel);  
   public SpringCamelContext getCamel() {  
     return camel;  
   public void setCamel(SpringCamelContext camel) {  
     this.camel = camel;  

The magic behind all this is in the Service interface.  When this class is registered as a service with the broker, the start method will be called when the broker is fully initialized.  Remember to copy the jar file to the lib directory of the broker where you want this code to be invoked.


First let's have a look at how we deploy Camel routes from ActiveMQ's broker configuration file.  In the installation directory of the broker you will find the conf directory which holds multiple examples of different broker configuration files and such.  One such file is called camel.xml which defines a simple route.  In our example we will import this file in our broker's activemq.xml as follows which will start the camel context and associated route.

 <import resource="camel.xml"/>  

This should be added just after the ending broker element where you will see that the configuration is already importing jetty.xml.

Now that we have added a Camel route to the master broker it can be started.  Once started, you should see that the Camel Context was picked up and one route was started:

  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camel) is starting  
  INFO | JMX enabled. Using ManagedManagementStrategy.  
  INFO | Found 3 packages with 14 @Converter classes to load  
  INFO | Loaded 163 core type converters (total 163 type converters)  
  INFO | Loaded 3 @Converter classes  
  INFO | Loaded additional 4 type converters (total 167 type converters) in 0.006 seconds  
  WARN | Broker localhost not started so using broker1 instead  
  INFO | Connector vm://localhost Started  
  INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]  
  INFO | Total 1 routes, of which 1 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camel) started in 0.532 seconds  

So now that we have the master broker up and running with a Camel route deployed we are going to do the same to the slave broker, but this time we are going to edit the camel.xml slightly as follows to set the Camel Context id to camelBackup and most importantly we are going add an attribute autoStartup and set it to false to prevent the route from starting when the Camel Context is discovered:

 <camelContext id="camelBackup" xmlns="" autoStartup="false">  

One last thing we need to do to the slave broker is configure the new service we created from the above code.  Copy the following configuration to your slave broker's activemq.xml:

   <bean xmlns="" class="com.fusesource.example.CamelContextService">  
      <property name="camel" ref="camelBackup"/>  

From the above configuration and sample code, you can see Spring is being used to inject the camel property into our Service class.  Additionally, notice the ref has been set to camelBackup which is the id we used for the CamelContext in the slave's camel.xml file.

Additionally, the broker has been configured as a slave so the broker will only be fully initialized when the master broker fails.  If you want more information on configuring ActiveMQ Master/Slave brokers, take at look at one of my early posts on Master/Slave Broker Configuration.

If you haven't done it all ready, copy the jar that was created from packaging up the code from the example above to the slave broker's lib directory.

Note:  In a production system you might want to configure this on both the master and slave broker to keep the configurations mirrored, as the routes will be started on the master as well once it is fully initialized. In this post I am keeping the required configuration to the slave broker just to demonstrate the behavior.

Test Run

Now that we have the code and configuration done, let's give this a test run by starting up the slave broker.

  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) is starting  
  INFO | JMX enabled. Using ManagedManagementStrategy.  
  INFO | Found 3 packages with 14 @Converter classes to load  
  INFO | Loaded 163 core type converters (total 163 type converters)  
  INFO | Loaded 3 @Converter classes  
  INFO | Loaded additional 4 type converters (total 167 type converters) in 0.007 seconds  
  INFO | Total 1 routes, of which 0 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) started in 0.512 seconds  

Looking at the output from the slave broker you can see the CamelContext is still started, however the route is not (remember we set autoStartup="false).  Now, in the terminal where the master broker is running, issue a kill to stop the broker.

If you have a look at the slave broker's output again, you can see the connector was started, openwire in this case, and the Camel route is now started.

  ERROR | Network connection between vm://broker2#0 and tcp://localhost/ shutdown: null  
      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(  
      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(  
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(  
  WARN | Master Failed - starting all connectors  
  INFO | Listening for connections at: tcp://macbookpro-251a.home:62616  
  INFO | Connector openwire Started  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) is starting  
  WARN | Broker localhost not started so using broker2 instead  
  INFO | Connector vm://localhost Started  
  INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]  
  INFO | Total 1 routes, of which 1 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) started in 0.036 seconds  


That's all there is to it, and I think you would agree starting a route from ActiveMQ slave is rather simple and easy to implement.  I'd also like to thank Hiram Chirino, an Apache ActiveMQ Founder, for pointing me in the direction of using the ActiveMQ Service Interface.