NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/rust
View the source code or learn how to run this example yourself

Code

use std::{env, str::from_utf8};


use async_nats::jetstream::{self, consumer::PullConsumer};
use futures::StreamExt;


#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());

Create an unauthenticated connection to NATS.

    let client = async_nats::connect(nats_url).await?;

Access the JetStream Context for managing streams and consumers as well as for publishing and subscription convenience methods.

    let jetstream = jetstream::new(client);


    let stream_name = String::from("EVENTS");

Create a stream and a consumer. We can chain the methods. First we create a stream and bind to it.

    let consumer: PullConsumer = jetstream
        .create_stream(jetstream::stream::Config {
            name: stream_name,
            subjects: vec!["events.>".to_string()],
            ..Default::default()
        })
        .await?

Then, on that Stream use method to create Consumer and bind to it too.

        .create_consumer(jetstream::consumer::pull::Config {
            durable_name: Some("consumer".to_string()),
            ..Default::default()
        })
        .await?;

Publish a few messages for the example.

    for i in 0..10 {
        jetstream
            .publish(format!("events.{}", i), "data".into())
            .await?;
    }

Attach to the messages iterator for the Consumer. The iterator does its best to optimize retrival of messages from the server.

    let mut messages = consumer.messages().await?.take(10);

Iterate over messages.

    while let Some(message) = messages.next().await {
        let message = message?;
        println!(
            "got message on subject {} with payload {:?}",
            message.subject,
            from_utf8(&message.payload)?
        );

acknowledge the message

        message.ack().await?;
    }


    Ok(())
}

Output

got message on subject events.0 with payload "data"
got message on subject events.1 with payload "data"
got message on subject events.2 with payload "data"
got message on subject events.3 with payload "data"
got message on subject events.4 with payload "data"
got message on subject events.5 with payload "data"
got message on subject events.6 with payload "data"
got message on subject events.7 with payload "data"
got message on subject events.8 with payload "data"
got message on subject events.9 with payload "data"

Recording

Note, playback is half speed to make it a bit easier to follow.