NATS Logo by Example

Intro in Services Framework

NATS services have always been straightforward to write. However, with the services framework, the NATS client library further simplifies the building, discovery and monitoring of services. The framework automatically places all subscriptions in a queue group and provides functionality for building subject hierarchies and their handlers.

Without any additional effort, the library enables automatic service discovery and status reporting. The NATS CLI nats micro command provides a simple way to query and report all the services using this framework.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run services/intro/elixir
View the source code or learn how to run this example yourself

Code

Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your mix.exs file.

Mix.install([

For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html

  {:gnat, "~> 1.7.1"},
  {:jason, "~> 1.0"}
])


url = System.get_env("NATS_URL", "nats://0.0.0.0:4222")
uri = URI.parse(url)

Call start_link on Gnat to start the Gnat application supervisor

{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})

The Gnat.ConnectionSupervisor is a process that monitors your NATS connection. If connection is lost, this process will retry according to its backoff settings to re-establish a connection.

gnat_supervisor_settings = %{
  name: :gnat,
  backoff_period: 4_000,
  connection_settings: [
    %{host: uri.host, port: uri.port}
  ]
}


{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)

Give the connection time to establish (this is only needed for these examples and not in production)

if Process.whereis(:gnat) == nil do
  Process.sleep(300)
end



Now let’s set up a consumer supervisor. Instead of manually specifying subjects on which to subscribe, we’ll supply the metadata required to expose the service to consumers.

consumer_supervisor_settings = %{
  connection_name: :gnat,

This is the name of a module defined below

  module: ExampleService,
  service_definition: %{
    name: "exampleservice",
    description: "This is an example service",

This service version needs to conform to the semver specification

    version: "0.1.0",
    endpoints: [

Each endpoint has a mandatory name, an optional group, and optional metadata

      %{
        name: "add",
        group_name: "calc",
      },
      %{
        name: "sub",
        group_name: "calc"
      }
    ]
  }
}

In this service definition, we have a single service, exampleservice. It has two endpoints, add and sub, each of which belong to the calc group. This means that the service will default to responding on calc.add and calc.sub. Let’s create the consumer supervisor for this

{:ok , _pid} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)



This is a module that conforms to the Gnat.Services.Server behavior. The name of this module matches the module field in the consumer supervisor settings.

defmodule ExampleService do
  use Gnat.Services.Server

This handler is matching just on the subject

  def request(%{topic: "calc.add", body: body}, _endpoint, _group) do
    IO.puts "Calculator adding...#{inspect(body)}"
    {:reply, "42"}
  end

Simulate an error occurring in a handler

  def request(%{body: body}, _, _) when body == "failthis" do
    {:error, "woopsy"}
  end

This handler matches on endpoint and group respectively

  def request(%{body: body}, "sub", "calc") do
    IO.puts "Calculator subtracting...#{inspect(body)}"


    {:reply, "24"}
  end

In case of an error, we can manually craft a response (remember this is a NATS reply)

  def error(_msg, e) do
    {:reply, "service error: #{inspect(e)}"}
  end
end



Let’s invoke the service a few times to generate some statistics.

{:ok, %{body: res}} = Gnat.request(:gnat, "calc.add", "add this!")
IO.puts("Add result: #{res}")
{:ok, %{body: res2}} = Gnat.request(:gnat, "calc.sub", "subtract this!")
IO.puts("Subtract result: #{res2}")

This simulates invoking an endpoint that returned an error

res3 = Gnat.request(:gnat, "calc.sub", "failthis")
IO.puts("Fail result: #{inspect(res3)}")

Get service stats. When you scroll down to the output of this demo, you’ll see that each of the endpoints has a request count of 1, and the sub endpoint has an error count of 1, and all of the endpoints have been keeping track of execution time

{:ok, %{body: stats}} = Gnat.request(:gnat, "$SRV.STATS.exampleservice", "")
jstats = Jason.decode!(stats)


IO.puts("Service Stats:")
IO.inspect(jstats)
:timer.sleep(50)

Output

* creating /root/.mix/archives/hex-2.0.6
Resolving Hex dependencies...
Resolution completed in 0.048s
New:
  cowlib 2.12.1
  ed25519 1.4.1
  gnat 1.7.1
  jason 1.4.1
  nimble_parsec 1.3.1
  nkeys 0.2.2
  telemetry 1.2.1
* Getting gnat (Hex package)
* Getting jason (Hex package)
* Getting cowlib (Hex package)
* creating /root/.mix/elixir/1-15/rebar3
* Getting nimble_parsec (Hex package)
* Getting nkeys (Hex package)
* Getting telemetry (Hex package)
* Getting ed25519 (Hex package)
You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more
==> ed25519
Compiling 2 files (.ex)
Generated ed25519 app
==> nkeys
Compiling 2 files (.ex)
Generated nkeys app
==> nimble_parsec
Compiling 4 files (.ex)
Generated nimble_parsec app
===> Analyzing applications...
===> Compiling telemetry
==> jason
Compiling 10 files (.ex)
Generated jason app
===> Analyzing applications...
===> Compiling cowlib
==> gnat
Compiling 11 files (.ex)
Generated gnat app

00:31:40.857 [debug] connecting to %{port: 4222, host: "nats"}
Calculator adding..."add this!"
Add result: 42
Calculator subtracting..."subtract this!"
Subtract result: 24
Fail result: {:ok, %{body: "service error: \"woopsy\"", gnat: #PID<0.688.0>, topic: "_INBOX.bOwa3Sr6ehoybiWX.W2zyLIUHXMXMC3db", reply_to: nil}}
Service Stats:
%{
  "endpoints" => [
    %{
      "average_processing_time" => 914000,
      "name" => "add",
      "num_errors" => 0,
      "num_requests" => 1,
      "processing_time" => 914000,
      "queue_group" => "q",
      "subject" => "calc.add"
    },
    %{
      "average_processing_time" => 263500,
      "name" => "sub",
      "num_errors" => 1,
      "num_requests" => 1,
      "processing_time" => 527000,
      "queue_group" => "q",
      "subject" => "calc.sub"
    }
  ],
  "id" => "7B5SUkuPcoVZj7Fb",
  "metadata" => nil,
  "name" => "exampleservice",
  "started" => "2023-10-25T00:31:41.165877Z",
  "type" => "io.nats.micro.v1.stats_response",
  "version" => "0.1.0"
}

Recording

Note, playback is half speed to make it a bit easier to follow.