Queue Push Consumers (legacy) in JetStream
A queue push consumer is analogous to a core NATS queue group, but designed to work with streams. Unlike a standard push consumer which only supports a single bound subscription at any time, a queue-based one supports multiple subscriptions bound to the consumer. Messages from the consumer will be distributed randomly among active subscribers which can freely come and go.
In this example, we will demonstrate how to create a durable queue push consumer and how to bind subscriptions to receive and process messages.
Note that as of NATS server v2.8.4, ephemeral queue push consumers are not supported. This means that the server does not currently keep track of these and will auto-cleanup if no active subscriptions are bound. You can, of course, create a durable and then delete once you are done with it, but if the deletion fails to happen (program crashes), you will need to be sure to check when the starts up again.
It is recommended to review the standard push consumer example in order to understand the general concepts. The queue-based variant is a small extension to this type of consumer.
$ nbe run jetstream/queue-push-consumer/javaView the source code or learn how to run this example yourself
Code
package example;
import io.nats.client.*;
import io.nats.client.api.*;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}
Initialize a connection to the server. The connection is AutoCloseable on exit.
try (Connection nc = Nats.connect(natsURL)) {
Access JetStream
and JetStreamManagement
which provide methods to create
streams and consumers as well as convenience methods for publishing
to streams and consuming messages from the streams.
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();
Declare a simple stream.
String streamName = "EVENTS";
StreamConfiguration config = StreamConfiguration.builder()
.name(streamName)
.subjects("events.>")
.build();
StreamInfo stream = jsm.addStream(config);
Durable (implicit)
Like the standard push consumer, the JetStream
context provides
a simple way to create a queue push consumer. The only additional
argument is the “group name”.
System.out.println("# Durable (implicit)");
PushSubscribeOptions pso = ConsumerConfiguration.builder()
.durable("durable")
.deliverGroup("event-processor")
.buildPushSubscribeOptions();
JetStreamSubscription sub1 = js.subscribe("events.>", "event-processor", pso);
If we inspect the consumer info, we will notice a property that
was not defined for the non-queue push consumer. The DeliverGroup
is the unique name of the group of subscribers. Internally, this
corresponds to a core NATS queue group name which we will see
below.
ConsumerInfo info = jsm.getConsumerInfo(streamName, "durable");
ConsumerConfiguration consumerConfig = info.getConsumerConfiguration();
System.out.printf("Deliver group: '%s'\n", consumerConfig.getDeliverGroup());
Using the same helper method, we can create another subscription in the group. This method implicitly checks for whether the consumer has been created and binds to it based on the subject and group name.
JetStreamSubscription sub2 = js.subscribe("events.>", "event-processor", pso);
As noted above, a queue push consumer relies on a core NATS
queue group for distributing messages to active members. As
a result, we can bind a subscription by using the DeliverSubject
and the DeliverGroup
Since messages are publishing to a dedicated subject and is
part of a group, we can also create a core NATS subscription
to join the group. As a reminder, the DeliverSubject
is
randomly generated by default, but this can be set explicitly
in the consumer config if desired.
Subscription sub3 = nc.subscribe(consumerConfig.getDeliverSubject(), consumerConfig.getDeliverGroup());
System.out.printf("Deliver subject: '%s'\n", consumerConfig.getDeliverSubject());
Now we can publish some messages to the stream to observe how they are distributed to the subscribers.
js.publish("events.1", null);
js.publish("events.2", null);
js.publish("events.3", null);
As noted in the push consumer example, subscriptions
enqueue messages proactively. When there are a group of
subscriptions, each will receive a different subset of the messages.
When calling nextMessage
this means, messages can be processed out
of order. There is no correlation with message order and
subscription creation order 😉. In fact, it is possible that
not all subscriptions will necessarily get a message.
Message msg = sub1.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub1: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub1: receive timeout");
}
msg = sub2.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub2: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub2: receive timeout");
}
msg = sub3.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub3: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub3: receive timeout");
}
Since we created this consumer using the helper method, when we unsubscribe
(or call drain
), the consumer will be deleted.
sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();
Durable (explicit)
To create a (safe) durable consumer, use the addOrUpdateConsumer
method. Although it may seem redundant, a durable name and
the deliver group name must be defined. This is simply because
the durable name is used for all consumer types, while the
deliver group is exclusive to the queue push consumer. In this
example, the same name is used as convention which is what the helper
method above did as well.
System.out.println("\n# Durable (explicit)");
String consumerName = "event-processor";
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(consumerName)
.deliverSubject("my-subject")
.deliverGroup(consumerName)
.ackPolicy(AckPolicy.Explicit)
.build();
info = jsm.addOrUpdateConsumer(streamName, cc);
Wait for all 6 messages to be received before exiting.
CountDownLatch latch = new CountDownLatch(6);
For this part, we will use async core NATS queue subscriptions. Since
core NATS subscriptions are JetStream-unaware, we must call msg.ack
explicitly to notify the server that the message has been processed.
Dispatcher dispatcher = nc.createDispatcher();
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub1: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub2: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub3: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});
Publish some more messages.
js.publish("events.4", null);
js.publish("events.5", null);
js.publish("events.6", null);
js.publish("events.7", null);
js.publish("events.8", null);
js.publish("events.9", null);
latch.await();
} catch (InterruptedException | IOException | JetStreamApiException e) {
e.printStackTrace();
}
}
}
Output
# Durable (implicit) Deliver group: 'event-processor' Deliver subject: '_INBOX.UG16KTAeW94CBpUY0Fdin1' sub1: received message 'events.1' sub2: receive timeout sub3: received message 'events.3' # Durable (explicit) sub1: received message 'events.1' sub1: received message 'events.2' sub1: received message 'events.3' sub3: received message 'events.4' sub3: received message 'events.5' sub2: received message 'events.6' sub3: received message 'events.7' sub1: received message 'events.8' sub2: received message 'events.9'