NATS Logo by Example

Multi-Stream Consumption (legacy) in JetStream

There may be use cases where a fan-in of messages across streams may be desired. One way to achieve this is to create a push consumer per stream and specify the same DeliverSubject and, optionally, DeliverGroup. This will result in each consumer delivering messages to clients subscribed to the subject and/or part of a queue group.

This example will demonstrate how to configure the consumers and subscription to achieve this fan-in consumption.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/multi-stream-consumption/go
View the source code or learn how to run this example yourself

Code

package main


import (
	"fmt"
	"os"
	"time"


	"github.com/nats-io/nats.go"
)


func main() {
	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = nats.DefaultURL
	}


	nc, _ := nats.Connect(natsURL)
	js, _ := nc.JetStream()

Create a stream for each region.

	js.AddStream(&nats.StreamConfig{
		Name:     "EVENTS-EU",
		Subjects: []string{"events.eu.>"},
	})


	js.AddStream(&nats.StreamConfig{
		Name:     "EVENTS-US",
		Subjects: []string{"events.us.>"},
	})

Create a consumer for each stream. Both publish to the same deliver subject. This is a straightforward way to do this in a single account. It is recommended a user is created with specific permissions to subscribe to this subject.

	js.AddConsumer("EVENTS-EU", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "push.events",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})


	js.AddConsumer("EVENTS-US", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "push.events",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})

Publish messages to each stream.

	js.Publish("events.eu.page_loaded", nil)
	js.Publish("events.eu.input_focused", nil)
	js.Publish("events.us.page_loaded", nil)
	js.Publish("events.us.mouse_clicked", nil)
	js.Publish("events.eu.mouse_clicked", nil)
	js.Publish("events.us.input_focused", nil)

Subscribe to the deliver subject with core NATS subscription. Observe that messages from both streams are being received and can be ack’ed.

	sub, _ := nc.QueueSubscribeSync("push.events", "processor")
	defer sub.Drain()


	for {
		msg, err := sub.NextMsg(time.Second)
		if err == nats.ErrTimeout {
			break
		}


		fmt.Println(msg.Subject)
		msg.Ack()
	}

Confirm the consumer state is updated.

	info1, _ := js.ConsumerInfo("EVENTS-EU", "processor")
	fmt.Printf("eu: last delivered: %d, num pending: %d\n", info1.Delivered.Stream, info1.NumPending)
	info2, _ := js.ConsumerInfo("EVENTS-US", "processor")
	fmt.Printf("us: last delivered: %d, num pending: %d\n", info2.Delivered.Stream, info2.NumPending)
}

Output

events.us.page_loaded
events.us.mouse_clicked
events.us.input_focused
events.eu.page_loaded
events.eu.input_focused
events.eu.mouse_clicked
eu: last delivered: 3, num pending: 0
us: last delivered: 3, num pending: 0

Recording

Note, playback is half speed to make it a bit easier to follow.