NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/deno
View the source code or learn how to run this example yourself

Code

import the library - in node.js import {connect, etc} from "nats"; or if not doing a module, const {connect, etc} = require("nats");

import {
  AckPolicy,
  connect,
  millis,
  nuid,
} from "https://deno.land/x/nats@v1.16.0/src/mod.ts";

Get the passed NATS_URL or fallback to the default. This can be a comma-separated string.

const servers = Deno.env.get("NATS_URL") || "nats://localhost:4222";

Create a client connection to an available NATS server.

const nc = await connect({
  servers: servers.split(","),
});

access JetStream

const js = nc.jetstream();

CRUD operations in jetstream are performed by the JetStreamManager:

const jsm = await js.jetstreamManager();

Creating the stream

Define the stream configuration, specifying RetentionPolicy.Interest for retention, and create the stream.

make the stream/subjects unique

const subj = nuid.next();
const name = `EVENTS_${subj}`;
await jsm.streams.add({
  name,
  subjects: [`${subj}.>`],
});

Publish a few messages for the example.

await Promise.all([
  js.publish(`${subj}.1`),
  js.publish(`${subj}.2`),
  js.publish(`${subj}.3`),
]);

The new consumer API is a pull consumer Let’s create an ephemeral consumer. An ephemeral consumer will be reaped by the server when inactive for some time

let ci = await jsm.consumers.add(name, { ack_policy: AckPolicy.None });

by simply specifying the name of the stream

const c = await js.consumers.get(name, ci.name);
console.log(
  "ephemeral consumer will live until inactivity of ",
  millis((await c.info(true)).config.inactive_threshold),
  "millis",
);

you can retrieve messages one at time with next():

let m = await c.next();
console.log(m.subject);


m = await c.next();
console.log(m.subject);


m = await c.next();
console.log(m.subject);

Let’s create another consumer, this time well use fetch we’ll make this a durable

await jsm.consumers.add(name, {
  ack_policy: AckPolicy.Explicit,
  durable_name: "A",
});

by simply specifying the name of the stream

const c2 = await js.consumers.get(name, "A");


let iter = await c2.fetch({ max_messages: 3 });
for await (const m of iter) {
  console.log(m.subject);
  m.ack();
}

if you know you don’t need to save the state of the consumer, you can delete it:

await c2.delete();

Lastly we’ll create another one but this time use consume this consumer will be an ordered consumer - this one is an ephemeral that guarantees that messages are delivered in order These have a special shortcut, we only need the name of the stream the underlying consumer is managed under the covers

const c3 = await js.consumers.get(name);


iter = await c3.consume({ max_messages: 3 });
for await (const m of iter) {
  console.log(m.subject);

if we don’t break, consume would keep waiting for messages we know when we have seen all messages when no more are pending

  if (m.info.pending === 0) {
    break;
  }
}


await nc.drain();

Output

[?25l[+] Building 0.0s (0/0)                                                                                                                                                                                                             
[?25h[?25l[+] Building 0.0s (0/0)                                                                                                                                                                                                             
[?25hephemeral consumer will live until inactivity of  5000 millis
5BBBFRAUSKLDYENVH7E8K8.1
5BBBFRAUSKLDYENVH7E8K8.2
5BBBFRAUSKLDYENVH7E8K8.3
5BBBFRAUSKLDYENVH7E8K8.1
5BBBFRAUSKLDYENVH7E8K8.2
5BBBFRAUSKLDYENVH7E8K8.3
5BBBFRAUSKLDYENVH7E8K8.1
5BBBFRAUSKLDYENVH7E8K8.2
5BBBFRAUSKLDYENVH7E8K8.3

Recording

Note, playback is half speed to make it a bit easier to follow.