NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/c
View the source code or learn how to run this example yourself

Code

#include <stdio.h>
#include <nats.h>


#define STREAM_NAME "EVENTS"
#define CONSUMER_NAME "event-consumer"
#define SUBSCRIBE_SUBJECT "event.>"
#define SUBJECT_PREFIX "event."
#define NUM_MESSAGES 5


static natsStatus publishTestMessages(jsCtx *js);
static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts);
static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts);
static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts);
static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts);


typedef natsStatus (*examplef)(jsCtx *js, jsOptions *jsOpts);


int main()
{
    natsStatus s = NATS_OK;
    natsConnection *nc = NULL;
    jsOptions jsOpts;
    jsCtx *js = NULL;
    examplef examples[] = {exampleFetch, exampleFetchRequest, exampleNamedConsumer};
    int i;
    int N = sizeof(examples) / sizeof(examples[0]);

Initialize the NATS connection and JetStream context.

    s = init(&nc, &js, &jsOpts);

Run the examples.

    for (i = 0; (i < N) && (s == NATS_OK); i++)
    {
        examplef f = examples[i];
        s = f(js, &jsOpts);
    }

Cleanup and finish.

    jsCtx_Destroy(js);
    natsConnection_Destroy(nc);
    if (s != NATS_OK)
    {
        nats_PrintLastErrorStack(stderr);
        return 1;
    }
    return 0;
}

Initialize the NATS connection and JetStream context, publish NUM_MESSAGES test messages.

static natsStatus init(natsConnection **newnc, jsCtx **newjs, jsOptions *jsOpts)
{
    natsStatus s = NATS_OK;
    natsOptions *opts = NULL;
    natsConnection *nc = NULL;


    jsCtx *js = NULL;
    jsStreamConfig cfg;
    jsStreamInfo *si = NULL;
    jsErrCode jerr = 0;
    char subject[] = SUBJECT_PREFIX "99999999999999";
    int i;

Use the env variable if running in the container, otherwise use the default.

    const char *url = getenv("NATS_URL");
    s = natsOptions_Create(&opts);
    if (s == NATS_OK && url != NULL)
    {
        s = natsOptions_SetURL(opts, url);
    }

Create an unauthenticated connection to NATS.

    if (s == NATS_OK)
        s = natsConnection_Connect(&nc, opts);
    natsOptions_Destroy(opts);

Access JetStream for managing streams and consumers as well as for publishing and consuming messages to and from the stream.

    if (s == NATS_OK)
        s = jsOptions_Init(jsOpts);
    if (s == NATS_OK)
        s = natsConnection_JetStream(&js, nc, jsOpts);

Add a simple limits-based stream.

    if (s == NATS_OK)
        s = jsStreamConfig_Init(&cfg);
    if (s == NATS_OK)
    {
        cfg.Name = STREAM_NAME;
        cfg.Subjects = (const char *[1]){SUBSCRIBE_SUBJECT};
        cfg.SubjectsLen = 1;
        s = js_AddStream(&si, js, &cfg, jsOpts, &jerr);
        jsStreamInfo_Destroy(si);
    }
    if (s == NATS_OK)
        printf("Created a stream named '%s' with 1 subject '%s'\n", STREAM_NAME, SUBSCRIBE_SUBJECT);

Publish NUM_MESSAGES messages for the examples.

    for (i = 0; (s == NATS_OK) && (i < NUM_MESSAGES); i++)
    {
        sprintf(subject, "%s%d", SUBJECT_PREFIX, i + 1);
        s = js_Publish(NULL, js, subject, "01234567890123456789012345678901234567890123456789", 50, NULL, &jerr);
    }
    if (s == NATS_OK)
        printf("Published %d messages for the example\n", NUM_MESSAGES);


    if (s == NATS_OK)
    {
        *newnc = nc;
        *newjs = js;
    }
    else
    {
        jsCtx_Destroy(js);
        natsConnection_Destroy(nc);
    }


    return s;
}

Create a pull consumer subscription and use natsSubscription_Fetch to receive messages.

