NATS Logo by Example

Concurrent Message Processing in Messaging

By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a queue group in which case the NATS server will distribute messages to each member of the group.

However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/concurrent/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;


public class Main {
    public static void main(String[] args) {
        String natsURL = System.getenv("NATS_URL");
        if (natsURL == null) {
            natsURL = "nats://127.0.0.1:4222";
        }

Initialize a connection to the server. The connection is AutoCloseable on exit.

        try (Connection nc = Nats.connect(natsURL)) {


            int total = 50;
            CountDownLatch latch = new CountDownLatch(total);

Create message dispatchers with queue groups for handling messages in separate threads.

            for (int i = 0; i < 4; i++) {
                Dispatcher dispatcher = nc.createDispatcher((msg) -> {
                    System.out.printf("Received %s\n",
                            new String(msg.getData(), StandardCharsets.UTF_8));
                    latch.countDown();
                });


                dispatcher.subscribe("greet", "queue");
            }


            for (int i = 0; i < total; i++) {
                nc.publish("greet", String.format("hello %s", i).getBytes(StandardCharsets.UTF_8));
            }

Await the dispatcher threads to have received all the messages before the program quits.

            latch.await();


        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }
}

Output

Received hello 0
Received hello 4
Received hello 6
Received hello 1
Received hello 2
Received hello 7
Received hello 18
Received hello 5
Received hello 11
Received hello 21
Received hello 3
Received hello 19
Received hello 20
Received hello 8
Received hello 10
Received hello 22
Received hello 23
Received hello 12
Received hello 13
Received hello 9
Received hello 14
Received hello 24
Received hello 27
Received hello 28
Received hello 32
Received hello 25
Received hello 31
Received hello 35
Received hello 15
Received hello 29
Received hello 34
Received hello 37
Received hello 38
Received hello 36
Received hello 39
Received hello 40
Received hello 45
Received hello 43
Received hello 44
Received hello 46
Received hello 42
Received hello 47
Received hello 16
Received hello 17
Received hello 26
Received hello 49
Received hello 30
Received hello 33
Received hello 41
Received hello 48

Recording

Note, playback is half speed to make it a bit easier to follow.