Tuesday, December 06, 2011

Repondez s'il vous plait !

No, this isn't a post in French (my school French would be too rusty for this !); this is about a new protocol in JGroups, called RSVP :-)

As the name possibly suggests, this feature allows for messages to get ack'ed by receivers before a message send returns. In other words, when A broadcasts a message M to {A,B,C,D}, then JChannel.send() will only return once itself, B, C and D have acknowledged that they delivered M to the application.

This differs from the default behavior of JGroups which always sends messages asynchronously, and guarantees that all non-faulty members will eventually receive the message. If we tag a message as RSVP, then we basically have 2 properties:
  1. The message send will only return when we've received all acks from the current members. Members leaving or crashing during the wait are treated as if they sent an ack. The send() method can also throw a (runtime) TimeoutException if a timeout was defined (in RSVP) and encountered.
  2. If A sent (asynchronous) messages #1-10, and tagged #10 as RSVP, then - when send() returns successfully - A is guaranteed that all members received A's message #10 and all messages prior to #10, that's #1-9.
This can be used for example when completing a unit of work, and needing to know that all current cluster members received all of the messages sent up to now by a given cluster member.

This is similar to FLUSH, but less strict in that it is a per-sender flush, there is no reconciliation phase, and it doesn't stop the world.

An alternative is to use a blocking RPC. However, I wanted to add the capability of synchronous messages directly into the base channel.

Note that this also solves another problem: if A sends messages #1-5, but some members drop #5, and A doesn't send more messages for some time, then A#5 won't get delivered at some members for quite a while (until stability (STABLE) kicks in).

RSVP will be available in JGroups 3.1. If you want to try it out, get the code from master [2]. The documentation is at [1], section 3.8.8.2.

For questions, I suggest one of the mailing lists.
Cheers,

[1] http://www.jgroups.org/manual-3.x/html/user-channel.html#SendingMessages

[2] https://github.com/belaban/JGroups


Thursday, November 17, 2011

JGroups 3.0.0.Final released

I'm happy to announce that JGroups 3.0.0.Final is here !

While originally intended to make only API changes (some of them queued for years), there are also several optimizations, most of them related to running JGroups in larger clusters.

For instance, the size of several messages has been reduced, and some protocol rounds have been eliminated, making JGroups more memory efficient and less chatty.

For the last couple of weeks, I've been working on making merging of 100-300 cluster nodes faster and making sure a merge never blocks. To this end, I've written a unit test, which creates N singleton nodes (= nodes which only see themselves in the cluster), then make them see each other and wait until a cluster of N has formed.

The test itself was a real challenge because I was hitting the max heap size pretty soon. For example, with 300 members, I had to increase the heap size to at least 900 MB, to make the test complete. This indicates that a JGroups member needs roughly a max of 3MBs of heap. Of course, I had to use shared thread pools, timers and do a fair amount of (memory) tuning on some of the protocols, to accommodate 300 members all running in the same JVM.

Running in such a memory constrained environment led to some more optimizations, which will benefit users, even if they're not running 300 members inside the same JVM ! :-)

One of them is that UNICAST / UNICAST2 maintain a structure for every member they talk to. So if member A sends a unicast to each and every member of a cluster of 300, it'll have 300 connections open.

The change is to close connections that have been idle for a given (configurable) time, and re-establish them when needed.

Further optimizations will be made in 3.1.

The release notes for 3.0.0.Final are here: https://github.com/belaban/JGroups/blob/master/doc/ReleaseNotes-3.0.0.txt

JGroups 3.0.0.Final can be downloaded here: https://sourceforge.net/projects/javagroups/files/JGroups/3.0.0.Final

As usual, if you have questions, use one of the mailing lists for questions.

Enjoy !


Monday, September 12, 2011

Publish-subscribe with JGroups

I've added a new demo program (org.jgroups.demos.PubSub), which shows how to use JGroups channels to do publish-subscribe.

Pub-sub is a pattern where instances subscribe to topics and receive only messages posted to those topics. For example, in a stock feed application, an instance could subscribe to topics "rht", "aapl" and "msft". Stock quote publishers could post to these topics to update a quote, and subscribers would get notified of the updates.

The simplest way to do this in JGroups is for each instance to join a cluster; publishers send topic posts as multicasts, and subscribers discard messages for topics to which they haven't subscribed.

The problem with this is that a lot of multicasts will make it all they way up to the application, only to be discarded there if the topic doesn't match. This means that a message is received by the transport protocols (by all instances in the cluster), passed up through all the protocols, and then handed over to the application. If the application discards the message, then all the work of fragmenting, retransmitting, ordering, flow-controlling, de-fragmenting, uncompressing and so on is unnecessary, resulting in wasted CPU cycles, lock acquisitions, cache and memory accesses, context switching and bandwidth.

A solution to this could be to do topic filtering at the publisher's side: a publisher maintains a hashmap of subscribers and topics they've subscribed to and sends updates only to instances which have a current subscription.

This has two drawbacks though: first the publishers have additional work maintaining those subscriptions, and the subscribers need to multicast subscribe or unsubscribe requests. In addition, new publishers need to somehow get the current subscriptions from an existing cluster member (via state transfer).

Secondly, to send updates only to instances with a subscription, we'd have to resort to unicasts: if 10 instances of a 100 instance cluster are subscribed to "rht", an update message to "rht" would entail sending 10 unicast messages rather than 1 multicast message. This generates more traffic than needed, especially when the cluster size increases.

Another solution, and that's the one chosen by PubSub, is to send all updates as multicast messages, but discard them as soon as possible at the receivers when there isn't a match. Instead of having to traverse the entire JGroups stack, a message that doesn't match is discarded directly by the transport, which is the first protocol that receives a message.

This is done by using a shared transport and creating a separate channel for each subscription: whenever a new topic is subscribed to, PubSub creates a new channel and joins a cluster whose name is the topic name. This is not overly costly, as the transport protocol - which contains almost all the resources of a stack, such as the thread pools, timers and sockets -  is only created once.

The first channel to join a cluster will create the shared transport. Subsequent channels will only link to the existing shared transport, but won't initialize it. Using reference counting, the last channel to leave the cluster will de-allocate the resources used by the shared transport and destroy it.

