Tuesday, November 30, 2010

Clustering between different sites / geopgraphic failover

I just completed a new feature in JGroups which allows for transparent bridging of separate clusters, e.g. at different sites.

Let's say we have a (local) cluster in New York (NYC) and another cluster in San Francisco (SFO). They're completely autonomous, and can even have completely different configurations.

RELAY [1] essentially has the coordinators of the local clusters relay local traffic to the remote cluster, and vice versa. The relaying (or bridging) is done via a separate cluster, usually based on TCP, as IP multicasting is typically not allowed between sites.

SFO could be a backup of NYC, or both could be active, or we could think of a follow-the-sun model where each cluster is active during working hours at its site.

If we have nodes {A,B,C} in NYC and {D,E,F} in SFO, then there would be a global view, e.g. {D,E,F,A,B,C}, which is the same across all the nodes of both clusters.

One use of RELAY could be to provide geographic failover in case of site failures. Because all of the data in NYC is also available in SFO, clients can simply fail over from NYC to SFO if the entire NYC site goes down, and continue to work.

Another use case is to have SFO act as a read-only copy of NYC, and run data analysis functions on SFO, without disturbing NYC, and with access to almost real-time data.

As you can guess, this feature is going to be used by Infinispan, and since Infinispan serves as the data replication / distribution layer in JBoss, we hope to be able to provide replication / distribution between sites in JBoss as well...

Exciting times ... stay tuned for more interesting news from the Infinispan team !

Read more on RELAY at [1] and provide feedback !
Cheers,


[1] http://www.jgroups.org/manual/html/user-advanced.html#RelayAdvanced

Tuesday, November 23, 2010

JGroups finally has a logo

After conducting a vote on the logos designed by James Cobb, the vast majority voted for logo #1. So I'm happy to say that, after 12 years, JGroups finally has a logo !

I added the logo and favicon to jgroups.org. Let me know what you think !


There's also swag available on cafepress, check it out !

Friday, October 29, 2010

JGroups 2.11 final released

FYI,

2.11.0.final can be downloaded here. Its main features, optimizations and bug fixes are listed below.

I hope that 2.12 will be the last release before finally going to 3.0 !

2.12 should be very small, currently it contains only 8 issues (mainly optimizations).

However, I also moved RELAY from 3.x to 2.12.

RELAY allows for connecting geographically separate clusters into a large virtual cluster. This will be interesting to apps which need to provide geographic failover. More on this in the next couple of weeks...

Meanwhile ... enjoy 2.11 !

Bela, Vladimir & Richard



Release Notes JGroups 2.11
==========================


Version: $Id: ReleaseNotes-2.11.txt,v 1.2 2010/10/29 11:45:35 belaban Exp $
Author: Bela Ban

JGroups 2.11 is API-backwards compatible with previous versions (down to 2.2.7).

Below is a summary (with links to the detailed description) of the major new features.


New features
============



AUTH: pattern matching to prevent unauthorized joiners
------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-996]

New plugin for AUTH which can use pattern matching against regular expressions to prevent unauthorized
IP addresses to join a cluster.

Blog: http://belaban.blogspot.com/2010/09/cluster-authentication-with-pattern.html



DAISYCHAIN: implementation of daisy chaining
--------------------------------------------
[https://jira.jboss.org/browse/JGRP-1021]

Daisy chaining sends messages around in a ring, improving throughput for non IP multicast networks.

Blog: http://belaban.blogspot.com/2010/08/daisychaining-in-clouds.html



New flow control protocols for unicast (UFC) and multicast (MFC) messages
-------------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1154]

MFC and UFC replace FC. They can be used independently, and performance is faster than that of FC only.


