NATS Logo by Example

JSON for Message Payloads in Messaging

The basic structure of a NATS message modeled in the client libraries includes the subject the message was published to, the application-defined payload, and an optional set of headers (for requests, there is also a reply-to subject). The payload is a sequence of bytes, so it is up to the application to define how to serialize and deserialize the payload.

JSON is ubiquitous and simple data-interchange format that is supported in virtually all programming languages. This example demonstrates how to serialize and deserialize a message payload using a JSON library.

CLI Go Python JavaScript Rust C# Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/json/rust
View the source code or learn how to run this example yourself

Code

use std::env;
use serde_json::json;
use serde::{Deserialize,Serialize};
use futures::stream::StreamExt;

Use the macro to generate the serialize and deserialize methods on a struct with basic types.

#[derive(Serialize, Deserialize)]
struct Payload {
    foo: String,
    bar: u8,
}


#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    let nats_url = env::var("NATS_URL")
        .unwrap_or_else(|_| "nats://localhost:4222".to_string());


    let client = async_nats::connect(nats_url).await?;

Create a subscription that receives two messages. One message will contain a valid serialized payload and the other will not.

    let mut subscriber = client.subscribe("foo").await?.take(2);

Construct a Payload value and serialize it.

    let payload = Payload{foo: "bar".to_string(), bar: 27};
    let bytes = serde_json::to_vec(&json!(payload))?;

Publish the serialized payload.

    client.publish("foo", bytes.into()).await?;
    client.publish("foo", "not json".into()).await?;

Loop through the expected messages and attempt to deserialize the payload into a Payload value. If deserialization into this type fails, alternate handling can be performed, either discarding or attempting to derialize in a more general type (such as a map).

    while let Some(message) = subscriber.next().await {
        if let Ok(payload) = serde_json::from_slice::<Payload>(message.payload.as_ref()) {
            println!("received valid JSON payload: foo={:?} bar={:?}", payload.foo, payload.bar);
        } else {
            println!("received invalid JSON payload: {:?}", message.payload);
        }
    }


    Ok(())
}

Output

received valid JSON payload: foo="bar" bar=27
received invalid JSON payload: b"not json"

Recording

Note, playback is half speed to make it a bit easier to follow.