Concurrent Message Processing in Messaging
By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a queue group in which case the NATS server will distribute messages to each member of the group.
However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.
$ nbe run messaging/concurrent/pythonView the source code or learn how to run this example yourself
Code
import asyncio
import os
import random
import nats
async def main():
Use the NATS_URL env variable if defined, otherwise fallback to the default.
nats_url = os.getenv("NATS_URL", "nats://localhost:4222")
client = await nats.connect(nats_url)
subscription.messages
returns an asynchronous iterator and allows us to not
wait for time-consuming operation and receive next message immediately.
messages = (await client.subscribe("greet.*", max_msgs=50)).messages
Publish set of messages, each with order identifier.
for i in range(50):
await client.publish("greet.joe", f"hello {i}".encode())
Iterate over messages concurrently.
25
is a limit for concurrent operations.
semaphore = asyncio.Semaphore(25)
async def process_message(message):
async with semaphore:
await asyncio.sleep(random.uniform(0, 0.5))
print(f"received message: {message.data.decode()!r}")
await asyncio.gather(*[process_message(msg) async for msg in messages])
if __name__ == "__main__":
asyncio.run(main())
Output
received message: 'hello 11' received message: 'hello 13' received message: 'hello 22' received message: 'hello 8' received message: 'hello 14' received message: 'hello 24' received message: 'hello 19' received message: 'hello 16' received message: 'hello 28' received message: 'hello 21' received message: 'hello 32' received message: 'hello 25' received message: 'hello 34' received message: 'hello 10' received message: 'hello 5' received message: 'hello 36' received message: 'hello 2' received message: 'hello 23' received message: 'hello 1' received message: 'hello 42' received message: 'hello 12' received message: 'hello 9' received message: 'hello 26' received message: 'hello 45' received message: 'hello 20' received message: 'hello 15' received message: 'hello 41' received message: 'hello 18' received message: 'hello 0' received message: 'hello 7' received message: 'hello 29' received message: 'hello 27' received message: 'hello 3' received message: 'hello 4' received message: 'hello 17' received message: 'hello 30' received message: 'hello 37' received message: 'hello 6' received message: 'hello 33' received message: 'hello 40' received message: 'hello 39' received message: 'hello 31' received message: 'hello 35' received message: 'hello 44' received message: 'hello 43' received message: 'hello 49' received message: 'hello 38' received message: 'hello 46' received message: 'hello 48' received message: 'hello 47'