Tuesday, December 03, 2024

JGroups 5.4 released

I'm happy to announce the release of 5.4! 

 

It has roughly 60 bug fixes, performance improvements and new features. Details can be found here: [1].

The most important new features / enhancements are described below.

Enjoy!

 

NAKACK4

NAKACK2 used to have unbounded message buffers (for retransmission and delivery). This required STABLE to purge messages seen by all members from retransmission buffers and MFC (= multicast flow control) to prevent fast senders from making receiver message buffers grow too large.

NAKACK4 uses fixed sized buffers with a maximum capacity. When senders attempt to add messages to a full send-buffer, they block until there is space. Receivers also drop a message when there is no space. Receivers remove messages that have been delivered, and also send ACKs back to the sender. When a sender receives ACKs, it computes the minimum and then purges all messages lower than that from its send-buffer.

Because stability is implemented by ACKing, STABLE can be removed from a configuration that has NAKACK4. Because senders block on full send-buffers, MFC can also be removed.

Memory pressure should be smaller with NAKACK4 than with NAKACK2; the former doesn't need to dynamically increase and shrink its receive- and send-buffers, but always uses the same buffer for messages, and only changes write- and read-pointers.

Examples of the new cpnfig are for example ./conf/udp-new.xml, ./conf/tcp-new.xml.

https://issues.redhat.com/browse/JGRP-2780 

Documentation: http://www.jgroups.org/manual5/index.html#NAKACK4

 

UNICAST4

Like NAKACK4, UNICAST4 also uses fixed-size buffers. When used in a configuration, unicast flow control (UFC) can be removed.

https://issues.redhat.com/browse/JGRP-2843 

Documentation: http://www.jgroups.org/manual5/index.html#UNICAST4

 

JDBC_PING2

JDBC_PING2 is a refactored version of JDBC_PING. Besides refactoring, the major changes are 

  • Text (rather than binary) column types; this provides for more legibility when looking at the DB. For example, a simple select allows to look at all members of a given cluster.
  • Support for stored procedures. This makes discovery faster

https://issues.redhat.com/browse/JGRP-2795

Documentation: http://www.jgroups.org/manual5/index.html#JDBC_PING2

 

Better time units in attributes

Time values in protocols can now be defined in a more simple way, e.g.

  • "240000" -> "4m" // 4 minutes
  • "500" -> "0.5s" // 0.5 seconds

https://issues.redhat.com/browse/JGRP-2811 

Documentation: http://www.jgroups.org/manual5/index.html#AttributeValues

 

Measuring of round-trip times between members

Round-trip times, measured at the transport of a given stack, can now be fetched. This can be done programmatically, or via probe.

https://issues.redhat.com/browse/JGRP-2812

 

Faster processing of loopback messages

Loopback messages are messages from A -> A or from A -> all. (In the latter case, a message sent to all members is also looped back up to the sender).

So far, loopback messages have always been passed to the thread pool to be passed up on a seperate thread one by one. This was inefficient, and increased lock contention with messages received from other members. Also, message bundling/batching was forfeited.

This change passes loopback messages down to the message bundler, which queues and subsequently passes up message batches rather than individual messages.

In other words, message bundling is now applied to loopback messages, too.

https://issues.redhat.com/browse/JGRP-2831

 

More efficient Average

Classes Average and AverageMinMax are used to collect statistics. The change introduced a fixed array of samples, avoiding a costly product and overflow check.

This change should speed up collections of averages (used by many protocol stats).

https://issues.redhat.com/browse/JGRP-2832

 

FastArray improvements

Class FastArray is used internally over ArrayList. The latter compacts the array when iteration of it removes elements; the former only nulls them. This is a common scenario when handling message batches, leading to a performance increase.

https://issues.redhat.com/browse/JGRP-2834


[1] https://issues.redhat.com/projects/JGRP/versions/12425884

Tuesday, September 12, 2023

JGroups 5.3 released

I just released JGroups 5.3.

The 5.2 branch (last stable release: 5.2.19) is stable and will only be modified when bug fixes are backported from 5.3. All new develpoment will be done on the 5.3 branch.

The major new feature of the 5.3 release is RELAY3, which provides asymmetric routing between sites. This means that a given site doesn't need to be connected to all other sites, e.g.:

A <--> B <--> C <--> D

Site A is connected to site B, B to C and C to D. If a member in site D wants to send a message M to a member in site A, then M needs to be forwarded to C, then to B which forwards M to the member in A.

RELAY3 accepts the same configuration as RELAY2, but cannot be used with RELAY2.

The documentation is at [2].

Enjoy!


[1] https://issues.redhat.com/projects/JGRP/versions/12343297

[2] http://www.jgroups.org/manual5/index.html#Relay3Advanced


Friday, April 28, 2023

Support for TLS/SSL in TCP

In version 5.2.15 (to be released soon), TLS can be enabled in TCP via a simple configuration change:

<TCP
     tls.enabled="true"
     tls.client_auth="NEED"
     tls.keystore_path="good-server.jks"
     tls.keystore_password="password"
     tls.keystore_alias="server"
...
/>

This installs an SSLSocketFactory into TCP, creating SSLSockets instead of Sockets and SSLServerSockets instead of ServerSockets.
 
This is an alternative to SYM_ENCRYPT.
 
Details can be found in [1].
 
Cheers,



Wednesday, May 26, 2021

JGroups 5.1.7 released

I'm happy to announce that 5.1.7 has been released!

The major new features are FD_SOCK2 [1] and VERIFY_SUSPECT2 [2].

The complete list of features and bug fixes is at [4].

Here's a short description of the major changes/additions:


FD_SOCK2

This is a rewrite of FD_SOCK, which was created 20 (!) years ago. The old protocol has worked surprisingly well, given its brittle and complex design. FD_SOCK2 should be much more robust, as I've eliminated the cache between ports and members, and code which maintains this cache.

Also, FD_SOCK2 (re-)uses NioServer, which means that we'll use 1 (select) thread instead of 3 in FD_SOCK.

Compared to FD_SOCK's 1235 LOC, FD_SOCK2 has 723 LOC with the same functionality.


VERIFY_SUSPECT2

The major change over VERIFY_SUSPECT is that VERIFY_SUSPECT2 bundles SUSPECT events sent up the stack. This reduces the problem where view installation runs into a timeout waiting for acks from crashed members.

When X crashed, and then Y crashed a few milliseconds later, then VERIFY_SUSPECT would have sent up events SUSPECT(X) and then SUSPECT(Y), whereas VERIFY_SUSPECT2 sends up SUSPECT(X,Y) *if* X and Y crashed in the same time window (1s by default).

This speeds up the installation of the new view, especially when multiple members have crashed.


No need to use jmx= or op= in probe

This is only syntatic sugar, but now we can shorten probe.sh jmx=UDP.bind to probe.sh UDP.ping and probe.sh op=TCP.printConnections to probe.sh TC.printConnections[]. This comes in handy when switching between attributes and operations. JIRA: [3]


[1] https://issues.redhat.com/browse/JGRP-2521

[2] https://issues.redhat.com/browse/JGRP-2558

[3] https://issues.redhat.com/browse/JGRP-2413

[4] https://issues.redhat.com/projects/JGRP/versions/12355552


Tuesday, December 22, 2020

Running jgroups-raft as a service

This is a short tutorial on running a Raft cluster [1] in Kubernetes. It shows how to run a jgroups-raft cluster of 3 nodes, then connects to it with a client.

Running the jgroups-raft cluster

This is very simple with Kubernetes:

