NATS Logo by Example

Limits-based Stream in JetStream

To get started with JetStream, a stream must be created. The mental model for a stream is that it binds a set of subjects for which messages published to those subjects will be persisted.

A stream is implemented as an implicit server-side service that receives a request (the published message) and replies back once the message has been persisted.

There are handful of different kinds of streams and configuration options, but we will start with the most basic one having a limits-based retention policy. This policy is the default, however, limits still apply to streams with other retention policies.

The stream limit choices include:

  • the maximum number of messages
  • the maximum total size in bytes
  • the maximum age of a message

There is also a specialized maximum messages limit that can be applied at the subject level, but this will be demonstrated in a separate example.

By default, no limits are set which would require manually managing the ever-growing stream. However, if any of these limits satisfy how the stream should be truncated, simply turn these limits on and let the server manage everything.

In this example, we showcase the behavior or applying these limits and the flexibility of JetStream supporting dynamically changing the stream configuration on-demand.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/limits-stream/python
View the source code or learn how to run this example yourself

Code

import os
import asyncio
import time


import nats




servers = os.environ.get("NATS_URL", "nats://localhost:4222").split(",")




async def main():
    
    

Connect to NATS server

    nc = await nats.connect(servers=servers)
    
    

Get a context to produce and consume messages from NATS JetStream

    js = nc.jetstream()
    
    

Create a stream named ‘events’ and with subjects matching ‘events.*’ ‘events’ will be a default stream that all events will be sent to Storage parameter can be set to ‘NONE’ for no storage, ‘FILE’ for file based storage, or ‘MEMORY’ for memory based storage

    await js.delete_stream('events')
    await js.add_stream(name='events', subjects=['events.*'], storage='file')
    
    

Publish 6 messages to the JetStream

    await js.publish('events.page_loaded',b'')
    await js.publish('events.mouse_clicked',b'')
    await js.publish('events.mouse_clicked',b'')
    await js.publish('events.page_loaded',b'')
    await js.publish('events.mouse_clicked',b'')
    await js.publish('events.input_focused',b'')
    print("published 6 messages","\n")

Check the number of messages in the stream using streams_info StreamState includes the total number of messages in the stream

    print(await js.streams_info(),"\n")

Update the ‘events’ stream to have a maximum of 10 messages

    await js.update_stream(name='events', subjects=['events.*'], max_msgs=10)
    print("set max messages to 10","\n")
    
    

Check the number of messages in the stream using streams_info StreamState includes the total number of messages in the stream

    print(await js.streams_info(),"\n")
    
    

Update the ‘events’ stream to have a maximum of 300 bytes

    await js.update_stream(name='events', subjects=['events.*'], max_msgs=10, max_bytes=300)
    print("set max bytes to 300","\n")
    
    

Check the number of messages in the stream using streams_info StreamState includes the total number of messages in the stream

    print(await js.streams_info(),"\n")
    
    

Update the ‘events’ stream to have a maximum age of 0.1 seconds

    await js.update_stream(name='events', subjects=['events.*'], max_msgs=10, max_bytes=300, max_age=0.1)
    print("set max age to one second","\n")
    
    

Check the number of messages in the stream using streams_info StreamState includes the total number of messages in the stream

    print(await js.streams_info(),"\n")
    
    

Sleep for 10 seconds to allow messages to expire

    time.sleep(10)
    
    

Check the number of messages in the stream using streams_info StreamState includes the total number of messages in the stream

    print(await js.streams_info())
    
    

Delete the ‘events’ stream

    await js.delete_stream('events')




if __name__ == '__main__':
    asyncio.run(main())

Output

Traceback (most recent call last):
  File "//main.py", line 74, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "//main.py", line 22, in main
    await js.delete_stream('events')
  File "/usr/local/lib/python3.10/site-packages/nats/js/manager.py", line 120, in delete_stream
    resp = await self._api_request(
  File "/usr/local/lib/python3.10/site-packages/nats/js/manager.py", line 362, in _api_request
    raise APIError.from_error(resp['error'])
  File "/usr/local/lib/python3.10/site-packages/nats/js/errors.py", line 84, in from_error
    raise NotFoundError(**err)
nats.js.errors.NotFoundError: nats: NotFoundError: code=404 err_code=10059 description='stream not found'
exit status 1

Recording

Note, playback is half speed to make it a bit easier to follow.