API for programmatic creation of channel
----------------------------------------
[https://jira.jboss.org/browse/JGRP-1245]

Allows for programmatic creation of a JChannel, no need for XML config file.

Blog: http://belaban.blogspot.com/2010/10/programmatic-creation-of-channel.html


S3: new features
----------------
[https://jira.jboss.org/browse/JGRP-1234] Allow use of public buckets (no credentials need to be sent)
[https://jira.jboss.org/browse/JGRP-1235] Pre-signed URLs



STOMP: new protocol to allows STOMP clients to talk to a JGroups node
---------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1248]

Blog: http://belaban.blogspot.com/2010/10/stomp-for-jgroups.html







Optimizations
=============


NAKACK: simplify and optimize handling of OOB messages
------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1104]


Discovery: reduce number of discovery responses sent in a large cluster
-----------------------------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1181]

A new propery (max_rank) determines who will and who won't send discovery responses.


New timer implementations
-------------------------
[https://jira.jboss.org/browse/JGRP-1051]

Way more effecient implementations of the timer (TimeScheduler).




Bug fixes
=========

ENCRYPT: encrypt entire message when length=0
---------------------------------------------
[https://jira.jboss.org/browse/JGRP-1242]

ENCRYPT would not encrypt messages whose length = 0


FD_ALL: reduce number of messages sent on suspicion
---------------------------------------------------
[https://jira.jboss.org/browse/JGRP-1241]


FILE_PING: empty files stop discovery
-------------------------------------
[https://jira.jboss.org/browse/JGRP-1246]




Manual
======

The manual is online at http://www.jgroups.org/manual/html/index.html



The complete list of features and bug fixes can be found at http://jira.jboss.com/jira/browse/JGRP.


Bela Ban, Kreuzlingen, Switzerland
Vladimir Blagojevic, Toronto, Canada
Richard Achmatowicz, Toronto, Canada

Nov 2010

Wednesday, October 27, 2010

STOMP for JGroups

FYI,

I've written a new JGroups protocol STOMP, which implements the STOMP protocol. This allows for STOMP clients to connect to any JGroups server node (which has the JGroups STOMP protocol in its configuration).

The benefits of this are:
  •  Clients can be written in any language. For example, I've used stomppy, a Python client, to connect to JGroups server nodes, and successfully subscribed to destinations, and sent and received messages.
  • Sometimes, clients don't want to be peers, ie. they don't want to join a cluster and become full members. These (light-weight) clients could also be in a different geographic location, and not be able to use IP multicasting.
  • Clients are started and stopped frequently, and there might be many of them. Frequently starting and stopping a full-blown JGroups server node has a cost, and is not recommended. Besides, a high churn rate might move the cluster coordinator around quite a lot, preventing it from doing real work.
  • We can easily scale to a large number of clients. Although every client requires 1 thread on the server side, we can easily support hundreds of clients. Note though that I wouldn't use the current JGroups STOMP protocol to connect thousands of clients...
Let's take a quick look: I started an instance of JGroups with STOMP on the top of the protocol stack (on 192.168.1.5). Then I connected to it with the JGroups client:

JGroups STOMP client

As can be seen, the first response the client received was an INFO with information about the available endpoints (STOMP instances) in the cluster. This is actually used by the StompConnection client to failover to a different server node should the currently connected to server fail.
Next, we subscribe to destination /a using the simplified syntax of the JGroups STOMP client.

Then, a telnet session to 192.168.1.5:8787 was started:

Telnet STOMP client



We get the INFO response with the list of endpoints too here. Then we subscribe to the /a destination. Note that the syntax used here is compliant with the STOMP protocol spec: first is the verb (SUBSCRIBE), then an optional bunch of headers (here just one, defining the destination to subscribe to), a newline and finally the body, terminated with a 0 byte. (SUBSCRIBE does not have a body).

Next, we send a message to all clients subscribed to /a. This is the telnet session itself, as evidenced by the reception of MESSAGE. If you look at the JGroups STOMP client, the message is also received there.

Next the JGroups client also sends a message to destination /a, which is received by itself and the telnet client.

JGroups 2.11.0.Beta2 also ships with a 'stompified' Draw demo, org.jgroups.demos.StompDraw, which is a stripped down version of Draw, using the STOMP protocol to send updates to the cluster.

Let me know what you think of this; feature requests, feedback etc appreciated (preferably on one of the JGroups mailing lists) !



The new protocol is part of JGroups 2.11.0.Beta2, which can be downloaded here.

Documentation is here.

Enjoy !

Wednesday, October 20, 2010

Programmatic creation of a channel

I've committed code which provides programmatic creation of channels. This is a way of creating a channel without XML config files. So instead of writing

JChannel ch=new JChannel("udp.xml");

, I can construct the channel programmatically:


JChannel ch=new JChannel(false);                 // 1
ProtocolStack stack=new ProtocolStack(); // 2
ch.setProtocolStack(stack);              // 3
stack.addProtocol(new UDP().setValue("ip_ttl", 8));
     .addProtocol(new PING())
     .addProtocol(new MERGE2())
     .addProtocol(new FD_SOCK())
     .addProtocol(new FD_ALL().setValue("timeout", 12000));
     .addProtocol(new VERIFY_SUSPECT())
     .addProtocol(new BARRIER())
     .addProtocol(new NAKACK())
     .addProtocol(new UNICAST2())
     .addProtocol(new STABLE())
     .addProtocol(new GMS())
     .addProtocol(new UFC())
     .addProtocol(new MFC())
     .addProtocol(new FRAG2());       // 4
stack.init();                         // 5


First, a JChannel is created (1). The 'false' argument means that the channel must not create its own protocol stack, because we create it (2) and stick it into the channel (3).

Next, all protocols are created and added to the stack (4). This needs to happen in the order in which we want the protocols to be, so the first protocol added is the transport protocol (UDP in the example).

Note that we can use Protocol.setValue(String attr_name, Object attr_value) to configure each protocol instance. We can also use regular setters if available.

Finally, we call init() (5), which connects the protocol list correctly and calls init() on every instance. This also handles shared transports correctly. For an example of how to create a shared transport with 2 channels on top see ProgrammaticApiTest.

I see mainly 3 use cases where programmatic creation of a channel is preferred over declarative creation:
  1. Someone hates XML (I'm not one of them) :-)
  2. Unit tests
  3. Projects consuming JGroups might have their own configuration mechanism (e.g. GUI, properties file, different XML configuration  etc) and don't want to use the XML cofiguration mechanism shipped with JGroups.
Let me know what you think about this API ! I deliberately kept it simple and stupid, and maybe there are things people like to see changed. I'm open to suggestions !


Cheers,

Friday, October 01, 2010

Confessions of a serial protocol designer

I have a confession to make.

I'm utterly disgusted by my implementation of FD_ALL, and thanks to David Forget for pointing this out !

What's bad about FD_ALL ? It will not scale at all ! After having written several dozen protocols, I thought an amateurish mistake like the one I'm about to show would certainly not happen to me anymore. Boy, was I wrong !

FD_ALL is about detecting crashed nodes in a cluster, and the protocol then lets GMS know so that the crashed node(s) can be excluded from the view.

Let's take a look at the design.
  • Every node periodically multicasts a HEARTBEAT
  • This message is received by everyone in the cluster and a hashmap of nodes and timestamps is updated; for a node P, P's timestamp is set to the current time
  • Another task run at every node periodcially iterates through the timestamps and checks if any timestamps haven't been updated for a given time. If that's the case, the members with outdated timestamps are suspected
  • A suspicion of P results in a SUSPECT(P) multicast
  • On reception of SUSPECT(P), every node generates a SUSPECT(P) event and passes it up the stack
  • VERIFY_SUSPECT catches SUSPECT(P) and sends an ARE_YOU_DEAD message to P
  • If P is still alive, it'll respond with a I_AM_NOT_DEAD message
  • If the sender doesn't get this message for a certain time, it'll pass the SUSPECT(P) event further up the stack (otherwise it'll drop it), and GMS will exclude P from the view, but if and only if that given node is the coordinator (first in the view)
Can anyone see the flaw in this design ? Hint: it has to do with the number of messages generated...

OK, so let's see what happens if we have a cluster of 100 nodes:
  • Say node P is temporarily slow; it doesn't send HEARTBEATs because a big garbage collection is going on, or the CPU is crunching at 90%
  • 99 nodes multicast a SUSPECT(P) message
  • Every node Q therefore receives 99 SUSPECT(P) messages
    • Q (via VERIFY_SUSPECT) sends a ARE_YOU_DEAD message to P
    • P (if it can) responds with an I_AM_NOT_DEAD back to Q
    • So the total number of messages generated by a single node is 99 * 2
  • This is done on every node, so the total number of messages is 99 * 99 * 2 = 19'602 messages !

Can you imagine what happens to P, which is a bit overloaded and cannot send out HEARTBEATs in time when it receives 19'602 messages ?

It it aint dead yet, it will die !

Isn't it ironic: by asking a node if it is still alive, we actually kill it !

This is an example of where the effects of using IP multicasts were not taken into account: if we multicast M, and everybody who receives M sends 2 messages, I neglected to see that the number of messages sent is a function of the cluster size !

So what's the solution ? Simple, elegant and outlined in [1].
  • Everybody sends a HEARTBEAT multicast periodically
  • Every member maintains a suspect list 
  • This list is adjusted on view changes 
  • Reception of a SUSPECT(P) message adds P to the list 
  • When we suspect P because we haven't received a HEARTBEAT (or traffic if enabled): 
    • The set of eligible members is computed as: members - suspected members 
    • If we are the coordinator (first in the list): 
      • Pass a SUSPECT(P) event up the stack, this runs the VERIFY_SUSPECT protocol and eventually passes the SUSPECT(P) up to GMS, which will exclude P from the view

The cost of running the suspicion protocol is (excluding the periodic heartbeat multicasts):
  • 1 ARE_YOU_DEAD unicast to P
  • A potential response (I_AM_NOT_DEAD) from P to the coordinator
TOTAL COST in a cluster of 100: 2 messages (this is always constant), compared to 19'602 messages before !

This is way better than the previous implementation !


[1] https://jira.jboss.org/browse/JGRP-1241

Wednesday, September 22, 2010

JUDCon 2010 Berlin

I'll be giving a talk at JUDCon 2010 (Oct 7 and 8, Berlin) on how to configure JBoss clusters to run optimally in a cloud (EC2).

It would be cool to see some of you, we can discuss JGroups and other topics over a beer !

The agenda is here.

Cheers,

Friday, September 17, 2010

Cluster authorization with pattern matching

I've added a new plugin to AUTH which allows for pattern matching to determine who can join a cluster.

The idea is very simple: if a new node wants to join a cluster, we only admit the node into the cluster if it matches a certain pattern. For example, we could only admit nodes whose IP address starts with 192.168.* or 10.5.*. Or we could only admit nodes whose logical name is "groucho" or "marx".

Currently, the 2 things I match against are IP address and logical name, but of course any attribute of a message could be used to match against.

Let's take a look at an example.

<AUTH auth_class="org.jgroups.auth.RegexMembership"
      match_string="groucho | marx"
      match_ip_address="false"
      match_logical_name="true" />

This example uses the new plugin RegexMembership (derived from FixedMembership). Its match string (which takes any regular expression as value) says that any node whose logical name is "marx" or "groucho" will be able to join. Note that we set match_logical_name to true here.

Note that AUTH has to be placed somewhere below GMS (Group MemberShip) in the configuration.

<AUTH auth_class="org.jgroups.auth.RegexMembership"
      match_string=
      "192.168.[0-9]{1,3}\.[0-9]{1,3}(:.[0-9]{1,5})?"
      match_ip_address="true"
      match_logical_name="false"  />

This example is a bit more complex, but it essentially says that all nodes whose IP address starts with 192.168 are allowed to join the cluster. So 192.168.1.5 and 192.168.1.10:5546 would pass, while 10.1.4.5 would be rejected.

I have to admit, I'm not really an expert in regular expression, so I guess the above expression could be simplified. For example, I gave up trying to define that hosts starting either with 192.168 or 10.5 could join.
If you know how to do that, please send me the regular expression !

Friday, August 13, 2010

Daisychaining in the clouds

I've been working on a new protocol DAISYCHAIN [1] which is based on research out of EPFL [2].

The idea behind it is that it is inefficient to broadcast a message in clusters where IP multicasting is not available. For example, if we only have TCP available (as is the case in most clouds today), then we have to send a broadcast (or group) message N-1 times. If we want to broadcast M to a cluster of 10, we send the same message 9 times.

Example: if we have {A,B,C,D,E,F}, and A broadcasts M, then it sends it to B, then to C, then to D etc.

If we have a 1 GB switch, and M is 1GB, then sending a broadcast to 9 members takes 9 seconds, even if we parallelize the sending of M. This is due to the fact that the link to the switch only sustains 1GB / sec. (Note that I'm conveniently ignoring the fact that the switch will start dropping packets if it is overloaded, causing TCP to retransmit, slowing things down)...

Let's introduce the concept of a round. A round is the time it takes to send or receive a message. In the above example, a round takes 1 second if we send 1 GB messages.




In the existing N-1 approach, it takes X * (N-1) rounds to send X messages to a cluster of N nodes. So to broadcast 10 messages a the cluster of 10, it takes 90 rounds.


Enter DAISYCHAIN.

The idea is that, instead of sending a message to N-1 members, we only send it to our neighbor, which forwards it to its neighbor, and so on. For example, in {A,B,C,D,E}, D would broadcast a message by forwarding it to E, E forwards it to A, A to B, B to C and C to D. We use a time-to-live field, which gets decremented on every forward, and a message gets discarded when the time-to-live is 0.

The advantage is that, instead of taxing the link between a member and the switch to send N-1 messages, we distribute the traffic more evenly across the links between the nodes and the switch. Let's take a look at an example, where A broadcasts messages m1 and m2 in cluster {A,B,C,D}, '-->' means sending:

Traditional N-1 approach

Round 1: A(m1) --> B
Round 2: A(m1) --> C
Round 3: A(m1) --> D
Round 4: A{m2) --> B
Round 5: A(m2} --> C
Round 6: A(m2) --> D

It takes 6 rounds to broadcast m1 and m2 to the cluster.


Daisychaining approach

Round 1: A(m1) --> B
Round 2: A(m2) --> B || B(m1) --> C
Round 3: B(m2) --> C || C(m1) --> D
Round 4: C(m2) --> D

In round 1, A send m1 to B.
In round 2, A sends m2 to B, but B also forwards m1 (received in round 1) to C.
In round 3, A is done. B forwards m2 to C and C forwards m1 to D(in parallel, denoted by '||').
In round 4, C forwards m2 to D.

Switch usage

Let's take a look at this in terms of switch usage: in the N-1 approach, A can only send 125MB/sec, no matter how many members there are in the cluster, so it is constrained by the link capacity to the switch. (Note that A can also receive 125MB/sec in parallel with today's full duplex links).

So the link between A and the switch gets hot.

In the daisychaining approach, link usage is more even: if we look for example at round 2, A sending to B and B sending to C uses 2 different links, so there are no constraints regarding capacity of a link. The same goes for B sending to C and C sending to D.

In terms of rounds, the daisy chaining approach uses X + (N-2) rounds, so for a cluster size of 10 and broadcasting 10 messages, it requires only 18 rounds, compared to 90 for the N-1 approach !


Performance

I ran a quick performance test this morning, with 4 nodes connected to a 1 GB switch; and every node sending 1 million 8K messages, for a total of 32GB received by every node. The config used was tcp.xml.

The N-1 approach yielded a throughput of 73 MB/node/sec, and the daisy chaining approach 107MB/node/sec !

The change to switch from N-1 to daisy chaining was to place DAISYCHAIN  directly on top of TCP.

DAISYCHAIN is still largely experimental, but the numbers above show that it has potential to improve performance in TCP based clusters.


[1] https://jira.jboss.org/browse/JGRP-1021
[2] infoscience.epfl.ch/record/149218/files/paper.pdf

Monday, July 12, 2010

JGroups 2.10 final released

I'm happy to announce that JGroups 2.10 final has been released. It can be downloaded from SourceForge and contains the following major new features (for a detailed list of the 80+ issues  check 2.10 in JIRA):

SCOPE: concurrent delivery of messages from the same sender
[https://jira.jboss.org/browse/JGRP-822]

By default, messages from a sender P are delivered in the (FIFO) order in which P sent them (ignoring OOB messages for now). However, sometimes it would be beneficial to deliver unrelated messages concurrently, e.g. modifications sent by P for different HTTP sessions.

SCOPE is a new protocol, which allows a developer to define a scope for a message, and that scope is then used to deliver messages from P concurrently.

See http://www.jgroups.org/manual/html/user-advanced.html#Scopes for details.


Use of factory to create sockets
[https://jira.jboss.org/browse/JGRP-278]

There's now a method Protocol.setSocketFactory(SocketFactory) which allows to set a socket factory, used to create and close datagram and TCP (client and server) sockets. The default implementation keeps track of open sockets, so
./probe.sh socks
dumps a list of open sockets.


UNICAST2: experimental version of UNICAST based on negative acks
[https://jira.jboss.org/browse/JGRP-1140]

By not sending acks for received messages, we can cut down on the number of acks. UNICAST2 is ca 20-30% faster than UNICAST as a result. Needs more testing though, currently UNICAST2 is experimental.


Certain IPv4 addresses should be allowed in an IPv6 stack
[https://jira.jboss.org/browse/JGRP-1152]

They will be converted into IPv6 mapped IPv4 addresses. This relaxes the (too restrictive) IP address conformance testing somewhat, and allows for more configurations to actually start the stack and not fail with an exception.


Multiple components using the same channel
[https://jira.jboss.org/browse/JGRP-1177]

This is a new light weight version of the (old and dreaded !) Multiplexer, which allows for sharing of channels between components, such as for example HAPartition and Infinispan.

*** Only to be used by experts ! ***


MERGE2: fast merge
[https://jira.jboss.org/browse/JGRP-1191]

Fast merge in case where we receive messages from a member which is not part of our group, but has the same group name.


RpcDispatcher / MessageDispatcher: add exclusion list
[https://jira.jboss.org/browse/JGRP-1192]

If an RPC needs to be sent to all nodes in a cluster except one node (e.g. the sender itself), then we can simply exclude the sender. This is done using
RequestOptions.setExclusionList(Address ...  xcluded_mbrs).
This is simpler than having to create the full list, and remove the sender.


Ability to use keywords instead of IP addresses
[https://jira.jboss.org/browse/JGRP-1204]

Whenever IP addresses (symbolic or dotted-decimal notation) are used, we can now use a keyword instead. Currently, the keywords are "GLOBAL" (public IP address), "SITE_LOCAL" (private IP address), "LINK_LOCAL" (link local), "LOOPBACK" (a loopback address) and "NON_LOOPBACK" (any but a loopback address).
This is useful in cloud environments where IP address may not be known beforehand.
Example: java -Djgroups.bind_addr=SITE_LOCAL
Example:



GossipRouter: re-introduce pinging to detect crashed clients
[https://jira.jboss.org/browse/JGRP-1213]

When clients are terminated without closing of sockets (e.g. in virtualized environments), they'd cause their
entries to not be removed in GossipRouter. This was changed by (re-)introducing pinging.








Feeback is appreciated via the usual channels (mailing list, IRC) !
Enjoy !

Bela Ban
Vladimir Blagojevic
Richard Achmatowicz

Friday, July 09, 2010

mod-cluster webinar: video available on vimeo

On July 7th, I did a webinar on mod-cluster, and it was a huge success: 1215 people signed up and 544 attended the webinar ! I'm told that this is the second highest turnout ever for Red Hat (the highest being an xvirt webinar a couple of years ago, with 600 attendees)...

For those who missed the webex presentation, here's the link to the recorded video. For those who only want to see the demo, it is here.

The demo is really cool: I set up a huge cluster in the cloud, spanning GoGrid, EC2 and Rackspace as clouds, and fronting a JBoss 6 based cluster with mod-cluster.

I showed how cluster nodes dynamically register themselves with httpd, or de-register when shutting down, and how web applications get registered/de-registered.

For those who know mod-jk: no more workers.properties or uriworkmap.properties are needed !

The coolest part was where I ran a load test, simulating 80 clients, each creating and destroying a session every 30 seconds: initially I ran 2 cluster nodes on EC2, so every node had 40 sessions on average. Then I started another EC2 instance, a GoGrid instance and 2 Rackspace instances, and after a few minutes, there were 3 mod-cluster domains with 3, 1 and 2 servers respectively, and every server had ca 12 sessions on average !

This can be compared to a bookshop, which spins up additional servers in the cloud around the holidays to serve increased traffic, and where the servers form a cluster for redundancy (don't want to lose your shoppig cart !).

Enjoy the demo, and give us feedback on mod-cluster on the mailing list or forum.

Bela

Friday, May 07, 2010

JBossWorld in Boston and bike riding in California

I'll be talking about mod-cluster at JBossWorld this June. It was a good talk last year, and I've spiced up the demo even more: I'm going to show 2 Apache httpd instances running in different clouds, and 3 domains of JBoss instances, also running in 3 different clouds (GoGrid, Amazon EC2 and Rackspace).

This will be a fun talk, showing the practical aspects of clouds, and not focusing on the hype (I leave that to marketing :-)).

This led to some changes in JGroups, which I'll talk about in my next blog post.

It would be cool to see some of you at JBW !

After that, I'll fly to the best place in the US: the Bay Area ! I'll be there June 25 until July 2nd and will rent a race bike, to ride my 5 favorite rides (from the time when I lived in San Jose). A friend will join me for some insane riding (he's preparing for the Death Ride), so this will definitely be fun !

Now let's just hope that some unknown volcano in Iceland doesn't stop me from making the trip to the US ! :-)

Saturday, March 27, 2010

Scopes: making message delivery in JGroups more concurrent

In JGroups, messages are delivered in the order in which they were sent by a given member. So when member X sends messages 1-3 to the cluster, then everyone will deliver them in the order X1 -> X2 -> X3 ('->' means 'followed by').

When a different member Y delivers messages 4-6, then they will get delivered in parallel to X's messages ('||' means 'parallel to'):
X1 -> X2 -> X3 || Y4 -> Y5 -> Y6

This is good, but what if X has 100 HTTP sessions and performs session replication ?

All modifications to the sessions are sent to the cluster, and will get delivered in the order in which they were performed.

The problem here is that even updates to different sessions will be ordered, e.g. if X updates sessions A, B and C, then we could end up with the following delivery order (X is omitted for brevity):
A1 -> A2 -> B1 -> A3 -> C1 -> C2 -> C3

This means that update 1 to session C has to wait until updates A1-3 and B1 have been processed; in other words, an update has to wait until all updates ahead of it in the queue have been processed !

This unnecessarily delays updates: since updates to A, B and C and unrelated, we could deliver them in parallel, e.g.:

A1 -> A2 -> A3 || B1 || C1 -> C2 -> C3

This means that all updates to A are delivered in order, but parallel to updates to B and updates to C.

How is this done ? Enter the SCOPE protocol.

SCOPE delivers messages  in the order in which they were sent within a given scope. Place it somewhere above NAKACK and UNICAST (or SEQUENCER).

To give a message a scope, simply use Message.setScope(short). The argument should be as unique as possible, to prevent collisions.

The use case described above is actually for real, and we anticipate using this feature in HTTP session replication / distribution in the JBoss application server !

More detailed documentation of  scopes can be found at [1]. Configuration of the SCOPE protocol is described in [2].

This is yet an experimental feature, so feedback is appreciated !

[1] Scopes
[2] The SCOPE protocol

Friday, March 05, 2010

Status report: performance of JGroups 2.10.0.Alpha2

I've already improved (mainly unicast) performance in Alpha1, a short list is:

  • BARRIER: moved lock acquired by every up-message out of the critical path
  • IPv6: just running a JGroups channel without any system props (e.g. java.net.preferIPv4Stack=true) now works, as IPv4 addresses are mapped to IP4-mapped IPv6 addresses under IPv6
  • NAKACK and UNICAST: streamlined marshalling of headers, drastically reducing the number of bytes streamed when marshalling headers
  • TCPGOSSIP: Vladimir fixed a bug in RouterStub which caused GossipRouters to return incorrect membership lists, resulting in JOIN failures
  • TP.Bundler:
    • Provided a new bundler implementation, which is faster than the default one (the new *is* actually the default in 2.10)
    • Sending of message lists (bundling): we don't ship the dest and src address for each message, but only ship them *once* for the entire list
  • AckReceiverWindow (used by UNICAST): I made this almost lock-free, so concurrent messages to the same recipient don't compete for the same lock. Should be a nice speedup for multiple unicasts to the same sender (e.g. OOB messages)
The complete list of features is at [1].

In 2.10.0.Alpha2 (that's actually the current CVS trunk), I replaced strings as header names with IDs [2]. This means that for each header, instead of marshalling "UNICAST" as a moniker for the UnicastHeader, we marshal a short.

The string (assuming a single-byte charset) uses up 9 bytes, whereas the short uses 2 bytes. We usually have 3-5 headers per message, so that's an average of 20-30 bytes saved per message. If we send 10 million messages, those saving accumulate !

Not only does this change make the marshalled message smaller, it also means that a message kept in memory has a smaller footprint: as messages are kept in memory until they're garbage collected by STABLE (or ack'ed by UNICAST), the savings are really nice...

The downside ? It's an API change for protocol implementers: methods getHeader(), putHeader() and putHeaderIfAbsent() in Message changed from taking a string to taking a short. Plus, if you implement headers, you have to register them in jg-magic-map.xml / jg-protocol-ids.xml and implement Streamable...

Now for some performance numbers. This is a quick and dirty benchmark, without many data points...

perf.Test (see [3] for details) has N senders send M messages of S size to all cluster nodes. This exercises the NAKACK code.

On my home cluster (4 blades with 4 cores each), 1GB ethernet, sending 1000-byte messages:
  • 4 senders, JGroups 2.9.0.GA:         128'000 messages / sec / member
  • 4 senders, JGroups 2.10.0.Alpha2: 137'000 messages / sec / member
  • 6 senders, JGroups 2.10.0.Alpha2: 100'000 messages / sec /member
  • 8 senders, JGroups 2.10.0.Alpha2:  78'000 messages / sec / member
2.10.0.Alpha2 is ca 7% faster for 4 members.

There is also a stress test for unicasts, UnicastTestRpcDist. It mimicks DIST mode of Infinispan and has every member invoke 20'000 requests on 2 members; 80% of those requests are GETs (simple RPCs) and 20% are PUTs (2 RPCs in parallel). All RPCs are synchronous, so the caller always waits for the result and thus blocks for the roud trip time. Every member has 25 threads invoking the RPCs concurrently.

On my home network, I got the following numbers:
  • 4 members, JGroups 2.9.0.GA:         4'500 requests / sec / member
  • 4 members, JGroups 2.10.0.Alpha2: 5'700 requests / sec / member
  • 6 members, JGroups 2.9.0.GA:         4'000 requests / sec / member
  • 6 members, JGroups 2.10.0.Alpha2: 5'000 requests / sec / member
  • 8 members, JGroups 2.9.0.GA:         3'800 requests / sec / member
  • 8 members, JGroups 2.10.0.Alpha2: 4'300 requests / sec / member

In our Atlanta lab (faster boxes), I got (unfortunately only for 2.10.0.Alpha2):

  • 4 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
  • 6 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
  • 8 members, JGroups 2.10.0.Alpha2: 10'900 requests / sec / member
Since the focus of the first half of 2.10.0 was on improving unicast performance, the numbers above are already pretty good and show (at least for up to 8 members) linear scalability.



[1] https://jira.jboss.org/jira/secure/IssueNavigator.jspa?reset=true&pid=10053&fixfor=12314411
[2] https://jira.jboss.org/jira/browse/JGRP-932
[3] http://community.jboss.org/docs/DOC-11594