static natsStatus exampleFetch(jsCtx *js, jsOptions *jsOpts)
{
    natsStatus s = NATS_OK;
    natsSubscription *sub = NULL;
    jsErrCode jerr = 0;
    natsMsgList list = {0};
    jsSubOptions so;
    int c, i, ibatch;

Create a pull consumer subscription. The durable name (4th parameter) is not supplied, so the consumer will be removed after InactiveThreshold (defaults to 5 seconds) is reached when not actively consuming messages.

    s = jsSubOptions_Init(&so);
    if (s == NATS_OK)
    {
        printf("exampleFetch: create a pull consumer and use natsSubscription_Fetch to receive messages\n");
        so.Stream = STREAM_NAME;
        s = js_PullSubscribe(&sub, js, SUBSCRIBE_SUBJECT, NULL, jsOpts, &so, &jerr);
    }

Use natsSubscription_Fetch to fetch the messages. Here we attempt to fetch a batch of up to 2 messages with a 5 second timeout, and we stop trying once the expected NUM_MESSAGES messages are successfully fetched.

Note: natsSubscription_Fetch will not wait for the timeout while we are fetching pre-buffered messages. The response time is in single ms.

Note: each fetched message must be acknowledged.

Note: natsMsgList_Destroy will destroy the fetched messages.

    for (ibatch = 0, c = 0; (s == NATS_OK) && (c < NUM_MESSAGES); ibatch++)
    {
        int64_t start = nats_Now();
        s = natsSubscription_Fetch(&list, sub, 2, 5000, &jerr);
        if (s == NATS_OK)
        {
            c += (int64_t)list.Count;
            printf("exampleFetch: batch #%d (%d messages) in %dms\n", ibatch, list.Count, (int)(nats_Now() - start));
        }
        else
        {
            printf("exampleFetch: error: %d:\n", s);
            nats_PrintLastErrorStack(stderr);
        }
        for (i = 0; (s == NATS_OK) && (i < list.Count); i++)
        {
            s = natsMsg_Ack(list.Msgs[i], jsOpts);
            printf("exampleFetch: received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i]));
        }
        natsMsgList_Destroy(&list);
    }

Attempt to fetch more messages, but this time we will wait for the 500ms timeout since there are no more pre-buffered messages.

    if (s == NATS_OK)
    {
        int64_t start = nats_Now();
        s = natsSubscription_Fetch(&list, sub, 2, 500, &jerr);
        printf("exampleFetch: extra natsSubscription_Fetch returned status %d and %d messages in %dms\n",
               s, list.Count, (int)(nats_Now() - start));
        s = NATS_OK;
    }

Cleanup.

    natsSubscription_Destroy(sub);


    return s;
}

Create another similar pull consumer subscription and use natsSubscription_FetchRequest to receive messages with more precise control.

static natsStatus exampleFetchRequest(jsCtx *js, jsOptions *jsOpts)
{
    natsStatus s = NATS_OK;
    natsSubscription *sub = NULL;
    jsErrCode jerr = 0;
    natsMsgList list = {0};
    jsSubOptions so;
    int c, i, ibatch;

Create another similar pull consumer, same scenario but using natsSubscription_FetchRequest for precise control.

    s = jsSubOptions_Init(&so);
    if (s == NATS_OK)
    {
        printf("exampleFetchRequest: create pull consumer and use natsSubscription_FetchRequest to receive messages\n");
        so.Stream = STREAM_NAME;
        s = js_PullSubscribe(&sub, js, SUBSCRIBE_SUBJECT, NULL, jsOpts, &so, &jerr);
    }

Use natsSubscription_FetchRequest to fetch the messages. We set the batch size to 1000, but MaxBytes of 300 so we will only get 2 messages at a time.

Note: Setting .NoWait causes the request to return as soon as there are some messages availabe, not necessarily the entire batch. By default, we wait .Expires time if there are not enough messages to make a full batch.

    for (ibatch = 0, c = 0; (s == NATS_OK) && (c < NUM_MESSAGES); ibatch++)
    {
        int64_t start = nats_Now();
        jsFetchRequest fr = {
            .Batch = 1000,
            /* .NoWait = true, */
            .Expires = 500 * 1000 * 1000,
            .MaxBytes = 300,
        };
        s = natsSubscription_FetchRequest(&list, sub, &fr);
        if (s == NATS_OK)
        {
            c += (int64_t)list.Count;
            printf("exampleFetchRequest: batch #%d (%d messages) in %dms\n", ibatch, list.Count, (int)(nats_Now() - start));
        }
        else
        {
            printf("exampleFetchRequest: error: %d:\n", s);
            nats_PrintLastErrorStack(stderr);
        }
        for (i = 0; (s == NATS_OK) && (i < list.Count); i++)
        {
            s = natsMsg_Ack(list.Msgs[i], jsOpts);
            if (s == NATS_OK)
                printf("exampleFetchRequest: received and acked message on %s\n", natsMsg_GetSubject(list.Msgs[i]));
        }
        natsMsgList_Destroy(&list);
    }


    natsSubscription_Destroy(sub);


    return s;
}

Create a pull consumer, then bind 2 subscriptions to it.

static natsStatus exampleNamedConsumer(jsCtx *js, jsOptions *jsOpts)
{
    natsStatus s = NATS_OK;
    jsConsumerInfo *ci = NULL;
    jsConsumerConfig cfg;
    natsSubscription *sub1 = NULL;
    natsSubscription *sub2 = NULL;
    jsErrCode jerr = 0;
    natsMsgList list = {0};
    jsSubOptions so;
    int i;
    bool done = false;


    jsConsumerConfig_Init(&cfg);
    cfg.Name = CONSUMER_NAME;
    s = js_AddConsumer(&ci, js, STREAM_NAME, &cfg, jsOpts, &jerr);
    if (s == NATS_OK)
    {
        printf("exampleNamedConsumer: create a pull consumer named '%s'\n", CONSUMER_NAME);
    }
    jsConsumerInfo_Destroy(ci);

Create a named pull consumer explicitly and subscribe to it twice.

Note: no delivery subject in js_PullSubsccribe since it will bind to the consumer by name.

Note: subscriptions are “balanced” in that each message is processed by one or the other.

    if (s == NATS_OK)
        s = jsSubOptions_Init(&so);


    if (s == NATS_OK)
    {
        printf("exampleNamedConsumer: bind 2 subscriptions to the consumer\n");
        so.Stream = STREAM_NAME;
        so.Consumer = CONSUMER_NAME;
        s = js_PullSubscribe(&sub1, js, NULL, NULL, jsOpts, &so, &jerr);
    }
    if (s == NATS_OK)
        s = js_PullSubscribe(&sub2, js, NULL, NULL, jsOpts, &so, &jerr);


    int64_t start = nats_Now();
    for (i = 0; (s == NATS_OK) && (!done); i++)
    {
        natsSubscription *sub = (i % 2 == 0) ? sub1 : sub2;
        const char *name = (i % 2 == 0) ? "sub1" : "sub2";
        start = nats_Now();


        s = natsSubscription_Fetch(&list, sub, 1, 100, &jerr);
        if ((s == NATS_OK) && (list.Count == 1))
        {
            printf("exampleNamedConsumer: fetched from %s subject '%s' in %dms\n",
                   name, natsMsg_GetSubject(list.Msgs[0]), (int)(nats_Now() - start));
            s = natsMsg_Ack(list.Msgs[0], jsOpts);
        }
        else if ((s == NATS_OK) && (list.Count != 1))
        {
            printf("exampleNamedConsumer: fetched wrong number of messages from %s: expected 1, got %d\n", name, list.Count);
            s = NATS_ERR;
        }
        else if (s == NATS_TIMEOUT && list.Count == 0)
        {
            printf("exampleNamedConsumer: got NATS_TIMEOUT from %s in %dms, no more messages for now\n", name, (int)(nats_Now() - start));
            s = NATS_OK;
            done = true;
        }
        else
        {
            printf("exampleNamedConsumer: error: %d:\n", s);
            nats_PrintLastErrorStack(stderr);
        }


        natsMsgList_Destroy(&list);
    }

Cleanup.

    natsSubscription_Drain(sub1);
    natsSubscription_Drain(sub2);
    natsSubscription_Destroy(sub1);
    natsSubscription_Destroy(sub2);


    s = js_DeleteConsumer(js, STREAM_NAME, CONSUMER_NAME, jsOpts, &jerr);
    if (s == NATS_OK)
    {
        printf("exampleNamedConsumer: deleted consumer '%s'\n", CONSUMER_NAME);
    }


    return s;
}

Output

Created a stream named 'EVENTS' with 1 subject 'event.>'
Published 5 messages for the example
exampleFetch: create a pull consumer and use natsSubscription_Fetch to receive messages
exampleFetch: batch #0 (2 messages) in 1ms
exampleFetch: received and acked message on event.1
exampleFetch: received and acked message on event.2
exampleFetch: batch #1 (2 messages) in 1ms
exampleFetch: received and acked message on event.3
exampleFetch: received and acked message on event.4
exampleFetch: batch #2 (1 messages) in 1ms
exampleFetch: received and acked message on event.5
exampleFetch: extra natsSubscription_Fetch returned status 26 and 0 messages in 491ms
exampleFetchRequest: create pull consumer and use natsSubscription_FetchRequest to receive messages
exampleFetchRequest: batch #0 (2 messages) in 1ms
exampleFetchRequest: received and acked message on event.1
exampleFetchRequest: received and acked message on event.2
exampleFetchRequest: batch #1 (2 messages) in 1ms
exampleFetchRequest: received and acked message on event.3
exampleFetchRequest: received and acked message on event.4
exampleFetchRequest: batch #2 (1 messages) in 492ms
exampleFetchRequest: received and acked message on event.5
exampleNamedConsumer: create a pull consumer named 'event-consumer'
exampleNamedConsumer: bind 2 subscriptions to the consumer
exampleNamedConsumer: fetched from sub1 subject 'event.1' in 1ms
exampleNamedConsumer: fetched from sub2 subject 'event.2' in 1ms
exampleNamedConsumer: fetched from sub1 subject 'event.3' in 1ms
exampleNamedConsumer: fetched from sub2 subject 'event.4' in 1ms
exampleNamedConsumer: fetched from sub1 subject 'event.5' in 1ms
exampleNamedConsumer: got NATS_TIMEOUT from sub2 in 91ms, no more messages for now
exampleNamedConsumer: deleted consumer 'event-consumer'

Recording

Note, playback is half speed to make it a bit easier to follow.