NATS Logo by Example

Limits-based Stream in JetStream

To get started with JetStream, a stream must be created. The mental model for a stream is that it binds a set of subjects for which messages published to those subjects will be persisted.

A stream is implemented as an implicit server-side service that receives a request (the published message) and replies back once the message has been persisted.

There are handful of different kinds of streams and configuration options, but we will start with the most basic one having a limits-based retention policy. This policy is the default, however, limits still apply to streams with other retention policies.

The stream limit choices include:

  • the maximum number of messages
  • the maximum total size in bytes
  • the maximum age of a message

There is also a specialized maximum messages limit that can be applied at the subject level, but this will be demonstrated in a separate example.

By default, no limits are set which would require manually managing the ever-growing stream. However, if any of these limits satisfy how the stream should be truncated, simply turn these limits on and let the server manage everything.

In this example, we showcase the behavior or applying these limits and the flexibility of JetStream supporting dynamically changing the stream configuration on-demand.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/limits-stream/deno
View the source code or learn how to run this example yourself

Code

import the library - in node.js import {connect, etc} from "nats"; or if not doing a module, const {connect, etc} = require("nats");

import {
  connect,
  nanos,
} from "https://deno.land/x/nats@v1.16.0/src/mod.ts";

Get the passed NATS_URL or fallback to the default. This can be a comma-separated string.

const servers = Deno.env.get("NATS_URL") || "nats://localhost:4222";

Create a client connection to an available NATS server.

const nc = await connect({
  servers: servers.split(","),
});



Access the JetStream manager which provides the methods for managing streams and consumers.

const jsm = await nc.jetstreamManager();

Declare the initial stream config. A stream can bind one or more subjects that are not overlapping with other streams. By default, a stream will have one replica and use file storage.

const cfg = {
  name: "EVENTS",
  subjects: ["events.>"],
};

Add/create the stream.

await jsm.streams.add(cfg)
console.log("created the stream")

Access the JetStream client for publishing and subscribing to streams.

const js = nc.jetstream();

Publish a series of messages and wait for each one to be completed.

await js.publish("events.page_loaded");
await js.publish("events.mouse_clicked");
await js.publish("events.mouse_clicked");
await js.publish("events.page_loaded");
await js.publish("events.mouse_clicked");
await js.publish("events.input_focused");
console.log("published 6 messages");


const events = [
  "events.input_changed",
  "events.input_blurred",
  "events.key_pressed",
  "events.input_focused",
  "events.input_changed",
  "events.input_blurred",
];

Map over the events which returns a set of promises as a batch. Then wait until all of them are done before proceeding.

const batch = events.map((e) => js.publish(e));
await Promise.all(batch);
console.log("published another 6 messages");

Get the stream state to show 12 messages exist.

let info = await jsm.streams.info(cfg.name);
console.log(info.state);

Let’s update the stream config and set the max messages to 10. This can be done with a partial config passed to the update method.

await jsm.streams.update(cfg.name, {max_msgs: 10});

Once applied, we can check out the stream state again to see that the first two messages were truncated.

info = await jsm.streams.info(cfg.name);
console.log(info.state);

Updating the stream config again, we can set the max bytes of the stream overall.

await jsm.streams.update(cfg.name, {max_bytes: 300});

This will prune the messages down some more…

info = await jsm.streams.info(cfg.name);
console.log(info.state);

Finally the last limit of max_age can be applied. Note the age is in nanoseconds, so 1000 milliseconds (1 second) converted to nanos.

await jsm.streams.update(cfg.name, {max_age: nanos(1000), duplicate_window: nanos(1000)});

Sleep for a second to ensure the message age in the stream has lapsed.

await new Promise(r => setTimeout(r, 1000));


info = await jsm.streams.info(cfg.name);
console.log(info.state);

Finally we drain the connection which waits for any pending messages (published or in a subscription) to be flushed.

await nc.drain();

Output

created the stream
published 6 messages
published another 6 messages
{
  messages: 12,
  bytes: 594,
  first_seq: 1,
  first_ts: "2023-03-16T13:36:44.263785233Z",
  last_seq: 12,
  last_ts: "2023-03-16T13:36:44.273875923Z",
  num_subjects: 6,
  consumer_count: 0
}
{
  messages: 10,
  bytes: 496,
  first_seq: 3,
  first_ts: "2023-03-16T13:36:44.266984573Z",
  last_seq: 12,
  last_ts: "2023-03-16T13:36:44.273875923Z",
  num_subjects: 6,
  consumer_count: 0
}
{
  messages: 6,
  bytes: 298,
  first_seq: 7,
  first_ts: "2023-03-16T13:36:44.273813303Z",
  last_seq: 12,
  last_ts: "2023-03-16T13:36:44.273875923Z",
  num_subjects: 4,
  consumer_count: 0
}
{
  messages: 0,
  bytes: 0,
  first_seq: 13,
  first_ts: "1970-01-01T00:00:00Z",
  last_seq: 12,
  last_ts: "2023-03-16T13:36:44.273875923Z",
  consumer_count: 0
}

Recording

Note, playback is half speed to make it a bit easier to follow.