Iterating Over Multiple Subscriptions in Messaging
Core NATS subscription support flexible subject model with tokens and wildcards,
but there are cases that require setting up separate subscriptions
(i.e. user want transport.cars
, transport.planes
and transport.ships
, but not transport.spaceships
).
Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.
$ nbe run messaging/iterating-multiple-subscriptions/csharpView the source code or learn how to run this example yourself
Code
using NATS.Net;
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope, we should flush our buffers and close the connection cleanly.
await using var nc = new NatsClient(url);
await nc.ConnectAsync();
using var cts = new CancellationTokenSource();
var s1 = nc.SubscribeAsync<int>("s1", cancellationToken: cts.Token);
var s2 = nc.SubscribeAsync<int>("s2", cancellationToken: cts.Token);
var s3 = nc.SubscribeAsync<int>("s3", cancellationToken: cts.Token);
var s4 = nc.SubscribeAsync<int>("s4", cancellationToken: cts.Token);
const int total = 80;
var subs = Task.Run(async () =>
{
var count = 0;
await foreach (var msg in AsyncEnumerableEx.Merge(s1, s2, s3, s4))
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}");
if (++count == total)
await cts.CancelAsync();
}
});
await Task.Delay(1000);
for (int i = 0; i < total / 4; i++)
{
await nc.PublishAsync("s1", i);
await nc.PublishAsync("s2", i);
await nc.PublishAsync("s3", i);
await nc.PublishAsync("s4", i);
await Task.Delay(100);
}
await subs;
That’s it!
Console.WriteLine("Bye!");
Output
Received s3: 0 Received s1: 0 Received s2: 0 Received s4: 0 Received s1: 1 Received s2: 1 Received s3: 1 Received s4: 1 Received s1: 2 Received s2: 2 Received s3: 2 Received s4: 2 Received s1: 3 Received s2: 3 Received s3: 3 Received s4: 3 Received s1: 4 Received s2: 4 Received s3: 4 Received s4: 4 Received s1: 5 Received s2: 5 Received s3: 5 Received s4: 5 Received s2: 6 Received s1: 6 Received s3: 6 Received s4: 6 Received s1: 7 Received s2: 7 Received s4: 7 Received s3: 7 Received s1: 8 Received s2: 8 Received s3: 8 Received s4: 8 Received s1: 9 Received s2: 9 Received s3: 9 Received s4: 9 Received s1: 10 Received s2: 10 Received s3: 10 Received s4: 10 Received s1: 11 Received s2: 11 Received s3: 11 Received s4: 11 Received s1: 12 Received s2: 12 Received s3: 12 Received s4: 12 Received s1: 13 Received s2: 13 Received s3: 13 Received s4: 13 Received s1: 14 Received s2: 14 Received s3: 14 Received s4: 14 Received s1: 15 Received s2: 15 Received s3: 15 Received s4: 15 Received s1: 16 Received s2: 16 Received s3: 16 Received s4: 16 Received s1: 17 Received s2: 17 Received s3: 17 Received s4: 17 Received s1: 18 Received s2: 18 Received s3: 18 Received s4: 18 Received s1: 19 Received s2: 19 Received s3: 19 Received s4: 19 Bye!
Recording
Note, playback is half speed to make it a bit easier to follow.
Install NuGet packages
NATS.Net
andSystem.Interactive.Async