NATS Logo by Example

Request-Reply in Messaging

The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.

Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.

By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.

This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/request-reply/rust
View the source code or learn how to run this example yourself

Code

use futures::StreamExt;
use std::{env, str::from_utf8, time::Duration};


#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    let nats_url = env::var("NATS_URL")
        .unwrap_or_else(|_| "nats://localhost:4222".to_string());


    let client = async_nats::connect(nats_url).await?;

In addition to vanilla publish-request, NATS supports request-reply interactions as well. Under the covers, this is just an optimized pair of publish-subscribe operations. The requests is just a subscription that responds to a message sent to it. This kind of subscription is called a service.

    let mut requests = client.subscribe("greet.*").await.unwrap();

Spawn a new task, so we can respond to incoming requests. Usually request/response happens across clients and network and in such scenarios, you don’t need a separate task.

    tokio::spawn({
        let client = client.clone();
        async move {

Iterate over requests.

            while let Some(request) = requests.next().await {

Check if message we got have a reply to which we can publish the response.

                if let Some(reply) = request.reply {

Publish the response.

                    let name = &request.subject[6..];
                    client
                        .publish(reply, format!("hello, {}", name).into())
                        .await?;
                }
            }
            Ok::<(), async_nats::Error>(())
        }
    });

As there is a Subscriber listening to requests, we can sent those. We’re leaving the payload empty for these examples.

    let response = client.request("greet.sue", "".into()).await?;
    println!("got a response: {:?}", from_utf8(&response.payload)?);


    let response = client.request("greet.john", "".into()).await?;
    println!("got a response: {:?}", from_utf8(&response.payload)?);

If we don’t want to endlessly wait until response is returned, we can wrap it in tokio::time::timeout.

    let response = tokio::time::timeout(
        Duration::from_millis(500),
        client.request("greet.bob", "".into()),
    )

first ? is Err if timeout occurs, second is for actual response Result.

    .await??;
    println!("got a response: {:?}", from_utf8(&response.payload)?);


    Ok(())
}

Output

got a response: "hello, sue"
got a response: "hello, john"
got a response: "hello, bob"

Recording

Note, playback is half speed to make it a bit easier to follow.