NATS Logo by Example

Iterating Over Multiple Subscriptions in Messaging

Core NATS subscription support flexible subject model with tokens and wildcards, but there are cases that require setting up separate subscriptions (i.e. user want transport.cars, transport.planes and transport.ships, but not transport.spaceships).

Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/iterating-multiple-subscriptions/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;


public class Main {
    public static void main(String[] args) {
        String natsURL = System.getenv("NATS_URL");
        if (natsURL == null) {
            natsURL = "nats://127.0.0.1:4222";
        }

Initialize a connection to the server. The connection is AutoCloseable on exit.

        try (Connection nc = Nats.connect(natsURL)) {


            int total = 80;
            CountDownLatch latch = new CountDownLatch(total);

Create a message dispatcher. A dispatcher is a process that runs on its own thread, receives incoming messages via a FIFO queue, for subjects registered on it. For each message it takes from the queue, it makes a blocking call to the MessageHandler passed to the createDispatcher call.

            Dispatcher dispatcher = nc.createDispatcher((msg) -> {
                System.out.printf("Received %s: %s\n",
                        msg.getSubject(),
                        new String(msg.getData(), StandardCharsets.UTF_8));
                latch.countDown();
            });

Subscribe directly on the dispatcher for multiple subjects.

            dispatcher.subscribe("s1");
            dispatcher.subscribe("s2");
            dispatcher.subscribe("s3");
            dispatcher.subscribe("s4");


            for (int i = 0; i < total / 4; i++) {
                nc.publish("s1", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
                nc.publish("s2", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
                nc.publish("s3", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
                nc.publish("s4", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
                Thread.sleep(100);
            }

Await the dispatcher thread to have received all the messages before the program quits.

            latch.await();


        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }
}

Output

Received s1: 0
Received s2: 0
Received s3: 0
Received s4: 0
Received s1: 1
Received s2: 1
Received s3: 1
Received s4: 1
Received s1: 2
Received s2: 2
Received s3: 2
Received s4: 2
Received s1: 3
Received s2: 3
Received s3: 3
Received s4: 3
Received s1: 4
Received s2: 4
Received s3: 4
Received s4: 4
Received s1: 5
Received s2: 5
Received s3: 5
Received s4: 5
Received s1: 6
Received s2: 6
Received s3: 6
Received s4: 6
Received s1: 7
Received s2: 7
Received s3: 7
Received s4: 7
Received s1: 8
Received s2: 8
Received s3: 8
Received s4: 8
Received s1: 9
Received s2: 9
Received s3: 9
Received s4: 9
Received s1: 10
Received s2: 10
Received s3: 10
Received s4: 10
Received s1: 11
Received s2: 11
Received s3: 11
Received s4: 11
Received s1: 12
Received s2: 12
Received s3: 12
Received s4: 12
Received s1: 13
Received s2: 13
Received s3: 13
Received s4: 13
Received s1: 14
Received s2: 14
Received s3: 14
Received s4: 14
Received s1: 15
Received s2: 15
Received s3: 15
Received s4: 15
Received s1: 16
Received s2: 16
Received s3: 16
Received s4: 16
Received s1: 17
Received s2: 17
Received s3: 17
Received s4: 17
Received s1: 18
Received s2: 18
Received s3: 18
Received s4: 18
Received s1: 19
Received s2: 19
Received s3: 19
Received s4: 19

Recording

Note, playback is half speed to make it a bit easier to follow.