Abstract
Many organisations are now facing a challenge when it comes to choosing and setting up the right messaging infrastructure. It must be able to scale and handle massive parallel connections. This challenge often emerges with IoT & Big Data projects where a massive number of sensors are potentially connected to produce messages that need to be processed and stored.
An important thing to bare in mind when speaking of scaling the messaging infrastructure is that one must consider 2 distinct capabilities
- scaling up when there is heavy load
- scaling down to save costs when compute power is not need
This post explains how to address this challenge using the concept of network brokers in JBoss Active MQ.
At the time of writing the uses cases in this post have been tested with JBoss Active MQ 6.3.0
Choosing the right topology
Choosing the right broker network topology is a crucial step.
There are many examples of network topology that can be found in the official JBoss ActiveMQ documentation :
https://access.redhat.com/documentation/en/red-hat-jboss-a-mq/6.3/paged/using-networks-of-brokers/chapter-6-network-topologies
To have a better understanding, we need to have the following concepts in mind.
Each broker can have :
- 1 or many transport connector to accept client connections (consumers or producers) and network connections from other brokers (receiving messages from other brokers). In our sample here we only have 1 transport connector per broker instance
- 1 or many network connectors to allow messages to be forwarded to other brokers
In this article we will take the example of an IoT use case where the main focus is on :
- handing a large number of connected sensors
- securing a central system that collects and processes messages
In such scenario, we would like to set up a concentrator topology as shown below.
This topology allows us to setup stable and controlled connections between the brokers of layer 1 towards layer 2. The central brokers do not have to deal with sensors connecting and disconnecting.
The green arrows define a network connector. Note that these have a direction. It means that brokers from layer 1 can forward messages to all brokers from layer 2.
We also allow messages to travel between brokers of layer 2. We will see later that this allows the network to better balance the workload and to facilitates scale ups/scale downs.
How to scale up ?
Scaling up is pretty straight in this setup :
- To handle more connections, brokers can be added in layer 1
- If the overall volume grows, extend the number of brokers in layer 2 to absorb the workload
Usually when scaling up, we want the clients already connected to be redistributed among the newly available brokers. This is especially important for the clients connected to the layer 2 as we might not want to statically bind them to one broker.
To achieve this we can set the following options within the transport connectors of the central brokers.
- rebalanceClusterClients : allows the broker to disconnect clients and force them to reconnect to a less busy broker
- updateClusterClients : updates client connections when a broker joins the network. In that way the clients do not need to be aware of all the brokers available when connecting for the first time. The clients only need to be able to connect to at least one.
- updateClusterClientsOnRemove : updates client connections when a broker leaves the network.
- updateClusterFilter : a regular expression that allows to trigger client updates only on certain brokers that join the network. In our case for example we only want brokers from layer 2 to trigger reconnects.
<transportConnectors> <transportConnector name="clients" rebalanceClusterClients="true" updateClusterClients="true" updateClusterClientsOnRemove="true" updateClusterFilter="amq-c.*" uri="tcp://localhost:61620" /> </transportConnectors>
Further options on the transport connector can be found here :
How to scale down?
Scaling down is a bit more challenging. This is because when shutting down an instance, there might be still messages remaining in the broker.
The strategy we can apply to shut down a broker (named c01 in this example) properly in a network is the following :
- Stop the transport connector through JMX/Jolokia of broker c01. This has the effect of disconnecting all consumers and producers from broker c01. It also disconnects all incoming network connectors.
- The clients (producers & consumers) will fail over to an active broker instance
- Unconsumed messages in broker c01 will be forwarded to another broker instance since the network connect of broker c01 towards its neighbor is still active
- Once all messages in broker c01 are consumed, we can shut down the entire instance
There might be situations where this procedure is not enough. Messages may remain when no active consumers for those exist anywhere in the network. In this case, I recommend you to look at Josh Reagan’s blog post:
http://joshdreagan.github.io/2016/08/22/decommissioning_jboss_a-mq_brokers/
It describes how to read the message store to be decommissioned and forward all remaining messages.
How is the load distributed ?
Deactivate conduit subscriptions on queues for better load balancing
When messages are sent to brokers in layer 1, by default these are distributed in a round robin manner to brokers in layer 2.
For better load balancing on queues, conduit subscriptions can be deactivated on the brokers of layer 1. If consumers are not evenly distributed on layer 2, it allows brokers from layer 1 to be aware of that and apply the round robin algorithm on the different consumers rather than the brokers they are connected to.
To have a deeper understanding you can refer to the following documentation :
Message hopping on layer 2 to avoid slow consumers
If you run the tests that are provided in the sample code (see below). You might notice that sometimes a message that reaches a broker in layer 2 is forwarded to its neighbor on layer 2 instead of being consumed by the local client.
This is a very powerful feature of Active MQ that allows messages to be consumed as fast as possible within the network.
This usually happens when the local consumer is slowing down. A good messaging system is one that tends to contain 0 messages (all messages are consumed as soon as they are produced). So what we can observe here is that the network of brokers is doing it’s best to move the messages around so that they are consumed as fast as possible.
On the other hand though, the consequences of this is that messages might get stuck at some point. There is a rule saying that a message cannot go back to a broker that has already been visited. So imagine if the fast consumer suddenly disconnects, our message here will get stuck in c01.
How to avoid stuck messages ?
To avoid messages getting stuck we need to add the replayWhenNoConsumers policy on the queues that are used within the network.
<policyEntry queue="sensor.events" queuePrefetch="1"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayDelay="1000" replayWhenNoConsumers="true" /> </networkBridgeFilterFactory> </policyEntry>
This allows messages that are on a broker without any local consumer to revisit a previous broker.
Other important configurations
To ensure that the messages are prioritizing the shortest paths, these options need to be set on the network connectors :
- decreaseNetworkConsumerPriority=”true”
- suppressDuplicateQueueSubscriptions=”true”
Sample code & configurations
Nothing is better than a working example. A sample project can be found in the following GitHub repo : https://github.com/alainpham/amq-broker-network
This project populates 3 AMQ instances configured as a netwok of brokers similar to what is described in the sections above :
- s01 is the instance where the producers will connect to (layer 1). We only need on of those to understand the different concepts. This instance has 2 network connectors, one towards c01 and another one towards c02
- c01 and c02 are the instances where the central consumers connect to in order to process messages (alyer 2). They each have a network connector towards each other
There are these test cases that you can run with the command
mvn test
- nominalSend1000Msg
- create the network of 3 brokers with 1 producer and 2 consumers
- send 1000 messages
- each consumer receives roughly half of the messages
- some messages hop between c01 and c02 when consumers are a bit slow
[ main] TestAMQNetwork INFO Received on collector01 500 [ main] TestAMQNetwork INFO Received on collector02 500 [ main] TestAMQNetwork INFO Total received 1000 [ main] TestAMQNetwork INFO amqs01 enqueue 1000 [ main] TestAMQNetwork INFO amqs01 dequeue 1000 [ main] TestAMQNetwork INFO amqs01 dispatch 1000 [ main] TestAMQNetwork INFO amqc01master enqueue 501 [ main] TestAMQNetwork INFO amqc01master dequeue 501 [ main] TestAMQNetwork INFO amqc01master dispatch 501 [ main] TestAMQNetwork INFO amqc02master enqueue 501 [ main] TestAMQNetwork INFO amqc02master dequeue 501 [ main] TestAMQNetwork INFO amqc02master dispatch 501
- scaleDown
- create the network of 3 brokers with 1 producer and 2 consumers
- send 500 messages
- stop transport connector on c01
- consumer on c01 reconnects to c02, remaining messages on c01 are drained
- send another 500 messages
- each consumer receives roughly half of the messages
############### before scaleDown ######################### [ main] TestAMQNetwork INFO ################################### [ main] TestAMQNetwork INFO ################################### [ main] TestAMQNetwork INFO Received on collector01 250 [ main] TestAMQNetwork INFO Received on collector02 250 [ main] TestAMQNetwork INFO Total received 500 [ main] TestAMQNetwork INFO amqs01 enqueue 500 [ main] TestAMQNetwork INFO amqs01 dequeue 500 [ main] TestAMQNetwork INFO amqs01 dispatch 500 [ main] TestAMQNetwork INFO amqc01master enqueue 250 [ main] TestAMQNetwork INFO amqc01master dequeue 250 [ main] TestAMQNetwork INFO amqc01master dispatch 250 [ main] TestAMQNetwork INFO amqc02master enqueue 250 [ main] TestAMQNetwork INFO amqc02master dequeue 250 [ main] TestAMQNetwork INFO amqc02master dispatch 250
############### scaleDown ######################### [ main] TestAMQNetwork INFO ############################## [ main] TestAMQNetwork INFO ############################## [ main] TestAMQNetwork INFO Received on collector01 501 [ main] TestAMQNetwork INFO Received on collector02 499 [ main] TestAMQNetwork INFO Total received 1000 [ main] TestAMQNetwork INFO amqs01 enqueue 1000 [ main] TestAMQNetwork INFO amqs01 dequeue 1000 [ main] TestAMQNetwork INFO amqs01 dispatch 1000 [ main] TestAMQNetwork INFO amqc01master enqueue 250 [ main] TestAMQNetwork INFO amqc01master dequeue 250 [ main] TestAMQNetwork INFO amqc01master dispatch 250 [ main] TestAMQNetwork INFO amqc02master enqueue 750 [ main] TestAMQNetwork INFO amqc02master dequeue 750 [ main] TestAMQNetwork INFO amqc02master dispatch 750
- scaleUp
- c01 is not started at the beginning, only s01 and c02 are up.
- send 500 msgs
- start c01
- one of the consumer is forced to reconnected to c01
- send another 500 msgs
- each consumer receives roughly half of the messages
- c01 starts to broke messages once it’s up
############### before scaleUp ######################### [ main] TestAMQNetwork INFO Received on collector01 250 [ main] TestAMQNetwork INFO Received on collector02 250 [ main] TestAMQNetwork INFO Total received 500 [ main] TestAMQNetwork INFO amqs01 enqueue 500 [ main] TestAMQNetwork INFO amqs01 dequeue 500 [ main] TestAMQNetwork INFO amqs01 dispatch 500 [ main] TestAMQNetwork INFO amqc01master enqueue 0 [ main] TestAMQNetwork INFO amqc01master dequeue 0 [ main] TestAMQNetwork INFO amqc01master dispatch 0 [ main] TestAMQNetwork INFO amqc02master enqueue 500 [ main] TestAMQNetwork INFO amqc02master dequeue 500 [ main] TestAMQNetwork INFO amqc02master dispatch 500
############### scaleUp ######################### [ main] TestAMQNetwork INFO Received on collector01 500 [ main] TestAMQNetwork INFO Received on collector02 500 [ main] TestAMQNetwork INFO Total received 1000 [ main] TestAMQNetwork INFO amqs01 enqueue 1000 [ main] TestAMQNetwork INFO amqs01 dequeue 1000 [ main] TestAMQNetwork INFO amqs01 dispatch 1000 [ main] TestAMQNetwork INFO amqc01master enqueue 252 [ main] TestAMQNetwork INFO amqc01master dequeue 252 [ main] TestAMQNetwork INFO amqc01master dispatch 252 [ main] TestAMQNetwork INFO amqc02master enqueue 752 [ main] TestAMQNetwork INFO amqc02master dequeue 752 [ main] TestAMQNetwork INFO amqc02master dispatch 752