Every channel on top of the same shared transport will join a different cluster, named after the topic. PubSub maintains a hashmap of topic names as keys and channels as values. A "subscribe rht" operation simply creates a new channel (if there isn't one for topic "rht" yet), adds a listener, joins cluster "rht" and adds the topic/channel pair to the hashmap. An "unsubscribe rht" grabs the channel for "rht", closes it and removes it from the hashmap.

When a publishes posts an update for "rht", it essentially sends a multicast to the "rht" cluster.

The important point is that, when an update for "rht" is received by a shared transport, JGroups tries to find the channel which joined cluster "rht" and passes the message up to that channel (through its protocol stack), or discards it if there isn't a channel which joined cluster "rht".

For example, if we have 3 channels A, B and C over the same shared transport TP, and A joined cluster "rht", B joined "aapl" and C joined "msft", then when a message for "ibm" arrives, it will be discarded by TP as there is no cluster "ibm" present. When a message for "rht" arrives, it will be passed up the stack for "rht" to channel A.

As a non-matching message will be discarded at the transport level, and not the application level, we save the costs of passing the message up the stack, through all the protocols and delivering it to the application.

Note that PubSub uses the properties of IP multicasting, so the stack used by it should have UDP as shared transport. If TCP is used, then there are no benefits to the approach outlined above.

Wednesday, September 07, 2011

Speaking at the OpenBlend conference on Sept 15

FYI,

I'll be speaking at the OpenBlend conference in Ljubljana on Sept 15.

My talk will be about how to persist data without using a disk, by spreading it over a grid with a customizable degree of redundancy. Kind of the NoSQL stuff everybody and their grandmothers are talking about these days...

I'm excited to visit Ljubljana, as I've never been there before and I like seeing new towns.

The other reason, of course, is to beat Ales Justin's a**s in tennis :-)

If you happen to be in town, come and join us ! I mean not for tennis, but for the conference, or for a beer in the evening !

Cheers,
Bela

Thursday, September 01, 2011

Optimizations for large clusters

I've been working on making JGroups more efficient on large clusters. 'Large' is between 100 and 2000 nodes.

My focus has been on making the memory footprint smaller, and to reduce the wire size of certain types of messages.


Here are some of the optimizations that I implemented.

Discovery

Discovery is needed by a new member to find the coordinator when joining. It broadcasts a discovery request, and everybody in the cluster replies with a discovery response.

There were 2 problems with this: first, a cluster of 1000 nodes meant that a new joiner received 1000 messages at the same time, possibly clogging up network queues and causing messages to get dropped.

This was solved by staggering the sending of responses (stagger_timeout).

The second problem was that every discovery response included the current view. In a cluster of 1000, this meant that 1000 responses each contained a view of 1000 members !

The solution to this was that we only send back the address of the coordinator; as this is all that's needed to send a JOIN request to it. So instead of sending back (with every discovery response) 1000 addresses, we now only send back 1 address.


Digest

A digest used to contain the lowest, highest delivered and highest received sequence numbers (seqnos) for every member. They are sent back to a new joiner in a JOIN response, and they are also broadcast periodically by STABLE to purge messages delivered by everyone.

The wire size would be 2 longs for every address (UUID), and 3 longs for the 3 seqnos. That's roughly 1000 * 5 * 8 = 40000 bytes for a cluster of 1000 members. Bear in mind that that's the size of one digest; in a cluster of 1000, everyone broadcasts such a digest periodically (STABLE) !

The first optimization was to remove the 'low' seqno; I had to change some code in the retransmitters to allow for that, but - hey - who wouldn't do that to save 8 bytes / STABLE message ? :-)

This reduced the wire (and memory !) size of a 1000-member digest by another 8'000 bytes, down to 32'000 (from 40'000).

Having only highest delivered (HD) and highest received (HR) seqnos allowed for another optimization: HR is always >= HD, and the difference between HR and HD is usually small.

So the next optimization was to send HR as a delta to HD. So instead of sending 322649 | 322650, we'd send 322649 | 1.

The central optimization underlying that was that seqnos seldomly need 8 bytes: a seqno starts at 1 and increases monotonically. If a member sends 5 million messages, the seqno can still be encoded in 4 bytes (saving 4 bytes per seqno). If a member is restarted, the seqno starts again at 1 and can thus be encoded in 1 byte.

So now I could encode an HD/HR pair by sending a byte containing the number of bytes needed for the HD part in the lower 4 bits and the number of bytes needed for the delta in the higher 4 bits. The HD and the delta would then follow. Example: to encode HD=2000000 | HR=2000500, we'd generate the bytes:

| 50 | -128 | -124 | 30 | -12 | 1 |

  • 50 encodes a length of 3 for HD and 2 for HD-HR (500)
  • -128, -124 and 30 encode 2'000'000 in 3 bytes
  • -12 and 1 encode the delta (500)

So instead of using 16 bytes for the above sequence, we use only 6 bytes !

If we assume that we can encode 2 seqnos on average in 6 bytes, the wire size of a digest is now 1000 * (16 (UUID) + 6) = 22'000, that's down from 40'000 in a 1000 member cluster. In other words, we're saving almost 50% of the wire size of a digest !

Of course, we can not only encode seqno sequences, but also other longs, which is exactly what we did for another optimization. Examples of where this makes sense are:
  • Seqnos in NakackHeaders: every multicast message has such a header, so the savings here are significant
  • Range: this is used for retransmission requests, and is also a seqno sequence
  • RequestCorrelator IDs: used for every RPC
  • Fragmentation IDs (FRAG and FRAG2)
  • UNICAST and UNICAST2: sqnos and ranges
  • ViewId
An example of where this doesn't make sense are UUIDs: they are generated such that the bits are spread out over the entire 8 bytes, so encoding them would make 9 bytes out of 8 and that doesn't help.


JoinRsp

A JoinRsp used to contain a list of members twice: once in the view and once in the digest. The was eliminated, and now we're sending the member list only once. This also cut the wire size of a JoinRsp in half.



Further optimizations planned for 3.1 include delta views and better compressed STABLE messages:



Delta views

If we have a view of 1000 members, we always send the full address list with every view change. This is not necessary, as everybody has access to the previous view.

So, for example, when we have P, Q and R joining, and X and Y leaving in V22, then we can simply send a delta view; a view V22={V21+P+Q+R-X-Y}. This means, take the current view V21, remove members X and Y, and add members P, Q and R to the tail of the list, in order to generate a new view V22.

So, instead of sending a list of 1000 members, we simply send 5 members, and everybody creates the new view locally, based on the current view and the delta information.


Compressed STABLE messages

A STABLE message contains a digest with a list of all members and then the digest seqnos for HD and HR. Since STABLE messages are exchanged between members of the same cluster, they all have the same view, or else they would drop a STABLE message.

Hence, we can drop the View and instead send the ViewId, which is 1 address and a long. Everyone knows that the digest seqnos will be in order of the current view, e.g. seqno pair 1 belongs to the first member of the current view, seqno pair 2 to the second member and so on.

So instead of sending a list of 1000 members for a STABLE message, we only send 1 address.

This will reduce the wire size of a 1000-member digest sent by STABLE from roughly 40'000 bytes to ca. 6'000 bytes !



Download 3.0.0.CR1

The optimizations (exluding delta views and compressed STABLE messages) are available in JGroups 3.0.0.CR1, which can be downloaded from [1].

Enjoy (and feedback appreciated, on the mailing lists...) !

[1] https://sourceforge.net/projects/javagroups/files/JGroups/3.0.0.CR1

Tuesday, July 26, 2011

It's time for a change: JGroups 3.0

I'm happy to anounce that I just released a first beta of JGroups 3.0 !

It's been a long time since I released version 2.0 (Feb 2002); over 11 years and 77 2.x releases !

We've pushed a lot of API changes into 3.x, in order to provide more features, bug fixes and optimizations in 2.x releases, which were always (API) backwards compatible to previous 2.x releases.

However, now it was time to take that step and make all the changes we've accumulated over the years.

The bad thing is that 3.x will require code changes if you port your 2.x app to it... however I anticipate that those changes will be trivial. Please ask questions regarding porting on the JGroups mailing list (or forums), and also post suggestions for improvements !

The good thing is that I was able to remove a lot of code (ca. 25'000 lines compared to 2.12.1) and simplify JGroups significantly.

Just one example: the getState(OutputStream) callback in 2.x didn't have an exception in its signature, so an implementation would typically look like this:

public void getState(OutputStream output) {
    try {
        marshalStateToStream(output);
    }
    catch(Exception ex) {
         log.error(ex);
    }
}

In 3.x, getState() is allowed to throw an exception, so the code looks like this now:

public void getState(OutputStream output) throws Exception {
    marshalStateToStream(output);
}

First of all, we don't need to catch (and swallow !) the exception. Secondly, a possible exception will now actually be passed to the state requester, so that we know *why* a state transfer failed when we call JChannel.getState().

There are many small (or bigger) changes like this, which I hope will make using JGroups simpler. A list of all API changes can be found at [2].

The stability of 3 beta1 is about the same as 2.12.1 (very high), because there were mainly API changes, and only a few bug fixes or optimizations.

I've also created a new 3.x specific set of documentation (manual, tutorial, javadocs), for example see the 3.x manual at [3].

JGroups 3 beta1 can be downloaded from [1]. Please try it out and send me your feedback (mailing lists preferred) !

Enjoy !



[1] https://sourceforge.net/projects/javagroups/files/JGroups/3.0.0.Beta1

[2] https://github.com/belaban/JGroups/blob/JGroups_3_0_16_Final/doc/API_Changes.txt

[3] http://www.jgroups.org/manual-3.x/html/index.html

Friday, April 29, 2011

Largest JGroups cluster ever: 536 nodes !

I just returned from a trip to a customer who's working on creating a large scale JGroups cluster. The largest cluster I've ever created is 32 nodes, due to the fact that I don't have access to a larger lab...

I've heard of a customer who's running a 420 node cluster, but I haven't seen it with my own eyes.

However, this record was surpassed on Thursday April 28 2011: we managed to run a 536 node cluster !

The setup was 130 celeron based blades with 1GB of memory, each running 4 JVMs with 96MB of heap, plus 4 embedded devices with 4 JVMs running on each. Each blade had 2 1GB NICs setup with IP Bonding. Note that the 4 processes are competing for CPU time and network IO, so with more blades or more physical memory available, I'm convinced we could go to 1000+ nodes !

The configuration used was udp-largecluster.xml (with some modifications), recently created and shipped with JGroups 2.12.

We started the processes in batches of 130, then waited for 20 seconds, then launched the second batch and so on. The reason we staggered the startup was to reduce the number of merges, which would have increased the startup time.

Running this a couple of times (plus 50+ times over night), the cluster always formed fine, and most of the time we didn't have any merges at all.

It took around 150-200 seconds (including the 5 sleeps of 20 seconds each) to start the cluster; in the picture at the bottom we see a run that took 176 seconds.

Changes to JGroups

This large scale setup revealed that certain protocols need slight modifications to optimally support large clusters, a few of these changes are:
  • Discovery: the current view is sent back with every discovery response. This is not normally an issue, but if you have a 500+ view, then the size of a discovery response becomes huge. We'll fix this by returning only the coordinator's address and not the view. For discovery requests triggered by MERGE2, we'll return the ViewId instead of the entire view.
  • We're thinking about canonicalizing UUIDs with IDs, so nodes will be assigned unique (short) IDs instead of UUIDs. This means reducing the size for having 17 bytes (UUID) in memory in favor of 2 bytes (short).
  • STABLE messages: here, we return an array of members plus a digest (containing 3 longs) for *each* member. This also generates large messages (11K for 260 nodes).
  • The fix in general for these problems is to reduce the data sent, e.g. by compressing the view, or not sending it at all, if possible. For digests, we can also reduce the data sent by sending only diffs, by sending only 1 long and using shorts for diffs, by using bitsets representing offsets to a previously sent value, and so on. 
Ideas are abundant, we now need to see which one is the most efficient.

For now, 536 nodes is an excellent number and - remember - we got to this number *without* the changes discussed above ! I'm convinced we can easily go higher, e.g. to 1000 nodes, without any changes. However, to reach 2000 nodes, the above changes will probably be required.

Anyway, I'm very happy to see this new record !

If anyone has created an even larger cluster, I'd be very interested in hearing about it !
Cheers, and happy clustering,



Friday, April 01, 2011

JBossWorld 2011 around the corner

Wanted to let you know that I've got 2 talks at JBW (Boston, May 3-6).

The first talk [1] is about geographic failover of JBoss clusters. I'll show 2 clusters, one in NYC, the other one in ZRH. Both are completely independent and don't know about each other. However, they're bridged with a JGroups RELAY and therefore appear as if they were one big virtual cluster.

This can be used for geographic failover, but it could also be used for example to extend a private cloud with an external, public cloud without having to use a hardware VPN device.

As always with my talks, this will be demo'ed, so you know this isn't just vapor ware !

The second talk [2] discusses 5 different ways of running a JBoss cluster on EC2. I'll show 2 demos, one of which works only on EC2, the other works on all clouds.

This will be a fun week, followed by a week of biking in the Bay Area ! YEAH !!

Hope to see and meet many of you in Boston !
Cheers,


[1] http://www.redhat.com/summit/sessions/best-of.html#66

[2] http://www.redhat.com/summit/sessions/jboss.html#43

Friday, March 11, 2011

A quick update on performance of JGroups 2.12.0.Final

I forgot to add performance data to the release announcement of 2.1.0.Final, so here it is.

Caveat: this is a quick check to see if we have a performance regression, which I run routinely before a release, and my no means a comprehensive performance test !

I ran this both on my home cluster and our internal lab.


org.jgroups.tests.perf.Test

This test is described in detail in [1]. It forms a cluster of 4 nodes, and every node sends 1 million messages of varying size (1K, 5K, 20K). We measure how long it takes for every node to receive the 4 million messages, and compute the message rate and throughput, per second, per node.

This is my home cluster and consists of 4 HP ProLiant DL380G5 quad core servers (ca 3700 bogomips), connected to a GB switch, and running Linux 2.6. The JDK is 1.6 and the heap size is 600M. I ran 1 process on every box. The configuration used was udp.xml (using IP multicasting) shipped with JGroups.

Results
  •   1K message size: 140 MBytes / sec / node
  •   5K message size: 153 MBytes / sec / node
  • 20K message size: 154 MBytes / sec / node
 This shows that GB ethernet is saturated. The reason that every node receives more than the limit of GB ethernet (~ 125 MBytes/sec) is that every node loops back its own traffic, and therefore doesn't have to share it with other incoming packets. In theory, the max throughput should therefore be 4/3 * 125 ~= 166 MBytes/sec. We see that the numbers above are not too far away from this.


org.jgroups.tests.UnicastTestRpcDist

This test mimicks the way Infinispan's DIST mode works.

Again, we form a cluster of between 1 and 9 nodes. Every node is on a separate machine. The test then has every node invoke 2 unicast RPCs in randomly selected nodes. With a chance of 80% the RPCs are reads, and with a chance of 20% they're writes. The writes carry a payload of 1K, and the reads return a payload of 1K. Every node makes 20'000 RPCs.

The hardware is a bit more powerful than my home cluster; every machine has 5300 bogomips, and all machines are connected with GB ethernet.

Results
  • 1 node:   50'000 requests / sec /node
  • 2 nodes: 23'000 requests / sec / node
  • 3 nodes: 20'000 requests / sec / node
  • 4 nodes: 20'000 requests / sec / node
  • 5 nodes: 20'000 requests / sec / node
  • 6 nodes: 20'000 requests / sec / node
  • 7 nodes: 20'000 requests / sec / node
  • 8 nodes: 20'000 requests / sec / node
  • 9 nodes: 20'000 requests / sec / node
As can be seen, the number of requests per node is the same after 2-3 nodes. The 1 node scenario is somewhat contrived as there is no network communication involved.

This is actually good news, as it shows that performance grows linearly. As a matter of fact, with increasing cluster size, the chances of more than 2 nodes picking the same target decreases, therefore performance degradation due to (write) access conflicts are likely to decrease.

Caveat: I haven't tested this on a larger cluster yet, but the current performance is already very promising.

[1] http://community.jboss.org/docs/DOC-11594

Wednesday, March 09, 2011

It took me 9 years to go from JGroups 2.0.0 to 2.12.0

Yes, you heard right: I released JGroups 2.0.0, new, shiny and refactored, in Feb 2002.

I just released JGroups 2.12.0.Final, which will be the last minor release on the 2.x branch. (There won't be a 2.13; bug fixes will go into 2.12.x).

Time difference: 9 years and change...:-)

I'm still investigating why it took me so long !

Anyway, 2.12.0.Final is here and it is an important release, as it will be shipped in Infinispan 4.2.1 and JBoss 6.


Below are the major features and bug fixes.

On to 3.0 !
Cheers,




Release Notes JGroups 2.12


JGroups 2.12 is API-backwards compatible with previous versions (down to 2.2.7).



New features



RELAY: connecting local (autonomous) clusters into a large virtual cluster


[https://issues.jboss.org/browse/JGRP-747]

A new protocol to connect 2 geographically separate sites into 1 large virtual cluster. The local clusters are
completely autonomous, but RELAY makes them appear as if they were one.

This can for example be used to implement geographic failover

Blog: http://belaban.blogspot.com/2010/11/clustering-between-different-sites.html



LockService: a new distributed locking service

[https://issues.jboss.org/browse/JGRP-1249]
[https://issues.jboss.org/browse/JGRP-1298]
[https://issues.jboss.org/browse/JGRP-1278]

New distributed lock service, offering a java.util.concurrent.lock.Lock implementation (including conditions)
providing cluster wide locks.

Blog: http://belaban.blogspot.com/2011/01/new-distributed-locking-service-in.html



Distributed ExecutorService

[https://issues.jboss.org/browse/JGRP-1300]

New implementation of java.util.concurrent.ExecutorService over JGroups (contributed by William Burns).
Read the documentation at www.jgroups.org for details.



BPING (Broadcast Ping): new discovery protocol based on broadcasting

[https://issues.jboss.org/browse/JGRP-1269]

This is mainly used for discovery of JGroups on Android based phones. Apparently, IP multicasting is not correctly implemented / supported on Android (2.1), and so we have to resort to UPD broadcasting.

Blog: http://belaban.blogspot.com/2011/01/jgroups-on-android-phones.html



JDBC_PING: new discovery protocol using a shared database


[https://issues.jboss.org/browse/JGRP-1231]

All nodes use a shared DB (e.g. RDS on EC2) to place their location information into, and to read information from.
Thanks to Sanne for coming up with the idea and for implementing this !
Additional infos are on the wiki: community.jboss.org/wiki/JDBCPING


FD_SOCK: ability to pick the bind address and port for the client socket

[https://issues.jboss.org/browse/JGRP-1262]



Pluggable address generation


[https://issues.jboss.org/browse/JGRP-1297]

Address generation is now pluggable; JChannel.setAddressGenerator(AddressGenerator) allows for generation of specific implementations of Address. This can for example be used to pass additional information along with every address. Currently used by RELAY to pass the name of the sub cluster around with a UUID.





Optimizations



NAKACK: retransmitted messages don't need to be wrapped


[https://issues.jboss.org/browse/JGRP-1266]

Not serializing retransmitted messages at the retransmitter and deserializing them at the requester saves
1 serialization and 1 deserialization per retransmitted message.


Faster NakReceiverWindow

[https://issues.jboss.org/browse/JGRP-1133]

Various optimizations to reduce locking in NakReceiverWindow:
  • Use of RetransmitTable (array-based matrix) rather than HashMap (reduced memory need, reduced locking, compaction)
  • Removal of double locking






Bug fixes



NAKACK: incorrect digest on merge and state transfer

[https://issues.jboss.org/browse/JGRP-1251]

When calling JChannel.getState() on a merge, the fetched state would overwrite the digest incorrectly.


AUTH: merge can bypass authorization

[https://issues.jboss.org/browse/JGRP-1255]

AUTH would not check creds of other members in case of a merge. This allowed an unauthorized node to join a cluster by triggering a merge.


Custom SocketFactory ignored

[https://issues.jboss.org/browse/JGRP-1276]

Despite setting a custom SocketFactory, it was ignored.


UFC: crash of depleted member could hang node

[https://issues.jboss.org/browse/JGRP-1274]

Causing it to wait forever for credits from the crashed member.


Flow control: crash of member doesn't unblock sender


[https://issues.jboss.org/browse/JGRP-1283]
[https://issues.jboss.org/browse/JGRP-1287]
[https://issues.jboss.org/browse/JGRP-1274]

When a sender block on P sending credits, and P crashes before being able to send credits,
the sender blocks indefinitely.


UNICAST2: incorrect delivery order under stress

[https://issues.jboss.org/browse/JGRP-1267]

UNICAST2 could (in rare cases) deliver messages in incorrect order. Fixed by using the same (proven)
algorithm as NAKACK.


Incorrect conversion of TimeUnit if MILLISECONDS were not used

[https://issues.jboss.org/browse/JGRP-1277]


Check if bind_addr is correct

[https://issues.jboss.org/browse/JGRP-1280]

JGroups now verifies that the bind address is indeed a valid IP address: it has to be either the wildcard
address (0.0.0.0) or an address of a network interface that is up.


ENCRYPT: sym_provider ignored

[https://issues.jboss.org/browse/JGRP-1279]

Property sym_provider is ignored



Manual


The manual is online at http://www.jgroups.org/manual/html/index.html



The complete list of features and bug fixes can be found at http://jira.jboss.com/jira/browse/JGRP.

Download the new release at https://sourceforge.net/projects/javagroups/files/JGroups/2.12.0.Final.

Bela Ban, Kreuzlingen, Switzerland
Vladimir Blagojevic, Toronto, Canada
Richard Achmatowicz, Toronto, Canada
Sanne Grinovero, Newcastle, Great Britain

March 2011

Saturday, January 22, 2011

JGroups on Android phones

Yann Sionneau recently completed a port of JGroups to Android (2.1+). He took the 2.11 version of JGroups and removed classes which weren't available on Android, and changed some code to make JGroups run on Android.

The QR code for a demo app (based on Draw) is available at [1]. Point a QR code scanner to it, download the app and run it on your Android based phone (I ran it on my HTC Desire). Then start Draw on your local computer, connected to the same wifi network as the phone. The instances, whether run on the phone or computers, should find each other and form a cluster.

It was cool to draw some lines on my HTC and see them getting drawn on all cluster instances as well !

[1] http://sionneau.net/index.php?option=com_content&view=article&id=12%3Atouchsurface-android-app-now-pc-compatible-&catid=3%3Adivers&Itemid=2&lang=en

Friday, January 21, 2011

New distributed locking service in JGroups

I just uploaded JGroups 2.12.0.Beta1, which contains a first version of the new distributed locking service (LockService), which replaces DistributedLockManager.

LockService provides a distributed implementation of java.util.concurrent.lock.Lock. A lock is named and locking granularity is per thread. Here's an example of how to use it:

// lock.xml has to have a locking protocol in it
JChannel ch=new JChannel("/home/bela/lock.xml");
LockService lock_service=new LockService(ch);
Lock lock=lock_service.getLock("mylock");
if(lock.tryLock(2000, TimeUnit.MILLISECONDS)) {
    try {
        // access the resource protected by "mylock"
    }
    finally {
        lock.unlock();
    }
}

If "mylock" is locked by a different thread, it doesn't matter whether inside the same JVM, on the same box, or somewhere in the same cluster, then tryLock() will return false after 2 seconds, else it'll return true.

Lock.newCondition() is currently not implemented - if there's a need for this, let us know on one of the JGroups mailing lists and we'll tackle this. If you have a chance to play with LockService, we're also grateful for feedback.

The new locking service is part of 2.12.0.Beta1, which can be downloaded at [1]. Documentation is at [2].
Cheers,


[1] http://sourceforge.net/projects/javagroups/files/JGroups/2.12.0.Beta1
[2] http://www.jgroups.org/manual/html/index.html, section 4.6

Tuesday, November 30, 2010

Clustering between different sites / geopgraphic failover

I just completed a new feature in JGroups which allows for transparent bridging of separate clusters, e.g. at different sites.

Let's say we have a (local) cluster in New York (NYC) and another cluster in San Francisco (SFO). They're completely autonomous, and can even have completely different configurations.

RELAY [1] essentially has the coordinators of the local clusters relay local traffic to the remote cluster, and vice versa. The relaying (or bridging) is done via a separate cluster, usually based on TCP, as IP multicasting is typically not allowed between sites.

SFO could be a backup of NYC, or both could be active, or we could think of a follow-the-sun model where each cluster is active during working hours at its site.

If we have nodes {A,B,C} in NYC and {D,E,F} in SFO, then there would be a global view, e.g. {D,E,F,A,B,C}, which is the same across all the nodes of both clusters.

One use of RELAY could be to provide geographic failover in case of site failures. Because all of the data in NYC is also available in SFO, clients can simply fail over from NYC to SFO if the entire NYC site goes down, and continue to work.

Another use case is to have SFO act as a read-only copy of NYC, and run data analysis functions on SFO, without disturbing NYC, and with access to almost real-time data.

As you can guess, this feature is going to be used by Infinispan, and since Infinispan serves as the data replication / distribution layer in JBoss, we hope to be able to provide replication / distribution between sites in JBoss as well...

Exciting times ... stay tuned for more interesting news from the Infinispan team !

Read more on RELAY at [1] and provide feedback !
Cheers,


[1] http://www.jgroups.org/manual/html/user-advanced.html#RelayAdvanced

Tuesday, November 23, 2010

JGroups finally has a logo

After conducting a vote on the logos designed by James Cobb, the vast majority voted for logo #1. So I'm happy to say that, after 12 years, JGroups finally has a logo !

I added the logo and favicon to jgroups.org. Let me know what you think !


There's also swag available on cafepress, check it out !

Friday, October 29, 2010

JGroups 2.11 final released

FYI,

2.11.0.final can be downloaded here. Its main features, optimizations and bug fixes are listed below.

I hope that 2.12 will be the last release before finally going to 3.0 !

2.12 should be very small, currently it contains only 8 issues (mainly optimizations).

However, I also moved RELAY from 3.x to 2.12.

RELAY allows for connecting geographically separate clusters into a large virtual cluster. This will be interesting to apps which need to provide geographic failover. More on this in the next couple of weeks...

Meanwhile ... enjoy 2.11 !

Bela, Vladimir & Richard



Release Notes JGroups 2.11
==========================


Version: $Id: ReleaseNotes-2.11.txt,v 1.2 2010/10/29 11:45:35 belaban Exp $
Author: Bela Ban

JGroups 2.11 is API-backwards compatible with previous versions (down to 2.2.7).

Below is a summary (with links to the detailed description) of the major new features.


New features
============



AUTH: pattern matching to prevent unauthorized joiners
------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-996]

New plugin for AUTH which can use pattern matching against regular expressions to prevent unauthorized
IP addresses to join a cluster.

Blog: http://belaban.blogspot.com/2010/09/cluster-authentication-with-pattern.html



DAISYCHAIN: implementation of daisy chaining
--------------------------------------------
[https://jira.jboss.org/browse/JGRP-1021]

Daisy chaining sends messages around in a ring, improving throughput for non IP multicast networks.

Blog: http://belaban.blogspot.com/2010/08/daisychaining-in-clouds.html



New flow control protocols for unicast (UFC) and multicast (MFC) messages
-------------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1154]

MFC and UFC replace FC. They can be used independently, and performance is faster than that of FC only.


API for programmatic creation of channel
----------------------------------------
[https://jira.jboss.org/browse/JGRP-1245]

Allows for programmatic creation of a JChannel, no need for XML config file.

Blog: http://belaban.blogspot.com/2010/10/programmatic-creation-of-channel.html


S3: new features
----------------
[https://jira.jboss.org/browse/JGRP-1234] Allow use of public buckets (no credentials need to be sent)
[https://jira.jboss.org/browse/JGRP-1235] Pre-signed URLs



STOMP: new protocol to allows STOMP clients to talk to a JGroups node
---------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1248]

Blog: http://belaban.blogspot.com/2010/10/stomp-for-jgroups.html







Optimizations
=============


NAKACK: simplify and optimize handling of OOB messages
------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1104]


Discovery: reduce number of discovery responses sent in a large cluster
-----------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1181]

A new propery (max_rank) determines who will and who won't send discovery responses.


New timer implementations
-------------------------
[https://jira.jboss.org/browse/JGRP-1051]

Way more effecient implementations of the timer (TimeScheduler).




Bug fixes
=========

ENCRYPT: encrypt entire message when length=0
---------------------------------------------
[https://jira.jboss.org/browse/JGRP-1242]

ENCRYPT would not encrypt messages whose length = 0


FD_ALL: reduce number of messages sent on suspicion
---------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1241]


FILE_PING: empty files stop discovery
-------------------------------------
[https://jira.jboss.org/browse/JGRP-1246]




Manual
======

The manual is online at http://www.jgroups.org/manual/html/index.html



The complete list of features and bug fixes can be found at http://jira.jboss.com/jira/browse/JGRP.


Bela Ban, Kreuzlingen, Switzerland
Vladimir Blagojevic, Toronto, Canada
Richard Achmatowicz, Toronto, Canada

Nov 2010

Wednesday, October 27, 2010

STOMP for JGroups

FYI,

I've written a new JGroups protocol STOMP, which implements the STOMP protocol. This allows for STOMP clients to connect to any JGroups server node (which has the JGroups STOMP protocol in its configuration).

The benefits of this are:
  •  Clients can be written in any language. For example, I've used stomppy, a Python client, to connect to JGroups server nodes, and successfully subscribed to destinations, and sent and received messages.
  • Sometimes, clients don't want to be peers, ie. they don't want to join a cluster and become full members. These (light-weight) clients could also be in a different geographic location, and not be able to use IP multicasting.
  • Clients are started and stopped frequently, and there might be many of them. Frequently starting and stopping a full-blown JGroups server node has a cost, and is not recommended. Besides, a high churn rate might move the cluster coordinator around quite a lot, preventing it from doing real work.
  • We can easily scale to a large number of clients. Although every client requires 1 thread on the server side, we can easily support hundreds of clients. Note though that I wouldn't use the current JGroups STOMP protocol to connect thousands of clients...
Let's take a quick look: I started an instance of JGroups with STOMP on the top of the protocol stack (on 192.168.1.5). Then I connected to it with the JGroups client:

JGroups STOMP client

As can be seen, the first response the client received was an INFO with information about the available endpoints (STOMP instances) in the cluster. This is actually used by the StompConnection client to failover to a different server node should the currently connected to server fail.
Next, we subscribe to destination /a using the simplified syntax of the JGroups STOMP client.

Then, a telnet session to 192.168.1.5:8787 was started:

Telnet STOMP client



We get the INFO response with the list of endpoints too here. Then we subscribe to the /a destination. Note that the syntax used here is compliant with the STOMP protocol spec: first is the verb (SUBSCRIBE), then an optional bunch of headers (here just one, defining the destination to subscribe to), a newline and finally the body, terminated with a 0 byte. (SUBSCRIBE does not have a body).

Next, we send a message to all clients subscribed to /a. This is the telnet session itself, as evidenced by the reception of MESSAGE. If you look at the JGroups STOMP client, the message is also received there.

Next the JGroups client also sends a message to destination /a, which is received by itself and the telnet client.

JGroups 2.11.0.Beta2 also ships with a 'stompified' Draw demo, org.jgroups.demos.StompDraw, which is a stripped down version of Draw, using the STOMP protocol to send updates to the cluster.

Let me know what you think of this; feature requests, feedback etc appreciated (preferably on one of the JGroups mailing lists) !



The new protocol is part of JGroups 2.11.0.Beta2, which can be downloaded here.

Documentation is here.

Enjoy !

Wednesday, October 20, 2010

Programmatic creation of a channel

I've committed code which provides programmatic creation of channels. This is a way of creating a channel without XML config files. So instead of writing

JChannel ch=new JChannel("udp.xml");

, I can construct the channel programmatically:


JChannel ch=new JChannel(false);                 // 1
ProtocolStack stack=new ProtocolStack(); // 2
ch.setProtocolStack(stack);              // 3
stack.addProtocol(new UDP().setValue("ip_ttl", 8));
     .addProtocol(new PING())
     .addProtocol(new MERGE2())
     .addProtocol(new FD_SOCK())
     .addProtocol(new FD_ALL().setValue("timeout", 12000));
     .addProtocol(new VERIFY_SUSPECT())
     .addProtocol(new BARRIER())
     .addProtocol(new NAKACK())
     .addProtocol(new UNICAST2())
     .addProtocol(new STABLE())
     .addProtocol(new GMS())
     .addProtocol(new UFC())
     .addProtocol(new MFC())
     .addProtocol(new FRAG2());       // 4
stack.init();                         // 5


First, a JChannel is created (1). The 'false' argument means that the channel must not create its own protocol stack, because we create it (2) and stick it into the channel (3).

Next, all protocols are created and added to the stack (4). This needs to happen in the order in which we want the protocols to be, so the first protocol added is the transport protocol (UDP in the example).

Note that we can use Protocol.setValue(String attr_name, Object attr_value) to configure each protocol instance. We can also use regular setters if available.

Finally, we call init() (5), which connects the protocol list correctly and calls init() on every instance. This also handles shared transports correctly. For an example of how to create a shared transport with 2 channels on top see ProgrammaticApiTest.

I see mainly 3 use cases where programmatic creation of a channel is preferred over declarative creation:
  1. Someone hates XML (I'm not one of them) :-)
  2. Unit tests
  3. Projects consuming JGroups might have their own configuration mechanism (e.g. GUI, properties file, different XML configuration  etc) and don't want to use the XML cofiguration mechanism shipped with JGroups.
Let me know what you think about this API ! I deliberately kept it simple and stupid, and maybe there are things people like to see changed. I'm open to suggestions !


Cheers,

Friday, October 01, 2010

Confessions of a serial protocol designer

I have a confession to make.

I'm utterly disgusted by my implementation of FD_ALL, and thanks to David Forget for pointing this out !

What's bad about FD_ALL ? It will not scale at all ! After having written several dozen protocols, I thought an amateurish mistake like the one I'm about to show would certainly not happen to me anymore. Boy, was I wrong !

FD_ALL is about detecting crashed nodes in a cluster, and the protocol then lets GMS know so that the crashed node(s) can be excluded from the view.

Let's take a look at the design.
  • Every node periodically multicasts a HEARTBEAT
  • This message is received by everyone in the cluster and a hashmap of nodes and timestamps is updated; for a node P, P's timestamp is set to the current time
  • Another task run at every node periodcially iterates through the timestamps and checks if any timestamps haven't been updated for a given time. If that's the case, the members with outdated timestamps are suspected
  • A suspicion of P results in a SUSPECT(P) multicast
  • On reception of SUSPECT(P), every node generates a SUSPECT(P) event and passes it up the stack
  • VERIFY_SUSPECT catches SUSPECT(P) and sends an ARE_YOU_DEAD message to P
  • If P is still alive, it'll respond with a I_AM_NOT_DEAD message
  • If the sender doesn't get this message for a certain time, it'll pass the SUSPECT(P) event further up the stack (otherwise it'll drop it), and GMS will exclude P from the view, but if and only if that given node is the coordinator (first in the view)
Can anyone see the flaw in this design ? Hint: it has to do with the number of messages generated...

OK, so let's see what happens if we have a cluster of 100 nodes:
  • Say node P is temporarily slow; it doesn't send HEARTBEATs because a big garbage collection is going on, or the CPU is crunching at 90%
  • 99 nodes multicast a SUSPECT(P) message
  • Every node Q therefore receives 99 SUSPECT(P) messages
    • Q (via VERIFY_SUSPECT) sends a ARE_YOU_DEAD message to P
    • P (if it can) responds with an I_AM_NOT_DEAD back to Q
    • So the total number of messages generated by a single node is 99 * 2
  • This is done on every node, so the total number of messages is 99 * 99 * 2 = 19'602 messages !

Can you imagine what happens to P, which is a bit overloaded and cannot send out HEARTBEATs in time when it receives 19'602 messages ?

It it aint dead yet, it will die !

Isn't it ironic: by asking a node if it is still alive, we actually kill it !

This is an example of where the effects of using IP multicasts were not taken into account: if we multicast M, and everybody who receives M sends 2 messages, I neglected to see that the number of messages sent is a function of the cluster size !

So what's the solution ? Simple, elegant and outlined in [1].
  • Everybody sends a HEARTBEAT multicast periodically
  • Every member maintains a suspect list 
  • This list is adjusted on view changes 
  • Reception of a SUSPECT(P) message adds P to the list 
  • When we suspect P because we haven't received a HEARTBEAT (or traffic if enabled): 
    • The set of eligible members is computed as: members - suspected members 
    • If we are the coordinator (first in the list): 
      • Pass a SUSPECT(P) event up the stack, this runs the VERIFY_SUSPECT protocol and eventually passes the SUSPECT(P) up to GMS, which will exclude P from the view

The cost of running the suspicion protocol is (excluding the periodic heartbeat multicasts):
  • 1 ARE_YOU_DEAD unicast to P
  • A potential response (I_AM_NOT_DEAD) from P to the coordinator
TOTAL COST in a cluster of 100: 2 messages (this is always constant), compared to 19'602 messages before !

This is way better than the previous implementation !


[1] https://jira.jboss.org/browse/JGRP-1241

Wednesday, September 22, 2010

JUDCon 2010 Berlin

I'll be giving a talk at JUDCon 2010 (Oct 7 and 8, Berlin) on how to configure JBoss clusters to run optimally in a cloud (EC2).

It would be cool to see some of you, we can discuss JGroups and other topics over a beer !

The agenda is here.

Cheers,

Friday, September 17, 2010

Cluster authorization with pattern matching

I've added a new plugin to AUTH which allows for pattern matching to determine who can join a cluster.

The idea is very simple: if a new node wants to join a cluster, we only admit the node into the cluster if it matches a certain pattern. For example, we could only admit nodes whose IP address starts with 192.168.* or 10.5.*. Or we could only admit nodes whose logical name is "groucho" or "marx".

Currently, the 2 things I match against are IP address and logical name, but of course any attribute of a message could be used to match against.

Let's take a look at an example.

<AUTH auth_class="org.jgroups.auth.RegexMembership"
      match_string="groucho | marx"
      match_ip_address="false"
      match_logical_name="true" />

This example uses the new plugin RegexMembership (derived from FixedMembership). Its match string (which takes any regular expression as value) says that any node whose logical name is "marx" or "groucho" will be able to join. Note that we set match_logical_name to true here.

Note that AUTH has to be placed somewhere below GMS (Group MemberShip) in the configuration.

<AUTH auth_class="org.jgroups.auth.RegexMembership"
      match_string=
      "192.168.[0-9]{1,3}\.[0-9]{1,3}(:.[0-9]{1,5})?"
      match_ip_address="true"
      match_logical_name="false"  />

This example is a bit more complex, but it essentially says that all nodes whose IP address starts with 192.168 are allowed to join the cluster. So 192.168.1.5 and 192.168.1.10:5546 would pass, while 10.1.4.5 would be rejected.

I have to admit, I'm not really an expert in regular expression, so I guess the above expression could be simplified. For example, I gave up trying to define that hosts starting either with 192.168 or 10.5 could join.
If you know how to do that, please send me the regular expression !

Friday, August 13, 2010

Daisychaining in the clouds

I've been working on a new protocol DAISYCHAIN [1] which is based on research out of EPFL [2].

The idea behind it is that it is inefficient to broadcast a message in clusters where IP multicasting is not available. For example, if we only have TCP available (as is the case in most clouds today), then we have to send a broadcast (or group) message N-1 times. If we want to broadcast M to a cluster of 10, we send the same message 9 times.

Example: if we have {A,B,C,D,E,F}, and A broadcasts M, then it sends it to B, then to C, then to D etc.

If we have a 1 GB switch, and M is 1GB, then sending a broadcast to 9 members takes 9 seconds, even if we parallelize the sending of M. This is due to the fact that the link to the switch only sustains 1GB / sec. (Note that I'm conveniently ignoring the fact that the switch will start dropping packets if it is overloaded, causing TCP to retransmit, slowing things down)...

Let's introduce the concept of a round. A round is the time it takes to send or receive a message. In the above example, a round takes 1 second if we send 1 GB messages.




In the existing N-1 approach, it takes X * (N-1) rounds to send X messages to a cluster of N nodes. So to broadcast 10 messages a the cluster of 10, it takes 90 rounds.


Enter DAISYCHAIN.

The idea is that, instead of sending a message to N-1 members, we only send it to our neighbor, which forwards it to its neighbor, and so on. For example, in {A,B,C,D,E}, D would broadcast a message by forwarding it to E, E forwards it to A, A to B, B to C and C to D. We use a time-to-live field, which gets decremented on every forward, and a message gets discarded when the time-to-live is 0.

The advantage is that, instead of taxing the link between a member and the switch to send N-1 messages, we distribute the traffic more evenly across the links between the nodes and the switch. Let's take a look at an example, where A broadcasts messages m1 and m2 in cluster {A,B,C,D}, '-->' means sending:

Traditional N-1 approach

Round 1: A(m1) --> B
Round 2: A(m1) --> C
Round 3: A(m1) --> D
Round 4: A{m2) --> B
Round 5: A(m2} --> C
Round 6: A(m2) --> D

It takes 6 rounds to broadcast m1 and m2 to the cluster.


Daisychaining approach

Round 1: A(m1) --> B
Round 2: A(m2) --> B || B(m1) --> C
Round 3: B(m2) --> C || C(m1) --> D
Round 4: C(m2) --> D

In round 1, A send m1 to B.
In round 2, A sends m2 to B, but B also forwards m1 (received in round 1) to C.
In round 3, A is done. B forwards m2 to C and C forwards m1 to D(in parallel, denoted by '||').
In round 4, C forwards m2 to D.

Switch usage

Let's take a look at this in terms of switch usage: in the N-1 approach, A can only send 125MB/sec, no matter how many members there are in the cluster, so it is constrained by the link capacity to the switch. (Note that A can also receive 125MB/sec in parallel with today's full duplex links).

So the link between A and the switch gets hot.

In the daisychaining approach, link usage is more even: if we look for example at round 2, A sending to B and B sending to C uses 2 different links, so there are no constraints regarding capacity of a link. The same goes for B sending to C and C sending to D.

In terms of rounds, the daisy chaining approach uses X + (N-2) rounds, so for a cluster size of 10 and broadcasting 10 messages, it requires only 18 rounds, compared to 90 for the N-1 approach !


Performance

I ran a quick performance test this morning, with 4 nodes connected to a 1 GB switch; and every node sending 1 million 8K messages, for a total of 32GB received by every node. The config used was tcp.xml.

The N-1 approach yielded a throughput of 73 MB/node/sec, and the daisy chaining approach 107MB/node/sec !

The change to switch from N-1 to daisy chaining was to place DAISYCHAIN  directly on top of TCP.

DAISYCHAIN is still largely experimental, but the numbers above show that it has potential to improve performance in TCP based clusters.


[1] https://jira.jboss.org/browse/JGRP-1021
[2] infoscience.epfl.ch/record/149218/files/paper.pdf

Monday, July 12, 2010

JGroups 2.10 final released

I'm happy to announce that JGroups 2.10 final has been released. It can be downloaded from SourceForge and contains the following major new features (for a detailed list of the 80+ issues  check 2.10 in JIRA):

SCOPE: concurrent delivery of messages from the same sender
[https://jira.jboss.org/browse/JGRP-822]

By default, messages from a sender P are delivered in the (FIFO) order in which P sent them (ignoring OOB messages for now). However, sometimes it would be beneficial to deliver unrelated messages concurrently, e.g. modifications sent by P for different HTTP sessions.

SCOPE is a new protocol, which allows a developer to define a scope for a message, and that scope is then used to deliver messages from P concurrently.

See http://www.jgroups.org/manual/html/user-advanced.html#Scopes for details.


Use of factory to create sockets
[https://jira.jboss.org/browse/JGRP-278]

There's now a method Protocol.setSocketFactory(SocketFactory) which allows to set a socket factory, used to create and close datagram and TCP (client and server) sockets. The default implementation keeps track of open sockets, so
./probe.sh socks
dumps a list of open sockets.


UNICAST2: experimental version of UNICAST based on negative acks
[https://jira.jboss.org/browse/JGRP-1140]

By not sending acks for received messages, we can cut down on the number of acks. UNICAST2 is ca 20-30% faster than UNICAST as a result. Needs more testing though, currently UNICAST2 is experimental.


Certain IPv4 addresses should be allowed in an IPv6 stack
[https://jira.jboss.org/browse/JGRP-1152]

They will be converted into IPv6 mapped IPv4 addresses. This relaxes the (too restrictive) IP address conformance testing somewhat, and allows for more configurations to actually start the stack and not fail with an exception.


Multiple components using the same channel
[https://jira.jboss.org/browse/JGRP-1177]

This is a new light weight version of the (old and dreaded !) Multiplexer, which allows for sharing of channels between components, such as for example HAPartition and Infinispan.

*** Only to be used by experts ! ***


MERGE2: fast merge
[https://jira.jboss.org/browse/JGRP-1191]

Fast merge in case where we receive messages from a member which is not part of our group, but has the same group name.


RpcDispatcher / MessageDispatcher: add exclusion list
[https://jira.jboss.org/browse/JGRP-1192]

If an RPC needs to be sent to all nodes in a cluster except one node (e.g. the sender itself), then we can simply exclude the sender. This is done using
RequestOptions.setExclusionList(Address ...  xcluded_mbrs).
This is simpler than having to create the full list, and remove the sender.


Ability to use keywords instead of IP addresses
[https://jira.jboss.org/browse/JGRP-1204]

Whenever IP addresses (symbolic or dotted-decimal notation) are used, we can now use a keyword instead. Currently, the keywords are "GLOBAL" (public IP address), "SITE_LOCAL" (private IP address), "LINK_LOCAL" (link local), "LOOPBACK" (a loopback address) and "NON_LOOPBACK" (any but a loopback address).
This is useful in cloud environments where IP address may not be known beforehand.
Example: java -Djgroups.bind_addr=SITE_LOCAL
Example:



GossipRouter: re-introduce pinging to detect crashed clients
[https://jira.jboss.org/browse/JGRP-1213]

When clients are terminated without closing of sockets (e.g. in virtualized environments), they'd cause their
entries to not be removed in GossipRouter. This was changed by (re-)introducing pinging.








Feeback is appreciated via the usual channels (mailing list, IRC) !
Enjoy !

Bela Ban
Vladimir Blagojevic
Richard Achmatowicz

Friday, July 09, 2010

mod-cluster webinar: video available on vimeo

On July 7th, I did a webinar on mod-cluster, and it was a huge success: 1215 people signed up and 544 attended the webinar ! I'm told that this is the second highest turnout ever for Red Hat (the highest being an xvirt webinar a couple of years ago, with 600 attendees)...

For those who missed the webex presentation, here's the link to the recorded video. For those who only want to see the demo, it is here.

The demo is really cool: I set up a huge cluster in the cloud, spanning GoGrid, EC2 and Rackspace as clouds, and fronting a JBoss 6 based cluster with mod-cluster.

I showed how cluster nodes dynamically register themselves with httpd, or de-register when shutting down, and how web applications get registered/de-registered.

For those who know mod-jk: no more workers.properties or uriworkmap.properties are needed !

The coolest part was where I ran a load test, simulating 80 clients, each creating and destroying a session every 30 seconds: initially I ran 2 cluster nodes on EC2, so every node had 40 sessions on average. Then I started another EC2 instance, a GoGrid instance and 2 Rackspace instances, and after a few minutes, there were 3 mod-cluster domains with 3, 1 and 2 servers respectively, and every server had ca 12 sessions on average !

This can be compared to a bookshop, which spins up additional servers in the cloud around the holidays to serve increased traffic, and where the servers form a cluster for redundancy (don't want to lose your shoppig cart !).

Enjoy the demo, and give us feedback on mod-cluster on the mailing list or forum.

Bela

Friday, May 07, 2010

JBossWorld in Boston and bike riding in California

I'll be talking about mod-cluster at JBossWorld this June. It was a good talk last year, and I've spiced up the demo even more: I'm going to show 2 Apache httpd instances running in different clouds, and 3 domains of JBoss instances, also running in 3 different clouds (GoGrid, Amazon EC2 and Rackspace).

This will be a fun talk, showing the practical aspects of clouds, and not focusing on the hype (I leave that to marketing :-)).

This led to some changes in JGroups, which I'll talk about in my next blog post.

It would be cool to see some of you at JBW !

After that, I'll fly to the best place in the US: the Bay Area ! I'll be there June 25 until July 2nd and will rent a race bike, to ride my 5 favorite rides (from the time when I lived in San Jose). A friend will join me for some insane riding (he's preparing for the Death Ride), so this will definitely be fun !

Now let's just hope that some unknown volcano in Iceland doesn't stop me from making the trip to the US ! :-)

Saturday, March 27, 2010

Scopes: making message delivery in JGroups more concurrent

In JGroups, messages are delivered in the order in which they were sent by a given member. So when member X sends messages 1-3 to the cluster, then everyone will deliver them in the order X1 -> X2 -> X3 ('->' means 'followed by').

When a different member Y delivers messages 4-6, then they will get delivered in parallel to X's messages ('||' means 'parallel to'):
X1 -> X2 -> X3 || Y4 -> Y5 -> Y6

This is good, but what if X has 100 HTTP sessions and performs session replication ?

All modifications to the sessions are sent to the cluster, and will get delivered in the order in which they were performed.

The problem here is that even updates to different sessions will be ordered, e.g. if X updates sessions A, B and C, then we could end up with the following delivery order (X is omitted for brevity):
A1 -> A2 -> B1 -> A3 -> C1 -> C2 -> C3

This means that update 1 to session C has to wait until updates A1-3 and B1 have been processed; in other words, an update has to wait until all updates ahead of it in the queue have been processed !

This unnecessarily delays updates: since updates to A, B and C and unrelated, we could deliver them in parallel, e.g.:

A1 -> A2 -> A3 || B1 || C1 -> C2 -> C3

This means that all updates to A are delivered in order, but parallel to updates to B and updates to C.

How is this done ? Enter the SCOPE protocol.

SCOPE delivers messages  in the order in which they were sent within a given scope. Place it somewhere above NAKACK and UNICAST (or SEQUENCER).

To give a message a scope, simply use Message.setScope(short). The argument should be as unique as possible, to prevent collisions.

The use case described above is actually for real, and we anticipate using this feature in HTTP session replication / distribution in the JBoss application server !

More detailed documentation of  scopes can be found at [1]. Configuration of the SCOPE protocol is described in [2].

This is yet an experimental feature, so feedback is appreciated !

[1] Scopes
[2] The SCOPE protocol

Friday, March 05, 2010

Status report: performance of JGroups 2.10.0.Alpha2

I've already improved (mainly unicast) performance in Alpha1, a short list is:

  • BARRIER: moved lock acquired by every up-message out of the critical path
  • IPv6: just running a JGroups channel without any system props (e.g. java.net.preferIPv4Stack=true) now works, as IPv4 addresses are mapped to IP4-mapped IPv6 addresses under IPv6
  • NAKACK and UNICAST: streamlined marshalling of headers, drastically reducing the number of bytes streamed when marshalling headers
  • TCPGOSSIP: Vladimir fixed a bug in RouterStub which caused GossipRouters to return incorrect membership lists, resulting in JOIN failures
  • TP.Bundler:
    • Provided a new bundler implementation, which is faster than the default one (the new *is* actually the default in 2.10)
    • Sending of message lists (bundling): we don't ship the dest and src address for each message, but only ship them *once* for the entire list
  • AckReceiverWindow (used by UNICAST): I made this almost lock-free, so concurrent messages to the same recipient don't compete for the same lock. Should be a nice speedup for multiple unicasts to the same sender (e.g. OOB messages)
The complete list of features is at [1].

In 2.10.0.Alpha2 (that's actually the current CVS trunk), I replaced strings as header names with IDs [2]. This means that for each header, instead of marshalling "UNICAST" as a moniker for the UnicastHeader, we marshal a short.

The string (assuming a single-byte charset) uses up 9 bytes, whereas the short uses 2 bytes. We usually have 3-5 headers per message, so that's an average of 20-30 bytes saved per message. If we send 10 million messages, those saving accumulate !

Not only does this change make the marshalled message smaller, it also means that a message kept in memory has a smaller footprint: as messages are kept in memory until they're garbage collected by STABLE (or ack'ed by UNICAST), the savings are really nice...

The downside ? It's an API change for protocol implementers: methods getHeader(), putHeader() and putHeaderIfAbsent() in Message changed from taking a string to taking a short. Plus, if you implement headers, you have to register them in jg-magic-map.xml / jg-protocol-ids.xml and implement Streamable...

Now for some performance numbers. This is a quick and dirty benchmark, without many data points...

perf.Test (see [3] for details) has N senders send M messages of S size to all cluster nodes. This exercises the NAKACK code.

On my home cluster (4 blades with 4 cores each), 1GB ethernet, sending 1000-byte messages:
  • 4 senders, JGroups 2.9.0.GA:         128'000 messages / sec / member
  • 4 senders, JGroups 2.10.0.Alpha2: 137'000 messages / sec / member
  • 6 senders, JGroups 2.10.0.Alpha2: 100'000 messages / sec /member
  • 8 senders, JGroups 2.10.0.Alpha2:  78'000 messages / sec / member
2.10.0.Alpha2 is ca 7% faster for 4 members.

There is also a stress test for unicasts, UnicastTestRpcDist. It mimicks DIST mode of Infinispan and has every member invoke 20'000 requests on 2 members; 80% of those requests are GETs (simple RPCs) and 20% are PUTs (2 RPCs in parallel). All RPCs are synchronous, so the caller always waits for the result and thus blocks for the roud trip time. Every member has 25 threads invoking the RPCs concurrently.

On my home network, I got the following numbers:
  • 4 members, JGroups 2.9.0.GA:         4'500 requests / sec / member
  • 4 members, JGroups 2.10.0.Alpha2: 5'700 requests / sec / member
  • 6 members, JGroups 2.9.0.GA:         4'000 requests / sec / member
  • 6 members, JGroups 2.10.0.Alpha2: 5'000 requests / sec / member
  • 8 members, JGroups 2.9.0.GA:         3'800 requests / sec / member
  • 8 members, JGroups 2.10.0.Alpha2: 4'300 requests / sec / member

In our Atlanta lab (faster boxes), I got (unfortunately only for 2.10.0.Alpha2):

  • 4 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
  • 6 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
  • 8 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
Since the focus of the first half of 2.10.0 was on improving unicast performance, the numbers above are already pretty good and show (at least for up to 8 members) linear scalability.



[1] https://jira.jboss.org/jira/secure/IssueNavigator.jspa?reset=true&pid=10053&fixfor=12314411
[2] https://jira.jboss.org/jira/browse/JGRP-932
[3] http://community.jboss.org/docs/DOC-11594

Monday, December 21, 2009

JGroups 2.8.0.GA released

I'm happy to announce that JGroups 2.8.0 is finally GA !

It has taken us almost a year since the last major release (2.7 was released in January), but to our defense 2.8.0.GA contains a lot of new features and I think they are worth the wait. We also released a number of 2.6.x versions in 2009, which are used in the JBoss Enterprise Application Platform (EAP).

Before I get into a summary of some of the new features (a detailed list can be found at [1]), I'd like to thank all the developers, users and contributors of JGroups. Without this healthy community, producing code, bug reports, patches, documentation and user stories, JGroups wouldn't be anywhere close to where it is today !

So a big thanks to everyone involved, Happy Holidays and a great start into 2010 !

Here's a short list of features that made it into 2.8.0.GA (here are the release notes):
  • Logical addresses: decouples physical addresses (which can change) from logical ones. Eliminates reincarnation issues. This alone is worth 2.8, as it eliminates a big source of problems !
  • Logical names: allow for meaningful channel names, logical names stay with a channel for its lifetime, even after reconnecting it
  • Improved merging / no more shunning: shunning was replaced by merging. Now we have a much simpler model: JOIN - LEAVE - MERGE. The merging algorithm was improved to take 'weird' (e.g. asymmetric) merges into account
  • Better IPv6 support
  • Better support for defaults for addresses: based on the type of the stack (IPv4, IPv6), we perform sanity checks and set default addresses of the correct type
  • FILE_PING / S3_PING: new discovery protocols, file-based and Amazon S3 based. The latter protocol can be used as a replacement for GossipRouter on EC2
  • Speaking of which: major overhaul of GossipRouter
  • Ability to have multiple protocols of the same class in the same stack
  • Ability to override message bundling on a per-message basis
  • Much improved and faster UNICAST
  • XSD schema for protocol configurations
  • STREAMING_STATE_TRANSFER now doesn't need to use TCP, but can also use the configured transport, e.g. UDP
  • RpcDispatcher: additional methods returning a Future rather than blocking
  • Probe.sh: ability to invoke methods cluster-wide. E.g. run message stability on all nodes: probe.sh invoke=STABLE.runMessageGarbageCollection
  • Logging
    • Removal of commons-logging.jar: JGroups now has ZERO dependencies !
    • Configure logging level at runtime, e.g. through JMX (jconsole) or probe.sh, or programmatically. Use case: set logging for NAKACK from "warn" to "trace" for a unit test, then reset it back to "warn"
    • Ability to set custom log provider. This allows for support of new logging frameworks (JGroups ships with support for log4j and JDK logging)
Enjoy !
Bela, Vladimir and Richard

[1] http://javagroups.cvs.sourceforge.net/viewvc/javagroups/JGroups/doc/ReleaseNotes-2.8.txt?revision=1.10&view=markup&pathrev=Branch_JGroups_2_8

[2] http://community.jboss.org/wiki/Support

Thursday, November 05, 2009

IPv6 addresses in JGroups

I finished code to support scoped IPv6 link local addresses [1]. A link local address is an address that's not guaranteed to be unique on a given host (althougbh in most cases it will be), so it can be assigned on different interfaces of the same host.

To differentiate between interfaces, a scope-id can be added, e.g. fe80::216:cbff:fea9:c3b5%en0 or fe80::216:cbff:fea9:c3b5%3, where the %X suffix denotes the interface.

Note that this is only relevant for TCP sockets, multicast or datagram sockets are not affected.

Now, on the server side, we can bind to a scoped or unscoped link-local socket, e.g.

ServerSocket srv_sock=new ServerSocket(7500, 50, InetAddress.getByName("fe80::216:cbff:fea9:c3b5"))

binds to an unscoped link-local address, and

ServerSocket srv_sock=new ServerSocket(7500, 50, InetAddress.getByName("fe80::216:cbff:fea9:c3b5%en0"))

binds to the scoped equivalent.

This is all fine, but on the client side, we cannot use scoped link-local addresses, e.g.

Socket sock=new Socket(InetAddress.getByName("fe80::216:cbff:fea9:c3b5%en0"), 7500)

fails !

The reason is that a scope-id "en0" does not mean anything on a client, which might run on a different host.

The correct code is

Socket sock=new Socket(InetAddress.getByName("fe80::216:cbff:fea9:c3b5"), 7500),

with the scope-id removed.

JGroups runs into this problem, too: whenever we have a bind_addr which is a scoped link-local IPv6 address, certain discovery protocols (e.g. MPING, TCPGOSSIP) will return the scoped addresses, and the joiners will then try to connect to the existing members using the scoped addresses.

To fix this, all Socket.connect() calls in JGroups have been replaced with Util.connect(Socket, SocketAddress, port). This method checks for scoped link-local IPv6 addresses and simply removes the scope-id from the destination address, so the connect() call will work.

Note that this problem doesn't occur with global IPv6 addresses.

I need to test whether this solution works on other operating systems, too, .e.g. on Windows, Solaris and MacOS.

OK, I'm off to http://www.davidoffswissindoors.ch, hope to see some good tennis !

[1] http://www.jboss.org/community/wiki/IPv6

Wednesday, October 28, 2009

JGroups 2.8.0.CR3 released

Unfortunately, a little later than estimated, but better late than never ! The reason is that I got side tracked by EAP 5 performance testing and also by the good feedback from the community (you !) on CR2, and the associated bug reports.

This version contains bug fixes, and mostly work around IPv6 versus IPv4 addresses. We now try to be smart and attempt to find out the type of stack used, and then default undefined IP addresses to addresses of the correct type. Note that IPv6 support is not yet 100% done, I'm continuing to work on this for either CR4 or GA. More on this topic in a later post...

CR3 also added a new feature, which is marshaller pools in the transport. When we send messages, they're either bundled and sent as a batch of messages, or not. In either case, the marshalling of a message or message list is done in an output buffer for which we have to acquire a lock. When we have heavy message sending, e.g. through multiple sender threads, that lock is heavily contended.

Not to say this is a big issue because the sender side is almost never the culprit in slow performance (the receiver side is !), but I've introduced a marshaller pool, which provides N output streams (default=2) rather than 1. The property marshaller_pool_size defines how many output streams we want in the pool and marshaller_pool_initial_size the initial size of each output stream (in bytes).

Note that, for UDP, each output stream can grow up to 63535 bytes, so take that into account when allocating a large number of streams.

In my perf tests, I haven't found that increasing the pool size makes a difference to performance, but if you use many threads which send messages concurrently, this does make a difference.

2.8.0.CR3 can be downloaded from http://sourceforge.net/projects/javagroups/files/JGroups/2.8.0.CR3.
Enjoy !

Friday, September 18, 2009

JGroups 2.6.13.CR2 released

OK, going from CR1 to CR2 doesn't seem like a big deal, and certainly not worth posting as a blog entry ?

You might wonder if I have nothing better to do (like biking in the French Alps) :-)

But actually, there have been significant changes since CR1, so please read on !

CR2 only contains 3 JIRA issues:
  1. Backport of NAKACK from head
  2. Backport of UNICAST from head and
  3. Removal of UNICAST contention issues
#1 is a partial backport of NAKACK from head (2.8) to the 2.6 branch. This version doesn't acquire locks for incoming messages anymore, but uses a CAS (compare-and-swap) operation to decide whether to process a message, or not.

What used to happen when a message from P is received is that we grabbed the receiver window for P and added the message. Then we grabbed the lock associated with P's window and - once acquired - removed as many messages as possible and passed them up to the application sequentially. Sequential order is always respected unless a message is tagged as OOB (out-of-band).

So here's what happened: say we received 10 multicast messages from B and 3 from A. Both A's and B's messages would be delivered in parallel with respect to each other, but sequentially for a given sender. So A's message #34 would always get delivered before #35 before #36 and so on...

However, say we have to process 10 messages from B: 1 2 3 4 5 6 7 8 9 10:
  • Every message would get into NAKACK on a separate thread
  • All the 10 messages would get added into B's receiver window
  • The thread with message #3 would grab the lock
  • All other threads would block, trying to acquire the lock
  • The thread with the lock would remove #1 and pass it up the stack, then #2, then #3 and so on, until it passed #10 up the stack to the application
  • Now it releases the lock
  • All other 9 threads now compete for the lock, but every single thread will return because there are no more messages in the receiver window
This is a terrible waste: we've wasted 9 threads; for the duration of removing and passing up 10 messages, these threads could have been put to better use, e.g. processing other messages !

For example, if our total thread pool only had 10 threads, and 1 of them was processing messages and 9 were blocked on lock acquisition, if a message from a different sender came in (which could be delivered in parallel to B's messages), then no thread would be available !

So the simple but effective change was to replace the lock on the receive window with a CAS: when a thread tries to remove messages, it simply set the CAS from false to true. If it succeed, it goes into the removal loop and sets the CAS back to false when done. Else, the thread simply returns because it knows that someone else will be processing the message it just added.

Result: we've returned 9 threads to the thread pool, ready to serve other messages, without even locking !

The net affect is faster performance and smaller thread pools. As a rule of thumb, a thread pool's max threads can now be around the number of cluster nodes: if every node sends messages, we only need 1 thread per sender to process all of the sender's messages...


#2 has 2 changes: same as above (locks replaced by CAS) and the changes outlined in the design document. The latter changes simplify UNICAST a lot and also handle the cases of asymmetrical connection closings. This was also back-ported from head (2.8)


#3 UNICAST contention issues
We used to have 2 big fat locks in UNICAST, which severely impacted performance on high unicast message volumes. The bottleneck was detected as part of our EAP testing for JBoss.

This has been fixed and is getting forward-ported to CVS head.

I guess the 3 changes are worth trying out 2.6.13.CR2; in some cases this should make a real difference in performance !

Enjoy,