Simple message storing with ActiveMQ

I have been doing some digging into monitoring queues on ActiveMQ and pushing the messages into a message store for what ever reason. The main reason for doing this would be monitoring error messages and giving developers and operations a way of exploring what is happening in a queue if data at a recipient system does not look quite right. In the first place, I am going to look at ActiveMQ since this is part of what I use at work but I also hope to get a few minutes to dive into RabbitMQ as it is still an MQ love of mine. Sometimes, you just cannot forget your first…

ActiveMQ has a concept called Mirrored Queues which is an easy way of creating the WireTap pattern with a minimal amount of intrusion. The complete configuration in the .xml file in /conf is:


<destinationInterceptors>
<mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>

Place that in the appropriate position and restart ActiveMQ. That code simply takes a copy of the incoming message and copies it to a topic. In the example, the topic has a prefix of .qmirror but you can also set up  a prefix using the attributes in the mirroredQueue element.

Why do this?

It means that you can set up listeners on the topic and set up activity monitoring on all queues or store them. Using the prefix and/or postfix, a listener can be set up with a wildcard pattern to listen to certain set of queues or just listen to a certain queue.

Of course topics do have the issue that if the consumer fails, and it is not durable, then the copied message will be lost. One way of preventing this is to set up durable subscribers, i.e. subscriptions with an id so if the consumer or server need restarting, the consumer can have messages being sent to it straight away and any missing messages will be resent. Great idea, but it also means that the server has to, potentially, store many ids permanently.

ActiveMQ provides a way of dealing with this with Virtual Destinations, a way of virtualising the topics which allows ActiveMQ to re-route messages internally. As of writing, this seems an incredibly powerful way of targeting queues for wiretaps but it would appear that a route is needed for each configured queue. It does allow for load balancing against the virtual queue though.

Anyhow, that is going off the original topic.

I have been using STOMP since the main code that will produce and consume is written in PHP. As part of the exploration that I was carrying out, I dumped out the raw message from the mirrored queue and got the following:

object(StompFrame)#2 (3) {

[“command”]=>  string(7) “MESSAGE”

[“headers”]=>  array(5) {

[“message-id”]=>    string(52) “ID:machineid-49295-1355686372652-4:9:-1:1:14”

[“destination”]=>    string(20) “/topic/notes.qmirror”    [“timestamp”]=>    string(13) “1355686651714”

[“expires”]=>    string(1) “0”

[“priority”]=>    string(1) “4”

}

[“body”]=>  string(17) “test1355686651 13”

Using a virtual queue, I saw the following:

object(StompFrame)#3 (3) {
[“command”]=>
string(7) “MESSAGE”
[“headers”]=>
array(5) {
[“message-id”]=>
string(52) “ID:machineid-49759-1355690002164-4:2:-1:1:15”
[“destination”]=>
string(20) “/queue/Notifications”
[“timestamp”]=>
string(13) “1355690012570”
[“expires”]=>
string(1) “0”
[“priority”]=>
string(1) “4”
}
[“body”]=>
string(17) “test1355690012 14”
}

Nominally the messages are the same but the destination is different. In the mirrored queue, it is set up in /topic/notes.qmirror, signifying that it is a mirrored queue and the originating queue. However the virtual queue changes this to /queue/Notifications which is what I set the queue to be. Now what I could do is to set up the queue to the <originating queue>.mirror and set up routes that way (which is very Camel like). I suspect that with some practice, I could get this more how I want it but it may cause issues if you trying to set up a monitoring system with transparency on where an issue appeared. Something to think about when trying to work out why a message has failed.

Using both, I set up a consumer which is parses the message that it receives and then pushes into MySQL. NoSQL could equally be used but I tend to work in a LAMP environment so writing it into MySQL solves some problems. Thinking about this, a message store could either by passive, in the sense of having to be queried manually, or active, have a mechanism of pushing data out according to rules. In one sense, this could be done at the consumer level with rules being used to send notifications on certain conditions.

The next thing to look at is the message store itself for processing, better ways of using Virtual Topics and also playing with RabbitMQ as well, although a quick peak suggests that you need to do this outside of the broker.