NATS Logo by Example

Object-Store Intro in Object-Store

The object-store (OS) capability in NATS is an abstraction over a stream which models message subjects as keys similar to KV, but with payloads that span multiple chunks. This allows for assets that are larger, and are typically loaded and read as readable/writable streams.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run os/intro/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.nats.client.support.Digester;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ThreadLocalRandom;


public class Main {
  public static void main(String[] args) {
    String natsURL = System.getenv("NATS_URL");
    if (natsURL == null) {
      natsURL = "nats://127.0.0.1:4222";
    }


    Options options = Options.builder().server(natsURL).errorListener(new ObjectStoreIntroErrorListener()).build();


    try (Connection nc = Nats.connect(options)) {

The utility of the Object Store is that it is able to store objects that are larger than the max message payload

      System.out.println("The server max payload is configured as " + nc.getServerInfo().getMaxPayload() + " bytes.");

Bucket basics

An object store (OS) bucket is created by specifying a bucket name. Java returns a ObjectStoreStatus object upon creation

      ObjectStoreManagement osm = nc.objectStoreManagement();


      ObjectStoreConfiguration osc = ObjectStoreConfiguration.builder()
          .name("my-bucket")
          .description("bucket with my stuff in it")
          .storageType(StorageType.Memory)
          .compression(true)
          .build();

when you create a bucket, you get a status object in return.

      ObjectStoreStatus objectStoreStatus = osm.create(osc);

Retrieve the Object Store context by name once the bucket is created.

      ObjectStore os = nc.objectStore("my-bucket");


      System.out.println("Before put, the object store has " + objectStoreStatus.getSize() + " bytes of data and meta-data stored.");


      int bytes = 10_000_000;
      byte[] data = new byte[bytes];
      ThreadLocalRandom.current().nextBytes(data);
      ByteArrayInputStream in = new ByteArrayInputStream(data);

PUT

There are multiple ways to “put” an object into the store. This examples shows putting an input stream like you might get from a file It also shows how to customize how it’s put into the store, in this case in chunks of 32K instead of the default 128K Setting chunk size is important to consider for instance if you know you are on a connection with limited bandwidth or your server has been configured to only accept smaller messages.

      ObjectMeta objectMeta = ObjectMeta.builder("my-object")
          .description("My Object Description")
          .chunkSize(32 * 1024)
          .build();
      ObjectInfo oInfo = os.put(objectMeta, in);
      System.out.println("ObjectInfo from put(...) " + Helper.toString(oInfo)
          + "\nwas successfully put in bucket '" + oInfo.getBucket() + "'.");


      objectStoreStatus = os.getStatus();
      System.out.println("After put, the object store has " + objectStoreStatus.getSize() + " bytes of data and meta-data stored.");


      oInfo = os.getInfo("my-object");
      System.out.println("ObjectInfo from get(...) " + oInfo);

GET

When we “get” an object, we will need an output stream to deliver the bytes. It could be a FileOutputStream to store it directly to disk. In this case we will just put it to memory.

      ByteArrayOutputStream out = new ByteArrayOutputStream();

The get reads all the message chunks.

      os.get("my-object", out);
      byte[] outBytes = out.toByteArray();


      System.out.println("We received " + outBytes.length + " bytes.");

Manually check the bytes

      for (int x = 0; x < outBytes.length; x++) {
        if (data[x] != outBytes[x]) {
          System.out.println("Input should be exactly output.");
          return;
        }
      }
      System.out.println("We received exactly the same bytes we put in!");

Use a digester to check the bytes

      Digester d = new Digester();
      d.update(outBytes);
      String outDigest = d.getDigestEntry();
      System.out.println("The received bytes has a digest of: '" + outDigest + "'");

Watching for changes

Although one could subscribe to the stream directly, it is more convenient to use an ObjectStoreWatcher which provides a deliberate API and types for tracking changes over time.

      ObjectStoreWatcher uoWatcher = new ObjectStoreIntroWatcher("UPDATES_ONLY");
      System.out.println("About to watch [UPDATES_ONLY]");
      os.watch(uoWatcher, ObjectStoreWatchOption.UPDATES_ONLY);


      byte[] simple = new byte[100_000];
      ThreadLocalRandom.current().nextBytes(simple);
      os.put("simple object", simple);
      System.out.println("Simple object has been put.");


      ObjectStoreWatcher ihWatcher = new ObjectStoreIntroWatcher("INCLUDE_HISTORY");
      System.out.println("About to watch [INCLUDE_HISTORY]");
      os.watch(ihWatcher, ObjectStoreWatchOption.INCLUDE_HISTORY);


      System.out.println("About to delete...");
      os.delete("simple object");
      os.delete("my-object");

Sleep this thread a little so the program has time to receive all the messages before the program quits.

      Thread.sleep(500);


      objectStoreStatus = os.getStatus();
      System.out.println("After deletes, the object store has " + objectStoreStatus.getSize() + " bytes of data and meta-data stored.");


    } catch (InterruptedException | IOException | JetStreamApiException e) {
      e.printStackTrace();
    }
    catch (NoSuchAlgorithmException e) {

The NoSuchAlgorithmException comes from the Digester if the runtime does not have the SHA-256 algorithm available, so should actually never happen.

      throw new RuntimeException(e);
    }
  }

ObjectStoreWatcher

This is just a simple implementation.

