NATS Logo by Example

Core Publish-Subscribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/pub-sub/dotnet
View the source code or learn how to run this example yourself

Code

using System;
using System.Text;
using NATS.Client;


string natsUrl = Environment.GetEnvironmentVariable("NATS_URL");
if (natsUrl == null)
{
    natsUrl = "nats://127.0.0.1:4222";
}

Create a new connection factory to create a connection.

Options opts = ConnectionFactory.GetDefaultOptions();
opts.Url = natsUrl;

Creates a connection to nats server at the natsUrl An IConnection is IDisposable so it can be used within a using statement.

ConnectionFactory cf = new ConnectionFactory();
IConnection c = cf.CreateConnection(opts);

Here are some of the accessible properties from the Msg object:

  • Msg.Data
  • Msg.Reply
  • Msg.Subject
  • Msg.Header
  • Msg.MetaData

Setup an event handler to process incoming messages. An anonymous delegate function is used for brevity.

EventHandler<MsgHandlerEventArgs> handler = (sender, args) =>
{
    Msg m = args.Message;
    string text = Encoding.UTF8.GetString(m.Data);
    Console.WriteLine($"Async handler received the message '{text}' from subject '{m.Subject}'");
};

Subscriptions will only receive messages that are published after the subscription is made, so for this example, we will publish a message before we are subscribed which will not be received.

c.Publish("greet.joe", Encoding.UTF8.GetBytes("hello joe 1"));

The simple way to create an asynchronous subscriber is to simply pass the handler in. Messages will start arriving immediately. We are subscribing to anything that matches the greet.* pattern. You will see that this subscription will not receive the “hello joe 1” message

IAsyncSubscription subAsync = c.SubscribeAsync("greet.*", handler);

Simple synchronous subscriber. In this case we are only subscribing to greetings to pam.

ISyncSubscription subSync = c.SubscribeSync("greet.pam");

Lets publish to two different greeting messages

c.Publish("greet.pam", Encoding.UTF8.GetBytes("hello pam 1"));
c.Publish("greet.joe", Encoding.UTF8.GetBytes("hello joe 2"));

Using a synchronous subscriber, try to get some messages, waiting up to 1000 milliseconds (1 second).

try
{
    Msg m = subSync.NextMessage(1000);
    string text = Encoding.UTF8.GetString(m.Data);
    Console.WriteLine($"Sync subscription received the message '{text}' from subject '{m.Subject}'");
    m = subSync.NextMessage(100);
}

Catching the NATSTimeoutException will let you know there were no messages available. In this case, there was only one message, so the second NextMessage timed out.

catch (NATSTimeoutException)
{
    Console.WriteLine($"Sync subscription no messages currently available");
}

Calling drain directly on the connection also closes the connection. If you want to keep the connection open and do other things, you could call drain directly on the subscription, in this example either subAsync.Drain(); or subSync.Drain();

c.Drain();

Output

Async handler received the message 'hello pam 1' from subject 'greet.pam'
Sync subscription received the message 'hello pam 1' from subject 'greet.pam'
Async handler received the message 'hello joe 2' from subject 'greet.joe'
Sync subscription no messages currently available
DisconnectedEvent, Connection: 4
ClosedEvent, Connection: 4

Recording

Note, playback is half speed to make it a bit easier to follow.