NATS Logo by Example

Protobuf for Message Payloads in Messaging

NATS message payloads are byte slices, so any kind of serialization strategy can be applied. This example shows a simple way to define message types using the Protocol Buffers IDL, generate code for the target language, and then use it with NATS.

If you are new to Protobuf, you can get started using one of the official tutorials.

The protobuf file for example looks as follows:

  syntax = "proto3";

  option go_package = ".;main";

  package main;

  message GreetRequest {
    string name = 1;
  }

  message GreetReply {
    string text = 1;
  }

Click the link to the example’s source code to view the generated code.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/protobuf/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net, Google.Protobuf and Microsoft.Extensions.Logging.Console.

using System;
using System.Buffers;
using System.Threading.Tasks;
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly. Notice the use of custom serializer registry.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    SerializerRegistry = new MyProtoBufSerializerRegistry(),
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Subscribe to a subject and start waiting for messages in the background. Notice that we are using a custom serializer for the subscription.

var sub = Task.Run(async () =>
{
    logger.LogInformation("Waiting for messages...");
    await foreach (var msg in nats.SubscribeAsync<GreetRequest>(subject: "greet", serializer: MyProtoBufSerializer<GreetRequest>.Default))
    {
        if (msg.Data is null)
        {
            logger.LogInformation("Received empty payload: End of messages");
            break;
        }
        var request = msg.Data;
        var reply = new GreetReply { Text = $"hello {request.Name}"};
        await msg.ReplyAsync(reply, serializer: MyProtoBufSerializer<GreetReply>.Default);
    }
});

This request uses the default serializer for the connection assigned to connection options above. Alternatively we could’ve passed the individual serializer to the request method.

var reply = await nats.RequestAsync<GreetRequest, GreetReply>(subject: "greet", new GreetRequest { Name = "bob" });
logger.LogInformation("Response = {Response}...", reply.Data.Text);

Send an empty message to indicate we are done.

await nats.PublishAsync("greet");

We can unsubscribe now all orders are published. Unsubscribing or disposing the subscription should complete the message loop and exit the background task cleanly.

await sub;

That’s it!

logger.LogInformation("Bye!");

Serializer Registry

public class MyProtoBufSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => MyProtoBufSerializer<T>.Default;


    public INatsDeserialize<T> GetDeserializer<T>() => MyProtoBufSerializer<T>.Default;
}

Serializer

public class MyProtoBufSerializer<T> : INatsSerializer<T>
{
    public static readonly INatsSerializer<T> Default = new MyProtoBufSerializer<T>();


    public void Serialize(IBufferWriter<byte> bufferWriter, T value)
    {
        if (value is IMessage message)
        {
            message.WriteTo(bufferWriter);
        }
        else
        {
            throw new NatsException($"Can't serialize {typeof(T)}");
        }
    }


    public T? Deserialize(in ReadOnlySequence<byte> buffer)
    {
        if (typeof(T) == typeof(GreetRequest))
        {
            return (T)(object)GreetRequest.Parser.ParseFrom(buffer);
        }


        if (typeof(T) == typeof(GreetReply))
        {
            return (T)(object)GreetReply.Parser.ParseFrom(buffer);
        }


        throw new NatsException($"Can't deserialize {typeof(T)}");
    }
}

Protobuf Messages

The following messages would normally be generated using protoc. For the sake of example we defined simplified versions here. Usually you would use .proto files to define your messages and generate the code using Grpc.Tools NuGet package to generate them in separate project. See gRPC documentation and ASP.NET Core Tooling Support for more details.

public class GreetRequest : IMessage<GreetRequest>, IBufferMessage
{
    public static readonly MessageParser<GreetRequest> Parser = new(() => new GreetRequest());
    
    
    public string Name { get; set; }


    public void MergeFrom(GreetRequest message) => Name = message.Name;


    public void MergeFrom(CodedInputStream input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Name = input.ReadString();
        }
    }


    public void WriteTo(CodedOutputStream output)
    {
        output.WriteRawTag(10);
        output.WriteString(Name);
    }


    public int CalculateSize() => CodedOutputStream.ComputeStringSize(Name) + 1;


    public MessageDescriptor Descriptor => null!;


    public bool Equals(GreetRequest other) => string.Equals(other?.Name, Name);


    public GreetRequest Clone() => new() { Name = Name };


    public void InternalMergeFrom(ref ParseContext input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Name = input.ReadString();
        }
    }


    public void InternalWriteTo(ref WriteContext output)
    {
        output.WriteRawTag(10);
        output.WriteString(Name);
    }
}


public class GreetReply : IMessage<GreetReply>, IBufferMessage
{
    public static readonly MessageParser<GreetReply> Parser = new(() => new GreetReply());
    
    
    public string Text { get; set; }


    public void MergeFrom(GreetReply message) => Text = message.Text;


    public void MergeFrom(CodedInputStream input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Text = input.ReadString();
        }
    }


    public void WriteTo(CodedOutputStream output)
    {
        output.WriteRawTag(10);
        output.WriteString(Text);
    }


    public int CalculateSize() => CodedOutputStream.ComputeStringSize(Text) + 1;


    public MessageDescriptor Descriptor => null!;


    public bool Equals(GreetReply other) => string.Equals(other?.Text, Text);


    public GreetReply Clone() => new() { Text = Text };
    
    
    public void InternalMergeFrom(ref ParseContext input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Text = input.ReadString();
        }
    }


    public void InternalWriteTo(ref WriteContext output)
    {
        output.WriteRawTag(10);
        output.WriteString(Text);
    }
}

Output

info: NATS-by-Example[0]
      Waiting for messages...
info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NBG2DC6XCA2465Y3XIFCGH4YL7NFAEK4RLAKJD54G3YPD277HEEGD3W2, Name = NBG2DC6XCA2465Y3XIFCGH4YL7NFAEK4RLAKJD54G3YPD277HEEGD3W2, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.21.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
      Response = hello bob...
info: NATS-by-Example[0]
      Received empty payload: End of messages
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.