  static class ObjectStoreIntroWatcher implements ObjectStoreWatcher {
    String name;


    public ObjectStoreIntroWatcher(String name) {
      this.name = name;
    }


    @Override
    public void watch(ObjectInfo objectInfo) {
      System.out.println("Watcher [" + name + "] received watch" + Helper.toString(objectInfo));
    }


    @Override
    public void endOfData() {
    }
  }


  static class Helper {
    public static String toString(ObjectInfo oi) {
      return "\n  name=" + oi.getObjectName() +
          ", size=" + oi.getSize() +
          ", chunks=" + oi.getChunks() +
          ", deleted=" + oi.isDeleted() +
          ", digest=" + oi.getDigest();
    }
  }

Custom Error listener

Since it currently uses a push consumer under the covers depending on the size of the object, the ability / speed for it to process the incoming messages, there may be some flow control warnings. These are just warnings. In custom error listeners you can turn of printing or logging of that warning.

  static class ObjectStoreIntroErrorListener extends ErrorListenerConsoleImpl {


    @Override
    public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String id, FlowControlSource source) {

do nothing, just a warning

    }
  }
}

Output

The server max payload is configured as 1048576 bytes.
Before put, the object store has 0 bytes of data and meta-data stored.
ObjectInfo from put(...) 
  name=my-object, size=10000000, chunks=306, deleted=false, digest=SHA-256=U_40yxhKLPMn3kHxhYzHqhcJf9FhxYfSj4flpC8b8Fk=
was successfully put in bucket 'my-bucket'.
After put, the object store has 10016545 bytes of data and meta-data stored.
ObjectInfo from get(...) ObjectInfo{bucket='my-bucket', nuid='Kun9uZ59WYGZRwOq4xyzvC', size=10000000, modified=2024-07-29T20:18:24.232963594Z[GMT], chunks=306, digest='SHA-256=U_40yxhKLPMn3kHxhYzHqhcJf9FhxYfSj4flpC8b8Fk=', deleted=false, objectMeta=ObjectMeta{objectName='my-object', description='My Object Description', headers?0, objectMetaOptions=ObjectMetaOptions{link=null, chunkSize=32768}}}
We received 10000000 bytes.
We received exactly the same bytes we put in!
The received bytes has a digest of: 'SHA-256=U_40yxhKLPMn3kHxhYzHqhcJf9FhxYfSj4flpC8b8Fk='
About to watch [UPDATES_ONLY]
Simple object has been put.
About to watch [INCLUDE_HISTORY]
Watcher [UPDATES_ONLY] received watch
  name=simple object, size=100000, chunks=1, deleted=false, digest=SHA-256=qJDoaGJda-o-1GdOr6hcWt-HiCM5IqtF936CYLbeuc4=
Watcher [INCLUDE_HISTORY] received watch
  name=my-object, size=10000000, chunks=306, deleted=false, digest=SHA-256=U_40yxhKLPMn3kHxhYzHqhcJf9FhxYfSj4flpC8b8Fk=
Watcher [INCLUDE_HISTORY] received watch
  name=simple object, size=100000, chunks=1, deleted=false, digest=SHA-256=qJDoaGJda-o-1GdOr6hcWt-HiCM5IqtF936CYLbeuc4=
About to delete...
Watcher [INCLUDE_HISTORY] received watch
  name=simple object, size=0, chunks=0, deleted=true, digest=null
Watcher [UPDATES_ONLY] received watch
  name=simple object, size=0, chunks=0, deleted=true, digest=null
Watcher [INCLUDE_HISTORY] received watch
  name=my-object, size=0, chunks=0, deleted=true, digest=null
Watcher [UPDATES_ONLY] received watch
  name=my-object, size=0, chunks=0, deleted=true, digest=null
After deletes, the object store has 481 bytes of data and meta-data stored.

Recording

Note, playback is half speed to make it a bit easier to follow.