NATS Logo by Example

Protobuf for Message Payloads in Messaging

NATS message payloads are byte slices, so any kind of serialization strategy can be applied. This example shows a simple way to define message types using the Protocol Buffers IDL, generate code for the target language, and then use it with NATS.

If you are new to Protobuf, you can get started using one of the official tutorials.

The protobuf file for example looks as follows:

  syntax = "proto3";

  option go_package = ".;main";

  package main;

  message GreetRequest {
    string name = 1;
  }

  message GreetReply {
    string text = 1;
  }

Click the link to the example’s source code to view the generated code.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/protobuf/csharp
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Google.Protobuf

using System.Buffers;
using Google.Protobuf;
using Google.Protobuf.Reflection;
using NATS.Client.Core;

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope, we should flush our buffers and close the connection cleanly. Notice the use of custom serializer registry.

var opts = new NatsOpts
{
    Url = url,
    SerializerRegistry = new MyProtoBufSerializerRegistry(),
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Subscribe to a subject and start waiting for messages in the background. Notice that we are using a custom serializer for the subscription.

var sub = Task.Run(async () =>
{
    Console.WriteLine("Waiting for messages...");
    await foreach (var msg in nats.SubscribeAsync<GreetRequest>(subject: "greet", serializer: MyProtoBufSerializer<GreetRequest>.Default))
    {
        if (msg.Data is null)
        {
            Console.WriteLine("Received empty payload: End of messages");
            break;
        }
        var request = msg.Data;
        var reply = new GreetReply { Text = $"hello {request.Name}"};
        await msg.ReplyAsync(reply, serializer: MyProtoBufSerializer<GreetReply>.Default);
    }
});

This request uses the default serializer for the connection assigned to connection options above. Alternatively, we could’ve passed the individual serializer to the request method.

var reply = await nats.RequestAsync<GreetRequest, GreetReply>(subject: "greet", new GreetRequest { Name = "bob" });
Console.WriteLine($"Response = {reply.Data?.Text}...");

Send an empty message to indicate we are done.

await nats.PublishAsync("greet");

We can unsubscribe now all orders are published. Unsubscribing or disposing the subscription should complete the message loop and exit the background task cleanly.

await sub;

That’s it!

Console.WriteLine("Bye!");

Serializer Registry

public class MyProtoBufSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => MyProtoBufSerializer<T>.Default;


    public INatsDeserialize<T> GetDeserializer<T>() => MyProtoBufSerializer<T>.Default;
}

Serializer

public class MyProtoBufSerializer<T> : INatsSerializer<T>
{
    public static readonly INatsSerializer<T> Default = new MyProtoBufSerializer<T>();


    public void Serialize(IBufferWriter<byte> bufferWriter, T value)
    {
        if (value is IMessage message)
        {
            message.WriteTo(bufferWriter);
        }
        else
        {
            throw new NatsException($"Can't serialize {typeof(T)}");
        }
    }


    public T? Deserialize(in ReadOnlySequence<byte> buffer)
    {
        if (buffer.Length == 0)
        {
            /* serializers define what to do with empty payloads. */
            return default;
        }
        
        
        if (typeof(T) == typeof(GreetRequest))
        {
            return (T)(object)GreetRequest.Parser.ParseFrom(buffer);
        }


        if (typeof(T) == typeof(GreetReply))
        {
            return (T)(object)GreetReply.Parser.ParseFrom(buffer);
        }


        throw new NatsException($"Can't deserialize {typeof(T)}");
    }


    public INatsSerializer<T> CombineWith(INatsSerializer<T> next)
    {
        throw new NotImplementedException();
    }
}

Protobuf Messages

The following messages would normally be generated using protoc. For the sake of example, we defined simplified versions here. Usually you would use .proto files to define your messages and generate the code using Grpc.Tools NuGet package to generate them in a separate project. See gRPC documentation and ASP.NET Core Tooling Support for more details.

public class GreetRequest : IMessage<GreetRequest>, IBufferMessage
{
    public static readonly MessageParser<GreetRequest> Parser = new(() => new GreetRequest());
    
    
    public string? Name { get; set; }


    public void MergeFrom(GreetRequest message) => Name = message.Name;


    public void MergeFrom(CodedInputStream input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Name = input.ReadString();
        }
    }


    public void WriteTo(CodedOutputStream output)
    {
        output.WriteRawTag(10);
        output.WriteString(Name);
    }


    public int CalculateSize() => CodedOutputStream.ComputeStringSize(Name) + 1;


    public MessageDescriptor Descriptor => null!;


    public bool Equals(GreetRequest? other) => string.Equals(other?.Name, Name);


    public GreetRequest Clone() => new() { Name = Name };


    public void InternalMergeFrom(ref ParseContext input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Name = input.ReadString();
        }
    }


    public void InternalWriteTo(ref WriteContext output)
    {
        output.WriteRawTag(10);
        output.WriteString(Name);
    }
}


public class GreetReply : IMessage<GreetReply>, IBufferMessage
{
    public static readonly MessageParser<GreetReply> Parser = new(() => new GreetReply());
    
    
    public string? Text { get; set; }


    public void MergeFrom(GreetReply message) => Text = message.Text;


    public void MergeFrom(CodedInputStream input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Text = input.ReadString();
        }
    }


    public void WriteTo(CodedOutputStream output)
    {
        output.WriteRawTag(10);
        output.WriteString(Text);
    }


    public int CalculateSize() => CodedOutputStream.ComputeStringSize(Text) + 1;


    public MessageDescriptor Descriptor => null!;


    public bool Equals(GreetReply? other) => string.Equals(other?.Text, Text);


    public GreetReply Clone() => new() { Text = Text };
    
    
    public void InternalMergeFrom(ref ParseContext input)
    {
        uint tag;
        while ((tag = input.ReadTag()) != 0) {
            if (tag == 10)
                Text = input.ReadString();
        }
    }


    public void InternalWriteTo(ref WriteContext output)
    {
        output.WriteRawTag(10);
        output.WriteString(Text);
    }
}

Output

Waiting for messages...
Response = hello bob...
Received empty payload: End of messages
Bye!

Recording

Note, playback is half speed to make it a bit easier to follow.