NATS Logo by Example

Request-Reply in Messaging

The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.

Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.

By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.

This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/request-reply/elixir
View the source code or learn how to run this example yourself

Code

Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your mix.exs file.

Mix.install([

For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html

  {:gnat, "~> 1.7.1"},
  {:jason, "~> 1.0"}
])


url = System.get_env("NATS_URL", "nats://0.0.0.0:4222")
uri = URI.parse(url)

Call start_link on Gnat to start the Gnat application supervisor

{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})

The Gnat.ConnectionSupervisor is a process that monitors your NATS connection. If connection is lost, this process will retry according to its backoff settings to re-establish a connection. You can communicate with NATS without this, but we recommend using supervised connections and consumers

gnat_supervisor_settings = %{
  name: :gnat,
  backoff_period: 4_000,
  connection_settings: [
    %{host: uri.host, port: uri.port}
  ]
}


{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)

Give the connection time to establish (this is only needed for these examples and not in production)

if Process.whereis(:gnat) == nil do
  Process.sleep(300)
end



Now let’s set up a consumer supervisor. We use the subscription_topics field in the configuration map to set up a list of subscriptions which can optionally have queue names.

consumer_supervisor_settings = %{
  connection_name: :gnat,

This is the name of a module defined below

  module: ExampleService,
  subscription_topics: [
    %{topic: "request.demo"},
  ]


}

Starting the consumer supervisor will create the subscription and will monitor the connection to re-establish subscriptions in case of failure

{:ok , _pid} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)



This is a module that conforms to the Gnat.Server behavior. The name of this module matches the module field in the consumer supervisor settings.

defmodule ExampleService do
  use Gnat.Server

This handler will simulate an error

  def request(%{topic: "request.demo", body: body}) when body == "failure" do
    {:error, "something went wrong!"}
  end

This handler is matching just on the subject

  def request(%{topic: "request.demo", body: body}) do
    IO.puts "Demo request received message: #{inspect(body)}"
    {:reply, "This is a demo"}
  end

Defining an error handler is optional, the default one will just call Logger.error for you

   def error(%{gnat: gnat, reply_to: reply_to}, error) do
    Gnat.pub(gnat, reply_to, "An error occurred: #{inspect(error)}")
  end
end

Now that we know there’s an active subscription on the request.demo subject, we can make a request and process the reply using Gnat.request

{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "input data")
IO.puts("First result: #{res}")


{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "more data")
IO.puts("Second result: #{res}")

Now cause a failure so we can see error responses

{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "failure")
IO.puts("Failure result: #{res}")

Output

* creating /root/.mix/archives/hex-2.0.6
Resolving Hex dependencies...
Resolution completed in 0.051s
New:
  cowlib 2.12.1
  ed25519 1.4.1
  gnat 1.7.1
  jason 1.4.1
  nimble_parsec 1.4.0
  nkeys 0.2.2
  telemetry 1.2.1
* Getting gnat (Hex package)
* Getting jason (Hex package)
* Getting cowlib (Hex package)
* creating /root/.mix/elixir/1-15/rebar3
* Getting nimble_parsec (Hex package)
* Getting nkeys (Hex package)
* Getting telemetry (Hex package)
* Getting ed25519 (Hex package)
You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more
==> ed25519
Compiling 2 files (.ex)
Generated ed25519 app
==> nkeys
Compiling 2 files (.ex)
Generated nkeys app
==> nimble_parsec
Compiling 4 files (.ex)
Generated nimble_parsec app
===> Analyzing applications...
===> Compiling telemetry
==> jason
Compiling 10 files (.ex)
Generated jason app
===> Analyzing applications...
===> Compiling cowlib
==> gnat
Compiling 11 files (.ex)
Generated gnat app

18:23:21.015 [debug] connecting to %{port: 4222, host: "nats"}
Demo request received message: "input data"
First result: This is a demo
Demo request received message: "more data"
Second result: This is a demo
Failure result: An error occurred: "something went wrong!"

Recording

Note, playback is half speed to make it a bit easier to follow.