Limits-based Stream in JetStream
To get started with JetStream, a stream must be created. The mental model for a stream is that it binds a set of subjects for which messages published to those subjects will be persisted.
A stream is implemented as an implicit server-side service that receives a request (the published message) and replies back once the message has been persisted.
There are handful of different kinds of streams and configuration options, but we will start with the most basic one having a limits-based retention policy. This policy is the default, however, limits still apply to streams with other retention policies.
The stream limit choices include:
- the maximum number of messages
- the maximum total size in bytes
- the maximum age of a message
There is also a specialized maximum messages limit that can be applied at the subject level, but this will be demonstrated in a separate example.
By default, no limits are set which would require manually managing the ever-growing stream. However, if any of these limits satisfy how the stream should be truncated, simply turn these limits on and let the server manage everything.
In this example, we showcase the behavior or applying these limits and the flexibility of JetStream supporting dynamically changing the stream configuration on-demand.
$ nbe run jetstream/limits-stream/rustView the source code or learn how to run this example yourself
Code
use std::{future::IntoFuture, time::Duration};
use async_nats::jetstream::{self, stream::StorageType};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
Use the env variable if running in the container, otherwise use the default.
let nats_url =
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
Create an unauthenticated connection to NATS.
let client = async_nats::connect(nats_url).await?;
Access jetstream::context
to use the JS APIs.
let jetstream = jetstream::new(client);
We will declare the initial stream configuration by specifying the name and subjects. Stream names are commonly uppercased to visually differentiate them from subjects, but this is not required. A stream can bind one or more subjects which almost always include wildcards. In addition, no two streams can have overlapping subjects otherwise the primary messages would be persisted twice. There are options to replicate messages in various ways, but that will be explained in later examples.
let mut stream_config = jetstream::stream::Config {
name: "EVENTS".to_string(),
subjects: vec!["events.>".to_string()],
..Default::default()
};
JetStream provides both file and in-memory storage options. For durability of the stream data, file storage must be chosen to survive crashes and restarts. This is the default for the stream, but we can still set it explicitly.
stream_config.storage = StorageType::File;
Finally, let’s add/create the stream with the default (no) limits. We’re cloning the config as we will use it later in the example.
let mut stream = jetstream.create_stream(stream_config.clone()).await?;
println!("created the stream");
Let’s publish a few messages which are received by the stream since
they match the subject bound to the stream. The jetstream.publish()
sends a request
to the stream and returns ack future that may be awaited at any time.
Let’s first publish few messages immediately awaiting the ack.
Send the publish request.
jetstream
.publish("events.page_loaded", "".into())
.await?
And wait for acknowledgement.
.await?;
jetstream
.publish("events.input_blurred", "".into())
.await?
.await?;
jetstream
.publish("events.mouse_clicked")
.await?
.await?;
jetstream
.publish("events.page_loaded", "".into())
.await?
.await?;
jetstream
.publish("events.mouse_clicked", "".into())
.await?
.await?;
jetstream
.publish("events.input_focused", "".into())
.await?
.await?;
println!("published 6 messages");
Acked can also be processed later. There are many possible patterns in Rust to handle that and the best approach should be matched to given needs. Below example is one of the most straightforward ones. It publishes messages and gathers all acks in a Vec. Then, it awaits all ack futures to complete.
Create a vector of ack futures.
let mut acks = Vec::new();
publish the messages and push it’s Ack future into to Vec.
acks.push(
jetstream
.publish("events.input_changed", "".into())
.await?
.into_future(),
);
acks.push(
jetstream
.publish("events.input_blurred", "".into())
.await?
.into_future(),
);
acks.push(
jetstream
.publish("events.key_pressed", "".into())
.await?
.into_future(),
);
acks.push(
jetstream
.publish("events.input_focused", "".into())
.await?
.into_future(),
);
acks.push(
jetstream
.publish("events.input_changed", "".into())
.await?
.into_future(),
);
acks.push(
jetstream
.publish("events.input_blurred", "".into())
.await?
.into_future(),
);
Await all acks to complete. This methods will fail if any ack will fail.
match futures::future::try_join_all(acks).await {
Ok(_acks) => println!("published 6 messages"),
Err(err) => panic!("failed to ack all messages: {}", err),
}
Checking out the stream info, we can see how many messages we have.
println!("{:#?}", stream.info().await?);
Stream configuration can be dynamically changed. For example, we can set the max messages limit to 10 and it will truncate the two initial events in the stream.
stream_config.max_messages = 10;
jetstream.update_stream(stream_config.clone()).await?;
println!("set max messages to 10");
Checking out the info, we see there are now 10 messages and the first sequence and timestamp are based on the third message.
println!("{:#?}", stream.info().await?);
Limits can be combined and whichever one is reached, it will be applied to truncate the stream. For example, let’s set a maximum number of bytes for the stream.
stream_config.max_bytes = 300;
jetstream.update_stream(stream_config.clone()).await?;
println!("set max bytes to 300");
Inspecting the stream info we now see more messages have been truncated to ensure the size is not exceeded.
println!("{:#?}", stream.info().await?);
Finally, for the last primary limit, we can set the max age.
stream_config.max_age = Duration::from_secs(1);
jetstream.update_stream(stream_config.clone()).await?;
println!("set max age to one second");
Looking at the stream info, we still see all the messages…
println!("{:#?}", stream.info().await?);
until a second passes.
println!("sleeping one second...");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("{:#?}", stream.info().await?);
Ok(())
}
Output
created the stream published 6 messages published 6 messages Info { config: Config { name: "EVENTS", max_bytes: -1, max_messages: -1, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: Limits, max_consumers: -1, max_age: 0ns, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 120s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:18:59.351245173 +00:00:00, state: State { messages: 12, bytes: 594, first_sequence: 1, first_timestamp: 2023-10-23 16:18:59.352574881 +00:00:00, last_sequence: 12, last_timestamp: 2023-10-23 16:18:59.354391923 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NAVVSZMRCBDROOPHNFSXWDBQPJWQ36BU46EV3WZCD3SGTWKPGHQ3RAPK", ), replicas: [], }, ), mirror: None, sources: [], } set max messages to 10 Info { config: Config { name: "EVENTS", max_bytes: -1, max_messages: 10, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: Limits, max_consumers: -1, max_age: 0ns, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 120s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:18:59.351245173 +00:00:00, state: State { messages: 10, bytes: 496, first_sequence: 3, first_timestamp: 2023-10-23 16:18:59.353378756 +00:00:00, last_sequence: 12, last_timestamp: 2023-10-23 16:18:59.354391923 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NAVVSZMRCBDROOPHNFSXWDBQPJWQ36BU46EV3WZCD3SGTWKPGHQ3RAPK", ), replicas: [], }, ), mirror: None, sources: [], } set max bytes to 300 Info { config: Config { name: "EVENTS", max_bytes: 300, max_messages: 10, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: Limits, max_consumers: -1, max_age: 0ns, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 120s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:18:59.351245173 +00:00:00, state: State { messages: 6, bytes: 298, first_sequence: 7, first_timestamp: 2023-10-23 16:18:59.354376215 +00:00:00, last_sequence: 12, last_timestamp: 2023-10-23 16:18:59.354391923 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NAVVSZMRCBDROOPHNFSXWDBQPJWQ36BU46EV3WZCD3SGTWKPGHQ3RAPK", ), replicas: [], }, ), mirror: None, sources: [], } set max age to one second Info { config: Config { name: "EVENTS", max_bytes: 300, max_messages: 10, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: Limits, max_consumers: -1, max_age: 1s, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 1s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:18:59.351245173 +00:00:00, state: State { messages: 6, bytes: 298, first_sequence: 7, first_timestamp: 2023-10-23 16:18:59.354376215 +00:00:00, last_sequence: 12, last_timestamp: 2023-10-23 16:18:59.354391923 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NAVVSZMRCBDROOPHNFSXWDBQPJWQ36BU46EV3WZCD3SGTWKPGHQ3RAPK", ), replicas: [], }, ), mirror: None, sources: [], } sleeping one second... Info { config: Config { name: "EVENTS", max_bytes: 300, max_messages: 10, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: Limits, max_consumers: -1, max_age: 1s, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 1s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:18:59.351245173 +00:00:00, state: State { messages: 0, bytes: 0, first_sequence: 13, first_timestamp: 1970-01-01 0:00:00.0 +00:00:00, last_sequence: 12, last_timestamp: 2023-10-23 16:18:59.354391923 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NAVVSZMRCBDROOPHNFSXWDBQPJWQ36BU46EV3WZCD3SGTWKPGHQ3RAPK", ), replicas: [], }, ), mirror: None, sources: [], }