kubectl apply -f https://raw.githubusercontent.com/belaban/jgroups-raft/master/conf/rsm.yaml

This downloads belaban/jgroups-raft:blog and starts 3 StatefulSet instances. The instances are named jgroups-raft-0, jgroups-raft-1 and jgroups-raft-2. The persistent data is stored in /mnt/data. Note that the load balancer fronting the 3 instances is listening on port 1965:

netstat -na -f inet |grep 1965
tcp46      0      0  *.1965                 *.*                    LISTEN 

We can look at the cluster with probe:

kubectl exec jgroups-raft-2 probe.sh
#1 (176 bytes):
local_addr=jgroups-raft-2
physical_addr=10.1.0.207:58801
view=[jgroups-raft-1|2] (3) [jgroups-raft-1, jgroups-raft-0, jgroups-raft-2]
cluster=rsm
version=5.1.3.Final (Stelvio)

#2 (176 bytes):
local_addr=jgroups-raft-1
physical_addr=10.1.0.206:35596
view=[jgroups-raft-1|2] (3) [jgroups-raft-1, jgroups-raft-0, jgroups-raft-2]
cluster=rsm
version=5.1.3.Final (Stelvio)

#3 (176 bytes):
local_addr=jgroups-raft-0
physical_addr=10.1.0.205:46824
view=[jgroups-raft-1|2] (3) [jgroups-raft-1, jgroups-raft-0, jgroups-raft-2]
cluster=rsm
version=5.1.3.Final (Stelvio)

This shows the 3 instances, all having the same view (jgroups-raft-1|2). This shows that the cluster has formed correctly.

Running the client

This is a bit more involved. We could clone the jgroups-raft repo and build the client from source, but for this tutorial, we'll simply download the relevant JARs (jgroups-raft, JGroups) from maven central.

mkdir lib

curl -o ./lib/jgroups.jar https://repo1.maven.org/maven2/org/jgroups/jgroups/5.1.3.Final/jgroups-5.1.3.Final.jar

curl -o ./lib/raft.jar https://repo1.maven.org/maven2/org/jgroups/jgroups-raft/1.0.1.Final/jgroups-raft-1.0.1.Final.jar

java -cp "./lib/*" org.jgroups.raft.client.ReplicatedStateMachineClient

The client connects to the load balancer listening on port 1965, which redirects the request to one of the 3 instances. It can be used to modify/view the replicated state maintained by jgroups-raft, e.g.:

