Monday, September 12, 2011

Publish-subscribe with JGroups

I've added a new demo program (org.jgroups.demos.PubSub), which shows how to use JGroups channels to do publish-subscribe.

Pub-sub is a pattern where instances subscribe to topics and receive only messages posted to those topics. For example, in a stock feed application, an instance could subscribe to topics "rht", "aapl" and "msft". Stock quote publishers could post to these topics to update a quote, and subscribers would get notified of the updates.

The simplest way to do this in JGroups is for each instance to join a cluster; publishers send topic posts as multicasts, and subscribers discard messages for topics to which they haven't subscribed.

The problem with this is that a lot of multicasts will make it all they way up to the application, only to be discarded there if the topic doesn't match. This means that a message is received by the transport protocols (by all instances in the cluster), passed up through all the protocols, and then handed over to the application. If the application discards the message, then all the work of fragmenting, retransmitting, ordering, flow-controlling, de-fragmenting, uncompressing and so on is unnecessary, resulting in wasted CPU cycles, lock acquisitions, cache and memory accesses, context switching and bandwidth.

A solution to this could be to do topic filtering at the publisher's side: a publisher maintains a hashmap of subscribers and topics they've subscribed to and sends updates only to instances which have a current subscription.

This has two drawbacks though: first the publishers have additional work maintaining those subscriptions, and the subscribers need to multicast subscribe or unsubscribe requests. In addition, new publishers need to somehow get the current subscriptions from an existing cluster member (via state transfer).

Secondly, to send updates only to instances with a subscription, we'd have to resort to unicasts: if 10 instances of a 100 instance cluster are subscribed to "rht", an update message to "rht" would entail sending 10 unicast messages rather than 1 multicast message. This generates more traffic than needed, especially when the cluster size increases.

Another solution, and that's the one chosen by PubSub, is to send all updates as multicast messages, but discard them as soon as possible at the receivers when there isn't a match. Instead of having to traverse the entire JGroups stack, a message that doesn't match is discarded directly by the transport, which is the first protocol that receives a message.

This is done by using a shared transport and creating a separate channel for each subscription: whenever a new topic is subscribed to, PubSub creates a new channel and joins a cluster whose name is the topic name. This is not overly costly, as the transport protocol - which contains almost all the resources of a stack, such as the thread pools, timers and sockets -  is only created once.

The first channel to join a cluster will create the shared transport. Subsequent channels will only link to the existing shared transport, but won't initialize it. Using reference counting, the last channel to leave the cluster will de-allocate the resources used by the shared transport and destroy it.

