NATS Logo by Example

Confirmed Message Ack in JetStream

A confirmed message ack means that the client waits for an ack from the server to ensure that the ack was received and processed. The functionality can be found in various clients under the following:

NameClients
ack ackJavascript
double ackRust, C# .NET V2
ack syncGo, Python, Java, C

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/ack-ack/deno
View the source code or learn how to run this example yourself

Code

import {
  AckPolicy,
  nuid,
  connect
} from "https://deno.land/x/nats@v1.28.0/src/mod.ts";


const servers = Deno.env.get("NATS_URL")?.split(",");


const nc = await connect({ servers });


const jsm = await nc.jetstreamManager();
const js = nc.jetstream();

Create the stream

const subj = nuid.next();
const name = `EVENTS_${subj}`;
await jsm.streams.add({
  name,
  subjects: [subj]
});

Publish a couple messages so we can look at the state

await js.publish(subj)
await js.publish(subj)

Consume a message with 2 different consumers The first consumer will (regular) ack without confirmation The second consumer will ackSync which confirms that ack was handled.

Consumer 1 will use ack()

const ci1 = await jsm.consumers.add(name, {
  name: "consumer1",
  filter_subject: subj,
  ack_policy: AckPolicy.Explicit
});
console.log("Consumer 1");
console.log("  Start");
console.log(`    pending messages: ${ci1.num_pending}`);
console.log(`    messages with ack pending: ${ci1.num_ack_pending}`);


const consumer1 = await js.consumers.get(name, "consumer1");


try {
  const m = await consumer1.next();
  if (m) {
    let ci1= await consumer1.info(false);
    console.log("  After received but before ack");
    console.log(`    pending messages: ${ci1.num_pending}`);
    console.log(`    messages with ack pending: ${ci1.num_ack_pending}`);


    m.ack()
    ci1 = await consumer1.info(false);
    console.log("  After ack");
    console.log(`    pending messages: ${ci1.num_pending}`);
    console.log(`    messages with ack pending: ${ci1.num_ack_pending}`);
  }
} catch (err) {
  console.log(`consume failed: ${err.message}`);
}



Consumer 2 will use ackAck()

const ci2 = await jsm.consumers.add(name, {
  name: "consumer2",
  filter_subject: subj,
  ack_policy: AckPolicy.Explicit
});
console.log("Consumer 2");
console.log("  Start");
console.log(`    pending messages: ${ci2.num_pending}`);
console.log(`    messages with ack pending: ${ci2.num_ack_pending}`);


const consumer2 = await js.consumers.get(name, "consumer2");


try {
  const m = await consumer2.next();
  if (m) {
    let ci2= await consumer2.info(false);
    console.log("  After received but before ack");
    console.log(`    pending messages: ${ci2.num_pending}`);
    console.log(`    messages with ack pending: ${ci2.num_ack_pending}`);


    await m.ackAck()
    ci2 = await consumer2.info(false);
    console.log("  After ack");
    console.log(`    pending messages: ${ci2.num_pending}`);
    console.log(`    messages with ack pending: ${ci2.num_ack_pending}`);
  }
} catch (err) {
  console.log(`consume failed: ${err.message}`);
}


await nc.close();

Output

Consumer 1
  Start
    pending messages: 2
    messages with ack pending: 0
  After received but before ack
    pending messages: 1
    messages with ack pending: 1
  After ack
    pending messages: 1
    messages with ack pending: 1
Consumer 2
  Start
    pending messages: 2
    messages with ack pending: 0
  After received but before ack
    pending messages: 1
    messages with ack pending: 1
  After ack
    pending messages: 1
    messages with ack pending: 0

Recording

Note, playback is half speed to make it a bit easier to follow.