Skip to content

Example · Service Discovery

Systems register their channels and capabilities at startup. Other systems query the registry at runtime to find what's available — no hardcoded addresses.


Setup

import asyncio
from sssn.channels.discovery import DiscoveryChannel
from sssn.channels.broadcast import BroadcastChannel
from sssn.core.channel import Visibility, ChannelInfo
from sssn.core.system import BaseSystem


# ---------------------------------------------------------------------------
# A service that publishes itself to the registry
# ---------------------------------------------------------------------------

class TemperatureService(BaseSystem):
    async def setup(self):
        self.feed = BroadcastChannel(
            id="temperature-feed",
            name="Temperature Feed",
            description="Live temperature readings from all sensors",
            visibility=Visibility.PUBLIC,
        )
        self.add_channel(self.feed)

    async def announce(self, registry: DiscoveryChannel):
        await registry.register_channel(self.feed.info)
        await registry.register_system(
            self._service_descriptor.model_dump()
        )
        print(f"[{self.id}] registered with discovery")

    async def step(self):
        import time
        await self.write_channel("temperature-feed", data={"celsius": 22.4, "ts": time.time()})


# ---------------------------------------------------------------------------
# A client that discovers the service at runtime
# ---------------------------------------------------------------------------

class DataConsumer(BaseSystem):
    def __init__(self, registry: DiscoveryChannel, **kwargs):
        super().__init__(**kwargs)
        self._registry = registry
        self._connected = False

    async def setup(self):
        pass  # Connected lazily on first step

    async def step(self):
        if not self._connected:
            await self._discover_and_connect()

        msgs = await self.read_channel("temperature-feed", limit=20)
        for msg in msgs:
            print(f"[Consumer] temp={msg.content.data.get('celsius')}°C")

    async def _discover_and_connect(self):
        # Find public channels
        channels = await self._registry.find_channels(visibility="public")
        temp_channels = [c for c in channels if "temperature" in c.name.lower()]

        if not temp_channels:
            print("[Consumer] no temperature channels found yet, retrying...")
            return

        for info in temp_channels:
            # In a real system this would use ChannelClient.connect_remote() with HTTP
            # For this in-process demo, we look up the channel directly
            ch = _channel_registry.get(info.id)
            if ch:
                self.client.connect(info.id, ch)
                print(f"[Consumer] connected to '{info.name}' ({info.id})")

        self._connected = True


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

# Simple in-process channel lookup for demo purposes
_channel_registry: dict = {}

async def main():
    # Shared discovery channel (TTL = 5 minutes for demo)
    registry = DiscoveryChannel(
        id="registry",
        name="Service Registry",
        registration_ttl=300.0,
    )
    await registry.start()

    # Service
    svc = TemperatureService(id="temp-svc", name="Temperature Service")
    await svc.setup()
    _channel_registry["temperature-feed"] = svc.feed
    await svc.announce(registry)

    # Consumer
    consumer = DataConsumer(id="consumer", name="Data Consumer", registry=registry)
    await consumer.setup()

    # Run for 5 seconds
    async def run_limited(system, tick_rate, duration):
        try:
            await asyncio.wait_for(system.run(tick_rate=tick_rate), timeout=duration)
        except asyncio.TimeoutError:
            system.stop()

    await asyncio.gather(
        run_limited(svc,      tick_rate=2.0, duration=5),
        run_limited(consumer, tick_rate=2.0, duration=5),
    )

asyncio.run(main())

Finding systems by capability

# Find systems that expose a "process_image" service
systems = await registry.find_systems(capability="process_image")
for sys_info in systems:
    print(f"Found: {sys_info['system_name']} ({sys_info['system_id']})")
    print(f"  services: {[s['name'] for s in sys_info['services']]}")

find_systems(capability=X) filters by checking whether any of the system's declared service names match X. Systems are registered via register_system(descriptor.model_dump()).


TTL-based expiry

Registrations carry an expires_at timestamp. on_maintain() purges expired entries:

registry = DiscoveryChannel(
    id="registry",
    name="Registry",
    registration_ttl=60.0,       # Entries expire after 60 seconds
    maintenance_interval_seconds=30.0,  # Check every 30 seconds
)

To keep a long-running service registered, re-register before TTL expires:

async def heartbeat(svc: TemperatureService, registry: DiscoveryChannel, interval: float):
    while True:
        await svc.announce(registry)
        await asyncio.sleep(interval)

# Refresh every 45 seconds when TTL is 60
asyncio.create_task(heartbeat(svc, registry, interval=45))

Point lookups

# Get a specific channel by ID
info: ChannelInfo | None = await registry.get_channel("temperature-feed")
if info:
    print(f"Channel: {info.name}, period={info.period}s, visibility={info.visibility}")

# Get a specific system
sys_desc = await registry.get_system("temp-svc")
if sys_desc:
    print(f"System: {sys_desc['system_name']}")