Skip to content

Example · Request–Response

A synchronous call-and-reply pattern over two PassthroughChannels, with correlation IDs to match replies to their requests.

This is how SSSN implements RPC-style communication between systems without any framework coupling.


Pattern overview

Requester                    Responder
   │                            │
   │──write(request_channel)──►│
   │     correlation_id=X       │
   │                            │── process ──
   │                            │
   │◄─write(response_channel)──│
   │     correlation_id=X       │
   │                            │
   ▼ poll response_channel      ▼
     until correlation_id=X

Using the built-in helper

sssn.infra.helpers provides request_via_channel() which handles correlation ID generation, writing the request, and polling for the reply:

import asyncio
from sssn.channels.passthrough import PassthroughChannel
from sssn.infra.helpers import request_via_channel

async def main():
    request_ch  = PassthroughChannel(id="compute-request",  name="Compute Request")
    response_ch = PassthroughChannel(id="compute-response", name="Compute Response")

    await request_ch.start()
    await response_ch.start()

    # --- Responder (runs in the background) ---
    async def responder():
        while True:
            reqs = await request_ch.read("compute-service", limit=10)
            for req in reqs:
                result = req.content.data["a"] + req.content.data["b"]
                import time, uuid
                from sssn.core.channel import ChannelMessage, GenericContent
                reply = ChannelMessage(
                    id=str(uuid.uuid4()),
                    timestamp=time.time(),
                    sender_id="compute-service",
                    content=GenericContent(data={"result": result}),
                    correlation_id=req.correlation_id,  # Echo back the correlation ID
                )
                await response_ch.write("compute-service", reply, direct=True)
            await asyncio.sleep(0.05)

    responder_task = asyncio.create_task(responder())

    # --- Requester ---
    response = await request_via_channel(
        request_channel=request_ch,
        response_channel=response_ch,
        sender_id="client-a",
        data={"a": 7, "b": 35},
        timeout=5.0,
    )

    if response:
        print(f"7 + 35 = {response.content.data['result']}")  # → 42
    else:
        print("Request timed out")

    responder_task.cancel()

asyncio.run(main())

Building it manually

The helper is thin — here's the equivalent hand-rolled version to illustrate the full pattern:

import asyncio, time, uuid
from sssn.core.channel import ChannelMessage, GenericContent
from sssn.channels.passthrough import PassthroughChannel

async def call(
    request_ch: PassthroughChannel,
    response_ch: PassthroughChannel,
    sender_id: str,
    payload: dict,
    timeout: float = 10.0,
) -> dict | None:
    corr_id = uuid.uuid4().hex

    # Build and send request
    request = ChannelMessage(
        id=str(uuid.uuid4()),
        timestamp=time.time(),
        sender_id=sender_id,
        content=GenericContent(data=payload),
        correlation_id=corr_id,
        reply_to=response_ch.id,
    )
    await request_ch.write(sender_id, request, direct=True)

    # Poll for matching reply
    deadline = time.monotonic() + timeout
    last_msg_id = None

    while time.monotonic() < deadline:
        kw = {"after": last_msg_id} if last_msg_id else {}
        msgs = await response_ch.read(sender_id, limit=20, **kw)
        for msg in msgs:
            if msg.correlation_id == corr_id:
                return msg.content.data
            last_msg_id = msg.id
        await asyncio.sleep(0.05)

    return None  # Timeout

Typed request/response with MessageContent

For production use, define typed payloads to get validation and IDE support:

from sssn.core.channel import MessageContent

class ComputeRequest(MessageContent):
    a: float
    b: float
    operation: str = "add"

class ComputeResponse(MessageContent):
    result: float
    error: str | None = None

The responder validates the request type:

from pydantic import ValidationError

async def responder():
    while True:
        reqs = await request_ch.read("compute-service", limit=10)
        for req in reqs:
            try:
                req_data = ComputeRequest.model_validate(req.content.model_dump())
                if req_data.operation == "add":
                    result = req_data.a + req_data.b
                elif req_data.operation == "mul":
                    result = req_data.a * req_data.b
                else:
                    raise ValueError(f"Unknown operation: {req_data.operation}")
                content = ComputeResponse(result=result)
            except (ValidationError, ValueError) as e:
                content = ComputeResponse(result=0, error=str(e))

            reply = ChannelMessage(
                id=str(uuid.uuid4()),
                timestamp=time.time(),
                sender_id="compute-service",
                content=content,
                correlation_id=req.correlation_id,
            )
            await response_ch.write("compute-service", reply, direct=True)
        await asyncio.sleep(0.05)

Multiple concurrent requests

The correlation ID pattern handles concurrent requests safely — each requester waits for its own correlation_id:

async def main():
    # ... setup channels and start responder ...

    # Fire 10 requests concurrently
    tasks = [
        request_via_channel(
            request_ch, response_ch,
            sender_id="client",
            data={"a": i, "b": i * 2},
            timeout=5.0,
        )
        for i in range(10)
    ]
    responses = await asyncio.gather(*tasks)
    for i, resp in enumerate(responses):
        print(f"{i} + {i*2} = {resp.content.data['result']}")