NATS Logo by Example

Consumer - Fetch Messages in JetStream

The new JetStream API provides simplified semantics for JetStream asset management and message consumption. It removes the complexity of Subscribe() in favor of more explicit separation of creating consumers and consuming messages.

This example demonstrates how to Fetch messages with the new API.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/consumer-fetch-messages/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.*;
import io.nats.client.api.*;


import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;


public class Main {
  public static void main(String[] args) {
    String natsURL = System.getenv("NATS_URL");
    if (natsURL == null) {
      natsURL = "nats://127.0.0.1:4222";
    }


    try (Connection conn = Nats.connect(natsURL)) {
      System.out.println("\nA. Prepare Example Stream and Consumers");

The JetStream and JetStreamManagement

The JetStreamManagement context provides the ability to create and manage streams. The JetStream context provides the ability to publish messages.

      JetStream js = conn.jetStream();
      JetStreamManagement jsm = conn.jetStreamManagement();

Create a stream and populate the stream with a few messages.

      String streamName = "fetch";
      jsm.addStream(StreamConfiguration.builder()
          .name(streamName)
          .storageType(StorageType.Memory)
          .subjects("events.>")
          .build());

publish some messages to the stream

      js.publish("events.1", "e1m1".getBytes());
      js.publish("events.2", "e2m1".getBytes());
      js.publish("events.1", "e1m2".getBytes());
      js.publish("events.2", "e2m2".getBytes());

Although you can make consumers on the fly, typically consumers will be created ahead of time.

      ConsumerConfiguration cc = ConsumerConfiguration.builder().name("onlyEvents1").filterSubject("events.1").build();
      jsm.addOrUpdateConsumer(streamName, cc);
      cc = ConsumerConfiguration.builder().name("allEvents").filterSubject("events.*").build();
      jsm.addOrUpdateConsumer(streamName, cc);

Simplified JetStream API

The simplified API has a StreamContext for accessing existing streams, creating consumers, and getting a ConsumerContext. The StreamContext can be created from the Connection similar to the legacy API.

      System.out.println("\nB. Use Simplification StreamContext");
      StreamContext streamContext = conn.getStreamContext(streamName);
      StreamInfo streamInfo = streamContext.getStreamInfo(StreamInfoOptions.allSubjects());


      System.out.println("   Stream Name: " + streamInfo.getConfiguration().getName());
      System.out.println("   Stream Subjects: " + streamInfo.getStreamState().getSubjects());
      System.out.println("   Stream Message Count: " + streamInfo.getStreamState().getMsgCount());

Creating a consumer from the stream context

To create an ephemeral consumer, the createOrUpdateConsumer method can be used with a bare ConsumerConfiguration object.

Getting a consumer from the stream context

If your consumer already exists as a durable, you can create a ConsumerContext for that consumer from the stream context or directly from the connection by providing the stream and consumer name.

      System.out.println("\nC. Simplification Consumer Context");
      ConsumerContext consumerContext1 = streamContext.getConsumerContext("onlyEvents1");
      ConsumerInfo consumerInfo1 = consumerContext1.getCachedConsumerInfo();


      System.out.println("   The ConsumerContext for \"" + consumerInfo1.getName() + "\" was loaded from the StreamContext for \"" + consumerInfo1.getStreamName() + "\"");
      System.out.println("   The consumer has " + consumerInfo1.getNumPending() + " messages available.");


      ConsumerContext consumerContext2 = streamContext.getConsumerContext("allEvents");
      ConsumerInfo consumerInfo2 = consumerContext2.getCachedConsumerInfo();


      System.out.println("\n   The ConsumerContext for \"" + consumerInfo2.getName() + "\" was loaded from the StreamContext for \"" + consumerInfo2.getStreamName() + "\"");
      System.out.println("   The consumer has " + consumerInfo2.getNumPending() + " messages available.");

Retrieving messages on demand with fetch

FetchConsumer

A FetchConsumer is returned when you call the fetch methods on ConsumerContext. You will use that object to call nextMessage. Notice there is no stop on the FetchConsumer interface, the fetch stops by itself. The new version of fetch is very similar to the old iterate, as it does not block before returning the entire batch.

      System.out.println("\nD. FetchConsumer");
      System.out.println("   The consumer name is \"" + consumerInfo1.getName() + "\".");
      System.out.println("   The consumer has " + consumerInfo1.getNumPending() + " messages available.");


      long start = System.currentTimeMillis();
      long elapsed;
      try (FetchConsumer fetchConsumer = consumerContext1.fetchMessages(2)) {
        elapsed = System.currentTimeMillis() - start;
        System.out.println("   The 'fetch' method call returned in " + elapsed + "ms.");

fetch will return null once there are no more messages to consume.

        try {
          Message msg = fetchConsumer.nextMessage();
          while (msg != null) {
            String data = new String(msg.getData());
            System.out.println("   Processing " + msg.getSubject() + " '" + data);
            msg.ack();
            msg = fetchConsumer.nextMessage();
          }
        }
        catch (JetStreamStatusCheckedException se) {
          System.out.println("   JetStreamStatusCheckedException: " + se.getMessage());
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      elapsed = System.currentTimeMillis() - start;
      System.out.println("   Fetch complete in " + elapsed + "ms.");


      System.out.println("\n   The consumer name is \"" + consumerInfo2.getName() + "\".");
      System.out.println("   The consumer has " + consumerInfo2.getNumPending() + " messages available.");


      start = System.currentTimeMillis();
      try (FetchConsumer fetchConsumer = consumerContext2.fetchMessages(2)) {
        elapsed = System.currentTimeMillis() - start;
        System.out.println("   The 'fetch' method call returned in " + elapsed + "ms.");

fetch will return null once there are no more messages to consume.

        try {
          Message msg = fetchConsumer.nextMessage();
          while (msg != null) {
            elapsed = System.currentTimeMillis() - start;
            String data = new String(msg.getData());
            System.out.println("   Processing " + msg.getSubject() + " '" + data);
            msg.ack();
            msg = fetchConsumer.nextMessage();
          }
        }
        catch (JetStreamStatusCheckedException se) {
          System.out.println("   JetStreamStatusCheckedException: " + se.getMessage());
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      elapsed = System.currentTimeMillis() - start;
      System.out.println("   Fetch complete in " + elapsed + "ms.");
    }
    catch (JetStreamApiException | IOException | InterruptedException e) {
  • JetStreamApiException: the stream or consumer did not exist
  • IOException: problem making the connection
  • InterruptedException: thread interruption in the body of the example
      System.err.println(e);
    }
  }
}

Output

A. Prepare Example Stream and Consumers

B. Use Simplification StreamContext
   Stream Name: fetch
   Stream Subjects: [Subject{name='events.1', count=2}, Subject{name='events.2', count=2}]
   Stream Message Count: 4

C. Simplification Consumer Context
   The ConsumerContext for "onlyEvents1" was loaded from the StreamContext for "fetch"
   The consumer has 2 messages available.

   The ConsumerContext for "allEvents" was loaded from the StreamContext for "fetch"
   The consumer has 4 messages available.

D. FetchConsumer
   The consumer name is "onlyEvents1".
   The consumer has 2 messages available.
   The 'fetch' method call returned in 7ms.
   Processing events.1 'e1m1
   Processing events.1 'e1m2
   Fetch complete in 16ms.

   The consumer name is "allEvents".
   The consumer has 4 messages available.
   The 'fetch' method call returned in 2ms.
   Processing events.1 'e1m1
   Processing events.2 'e2m1
   Fetch complete in 5ms.

Recording

Note, playback is half speed to make it a bit easier to follow.