Using advisory queues to monitor activity on ActiveMQ

In my last post on ActiveMQ, I mused on using mirrored queues to keep a message store for varying reasons. It is a naive way of doing this but it showed how such a thing can be done quickly. As part of some ongoing explorations into operations and governance of MQ systems, I have also been looking at the ActiveMQ.Advisory.Queue and the ActiveMQ.Advisory.Topci topics which monitor which queues and topics exist on a broker. As I am using PHP, I have been using a STOMP consumer to dig out the varying types of messages coming across.

The first thing that I did was to dump the message object:

object(StompFrame)#5 (3) {
[“command”]=>
string(7) “MESSAGE”
[“headers”]=>
array(9) {
[“message-id”]=>
string(51) “ID:machineid-49424-1356875704891-1:1:0:0:21”
[“originBrokerName”]=>
string(9) “localhost”
[“type”]=>
string(8) “Advisory”
[“destination”]=>
string(30) “/topic/ActiveMQ.Advisory.Queue”
[“timestamp”]=>
string(1) “0”
[“expires”]=>
string(1) “0”
[“priority”]=>
string(1) “0”
[“originBrokerURL”]=>
string(29) “tcp://machineid:61616”
[“originBrokerId”]=>
string(44) “ID:machineid-49424-1356875704891-0:1”
}
[“body”]=>
string(290) “{“DestinationInfo”: {
“commandId”: 0,
“responseRequired”: false,
“connectionId”: {
“value”: “ID:machineid-49424-1356875704891-2:1”
},
“destination”: {
“@class”: “ActiveMQQueue”,
“string”: “example.A”,
“null”: {}
},
“operationType”: 0,
“timeout”: 0
}}”
}

The headers show the message id, its current destination and details of the broker which gives us a chance to begin noting which broker(s) are active and what they are sending data to, whether it has priority, timestamps or an expiry date for this queue. The body contains the destination information which allows monitoring systems to show the known topics and queues that have been created on that broker .

Used together, this information can be used to view the state of a queues or topics on a broker, and to view what queues/topics are being created at any one time remotely. If stored, which I have been playing with, then usage can be seen over time. So why would you want to view systems over time?

In one state, you may need to know what queues are being used to connect to systems to see if new queues are being added randomly and to allows operations and administrators to know the state of queues/topics on a broker. Does the queue need scaling or some sort of back pressure applied to producers if new queues are being added? (Flow control can be implemented in ActiveMQ using the config file but that is a separate issue to this post). Whilst there is a web console for ActiveMQ, you may have reasons for not using it or want to concatenate the queue and topic managers from many systems into one database or view by consuming from different brokers.

Since the mirrored topic does not have an originBrokerURL or originBrokerName, then this could be used to find out what queues are coming from which broker. A sensible queue architecture should not duplicate these across brokers (unless they are in a Highly Available configuration) so it can allow a view of traffic to be built up.

Using the ActiveMQ.Advisory.Connection topic, the connections can be viewed either connecting or disconnecting. If an object connects, then it spits out the following body:

{“ConnectionInfo”: {
“commandId”: 0,
“responseRequired”: true,
“connectionId”: {
“value”: “ID:machineid-49424-1356875704891-4:23”
},
“clientId”: “ID:machineid-49424-1356875704891-4:23”,
“userName”: “”,
“password”: “”,
“brokerMasterConnector”: false,
“manageable”: false,
“clientMaster”: true,
“faultTolerant”: false,
“failoverReconnect”: false
}}

and its disconnect object is:

{“RemoveInfo”: {
“commandId”: 0,
“responseRequired”: true,
“objectId”: {
“@class”: “ConnectionId”,
“value”: “ID:machineid-49424-1356875704891-4:23”
},
“lastDeliveredSequenceId”: 0
}}

I am not sure that this is directly useful in the operations that I was looking at but thinking laterally about it, the ids could be useful in monitoring which IPs or machines are connecting to the system and alerts set up if a machine that should be connected stop or a connection which ought to have died is still alive – effectively becoming a zombie process which can clog up the network and either slow delivery or worse, prevent delivery of a message to a consumer. The lastDeliveredSequenceId might be of use if the connection is dealing with messages that need to be sent in order or if any go missing. These could be written out to log files using something like Log4PHP to manage this.

In some ways, these mirror the data that is being presented in the web console but that it can be more convenient to set up listeners to the various advisory connections to view and monitor the systems and set them up for alerts.

The last thing that I would like to explore on this part of ActiveMQ is the queue/topic statistics and extracting these from the XML for live updates without needing to go the web console. Again this would be for having updates paused out if there are issues such as queues building up to quickly or the ingress/egress becoming wildly out of synch and to be able to deal with issues proactively rather than when a developer or business unit discovers that something is wrong which might take hours.