Concurrent Message Processing in Messaging
By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a queue group in which case the NATS server will distribute messages to each member of the group.
However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.
$ nbe run messaging/concurrent/csharpView the source code or learn how to run this example yourself
Code
using NATS.Net;
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope, we should flush our buffers and close the connection cleanly.
await using var nc = new NatsClient(url);
using var cts = new CancellationTokenSource();
Subscribe to a subject and start waiting for messages in the background and start processing messages in parallel.
var subscription = Task.Run(async () =>
{
await Parallel.ForEachAsync(nc.SubscribeAsync<string>("greet", cancellationToken: cts.Token), (msg, _) =>
{
Console.WriteLine($"Received {msg.Data}");
return ValueTask.CompletedTask;
});
});
Give some time for the subscription to start.
await Task.Delay(TimeSpan.FromSeconds(1));
for (int i = 0; i < 50; i++)
{
await nc.PublishAsync("greet", $"hello {i}");
}
Give some time for the subscription to receive all the messages.
await Task.Delay(TimeSpan.FromSeconds(1));
await cts.CancelAsync();
await subscription;
That’s it!
Console.WriteLine("Bye!");
Output
Received hello 1 Received hello 7 Received hello 8 Received hello 9 Received hello 10 Received hello 2 Received hello 12 Received hello 13 Received hello 14 Received hello 15 Received hello 16 Received hello 4 Received hello 5 Received hello 6 Received hello 11 Received hello 17 Received hello 0 Received hello 3 Received hello 18 Received hello 19 Received hello 21 Received hello 22 Received hello 24 Received hello 23 Received hello 27 Received hello 20 Received hello 28 Received hello 29 Received hello 25 Received hello 31 Received hello 34 Received hello 35 Received hello 36 Received hello 37 Received hello 26 Received hello 38 Received hello 40 Received hello 39 Received hello 30 Received hello 41 Received hello 42 Received hello 43 Received hello 44 Received hello 45 Received hello 46 Received hello 32 Received hello 33 Received hello 47 Received hello 48 Received hello 49 Bye!
Install NuGet package
NATS.Net