Monday, September 12, 2011

Publish-subscribe with JGroups

I've added a new demo program (org.jgroups.demos.PubSub), which shows how to use JGroups channels to do publish-subscribe.

Pub-sub is a pattern where instances subscribe to topics and receive only messages posted to those topics. For example, in a stock feed application, an instance could subscribe to topics "rht", "aapl" and "msft". Stock quote publishers could post to these topics to update a quote, and subscribers would get notified of the updates.

The simplest way to do this in JGroups is for each instance to join a cluster; publishers send topic posts as multicasts, and subscribers discard messages for topics to which they haven't subscribed.

The problem with this is that a lot of multicasts will make it all they way up to the application, only to be discarded there if the topic doesn't match. This means that a message is received by the transport protocols (by all instances in the cluster), passed up through all the protocols, and then handed over to the application. If the application discards the message, then all the work of fragmenting, retransmitting, ordering, flow-controlling, de-fragmenting, uncompressing and so on is unnecessary, resulting in wasted CPU cycles, lock acquisitions, cache and memory accesses, context switching and bandwidth.

A solution to this could be to do topic filtering at the publisher's side: a publisher maintains a hashmap of subscribers and topics they've subscribed to and sends updates only to instances which have a current subscription.

This has two drawbacks though: first the publishers have additional work maintaining those subscriptions, and the subscribers need to multicast subscribe or unsubscribe requests. In addition, new publishers need to somehow get the current subscriptions from an existing cluster member (via state transfer).

Secondly, to send updates only to instances with a subscription, we'd have to resort to unicasts: if 10 instances of a 100 instance cluster are subscribed to "rht", an update message to "rht" would entail sending 10 unicast messages rather than 1 multicast message. This generates more traffic than needed, especially when the cluster size increases.

Another solution, and that's the one chosen by PubSub, is to send all updates as multicast messages, but discard them as soon as possible at the receivers when there isn't a match. Instead of having to traverse the entire JGroups stack, a message that doesn't match is discarded directly by the transport, which is the first protocol that receives a message.

This is done by using a shared transport and creating a separate channel for each subscription: whenever a new topic is subscribed to, PubSub creates a new channel and joins a cluster whose name is the topic name. This is not overly costly, as the transport protocol - which contains almost all the resources of a stack, such as the thread pools, timers and sockets -  is only created once.

The first channel to join a cluster will create the shared transport. Subsequent channels will only link to the existing shared transport, but won't initialize it. Using reference counting, the last channel to leave the cluster will de-allocate the resources used by the shared transport and destroy it.

Every channel on top of the same shared transport will join a different cluster, named after the topic. PubSub maintains a hashmap of topic names as keys and channels as values. A "subscribe rht" operation simply creates a new channel (if there isn't one for topic "rht" yet), adds a listener, joins cluster "rht" and adds the topic/channel pair to the hashmap. An "unsubscribe rht" grabs the channel for "rht", closes it and removes it from the hashmap.

When a publishes posts an update for "rht", it essentially sends a multicast to the "rht" cluster.

The important point is that, when an update for "rht" is received by a shared transport, JGroups tries to find the channel which joined cluster "rht" and passes the message up to that channel (through its protocol stack), or discards it if there isn't a channel which joined cluster "rht".

For example, if we have 3 channels A, B and C over the same shared transport TP, and A joined cluster "rht", B joined "aapl" and C joined "msft", then when a message for "ibm" arrives, it will be discarded by TP as there is no cluster "ibm" present. When a message for "rht" arrives, it will be passed up the stack for "rht" to channel A.

As a non-matching message will be discarded at the transport level, and not the application level, we save the costs of passing the message up the stack, through all the protocols and delivering it to the application.

Note that PubSub uses the properties of IP multicasting, so the stack used by it should have UDP as shared transport. If TCP is used, then there are no benefits to the approach outlined above.


  1. One thing I didn't mention is that publish-subscribe could be extended to include (STOMP) clients [1], which are outside the pubsub cluster and may even be written in a different language.
    This would allow for pub-sub in large clusters consisting of heterogeneous server and client nodes...


  2. There was a discussion started about the merits of using a shared transport for publish-subscribe, see [1] for details.