NATS Logo by Example

Iterating Over Multiple Subscriptions in Messaging

Core NATS subscription support flexible subject model with tokens and wildcards, but there are cases that require setting up separate subscriptions (i.e. user want transport.cars, transport.planes and transport.ships, but not transport.spaceships).

Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/iterating-multiple-subscriptions/csharp
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and System.Interactive.Async

using NATS.Net;

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope, we should flush our buffers and close the connection cleanly.

await using var nc = new NatsClient(url);


await nc.ConnectAsync();


using var cts = new CancellationTokenSource();


var s1 = nc.SubscribeAsync<int>("s1", cancellationToken: cts.Token);
var s2 = nc.SubscribeAsync<int>("s2", cancellationToken: cts.Token);
var s3 = nc.SubscribeAsync<int>("s3", cancellationToken: cts.Token);
var s4 = nc.SubscribeAsync<int>("s4", cancellationToken: cts.Token);


const int total = 80;


var subs = Task.Run(async () =>
{
    var count = 0;
    await foreach (var msg in AsyncEnumerableEx.Merge(s1, s2, s3, s4))
    {
        Console.WriteLine($"Received {msg.Subject}: {msg.Data}");
        
        
        if (++count == total)
            await cts.CancelAsync();
    }
});


await Task.Delay(1000);


for (int i = 0; i < total / 4; i++)
{
    await nc.PublishAsync("s1", i);
    await nc.PublishAsync("s2", i);
    await nc.PublishAsync("s3", i);
    await nc.PublishAsync("s4", i);
    await Task.Delay(100);
}


await subs;

That’s it!

Console.WriteLine("Bye!");

Output

Received s3: 0
Received s1: 0
Received s2: 0
Received s4: 0
Received s1: 1
Received s2: 1
Received s3: 1
Received s4: 1
Received s1: 2
Received s2: 2
Received s3: 2
Received s4: 2
Received s1: 3
Received s2: 3
Received s3: 3
Received s4: 3
Received s1: 4
Received s2: 4
Received s3: 4
Received s4: 4
Received s1: 5
Received s2: 5
Received s3: 5
Received s4: 5
Received s2: 6
Received s1: 6
Received s3: 6
Received s4: 6
Received s1: 7
Received s2: 7
Received s4: 7
Received s3: 7
Received s1: 8
Received s2: 8
Received s3: 8
Received s4: 8
Received s1: 9
Received s2: 9
Received s3: 9
Received s4: 9
Received s1: 10
Received s2: 10
Received s3: 10
Received s4: 10
Received s1: 11
Received s2: 11
Received s3: 11
Received s4: 11
Received s1: 12
Received s2: 12
Received s3: 12
Received s4: 12
Received s1: 13
Received s2: 13
Received s3: 13
Received s4: 13
Received s1: 14
Received s2: 14
Received s3: 14
Received s4: 14
Received s1: 15
Received s2: 15
Received s3: 15
Received s4: 15
Received s1: 16
Received s2: 16
Received s3: 16
Received s4: 16
Received s1: 17
Received s2: 17
Received s3: 17
Received s4: 17
Received s1: 18
Received s2: 18
Received s3: 18
Received s4: 18
Received s1: 19
Received s2: 19
Received s3: 19
Received s4: 19
Bye!

Recording

Note, playback is half speed to make it a bit easier to follow.