[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit

1
key: name
value: Bela
[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit

1
key: id
value: 500
[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit

4
[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit

{name=Bela, id=500}
5
[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit


index (term): command
---------------------
21 (11379): put(name, Bela)
22 (11379): put(id, 500)

v
[1] add [2] get [3] remove [4] show all [5] dump log [6] snapshot [v] view [x] exit

local address: jgroups-raft-0
view: [jgroups-raft-1|2] (3) [jgroups-raft-1, jgroups-raft-0, jgroups-raft-2]

[1] adds a key and value to the replicated state, 4 shows the entire state and [5] shows the log. Press 'v' to see the cluster view.

 

Conclusion

Using Kubernetes is a quick way to to run a 3-node jgroups-raft cluster as a service. The demo above showed ReplicatedStateMachine, but - of course - other services are possible, too. For instance, one could write a Yaml file which starts a replicated counter service easily.

On the client side, a simple protocol to set/get and remove data was implemented. A more sophisticated client could for example use gRPC for the communication between client and service.

Questions and feedback please to the mailing list [3].

Enjoy!


[1] http://belaban.github.io/jgroups-raft/

[2] https://hub.docker.com/repository/docker/belaban/jgroups-raft

[3] https://groups.google.com/g/jgroups-raft

 

Tuesday, November 24, 2020

I hate distributed locks!

I hate distributed locks!

Adding distributed locks to JGroups has been the second biggest mistake that I've made (next to MuxRpcDispatcher). Unfortunately, a lot of people are using them in JGroups...

Distributed locks don't work. Or, at least, don't work the way developers expect them to.

TL;DR

  • Assumption that distributed locks have the same semantics as local locks
  • Multiple cluster members in JGroups can hold the same lock when there is a partition
  • The try-lock-finally-unlock pattern is unsuitable for taking locks away from a holder
  • Some scenarios are better handled by using transactions instead
  • Even consensus-based implementations have their share of problems

Distributed locks

The simplest distributed lock implementation is a (fixed) lock server (DLM) which cluster members contact to acquire or release locks. The server may persist locks in a database, so it still knows which members hold locks on a restart. This may be okay for applications that can tolerate unavailability of the lock server and/or the database. If your application is fine with this, then there's no need to read on!

However, the single lock server may quickly become a bottleneck when many members want to acquire or release locks. It is also a single point of failure. This is not the distributed lock implementation I'm talking about (although it works).

What I'm talking about are clustered distributed locks; in today's distributed systems, lock information is typically replicated to all or a subset of the cluster members. This avoids the database bottleneck, but brings with it its own slew of problems.

The biggest problem is that distributed system can have partitions.

Partitions occur when cluster members are separated from each other; when members falsely suspect each other of having crashed, for example because of a long GC cycle, an exhausted thread pool (so heartbeats are dropped), a flaky NIC, or a router dropping packets.

The example I'll use is a cluster {A,B,C} being partitioned into {A} and {B,C}. A suspects and removes members B and C, because it thinks they died, and members B and C remove A for the same reason. Note that B and C can still talk to each other.

 

Distributed locking in JGroups

How does the JGroups lock service [2] handle partitions? Well, it doesn't!

Let's assume A holds lock L before the partition. C wants to acquire L, but has to wait until A releases it, or crashes. Now partition {A} | {B,C} occurs. B becomes coordinator in {B,C} and C is able to acquire L. A remains coordinator in {A}.

This means that both A and C now hold lock L!

This is because the lock service implementation in JGroups favors availability over consistency (AP, see [1] for details).

This may be acceptable for some applications, but I suspect it's not ok for most. Typically, locks are used to make sure only 1 thread in the cluster accesses a shared resource, or performs an action that modifies some shared state. Unless these actions are idempotent, running them multiple times may wreak havoc.

Even worse, what happens when the partition heals? Now A and C are both holding lock L. The Lock interface mandates code like this:

mylock.lock();

try {

    // do some work 

}

finally {

    mylock.unlock();

}

Because both A and C might be doing some work in the try-clause, the following issues arise:

  1. How do you stop one of the two (the member which is not supposed to hold the lock anymore)?
  2. Do you interrupt the thread? 
  3. What happens to the work that has already been done, ie. state changes?

Point #1 shows that the try-lock-finally-unlock pattern is not a good one when it comes to taking locks away (forcefully) from a member. Actually, I'm not sure a good interface exists at all for forcefully removing locks from a member!

Point #3 is about what should be done with the work done until the point of lock removal? Should it be rolled back? Have other threads seen the changes so far?

The try-lock-finally-unlock pattern does not guarantee ACIDity, so perhaps people who are using the lock service in this manner should replace it with distributed transactions, and roll back the transaction on lock removal (a.k.a transaction abort)?


Consensus to the rescue?

Perhaps we should prevent multiple members from being able to acquire the same lock in the first place, instead of taking away locks? How about we use a consensus based system like jgroups-raft [3]? This is a Raft [4] implementation built on top of JGroups.

Consensus means that a change to the system can only be made when the majority of the members agree. In the case of {A,B,C}, at least 2 members have to agree on a change, otherwise the change won't be applied ('committed' in Raft terms).

In terms of CAP [1], this means that consistency (CP) is favored over availability.

We now introduce a compare-and-swap command to acquire a lock, which acquires lock L only if it is null. When a majority of the members is able to commit the command, then all members will record the outcome either as successful or failed.

For example, if "L" is not set, A is able to acquire the lock by calling compareAndSwap("L", null, "A"). This means atomically set "L" to "A" if "L" is null.

When the partition {A} | {B,C} occurs, A still holds the lock but is not able to release it because the compareAndSwap("L", "A", "null") operation will fail as it doesn't get a majority.

In the other partition {B,C}, B may become leader, but nobody will be able to acquire the lock as the compare-and-swap operation will fail because "L" is not null, but set to "A".

This means that members which want to acquire L have to wait until the partition has healed and A releases the lock. Even worse: if A crashed, it would be to be restarted, in order to release the lock! This is not better than using a DistributedLockManager (DLM) described at the top of this blog!

We have to ask ourselves whether locks make sense in a consensus-based system. If we make changes to a shared state, then - instead of using locks - we should use the consensus-based system directly to make changes. After all, consensus will serialize state changes, which is what locks promise.

 

Conclusion

AP (JGroups lock service) and CP (jgroups-raft) lie at the opposite ends of the reliability spectrum, and applications have to choose between them.

Like choosing between pest and cholera, when using the lock service in JGroups, due to the AP properties, we can have multiple holders of the same lock.

With jgroups-raft and its CP properties, only 1 member holds a lock at any given time, but members trying to acquire a lock may potentially be blocked for a long time.


[1] https://en.wikipedia.org/wiki/CAP_theorem

[2] http://www.jgroups.org/manual5/index.html#LockService

[3] http://belaban.github.io/jgroups-raft/

[4] https://raft.github.io/


Friday, September 04, 2020

One size fits all JGroups?

We're getting one step closer to having just a single JGroups program that runs in any environment! There are 3 things that need to be made to make this possible:

  1. Multiple discovery protocols: this allows for multiple discovery protocols to be present in the same configuration. For example, DNS_PING or KUBE_PING to run in Kubernetes environments, MPING when IP multicasting is available, TCPPING for a static list of members etc. DONE: [1]
  2. Multiple transports: this can run a UDP and TCP transport side by side. If IP multicasting is not available, we can fall back to TCP. Or, even if multicasting is available, use TCP for one-to-one messages and UDP for one-to-all messages. NOT DONE yet: [2]
  3. Use GraalVM to compile this down to a native executable. This could be shipped in a Docker image, so it could be run anywhere Docker/Kubernetes is available. NOT DONE yet.

Step #3 is optional, but would help for quick startup times.

Step #2 is not really needed if we know that all environments run in a cloud where IP multicasting is not supported, so we can ship configs with TCP as transport. But if we know that some customers deploy locally, where IP multicasting is available, and others in environments where multicasting is disabled, or in clouds, then multiple transports will be helpful, as we can ship and support a single configuration.

Step #1 is probably the most important one: there are ~13-15 discovery protocols available today, reflecting the wide range of different environments. Being able to ship a config that includes multiple discovery protocols allows us to support a single configuration for many different customers.

In the future, we could think of code that looks at unused/inactive discovery protocols, or even transports, and removes them after some time. Kind of like just-in-time (JIT) optimizations in the JVM...

Feature [1] will be in 5.1. If you want to try this out today, head over to Github [3], clone the JGroups repo and generate your own JAR.

Cheers,


[1] https://issues.redhat.com/browse/JGRP-2230

[2] https://issues.redhat.com/browse/JGRP-1424

[3] https://github.com/belaban/JGroups

 


Thursday, August 06, 2020

JGroups 5.0.0.Final released

I'm happy to announce that JGroups 5.0.0.Final has been released!

The new features are described in [1]. Below's a list of the major JIRAs:
  • https://issues.redhat.com/browse/JGRP-2218: this is the most important change in 5.0.0: it changes Message into an interface and allows for different implementations of Message
  • https://issues.redhat.com/browse/JGRP-2450: support for virtual threads (fibers). If the JDK (probably 16 and higher) supports virtual threads, then they can be enabled by setting use_fibers to true in the transport. This will effectively bypass the thread pool(s) and use virtual threads instead. See [2] for details.
  • https://issues.redhat.com/browse/JGRP-2451: FD_ALL3 is a more efficient failure detection protocol; counts messages received from P as heartbeats, and P suppresses heartbeats when sending messages. This should reduce traffic on the network
  • https://issues.redhat.com/browse/JGRP-2462: implementation of Random Early Drop (RED) protocol, which starts dropping messages on the send side when the queue becomes full. This prevents message storms (by unneeded retransmission requests when messages are not received) and/or blocking
  • https://issues.redhat.com/browse/JGRP-2402: new protocol SOS to captures vital stats and dump them to a file periodically
  • https://issues.redhat.com/browse/JGRP-2401: versioned configuration. Stacks won't start if the versions of JGroups and the configuration differ (not for micro versions). This prevents use of old/outdated configurations with a newer JGroups release
  • https://issues.redhat.com/browse/JGRP-2476: more efficient marshalling of classes. Reduces size of RPCs in RpcDispatcher
The documentation can be found at [3].
Enjoy!




Friday, July 17, 2020

Double your performance: virtual threads (fibers) and JDK 15/16!

If you use UDP as transport and want to double your performance: read on!

If you use TCP, your performance won't change much. You might still be interested in what more recent JDKs and virtual threads (used to be called 'fibers') will bring to the table.

Virtual threads


Virtual threads are lightweight threads, similar in concept to the old Green Threads, and are managed by the JVM rather than the kernel. Many virtual threads can map to the same OS native (carrier) thread (only one at a time, of course), so we can have millions of virtual threads.

Virtual threads are implemented with continuations, but that's a detail. What's important is that all blocking calls in the JDK (LockSupport.park() etc) have been modified to yield rather than block. This means that we don't waste the precious native carrier thread, but simply go to a non-RUNNING state. When the block is over, the thread is simply marked as RUNNABLE again and the scheduler continues the continuation where it left off.

Main advantages:
  • Blocking calls don't need to be changed, e.g. into reactive calls
  • No need for thread pools: simply create a virtual thread
  • Fewer context switches (reduced/eliminated blocking calls)
  • We can have lots of virtual threads
It will be a while until virtual threads show up in your JDK, but JGroups has already added support for it: just set use_fibers="true" in the transport. If the JVM supports virtual threads, they will be used, otherwise we fall back to regular native threads.


UDP: networking improvements 

While virtual threads bring advantages to JGroups, the other performance increase can be had by trying a more recent JDK.

Starting in JDK 15, the implementation of DatagramSockets and MulticastSockets has been changed to delegate to DatagramChannels and MulticastChannels. In addition, virtual threads are supported.

This increases the performance of UDP which uses DatagramChannels and MulticastChannels.

The combination of networking code improvements and virtual threads leads to astonishing results for UDP, read below.

Performance

I used UPerf for testing on a cluster of 8 (physical) boxes (1 GBit ethernet), with JDKs 11, 16-ea5 and 16-loom+2-14. The former two use native threads, the latter uses virtual threads.

As can be seen in [1], UDP's performance goes from 44'691 on JDK 11 to 81'402 on JDK 16-ea5; that's a whopping 82% increase! Enabling virtual threads increases the performance between 16-ea5 and 16-loom+2-14 to 88'252, that's another 8%!

The performance difference between JDK 11 and 16-loom is 97%!

The difference in TCP's performance is miniscule; I guess because the TCP code was already optimized in JDK 11.

Running in JDK 16-loom+2-14 shows that UDP's performance is now on par with TCP, as a matter of fact, UDP is even 3% faster than TCP!

If you want to try for yourself: head over to the JGroups Github repo and create the JAR (ant jar). Or wait a bit: I will soon release 5.0.0.Final which contains the changes.

Not sure if I want to backport the changes to the 4.x branch...

Enjoy!

[1] https://drive.google.com/file/d/1Ars1LOM7cEf6AWpPwZHeIfu_kKLa9gv0/view?usp=sharing

[2] http://openjdk.java.net/jeps/373

Tuesday, June 30, 2020

New Netty transport

I'm happy to announce that Baizel Mathew and Steven Wong have written a new transport protocol using Netty!

Read Baizel's announcement here: [1]; for the code look here: [2].

I anticipate that the Netty transport will replace TCP_NIO2 over time.

The maven coordinates are:

<dependency>
  <groupId>org.jgroups</groupId>
  <artifactId>jgroups-netty</artifactId>
  <version>1.0.0.Alpha2</version>
</dependency>


Thanks, Baizel and Steven, for your contribution!


[1] https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/jgroups-dev/R3yxmfhcqMk/ugked7zaAgAJ
[2] https://github.com/jgroups-extras/jgroups-netty

Monday, April 20, 2020

Hybrid clouds with JGroups and Skupper

This is a follow-up post on [1], which showed how to connect two Kubernetes-based hybrid clouds (Google GKE and AWS EKS) with JGroups' TUNNEL and GossipRouter.

Meanwhile, I've discovered Skupper, which (1) simplifies this task and (as a bonus) (2) encrypts the data exchanged between different clouds.

In this post, I'm going to provide step-by-step instructions on how to connect a Google Kubernetes Engine (GKE) cluster with a cluster running on my local box.

To run the demo yourself, you must have Skupper installed and a GKE account. However, any other cloud provider works, too.

For the local cloud, I'm using docker-desktop. Alternatively, minikube could be used.

So let's get cracking, and start the GKE cluster. To avoid having to switch contexts with kubectl all the time, I suggest start 2 separate shells and set KUBECONFIG for the public (GKE) cloud to a copy of config:

Shell 1 (GKE): cp .kube/config .kube/gke; export KUBECONFIG=$HOME/.kube/gke

Now start a GKE cluster (in shell 1):
gcloud container clusters create gke  --num-nodes 4

NOTE: if you use a different cloud, simply start your cluster and set kubectl's context to point to your cluster. The rest of the instructions below apply regardless of the specific cloud.

This sets the Kubernetes context (shell 1): 
kubectl config current-context
gke_ispnperftest_us-central1-a_gke


In shell 2, confirm that the context is local:
kubectl config current-context
docker-desktop

              

This shows Kubernetes is pointing to docker-desktop.

Let's now start a GossipRouter in both clouds. To do this, we have to modify the YAML used in [1] slightly:
curl https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/gossiprouter.yaml > gossiprouter.yaml

Now comment lines 42-43:
spec:
#  type: LoadBalancer
#  externalTrafficPolicy: Local


This is needed by Skupper which requires a service to be exposed as a ClusterIP and not a LoadBalancer.

Now deploy it in both shells:
kubectl apply -f gossiprouter.yaml
deployment.apps/gossiprouter created
service/gossiprouter created


Now it is time to initialize Skupper in both shells:
skupper init
Waiting for LoadBalancer IP or hostname...
Skupper is now installed in namespace 'default'.  Use 'skupper status' to get more information.


This installs some pods and services/proxies:
kubectl get po,svc
NAME                                           READY   STATUS    RESTARTS   AGE
pod/gossiprouter-6d6dcd6d79-q9p2f              1/1     Running   0          4m6s
pod/skupper-proxy-controller-dcf99c6bf-whns4   1/1     Running   0          86s
pod/skupper-router-7976948d9f-b58wn            1/1     Running   0          2m50s

NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP      PORT(S)                           AGE
service/gossiprouter         ClusterIP      10.27.252.196   <none>           8787/TCP,9000/TCP,12001/TCP       4m6s
service/kubernetes           ClusterIP      10.27.240.1     <none>           443/TCP                           27m
service/skupper-controller   LoadBalancer   10.27.241.112   35.223.80.171    8080:30508/TCP                    2m49s
service/skupper-internal     LoadBalancer   10.27.243.17    35.192.126.100   55671:30671/TCP,45671:31522/TCP   2m48s
service/skupper-messaging    ClusterIP      10.27.247.95    <none>           5671/TCP                          2m49s


Next, we create a connection token in one of the clouds. This creates a file containing a certificate and keys that allows a Skupper instance in one cluster to connect to a Skupper instance in another cluster.

Note that this file must be kept secret as it contains the private keys of the (server) Skupper instance!

We only need to connect from one cloud to the other, Skupper will automatically create a bi-directional connection.

Let's pick the public cloud (shell 1):
skupper connection-token gke.secret
Connection token written to gke.secret
 

We now need to copy this file to the other (local) cloud. In my example, I'm using the home directory, but in real-life, this would have to be done secretly.

The local Skupper instance now uses this file to connect to the Skupper instance in the public cluster and establish an encrypted VPN tunnel:
kupper connect gke.secret
Skupper is now configured to connect to 35.192.126.100:55671 (name=conn1)


Now, we have to expose the GossipRouter service in each cloud to Skupper, so Skupper can create a local proxy of the service that transparently connects to the other cloud, via  a symbolic name:
Shell 1:
skupper expose deployment gossiprouter --port 12001 --address gossiprouter-1
Shell 2:
skupper expose deployment gossiprouter --port 12001 --address gossiprouter-2

The symbolic names gossiprouter-1 and gossiprouter-2 are now available to any pod in both clusters.
Traffic sent from the local cluster to gossiprouter-1 in the public cluster is transparently (and encryptedly) forwarded by Skupper between the sites!

This means, we can set TUNNEL_INITIAL_HOSTS (as used in the bridge cluster) to
gossiprouter1[12001],gossiprouter-2[12001].

This is used in bridge.xml:
<TUNNEL bind_addr="match-interface:eth0,site-local"        gossip_router_hosts="${TUNNEL_INITIAL_HOSTS:127.0.0.1[12001]}"
...


Let's now run RelayDemo in the public and local clusters. This is the same procedure as in [1].

Shell 1: 
curl https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/nyc.yaml > public.yaml

Shell 2:
curl https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/sfc.yaml > local.yaml

In both YAML files, change the number of replicas to 3 and the value of TUNNEL_INITIAL_HOSTS to "gossiprouter-1[12001],gossiprouter-2[12001]".

Then start 3 pods in the public (NYC) and local (SFC) clusters:
Shell 1:
kubectl apply -f public.yaml
deployment.apps/nyc created
service/nyc created


Shell 2:
kubectl apply -f local.yaml
deployment.apps/sfc created
service/sfc created


Verify that there are 3 pods running in each cluster.

Let's now run RelayDemo on the local cluster:
Shell 2: 
> kubectl get pods |grep sfc-
sfc-7f448b7c94-6pb9m         1/1     Running   0          2m44s
sfc-7f448b7c94-d7zkp         1/1     Running   0          2m44s
sfc-7f448b7c94-ddrhs         1/1     Running   0          2m44s


> kubectl exec -it sfc-7f448b7c94-6pb9m bash
bash-4.4$ relay.sh -props sfc.xml -name Local

-------------------------------------------------------------------
GMS: address=Local, cluster=RelayDemo, physical address=10.1.0.88:7801
-------------------------------------------------------------------
View: [sfc-7f448b7c94-6pb9m-4056|3]: sfc-7f448b7c94-6pb9m-4056, sfc-7f448b7c94-ddrhs-52643, sfc-7f448b7c94-d7zkp-11827, Local
: hello
: << hello from Local
<< response from sfc-7f448b7c94-6pb9m-4056
<< response from sfc-7f448b7c94-ddrhs-52643
<< response from sfc-7f448b7c94-d7zkp-11827
<< response from Local
<< response from nyc-6b4846f777-g2gqk-7743:nyc
<< response from nyc-6b4846f777-7jm9s-23105:nyc
<< response from nyc-6b4846f777-q2wrl-38225:nyc



We're first listing all pods, then exec into one of them.

Next, we're running RelayDemo and send a message to all members of the local and remote clusters. We can see that we get a response from self (Local) and the other 3 members of the local (SFC) cluster, and we also get responses from the 3 members of the remote public cluster (NYC).

JGroups load-balances messages across one of the two GossipRouters. Each time, the router is remote, Skupper forwards the traffic transparently over its VPN tunnel to the other site.


[1] http://belaban.blogspot.com/2019/12/spanning-jgroups-kubernetes-based.html
[2] https://skupper.io/


Tuesday, January 28, 2020

First alpha of JGroups 5.0

Howdy folks!

Today I'm very happy to announce the first alpha version of JGroups 5.0!

JGroups 5.0 has major API changes and I'd like people to try it out and give feedback before we release final.

Note that there might still be more API changes before the first beta.

So what's new in 5?

The biggest change is that Message is now an interface [1] and we have a number of message classes implementing it, e.g.:
  • BytesMessage: this is the replacement for the old 4.x Message class, having a byte array as payload.
  • ObjectMessage: accepts an object as payload.
  • NioMessage: has an NIO ByteBuffer as payload.
  • EmptyMessage: this class has *no* payload at all! Useful when sending around messages that have only headers, e.g. heartbeats. Used mainly by JGroups internally. This class has a smaller memory footprint.
  • CompositeMessage: message type which carries other messages
The advantage is different message types is that rather than having to marshal payloads into a byte array, as in 4.x Messages, the payload is now added to the message without marshalling. Marshalling is only done just before sending the message on the network.

This late marshalling saves one memory allocation.

The other advantage is that applications can register their own messages types. This means that we can control how a message is created, e.g. using off-heap memory rather than heap memory.

Other changes include:
  • I've removed a lot of deprecated cruft, e.g. several AuthToken implementations, SASL, S3_PING and GOOGLE_PING (they have better replacements).
  • Java 11 is now the baseline. The current Alpha1 still runs under Java 8, but I expect this to change, perhaps only with 5.1. But at least, I reserve the right to use Java 11 specific language features, so be warned :-)
The full list of 5.0 is here: [2].

I still have a few JIRAs to resolve before releasing 5.0.0.Final, and then I'll add new functionality (without API changes) in a bunch of minor releases. I've planned 5.1 - 5.3 so far.

The documentation is here: [3].

For feedback please use the mailing list [4].

Enjoy!



[1] http://www.jgroups.org/manual5/index.html#Message
[2] https://issues.redhat.com/projects/JGRP/versions/12334686
[3] http://www.jgroups.org/manual5/index.html
[4] http://groups.google.com/forum/#!forum/jgroups-dev

Tuesday, December 31, 2019

Spanning JGroups Kubernetes-based clusters across Google and Amazon clouds

In this (long!) post, I'll provide step-by-step instructions on how to create JGroups clusters in Google Kubernetes Engine (GKE) and Amazon (EKS) clusters, and connect them into one virtual cluster using RELAY2.

Each local cluster is called a site. In this tutorial, we'll call the sites NYC and SFC. We'll start 5 nodes in NYC and 3 in SFC.

The sample deployments and services are defined in YAML and we're using Kubernetes to create the clusters.

To try this yourself, you'll need kubectl, eksctl and gcloud installed, and accounts on both EKS and GKE.

The demo is RelayDemo [1]. It is a simple chat, started in a pod, and every typed line appears in all pods across all sites, and then every pod sends a response back to the sender, which displays all responses. This way, we know who received our chat message.


Architecture

The setup of this tutorial is as follows:



On the left, we have nodes A,B,C,D,E in site NYC (Amazon EKS) and on the right, X,Y,Z in SFC (Google GKE).

A in NYC and X in SFC assume the role of site master (see [2]). This means, they join a separate JGroups cluster, called bridge cluster, which connects the two sites, and relay messages between the sites.

A site master is not a dedicated node, but any node can assume the role of site master. For example, when A leaves or crashes, B will take over the site master role, join the bridge cluster and relay messages between sites NYC and SFC.

The problem with Kubernetes / Openshift is that a pod cannot directly connect to a pod in a different cluster, region, or cloud provider. That is, without resorting to specific container network implementations (CNI) implementations.

To overcome this problem, the above setup uses a GossipRouter and TUNNEL [3]: this way, A and X can communicate across different regions or (in this case) even different cloud providers.

The way this is done is simple: the configuration of the bridge cluster includes TUNNEL as transport and a list of GossipRouters, in this case the ones in NYC and SFC (more details later).

A and X connect to both GossipRouters via TCP, under their respective cluster names. So A connects to GR-NYC and GR-SFC and X connects to its local GR, and the remote one in NYC.

When A wants to send a message to X, it can use either its local GossipRouter, or the one in SFC (by default, JGroups load-balances requests between the GossipRouters). In any case, the ingress TCP connection established by X to a GossipRouter is used to send egress traffic to X.

This means, we can send messages to any member of the bridge cluster, as long as all GossipRouters are publicly accessible and the members of the bridge cluster can connect to them.

But now let's get cracking! We'll do the following in the next sections:
  • Set up an EKS cluster (NYC)
  • Set up a GKE cluster (SFC)
  • Deploy a GossipRouter service in both sites
  • Deploy 5 pods in NYC and 3 pods in SFC
  • Use one of the pods in each site to talk to the other site with RelayDemo

Set up the NYC cluster in EKS

This can be done via the GUI, the AWS CLI or eksctl [4]. For simplicity, I chose the latter.
To create a cluster "nyc" in the us-east-1 region, execute:

eksctl create cluster --name nyc --region us-east-1 --nodegroup-name nyc-nodegroup --node-type t3.small --nodes 2 --nodes-min 1 --nodes-max 4 --managed

This will take 10-15 minutes.

The local kubeconfig should now point to the AWS cluster. This can be seen with kubectl config get-contexts. If this is not the case, use the AWS CLI to change this, e.g.:

aws eks --region use-east-1 update-kubeconfig --name nyc

This make kubectl access the NYC cluster by default.

Let's now deploy the GossipRouter in NYC:

kubectl apply -f https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/gossiprouter.yaml

The YAML file contains a deployment of the GossipRouter and a LoadBalancer service: [5]. The public address of the GossipRouter service can be seen as follows:

kubectl get svc gossiprouter
NAME           TYPE           CLUSTER-IP     EXTERNAL-IP                                                                     PORT(S)                                         AGE
gossiprouter   LoadBalancer   10.100.28.38   a6abc71e42b2211ea9c3716e7fa74966-862f92ba6a28fd36.elb.us-east-1.amazonaws.com   8787:31598/TCP,9000:30369/TCP,12001:31936/TCP   2m56s


We can see that the public address is a6abc71e42b2211ea9c3716e7fa74966-862f92ba6a28fd36.elb.us-east-1.amazonaws.com. Write this down somewhere, as we'll need to add it to our TUNNEL configuration later.


Set up SFC cluster in GKE

To create a cluster on GKE, execute:

gcloud container clusters create sfc  --num-nodes 2

This will create a cluster in the default region configured in gcloud.

Note that this added a new context to the kube config, and switched to it. If not, manually switch to it, e.g.

kubectl config use-context gke_ispnperftest_us-central1-a_sfc

Now deploy the GossipRouter in SFC (same as above, for NYC):
kubectl apply -f https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/gossiprouter.yaml

Now get the public IP address of the GossipRouter:
kubectl get svc gossiprouter
NAME           TYPE           CLUSTER-IP      EXTERNAL-IP     PORT(S)                                         AGE
gossiprouter   LoadBalancer   10.19.247.254   35.232.92.116   8787:30150/TCP,9000:32534/TCP,12001:32455/TCP   101s


The public IP is 35.232.92.116. Take a note of this, as we'll need it later.
We're now ready to deploy the cluster nodes in NYC and SFC.

Deploy the pods in NYC

We'll deploy 5 pods in NYC. To do this, we first need to switch the context back to NYC, e.g. by executing
kubectl config use-context jgroups@nyc.us-east-1.eksctl.io

Next, download the 2 YAML files for NYC and SFC locally (we need to make changes):
mkdir tmp ; cd tmp
curl https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/nyc.yaml > nyc.yaml
curl https://raw.githubusercontent.com/belaban/jgroups-docker/master/yaml/sfc.yaml > sfc.yaml

Now edit both YAML files and replace the TUNNEL_INITIAL_HOSTS system variable "load-balancer-1[12001],load-balancer-2[12001]" with
"a6abc71e42b2211ea9c3716e7fa74966-862f92ba6a28fd36.elb.us-east-1.amazonaws.com[12001],35.232.92.116[12001]".

This points the TUNNEL protocol to the two publicly accessible GossipRouters in NYC and SFC:

<TUNNEL

  port_range="${PORT_RANGE:0}"  gossip_router_hosts="${TUNNEL_INITIAL_HOSTS:127.0.0.1[12001]}"/>

This means that TUNNEL will establish 2 TCP connections, one to the GossipRouter in NYC and the other one to the GossipRouter in SFC.

Now deploy the NYC pods:
> kubectl apply -f tmp/nyc.yaml
deployment.apps/nyc created
service/nyc created


This shows that 1 pod has been created:
> kubectl get pods -o wide
NAME                           READY   STATUS    RESTARTS   AGE   IP              NODE                             NOMINATED NODE   READINESS GATES
gossiprouter-f65bb6858-jks8q   1/1     Running   0          25m   192.168.36.19   ip-192-168-38-111.ec2.internal   <none>           <none>
nyc-5f4964d444-9v5dm           1/1     Running   0          73s   192.168.26.87   ip-192-168-8-51.ec2.internal     <none>           <none>


Next, scale this to 5:
> kubectl scale --replicas=5 deployment nyc
deployment.extensions/nyc scaled


Listing the pods shows 5 'nyc' pods:
> kubectl get pods
NAME                           READY   STATUS    RESTARTS   AGE
gossiprouter-f65bb6858-jks8q   1/1     Running   0          27m
nyc-5f4964d444-2ttfp           1/1     Running   0          49s
nyc-5f4964d444-4lccs           1/1     Running   0          49s
nyc-5f4964d444-8622d           1/1     Running   0          49s
nyc-5f4964d444-9v5dm           1/1     Running   0          3m21s
nyc-5f4964d444-tm5h5           1/1     Running   0          49s


Let's exec into one of the and make sure that the local cluster formed:
> kubectl exec nyc-5f4964d444-2ttfp probe.sh
#1 (307 bytes):
local_addr=nyc-5f4964d444-2ttfp-24388
physical_addr=192.168.53.43:7800
view=[nyc-5f4964d444-9v5dm-21647|4] (5) [nyc-5f4964d444-9v5dm-21647, nyc-5f4964d444-tm5h5-64872, nyc-5f4964d444-2ttfp-24388, nyc-5f4964d444-8622d-63103, nyc-5f4964d444-4lccs-4487]
cluster=RelayDemo
version=4.1.9-SNAPSHOT (Mont Ventoux)

1 responses (1 matches, 0 non matches)


This shows a view of 5, so the 5 pods did indeed form a cluster.

Deploy the pods in SFC

Let's now switch the kubeconfig back to SFC (see above) and deploy the SFC cluster:
> kubectl apply -f tmp/sfc.yaml
deployment.apps/sfc created
service/sfc created

>  kubectl scale --replicas=3 deployment/sfc
deployment.extensions/sfc scaled

> kubectl get pods
NAME                            READY   STATUS    RESTARTS   AGE
gossiprouter-6cfdc58df5-7jph4   1/1     Running   0          21m
sfc-5d6774b647-25tk5            1/1     Running   0          50s
sfc-5d6774b647-sgxsk            1/1     Running   0          50s
sfc-5d6774b647-sjt9k            1/1     Running   0          88s



This shows that we have 3 pods in SFC running.

Run the demo

So, now we can run RelayDemo to see if the virtual cluster across the two clouds is working correctly. To do this, we run a bash in one of the pods:
> kubectl get pods
NAME                            READY   STATUS    RESTARTS   AGE
gossiprouter-6cfdc58df5-7jph4   1/1     Running   0          28m
sfc-5d6774b647-25tk5            1/1     Running   0          7m50s
sfc-5d6774b647-sgxsk            1/1     Running   0          7m50s
sfc-5d6774b647-sjt9k            1/1     Running   0          8m28s
> kubectl exec -it sfc-5d6774b647-sgxsk bash

bash-4.4$ 

The RelayDemo can be started with relay.sh:
relay.sh -props sfc.xml -name Temp

-------------------------------------------------------------------
GMS: address=Temp, cluster=RelayDemo, physical address=10.16.1.6:7801
-------------------------------------------------------------------
View: [sfc-5d6774b647-sjt9k-37487|9]: sfc-5d6774b647-sjt9k-37487, sfc-5d6774b647-sgxsk-6308, sfc-5d6774b647-25tk5-47315, Temp


We can see that our cluster member named 'Temp' has joined the cluster.

When we send a message, we can see that all 3 members of the (local) SFC cluster and the 5 members of the (remote) NYC cluster are replying (we're also getting a reply from self):
hello
: << response from sfc-5d6774b647-sgxsk-6308
<< response from sfc-5d6774b647-sjt9k-37487
<< response from sfc-5d6774b647-25tk5-47315
<< hello from Temp
<< response from Temp
<< response from nyc-5f4964d444-9v5dm-21647:nyc
<< response from nyc-5f4964d444-2ttfp-24388:nyc
<< response from nyc-5f4964d444-tm5h5-64872:nyc
<< response from nyc-5f4964d444-8622d-63103:nyc
<< response from nyc-5f4964d444-4lccs-4487:nyc


The topology can be shown by typing 'topo' ('help' lists more commands):
: topo

nyc
  nyc-5f4964d444-9v5dm-21647 (192.168.26.87:7800) (me) // site master
  nyc-5f4964d444-tm5h5-64872 (192.168.30.27:7800)
  nyc-5f4964d444-2ttfp-24388 (192.168.53.43:7800)
  nyc-5f4964d444-8622d-63103 (192.168.62.83:7800)
  nyc-5f4964d444-4lccs-4487 (192.168.40.102:7800)

sfc
  sfc-5d6774b647-sjt9k-37487 (10.16.1.5:7800) (me) // site master
  sfc-5d6774b647-sgxsk-6308 (10.16.1.6:7800)
  sfc-5d6774b647-25tk5-47315 (10.16.0.10:7800)
  Temp (10.16.1.6:7801)


This shows the members of both sites, plus their (internal) IP addresses and who the site masters are.

Dump the contents of the GossipRouters

This can be done via a utility program shipped with JGroups:
> java -cp jgroups.jar org.jgroups.tests.RouterStubGet -host 35.232.92.116 -cluster bridge
1: null:nyc, name=_nyc-5f4964d444-9v5dm-21647, addr=192.168.26.87:45275, server
2: null:sfc, name=_sfc-5d6774b647-sjt9k-37487, addr=10.16.1.5:42812, server


This shows the members of the bridge cluster, which registered with both GossipRouters.

Alternatively, the other GossipRouter can be used, but it show list the same members.

Add firewall/ingress rules to make the GossipRouter publicly available

If the GossipRouters cannot be accessed by the above command, then there has to be a firewall/ingress rule to be added to allow ingress traffic to port 12001.

Cross-site replication

The RelayDemo sample application is very basic and not very useful by itself, but the setup can be used for other types of applications, e.g. replication between data centers.

If we have in-memory data in NYC, and use SFC as a backup for NYC (and vice versa), then a total loss of the NYC cluster will not lose all the data, but clients can be failed over to SFC and will continue to work with the data.

This can be done for example by Red Hat Data Grid [6] and cross-site replication; as a matter of fact, all that needs to be done is to change the configuration, as explained in this post!

As usual, send questions and feedback to the JGroups mailing list.

Enjoy!



[1] https://github.com/belaban/JGroups/blob/master/src/org/jgroups/demos/RelayDemo.java
[2] http://www.jgroups.org/manual4/index.html#Relay2Advanced
[3] http://www.jgroups.org/manual4/index.html#TUNNEL_Advanced
[4] https://docs.aws.amazon.com/eks/latest/userguide/create-cluster.html
[5] https://github.com/belaban/jgroups-docker/blob/master/yaml/gossiprouter.yaml
[6] https://access.redhat.com/documentation/en-us/red_hat_data_grid/7.3/html/red_hat_data_grid_user_guide/x_site_replication


Wednesday, July 03, 2019

Compiling JGroups to native code with Quarkus/GraalVM

I'm happy to announce the availability of a JGroups extension for Quarkus!


What?


Quarkus is a framework that (among other things) compiles Java code down to native code (using GraalVM [4]), removing code that's not needed at run time.

Quarkus analyzes the code in a build phase, and removes code that's not used at run time, in order to have a small executable that starts up quickly.

This means that reflection cannot be used at run time, as all classes that are not used are removed at build time. However, reflection can be used at build time.

The other limitations that affect JGroups are threads and the creation of sockets. Both cannot be done at build time, but have to be done at run time. (More limitations of JGroups under Quarkus are detailed in [5]).

So what's the point of a providing a JGroups extension for Quarkus?

While a JGroups application can be compiled directly to native code (using GraalVM's native-image), it is cumbersome, and the application has to be restructured (see [6] for an example) to accommodate the limitations of native compilation.

In contrast, the JGroups extension provides a JChannel that can be injected into the application. The channel has been created according to a configuration file and connected (= joined the cluster) by the extension. The extension takes care of doing the reflection, the socket creation and the starting of threads at the right time (build- or run-time), and the user doesn't need to worry about this.

How?

So let's take a look at a sample application (available at [2]).

The POM includes the extension: groupId=org.jgroups.quarkus.extension and artifactId=quarkus-jgroups. This provides a JChannel that can be injected. The main class is ChatResource:

@ApplicationScoped 
@Path("/chat")
public class ChatResource extends ReceiverAdapter implements Publisher<String> {
    protected final Set<Subscriber<? super String>> subscribers=new HashSet<>();

    @Inject JChannel channel;

    protected void init(@Observes StartupEvent evt) throws Exception {
        channel.setReceiver(this);
        System.out.printf("-- view: %s\n", channel.getView());
    }

    protected void destroy(@Observes ShutdownEvent evt) {
        Util.close(channel);
        subscribers.forEach(Subscriber::onComplete);
        subscribers.clear();
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/send/{msg}")
    public String sendMessage(@PathParam("msg") String msg) throws Exception {
        channel.send(null, Objects.requireNonNull(msg).getBytes());
        return String.format("message \"%s\" was sent on channel \n", msg);
    }

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @Path("/subscribe")
    public Publisher<String> greeting() {
        return this;
    }

    public void receive(Message msg) {
        onNext(msg);
    }

    public void receive(MessageBatch batch) {
        for(Message msg: batch)
            onNext(msg);
    }

    public void viewAccepted(View view) {
        System.out.printf("-- new view: %s\n", view);
    }

    public void subscribe(Subscriber<? super String> s) {
        if(s != null)
            subscribers.add(s);
    }

    protected void onNext(Message msg) {
        String s=new String(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
        System.out.printf("-- from %s: %s\n", msg.src(), s);
        subscribers.forEach(sub -> sub.onNext(s));
    }
}
 
It has a JChannel channel which is injected by Arc (the dependency mechanism used in Quarkus). This channel is fully created and connected when it is injected.

The receive(Message) and receive(MessageBatch) methods receive messages sent by itself or other members in the cluster. It in turn publishes them via the Publisher interface. All subscribers will therefore receive all messages sent in the cluster.

The sendMessage() method is invoked when a URL of the form http://localhost:8080/chat/send/mymessage is received. It takes the string parameter ("mymessage") and uses the injected channel to send it to all members of the cluster.

The URL http://localhost:8080/chat/subscribe (or http://localhost:8080/streaming.html in a web browser) can be used to subscribe to messages being received by the channel.

Demo

Let's now run a cluster of 2 instances: open 2 shells and type the following commands (output has been edited for brevity):

Shell1:
[belasmac] /Users/bela/quarkus-jgroups-chat$ mvn compile quarkus:dev
...
[INFO] --- quarkus-maven-plugin:0.18.0:dev (default-cli) @ quarkus-jgroups-chat ---
2019-07-03 14:12:05,025 DEBUG [org.jgr.qua.ext.JChannelTemplate] (main) creating channel based on config config=chat-tcp.xml, bind_addr=, initial_hosts=, cluster=quarkus-jgroups-chat

 
-------------------------------------------------------------------
GMS: address=belasmac-19612, cluster=quarkus-jgroups-chat, physical address=127.0.0.1:7800
-------------------------------------------------------------------
-- view: [belasmac-19612|0] (1) [belasmac-19612]


Shell2:
[belasmac] /Users/bela/quarkus-jgroups-chat$ mvn compile quarkus:dev -Dquarkus.http.port=8081
...
[INFO] --- quarkus-maven-plugin:0.18.0:dev (default-cli) @ quarkus-jgroups-chat ---
2019-07-03 14:15:02,463 DEBUG [org.jgr.qua.ext.JChannelTemplate] (main) creating channel based on config config=chat-tcp.xml, bind_addr=, initial_hosts=, cluster=quarkus-jgroups-chat

-------------------------------------------------------------------
GMS: address=belasmac-25898, cluster=quarkus-jgroups-chat, physical address=127.0.0.1:7801
-------------------------------------------------------------------
-- view: [belasmac-19612|1] (2) [belasmac-19612, belasmac-25898]

The system property quarkus.http.port=8081 is needed, or else there would be a port collision, as the default port of 8080 has already been taken by the first application (both apps are started on the same host).

The output shows that there's a cluster of 2 members.

We can now post a message by invoking curl http://localhost:8080/chat/send/hello%20world and curl http://localhost:8081/chat/send/message2.

Both shells show that they received both messages:
-- view: [belasmac-19612|1] (2) [belasmac-19612, belasmac-25898]
-- from belasmac-19612: hello world
-- from belasmac-25898: message2


Of course, we could also use a web browser to send the HTTP GET requests.

When subscribing to the stream of messages in a web browser (http://localhost:8081/streaming.html), this would look as follows:
 


Note that both channels bind to the loopback (127.0.0.1) address. This can be changed by changing bind_addr and initial_hosts in application.properties:

quarkus.channel.config=chat-tcp.xml

quarkus.channel.cluster=quarkus-jgroups-chat

# quarkus.channel.bind_addr=192.168.1.105

# quarkus.channel.initial_hosts=192.168.1.105[7800]

Uncomment the 2 properties and set them accordingly.

Alternatively, we can pass these as system properties, e.g.:

[belasmac] /Users/bela/quarkus-jgroups-chat$ mvn compile quarkus:dev -Dbind_addr=192.168.1.105 -Dinitial_hosts=192.168.1.105[7800],192.168.1.105[7801]
...
[INFO] --- quarkus-maven-plugin:0.18.0:dev (default-cli) @ quarkus-jgroups-chat ---
2019-07-03 14:38:28,258 DEBUG [org.jgr.qua.ext.JChannelTemplate] (main) creating channel based on config config=chat-tcp.xml, bind_addr=, initial_hosts=, cluster=quarkus-jgroups-chat

-------------------------------------------------------------------
GMS: address=belasmac-10738, cluster=quarkus-jgroups-chat, physical address=192.168.1.105:7800
-------------------------------------------------------------------
-- view: [belasmac-10738|0] (1) [belasmac-10738]


Native compilation

To compile the application to native code, the mvn package -Pnative command has to be executed:

[belasmac] /Users/bela/quarkus-jgroups-chat$ mvn package -Pnative
[INFO] Building jar: /Users/bela/quarkus-jgroups-chat/target/quarkus-jgroups-chat-1.0.0-SNAPSHOT.jar
[INFO]
[INFO] --- quarkus-maven-plugin:0.18.0:build (default) @ quarkus-jgroups-chat ---
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Beginning quarkus augmentation
[INFO] [org.jboss.threads] JBoss Threads version 3.0.0.Beta4
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 1343ms
[INFO] [io.quarkus.creator.phase.runnerjar.RunnerJarPhase] Building jar: /Users/bela/quarkus-jgroups-chat/target/quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner.jar
[INFO]
[INFO] --- quarkus-maven-plugin:0.18.0:native-image (default) @ quarkus-jgroups-chat ---
[INFO] [io.quarkus.creator.phase.nativeimage.NativeImagePhase] Running Quarkus native-image plugin on OpenJDK 64-Bit Server VM
[INFO] [io.quarkus.creator.phase.nativeimage.NativeImagePhase] /Users/bela/graalvm/Contents/Home/bin/native-image -J-Djava.util.logging.manager=org.jboss.logmanager.LogManager --initialize-at-build-time= -H:InitialCollectionPolicy=com.oracle.svm.core.genscavenge.CollectionPolicy$BySpaceAndTime -jar quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner.jar -J-Djava.util.concurrent.ForkJoinPool.common.parallelism=1 -H:FallbackThreshold=0 -H:+ReportUnsupportedElementsAtRuntime -H:+ReportExceptionStackTraces -H:+PrintAnalysisCallTree -H:-AddAllCharsets -H:EnableURLProtocols=http -H:-SpawnIsolates -H:+JNI --no-server -H:-UseServiceLoaderFeature -H:+StackTrace
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]    classlist:   6,857.25 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]        (cap):   4,290.72 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]        setup:   6,430.30 ms
14:43:05,540 INFO  [org.jbo.threads] JBoss Threads version 3.0.0.Beta4
14:43:06,468 INFO  [org.xnio] XNIO version 3.7.2.Final
14:43:06,528 INFO  [org.xni.nio] XNIO NIO Implementation Version 3.7.2.Final
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]   (typeflow):  17,331.26 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]    (objects):  24,511.12 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]   (features):   1,194.16 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]     analysis:  44,204.65 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]     (clinit):     579.00 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]     universe:   1,715.40 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]      (parse):   3,315.80 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]     (inline):   4,563.11 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]    (compile):  24,906.58 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]      compile:  34,907.28 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]        image:   4,557.78 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]        write:   2,531.16 ms
[quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner:93574]      [total]: 109,858.54 ms
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:58 min
[INFO] Finished at: 2019-07-03T14:44:40+02:00
 


