Push Consumers (legacy) in JetStream
A push consumer provides alternate control to a pull consumer. Rather than fetching messages in controlled (and appropriately sized) batches to be processed, with a push consumer, the server will proactively push as many messages to the active subscription up to the consumer’s max ack pending limit.
In the happy path, the subscription receiving these messages can keep up and acknowledge the message prior to the ack wait time has elapsed.
Where push consumers can get unwieldy and confusing is when the subscriber cannot keep up, message processing errors start occurring, or the active subscription gets interrupted. Messages start getting redelivered and being interleaving with new messages pushed from the stream.
In practice, ephemeral push consumers can be a lightweight and useful way to do one-off consumption of a subset of messages in a stream. However, if you have a durable use case, it is recommended to access pull consumers first which provides more control and implicit support for scaling out consumption.
Code
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
Use the env variable if running in the container, otherwise use the default.
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
Create an unauthenticated connection to NATS.
nc, _ := nats.Connect(url)
defer nc.Drain()
Access the JetStreamContext for managing streams and consumers as well as for publishing and subscription convenience methods.
js, _ := nc.JetStream()
Declare a simple limits-based stream and populate the stream with a few messages.
streamName := "EVENTS"
js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"events.>"},
})
js.Publish("events.1", nil)
js.Publish("events.2", nil)
js.Publish("events.3", nil)
Ephemeral
The JetStreamContext provides a simple way to create an ephemeral push consumer, simply provide a subject that overlaps with the bound subjects on the stream and this helper method will do the stream look-up automatically and create the consumer. We will also make ack-ing explicit rather than relying on the default implicit ack on receive.
fmt.Println("# Ephemeral")
sub, _ := js.SubscribeSync("events.>", nats.AckExplicit())
An ephemeral consumer has a name generated on the server-side. Since there is only one consumer so far, let’s just get the first one.
ephemeralName := <-js.ConsumerNames(streamName)
fmt.Printf("ephemeral name is %q\n", ephemeralName)
Since this is a push consumer, messages will be sent by the server
and pre-buffered by this subscription. We can observe this by using
the Pending()
method. Messages are buffered asynchronously, so
this pending count may or may not be three.
queuedMsgs, _, _ := sub.Pending()
fmt.Printf("%d messages queued\n", queuedMsgs)
The maximum number of messages that will be queued is defined by the
MaxAckPending
option set on a consumer. The default is 1,000.
Let’s observe this by publishing a few more events and then check
the pending status again.
js.Publish("events.4", nil)
js.Publish("events.5", nil)
js.Publish("events.6", nil)
Let’s check if we buffered some more.
queuedMsgs, _, _ = sub.Pending()
fmt.Printf("%d messages queued\n", queuedMsgs)
To receive a message, call NextMsg
with a timeout. The timeout
applies when pending count is zero and the consumer has fully caught
up to the available messages in the stream. If no messages become
available, this call will only block until the timeout.
msg, _ := sub.NextMsg(time.Second)
fmt.Printf("received %q\n", msg.Subject)
By default, the underlying consumer requires explicit acknowledgements, otherwise messages will get redelivered.
msg.Ack()
Let’s receive and ack another.
msg, _ = sub.NextMsg(time.Second)
fmt.Printf("received %q\n", msg.Subject)
msg.Ack()
Checking out our pending information, we see there are no more than four remaining.
queuedMsgs, _, _ = sub.Pending()
fmt.Printf("%d messages queued\n", queuedMsgs)
Unsubscribing this subscription will result in the ephemeral consumer being deleted. Note, even if this is omitted and the process ends or is interrupted, the server will eventually clean-up the ephemeral when it determines the subscription is no longer active.
sub.Unsubscribe()
Durable (Helper)
We can use the same SubscribeSync
method to create a durable
consumer as well by passing nats.Durable()
. This will implicitly
create the durable if it does not exist, otherwise it will bind to
an existing one if it exists.
fmt.Println("\n# Durable (Helper)")
sub, _ = js.SubscribeSync("events.>", nats.Durable("handler-1"), nats.AckExplicit())
Let’s check out pending messages again. We should have some queued up already.
queuedMsgs, _, _ = sub.Pending()
fmt.Printf("%d messages queued\n", queuedMsgs)
Let’s receive one and ack it.
msg, _ = sub.NextMsg(time.Second)
msg.Ack()
One nuance of implicitly creating a durable using this helper
method is that if Unsubscribe
or Drain
is called, the consumer
will actually be deleted. So these helpers should only be used
if neither of those methods are called.
sub.Unsubscribe()
If we try to get the consumer info, we will see it no longer exists.
_, err := js.ConsumerInfo("EVENTS", "handler-1")
fmt.Println(err)
Durable (AddConsumer)
A more explicit and safer way to create durables is using js.AddConsumer
.
For push consumers, we must provide a DeliverSubject
which is the
subject messages will be published to (pushed) for a subscription to
receive them.
fmt.Println("\n# Durable (AddConsumer)")
consumerName := "handler-2"
js.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: consumerName,
DeliverSubject: "handler-2",
AckPolicy: nats.AckExplicitPolicy,
AckWait: time.Second,
})
Now that the consumer is created, we need to bind a client subscription
to it which will receive and process the messages. This can be done
using the nats.Bind
subscription option which requires the consumer
to have been pre-created. The subject can be omitted since that was
already defined on the consumer. Subscriptions to consumers cannot
independently define their own subject to filter on.
sub, _ = js.SubscribeSync("", nats.Bind(streamName, consumerName))
The next step is to receive a message which can be done using
the NextMsg
method. The passed duration is the amount of time
to wait before until a message is received. This is received because
SubscribeSync
is the synchronous form of a push consumer
subscription. There is also the Subscribe
variant which takes
a nats.MsgHandler
function to receive and process messages
asynchronously, but that will be described in a different example.
msg, _ = sub.NextMsg(time.Second)
fmt.Printf("received %q\n", msg.Subject)
Let’s ack the message and check out the pending count which will have a few buffered as shown above.
msg.Ack()
queuedMsgs, _, _ = sub.Pending()
fmt.Printf("%d messages queued\n", queuedMsgs)
If we unsubscribe, what happens to these pending messages?
From the client’s perspective they are effectively dropped. This behavior
would be true if the client crashed for some reason.
From the server’s perspective it is going to wait until AckWait
before attempting to re-deliver them. However, it will only re-deliver
if there is an active subscription.
sub.Unsubscribe()
If we check out the consumer info, we can pull out a few interesting bits of information. The first one is that the consumer tracks the sequence of the last message in the stream that a delivery was attempted for. The second is that it maintains its own sequence to track delivery attempts. These should not be treated as correlated since the consumer sequence for a given message will increment on each delivery attempt. The “num ack pending” indicates how many messages have been delivered and awaiting an acknowledgement. Since we ack’ed one already, there are five remaining. The final one to note here are the number of redeliveries. Since these messages have been only delivered once (so far) for this consumer this value is zero.
info, _ := sub.ConsumerInfo()
fmt.Printf("max stream sequence delivered: %d\n", info.Delivered.Stream)
fmt.Printf("max consumer sequence delivered: %d\n", info.Delivered.Consumer)
fmt.Printf("num ack pending: %d\n", info.NumAckPending)
fmt.Printf("num redelivered: %d\n", info.NumRedelivered)
If we create a new subscription and attempt to get a message before the AckWait, we will get a timeout since the messages are still pending.
sub, _ = js.SubscribeSync("", nats.Bind(streamName, consumerName))
_, err = sub.NextMsg(100 * time.Millisecond)
fmt.Printf("received timeout? %v\n", err == nats.ErrTimeout)
Let’s try again and wait a bit longer beyond the AckWait. We can also see that the delivery attempt on the message is now 2.
msg, _ = sub.NextMsg(time.Second)
md, _ := msg.Metadata()
fmt.Printf("received %q (delivery #%d)\n", msg.Subject, md.NumDelivered)
msg.Ack()
We can see how the numbers changed by viewing the consumer info again.
info, _ = sub.ConsumerInfo()
fmt.Printf("max stream sequence delivered: %d\n", info.Delivered.Stream)
fmt.Printf("max consumer sequence delivered: %d\n", info.Delivered.Consumer)
fmt.Printf("num ack pending: %d\n", info.NumAckPending)
fmt.Printf("num redelivered: %d\n", info.NumRedelivered)
}
Output
# Ephemeral ephemeral name is "M7FXHHUF" 3 messages queued 6 messages queued received "events.1" received "events.2" 4 messages queued # Durable (Helper) 0 messages queued nats: consumer not found # Durable (AddConsumer) received "events.1" 5 messages queued max stream sequence delivered: 6 max consumer sequence delivered: 6 num ack pending: 5 num redelivered: 0 received timeout? true received "events.2" (delivery #2) max stream sequence delivered: 6 max consumer sequence delivered: 11 num ack pending: 4 num redelivered: 4