NATS Logo by Example

Request-Reply in Messaging

The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.

Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.

By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.

This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/request-reply/python
View the source code or learn how to run this example yourself

Code

import os
import asyncio


import nats
from nats.errors import TimeoutError, NoRespondersError

Get the list of servers.

servers = os.environ.get("NATS_URL", "nats://localhost:4222").split(",")


async def main():

Create the connection to NATS which takes a list of servers.

    nc = await nats.connect(servers=servers)

In addition to vanilla publish-request, NATS supports request-reply interactions as well. Under the covers, this is just an optimized pair of publish-subscribe operations. The request handler is a subscription that responds to a message sent to it. This kind of subscription is called a service. We can use the cb argument for asynchronous handling.

    async def greet_handler(msg):

Parse out the second token in the subject (everything after greet.) and use it as part of the response message.

        name = msg.subject[6:]
        reply = f"hello, {name}"
        await msg.respond(reply.encode("utf8"))


    sub = await nc.subscribe("greet.*", cb=greet_handler)

Now we can use the built-in request method to do the service request. We simply pass a empty body since that is being used right now. In addition, we need to specify a timeout since with a request we are waiting for the reply and we likely don’t want to wait forever.

    rep = await nc.request("greet.joe", b'', timeout=0.5)
    print(f"{rep.data}")


    rep = await nc.request("greet.sue", b'', timeout=0.5)
    print(f"{rep.data}")


    rep = await nc.request("greet.bob", b'', timeout=0.5)
    print(f"{rep.data}")

What happens if the service is unavailable? We can simulate this by unsubscribing our handler from above. Now if we make a request, we will expect an error.

    await sub.drain()


    try:
        await nc.request("greet.joe", b'', timeout=0.5)
    except NoRespondersError:
        print("no responders")


    await nc.drain()




if __name__ == '__main__':
    asyncio.run(main())

Output

b'hello, joe'
b'hello, sue'
b'hello, bob'
no responders

Recording

Note, playback is half speed to make it a bit easier to follow.