NATS Logo by Example

Limits-based Stream in JetStream

To get started with JetStream, a stream must be created. The mental model for a stream is that it binds a set of subjects for which messages published to those subjects will be persisted.

A stream is implemented as an implicit server-side service that receives a request (the published message) and replies back once the message has been persisted.

There are handful of different kinds of streams and configuration options, but we will start with the most basic one having a limits-based retention policy. This policy is the default, however, limits still apply to streams with other retention policies.

The stream limit choices include:

  • the maximum number of messages
  • the maximum total size in bytes
  • the maximum age of a message

There is also a specialized maximum messages limit that can be applied at the subject level, but this will be demonstrated in a separate example.

By default, no limits are set which would require manually managing the ever-growing stream. However, if any of these limits satisfy how the stream should be truncated, simply turn these limits on and let the server manage everything.

In this example, we showcase the behavior or applying these limits and the flexibility of JetStream supporting dynamically changing the stream configuration on-demand.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/limits-stream/crystal
View the source code or learn how to run this example yourself

Code

require "nats"
require "nats/jetstream"

Get the NATS_URL from the environment or fallback to the default. This can be a comma-separated string. We convert it to an Array(URI) to pass to the NATS client.

servers = ENV.fetch("NATS_URL", "nats://localhost:4222")
  .split(',')
  .map { |url| URI.parse(url) }

Create a client connection to an available NATS server.

nats = NATS::Client.new(servers)

When the program exits, we close the NATS client which waits for any pending messages (published or in a subscription) to be flushed.

at_exit { nats.close }


js = nats.jetstream

Here we create the EVENTS stream that listens on all subjects matching events.> (all subjects starting with events.), stored on the filesystem for durability, and can contain up to 10 messages. The default discard policy will discard old messages when we exceed that limit.

stream = js.stream.create(
  name: "EVENTS",
  subjects: %w[events.>],
  storage: :file,
  max_msgs: 10_i64,
)

We publish 11 messages to our 10-message stream, so we will exceed our limit. As mentioned above, this will discard older messages when new ones come in.

event_types = %w[
  page_loaded
  mouse_clicked
  input_focused
]
11.times do
  type = event_types.sample
  subject = "events.#{type}"
  puts "Publishing #{subject}..."
  js.publish(subject, "")
end

When we fetch the current stream state from the server, we see that there are indeed only 10 messages in the stream, with sequence numbers 2..11. This indicates that the first message in our stream with a capacity of 10 was dropped to make room for the 11th message.

name = stream.config.name
if stream = js.stream.info(name)
  pp stream.state
else
  raise "Could not find stream: #{name}"
end

Output

Publishing events.input_focused...
Publishing events.input_focused...
Publishing events.page_loaded...
Publishing events.input_focused...
Publishing events.mouse_clicked...
Publishing events.mouse_clicked...
Publishing events.page_loaded...
Publishing events.page_loaded...
Publishing events.page_loaded...
Publishing events.page_loaded...
Publishing events.page_loaded...
NATS::JetStream::API::V1::StreamState(
 @bytes=648,
 @consumer_count=0,
 @first_seq=2,
 @first_ts=2023-03-16 13:36:38.572160660 UTC,
 @last_seq=11,
 @last_ts=2023-03-16 13:36:38.573442838 UTC,
 @messages=10)

Recording

Note, playback is half speed to make it a bit easier to follow.