Every channel on top of the same shared transport will join a different cluster, named after the topic. PubSub maintains a hashmap of topic names as keys and channels as values. A "subscribe rht" operation simply creates a new channel (if there isn't one for topic "rht" yet), adds a listener, joins cluster "rht" and adds the topic/channel pair to the hashmap. An "unsubscribe rht" grabs the channel for "rht", closes it and removes it from the hashmap.

When a publishes posts an update for "rht", it essentially sends a multicast to the "rht" cluster.

The important point is that, when an update for "rht" is received by a shared transport, JGroups tries to find the channel which joined cluster "rht" and passes the message up to that channel (through its protocol stack), or discards it if there isn't a channel which joined cluster "rht".

For example, if we have 3 channels A, B and C over the same shared transport TP, and A joined cluster "rht", B joined "aapl" and C joined "msft", then when a message for "ibm" arrives, it will be discarded by TP as there is no cluster "ibm" present. When a message for "rht" arrives, it will be passed up the stack for "rht" to channel A.

As a non-matching message will be discarded at the transport level, and not the application level, we save the costs of passing the message up the stack, through all the protocols and delivering it to the application.

Note that PubSub uses the properties of IP multicasting, so the stack used by it should have UDP as shared transport. If TCP is used, then there are no benefits to the approach outlined above.

Wednesday, September 07, 2011

Speaking at the OpenBlend conference on Sept 15


I'll be speaking at the OpenBlend conference in Ljubljana on Sept 15.

My talk will be about how to persist data without using a disk, by spreading it over a grid with a customizable degree of redundancy. Kind of the NoSQL stuff everybody and their grandmothers are talking about these days...

I'm excited to visit Ljubljana, as I've never been there before and I like seeing new towns.

The other reason, of course, is to beat Ales Justin's a**s in tennis :-)

If you happen to be in town, come and join us ! I mean not for tennis, but for the conference, or for a beer in the evening !


Thursday, September 01, 2011

Optimizations for large clusters

I've been working on making JGroups more efficient on large clusters. 'Large' is between 100 and 2000 nodes.

My focus has been on making the memory footprint smaller, and to reduce the wire size of certain types of messages.

Here are some of the optimizations that I implemented.


Discovery is needed by a new member to find the coordinator when joining. It broadcasts a discovery request, and everybody in the cluster replies with a discovery response.

There were 2 problems with this: first, a cluster of 1000 nodes meant that a new joiner received 1000 messages at the same time, possibly clogging up network queues and causing messages to get dropped.

This was solved by staggering the sending of responses (stagger_timeout).

The second problem was that every discovery response included the current view. In a cluster of 1000, this meant that 1000 responses each contained a view of 1000 members !

The solution to this was that we only send back the address of the coordinator; as this is all that's needed to send a JOIN request to it. So instead of sending back (with every discovery response) 1000 addresses, we now only send back 1 address.


A digest used to contain the lowest, highest delivered and highest received sequence numbers (seqnos) for every member. They are sent back to a new joiner in a JOIN response, and they are also broadcast periodically by STABLE to purge messages delivered by everyone.

The wire size would be 2 longs for every address (UUID), and 3 longs for the 3 seqnos. That's roughly 1000 * 5 * 8 = 40000 bytes for a cluster of 1000 members. Bear in mind that that's the size of one digest; in a cluster of 1000, everyone broadcasts such a digest periodically (STABLE) !

The first optimization was to remove the 'low' seqno; I had to change some code in the retransmitters to allow for that, but - hey - who wouldn't do that to save 8 bytes / STABLE message ? :-)

This reduced the wire (and memory !) size of a 1000-member digest by another 8'000 bytes, down to 32'000 (from 40'000).

Having only highest delivered (HD) and highest received (HR) seqnos allowed for another optimization: HR is always >= HD, and the difference between HR and HD is usually small.

So the next optimization was to send HR as a delta to HD. So instead of sending 322649 | 322650, we'd send 322649 | 1.

The central optimization underlying that was that seqnos seldomly need 8 bytes: a seqno starts at 1 and increases monotonically. If a member sends 5 million messages, the seqno can still be encoded in 4 bytes (saving 4 bytes per seqno). If a member is restarted, the seqno starts again at 1 and can thus be encoded in 1 byte.

So now I could encode an HD/HR pair by sending a byte containing the number of bytes needed for the HD part in the lower 4 bits and the number of bytes needed for the delta in the higher 4 bits. The HD and the delta would then follow. Example: to encode HD=2000000 | HR=2000500, we'd generate the bytes:

| 50 | -128 | -124 | 30 | -12 | 1 |

  • 50 encodes a length of 3 for HD and 2 for HD-HR (500)
  • -128, -124 and 30 encode 2'000'000 in 3 bytes
  • -12 and 1 encode the delta (500)

So instead of using 16 bytes for the above sequence, we use only 6 bytes !

If we assume that we can encode 2 seqnos on average in 6 bytes, the wire size of a digest is now 1000 * (16 (UUID) + 6) = 22'000, that's down from 40'000 in a 1000 member cluster. In other words, we're saving almost 50% of the wire size of a digest !

Of course, we can not only encode seqno sequences, but also other longs, which is exactly what we did for another optimization. Examples of where this makes sense are:
  • Seqnos in NakackHeaders: every multicast message has such a header, so the savings here are significant
  • Range: this is used for retransmission requests, and is also a seqno sequence
  • RequestCorrelator IDs: used for every RPC
  • Fragmentation IDs (FRAG and FRAG2)
  • UNICAST and UNICAST2: sqnos and ranges
  • ViewId
An example of where this doesn't make sense are UUIDs: they are generated such that the bits are spread out over the entire 8 bytes, so encoding them would make 9 bytes out of 8 and that doesn't help.


A JoinRsp used to contain a list of members twice: once in the view and once in the digest. The was eliminated, and now we're sending the member list only once. This also cut the wire size of a JoinRsp in half.

Further optimizations planned for 3.1 include delta views and better compressed STABLE messages:

Delta views

If we have a view of 1000 members, we always send the full address list with every view change. This is not necessary, as everybody has access to the previous view.

So, for example, when we have P, Q and R joining, and X and Y leaving in V22, then we can simply send a delta view; a view V22={V21+P+Q+R-X-Y}. This means, take the current view V21, remove members X and Y, and add members P, Q and R to the tail of the list, in order to generate a new view V22.

So, instead of sending a list of 1000 members, we simply send 5 members, and everybody creates the new view locally, based on the current view and the delta information.

Compressed STABLE messages

A STABLE message contains a digest with a list of all members and then the digest seqnos for HD and HR. Since STABLE messages are exchanged between members of the same cluster, they all have the same view, or else they would drop a STABLE message.

Hence, we can drop the View and instead send the ViewId, which is 1 address and a long. Everyone knows that the digest seqnos will be in order of the current view, e.g. seqno pair 1 belongs to the first member of the current view, seqno pair 2 to the second member and so on.

So instead of sending a list of 1000 members for a STABLE message, we only send 1 address.

This will reduce the wire size of a 1000-member digest sent by STABLE from roughly 40'000 bytes to ca. 6'000 bytes !

Download 3.0.0.CR1

The optimizations (exluding delta views and compressed STABLE messages) are available in JGroups 3.0.0.CR1, which can be downloaded from [1].

Enjoy (and feedback appreciated, on the mailing lists...) !