This uses GraalVM's native-image to generate a native executable. After a while, the resulting executable is in the ./target directory:

It's size is only 27MB and we can see that it is a MacOS native executable:
[belasmac] /Users/bela/quarkus-jgroups-chat/target$ ls -lh quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner
-rwxr-xr-x  1 bela  staff    27M Jul  3 14:44 quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner
[belasmac] /Users/bela/quarkus-jgroups-chat/target$ file quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner
quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner: Mach-O 64-bit executable x86_64


To run it:

[belasmac] /Users/bela/quarkus-jgroups-chat/target$ ./quarkus-jgroups-chat-1.0.0-SNAPSHOT-runner

-------------------------------------------------------------------
GMS: address=belasmac-55106, cluster=quarkus-jgroups-chat, physical address=127.0.0.1:7800
-------------------------------------------------------------------
-- view: [belasmac-55106|0] (1) [belasmac-55106]
 


When you run this yourself, you will notice that quick startup time of the second and subsequent members. Why not the first member? The first member has to wait for GMS.join_timeout millis (defined in chat-tcp.xml) to see if it discovers any other members, and so it always runs into this timeout.

To change bind_addr and initial_hosts, application.properties has to be changed before compiling to native code.



Caveats

The quarkus-jgroups extension depends on JGroups-4.1.2-SNAPSHOT, which it may not find unless a snapshot repository has been added to the POM (or settings.xml). Alternatively, git clone https://github.com/belaban/JGroups.git ; cd JGroups ; mvn install will generate and install this artifact in your local maven repo.

Currently, only TCP is supported (chat-tcp.xml). UDP will be supported once MulticastSockets are properly supported by GraalVM (see [5] for details).

For some obscure reason,
<enableJni>true</enableJni>

had to be enabled in the POM, or else native compilation would fail. I hope to remove this once I understand why...

Outlook

This was a relatively quick port of JGroups to native code. For feedback and questions please use the JGroups mailing list.

The following things are on my todo list for further development:
  • Provide more JGroups classes via extensions, e.g. RpcDispatcher (to make remote method calls)
  • Provide docker images with native executables
  • Implement support for UDP
  • Trim down the size of the executable even more

Enjoy!



[1] https://github.com/jgroups-extras/quarkus-jgroups
[2] https://github.com/jgroups-extras/quarkus-jgroups-chat
[3] https://quarkus.io
[4] https://www.graalvm.org
[5] https://github.com/belaban/JGroups/blob/master/doc/design/PortingToGraalVM.txt
[6] https://github.com/belaban/JGroups/blob/master/tests/perf/org/jgroups/tests/perf/ProgrammaticUPerf2.java