NATS Logo by Example

Limits-based Stream in JetStream

To get started with JetStream, a stream must be created. The mental model for a stream is that it binds a set of subjects for which messages published to those subjects will be persisted.

A stream is implemented as an implicit server-side service that receives a request (the published message) and replies back once the message has been persisted.

There are handful of different kinds of streams and configuration options, but we will start with the most basic one having a limits-based retention policy. This policy is the default, however, limits still apply to streams with other retention policies.

The stream limit choices include:

  • the maximum number of messages
  • the maximum total size in bytes
  • the maximum age of a message

There is also a specialized maximum messages limit that can be applied at the subject level, but this will be demonstrated in a separate example.

By default, no limits are set which would require manually managing the ever-growing stream. However, if any of these limits satisfy how the stream should be truncated, simply turn these limits on and let the server manage everything.

In this example, we showcase the behavior or applying these limits and the flexibility of JetStream supporting dynamically changing the stream configuration on-demand.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/limits-stream/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Microsoft.Extensions.Logging.Console.

using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Create JetStream Context which provides methods to create streams and consumers as well as convenience methods for publishing to streams and consuming messages from the streams.

var js = new NatsJSContext(nats);

We will declare the initial stream configuration by specifying the name and subjects. Stream names are commonly uppercase to visually differentiate them from subjects, but this is not required. A stream can bind one or more subjects which almost always include wildcards. In addition, no two streams can have overlapping subjects otherwise the primary messages would be persisted twice.

var config = new StreamConfig(name: "EVENTS", subjects: new [] { "events.>" });

JetStream provides both file and in-memory storage options. For durability of the stream data, file storage must be chosen to survive crashes and restarts. This is the default for the stream, but we can still set it explicitly.

config.Storage = StreamConfigStorage.File;

Finally, let’s add/create the stream with the default (no) limits.

var stream = await js.CreateStreamAsync(config);

Let’s publish a few messages which are received by the stream since they match the subject bound to the stream. The js.Publish method is a convenience for sending a Request and waiting for the acknowledgement.

for (var i = 0; i < 2; i++)
{
    await js.PublishAsync<object>(subject: "events.page_loaded", data: null);
    await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
    await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
    await js.PublishAsync<object>(subject: "events.page_loaded", data: null);
    await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
    await js.PublishAsync<object>(subject: "events.input_focused", data: null);
    logger.LogInformation("Published 6 messages");
}

Checking out the stream info, we can see how many messages we have.

await PrintStreamStateAsync(stream);


var configUpdate = new StreamUpdateRequest { Name = config.Name, Subjects = config.Subjects, Storage = config.Storage };

Stream configuration can be dynamically changed. For example, we can set the max messages limit to 10 and it will truncate the two initial events in the stream.

configUpdate.MaxMsgs = 10;
await js.UpdateStreamAsync(configUpdate);
logger.LogInformation("set max messages to 10");

Checking out the info, we see there are now 10 messages and the first sequence and timestamp are based on the third message.

await PrintStreamStateAsync(stream);

Limits can be combined and whichever one is reached, it will be applied to truncate the stream. For example, let’s set a maximum number of bytes for the stream.

configUpdate.MaxBytes = 300;
await js.UpdateStreamAsync(configUpdate);
logger.LogInformation("set max bytes to 300");

Inspecting the stream info we now see more messages have been truncated to ensure the size is not exceeded.

await PrintStreamStateAsync(stream);

Finally, for the last primary limit, we can set the max age.

configUpdate.MaxAge = (long)TimeSpan.FromSeconds(1).TotalNanoseconds;
await js.UpdateStreamAsync(configUpdate);
logger.LogInformation("set max age to one second");

Looking at the stream info, we still see all the messages..

await PrintStreamStateAsync(stream);

until a second passes.

logger.LogInformation("sleeping one second...");
await Task.Delay(TimeSpan.FromSeconds(1));


await PrintStreamStateAsync(stream);
    
    

That’s it!

logger.LogInformation("Bye!");


async Task PrintStreamStateAsync(INatsJSStream jsStream)
{
    await jsStream.RefreshAsync();
    var state = jsStream.Info.State;
    logger.LogInformation("Stream has {Messages} messages using {Bytes} bytes", state.Messages, state.Bytes);
}

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NCK4CIW6RWLVL4BHIPQ2QHQHEE62XZGXDY2EHCLMJHHKUKFWJLLB7SDR, Name = NCK4CIW6RWLVL4BHIPQ2QHQHEE62XZGXDY2EHCLMJHHKUKFWJLLB7SDR, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 192.168.128.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
      Published 6 messages
info: NATS-by-Example[0]
      Published 6 messages
info: NATS-by-Example[0]
      Stream has 12 messages using 592 bytes
info: NATS-by-Example[0]
      set max messages to 10
info: NATS-by-Example[0]
      Stream has 10 messages using 494 bytes
info: NATS-by-Example[0]
      set max bytes to 300
info: NATS-by-Example[0]
      Stream has 6 messages using 296 bytes
info: NATS-by-Example[0]
      set max age to one second
info: NATS-by-Example[0]
      Stream has 6 messages using 296 bytes
info: NATS-by-Example[0]
      sleeping one second...
info: NATS-by-Example[0]
      Stream has 0 messages using 0 bytes
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.