NATS Logo by Example

JSON for Message Payloads in Messaging

The basic structure of a NATS message modeled in the client libraries includes the subject the message was published to, the application-defined payload, and an optional set of headers (for requests, there is also a reply-to subject). The payload is a sequence of bytes, so it is up to the application to define how to serialize and deserialize the payload.

JSON is ubiquitous and simple data-interchange format that is supported in virtually all programming languages. This example demonstrates how to serialize and deserialize a message payload using a JSON library.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/json/csharp
View the source code or learn how to run this example yourself

Code

Install NuGet package NATS.Net

using System.Text;
using System.Text.Json.Serialization;
using NATS.Client.Core;
using NATS.Net;

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope, we should flush our buffers and close the connection cleanly.

await using var nc = new NatsClient(url);


await nc.ConnectAsync();

When subscribing or publishing, you can use the generated JSON serializer to deserialize the JSON payload. We can also demonstrate how to use the raw JSON payload and how to use binary data. For more information about the serializers see our documentation.

var mySerializer = new NatsJsonContextSerializer<MyData>(MyJsonContext.Default);


var subTask1 = Task.Run(async () =>
{
    Console.WriteLine("Waiting for messages...");
    await foreach (var msg in nc.SubscribeAsync<MyData>("data", serializer: mySerializer))
    {
        if (msg.Data is null)
        {
            Console.WriteLine("Received empty payload: End of messages");
            break;
        }
        var data = msg.Data;
        Console.WriteLine($"Received deserialized object {data}");
    }
});


var subTask2 = Task.Run(async () =>
{
    Console.WriteLine("Waiting for messages...");
    await foreach (var msg in nc.SubscribeAsync<NatsMemoryOwner<byte>>("data"))
    {
        using var memoryOwner = msg.Data;
        
        
        if (memoryOwner.Length == 0)
        {
            Console.WriteLine("Received empty payload: End of messages");
            break;
        }


        var json = Encoding.UTF8.GetString(memoryOwner.Span);
        
        
        Console.WriteLine($"Received raw JSON {json}");
    }
});

Give some time for the subscriptions to start.

await Task.Delay(1000);


await nc.PublishAsync<MyData>(subject: "data", data: new MyData{ Id = 1, Name = "Bob" }, serializer: mySerializer);
await nc.PublishAsync<byte[]>(subject: "data", data: Encoding.UTF8.GetBytes("""{"id":2,"name":"Joe"}"""));


var alice = """{"id":3,"name":"Alice"}""";
var bw = new NatsBufferWriter<byte>();
var byteCount = Encoding.UTF8.GetByteCount(alice);
var memory = bw.GetMemory(byteCount);
Encoding.UTF8.GetBytes(alice, memory.Span);
bw.Advance(byteCount);
await nc.PublishAsync<NatsBufferWriter<byte>>(subject: "data", data: bw);

End of messages published as an empty payload.

await nc.PublishAsync(subject: "data");


await Task.WhenAll(subTask1, subTask2);

That’s it!

Console.WriteLine("Bye!");

Serializer generator

[JsonSerializable(typeof(MyData))]
internal partial class MyJsonContext : JsonSerializerContext;


public record MyData
{
    [JsonPropertyName("id")]
    public int Id { get; set; }


    [JsonPropertyName("name")]
    public string? Name { get; set; }
}

Output

Waiting for messages...
Waiting for messages...
Received raw JSON {"id":1,"name":"Bob"}
Received raw JSON {"id":2,"name":"Joe"}
Received raw JSON {"id":3,"name":"Alice"}
Received empty payload: End of messages
Received deserialized object MyData { Id = 1, Name = Bob }
Received deserialized object MyData { Id = 2, Name = Joe }
Received deserialized object MyData { Id = 3, Name = Alice }
Received empty payload: End of messages
Bye!

Recording

Note, playback is half speed to make it a bit easier to follow.