Tutorial 2 · Broadcast Bus¶
In this tutorial you will build an event bus where multiple consumers receive every event the moment it is published.
What you'll learn:
BroadcastChanneland fan-out semantics- Push-based delivery with
subscribe() - Why exclusive reads are prohibited on a broadcast channel
The pattern¶
A broadcast channel is designed for one producer, many consumers. Every subscriber receives every message — there is no claiming, no acknowledgement, no "consumed" state.
producer ──write──► BroadcastChannel ──push──► subscriber A
──push──► subscriber B
──push──► subscriber C
Step 1 — Create the bus¶
import asyncio
from sssn.channels.broadcast import BroadcastChannel
from sssn.core.channel import ChannelMessage
async def main():
bus = BroadcastChannel(id="events", name="Event Bus")
await bus.start()
Step 2 — Register subscribers¶
received_by_a: list[str] = []
received_by_b: list[str] = []
async def handler_a(msg: ChannelMessage):
received_by_a.append(msg.content.data["type"])
print(f"[A] received: {msg.content.data}")
async def handler_b(msg: ChannelMessage):
received_by_b.append(msg.content.data["type"])
print(f"[B] received: {msg.content.data}")
await bus.subscribe("consumer-a", handler_a)
await bus.subscribe("consumer-b", handler_b)
subscribe() registers an async callback. The callback fires synchronously within the write() call — by the time write() returns, all subscribers have been notified.
If a subscriber raises an exception it is logged and does not affect other subscribers.
Step 3 — Publish events¶
await bus.write("service", {"type": "user.login", "user": "alice"})
await bus.write("service", {"type": "user.logout", "user": "bob"})
print(f"A received: {received_by_a}") # ['user.login', 'user.logout']
print(f"B received: {received_by_b}") # ['user.login', 'user.logout']
Step 4 — Late-joining consumers (poll mode)¶
Subscribers that register after some messages have already been published miss those messages. Late-joining consumers can catch up using shared reads:
# Carol subscribes late and polls the history
history = await bus.read("carol", limit=100)
print(f"Carol sees {len(history)} events in history") # 2
After catching up, Carol can optionally subscribe for future events:
async def handler_carol(msg: ChannelMessage):
print(f"[Carol] live: {msg.content.data}")
await bus.subscribe("carol", handler_carol)
await bus.write("service", {"type": "user.purchase"})
# → [Carol] live: {'type': 'user.purchase'}
Step 5 — Unsubscribe¶
await bus.unsubscribe("consumer-a")
await bus.write("service", {"type": "system.reboot"})
# Consumer A no longer receives events
# Consumer B still does
Why exclusive reads are blocked¶
BroadcastChannel raises TypeError if you attempt read(exclusive=True). This is intentional:
- Exclusive reads are for work-queue semantics: one consumer claims a message and others cannot see it.
- A broadcast channel is designed for all consumers to see all messages. Exclusive reads would break this invariant.
try:
await bus.read("reader", exclusive=True)
except TypeError as e:
print(e)
# BroadcastChannel does not support exclusive reads.
Use WorkQueueChannel when you want exclusive claiming.
Complete program¶
import asyncio
from sssn.channels.broadcast import BroadcastChannel
from sssn.core.channel import ChannelMessage
async def main():
bus = BroadcastChannel(id="telemetry", name="Telemetry")
await bus.start()
log: list[tuple[str, str]] = []
async def make_handler(name: str):
async def handler(msg: ChannelMessage):
log.append((name, msg.content.data["metric"]))
return handler
await bus.subscribe("dashboard", await make_handler("dashboard"))
await bus.subscribe("alerting", await make_handler("alerting"))
await bus.subscribe("archiver", await make_handler("archiver"))
metrics = ["cpu", "memory", "disk", "network"]
for m in metrics:
await bus.write("monitor", {"metric": m, "value": 42})
print(f"Total deliveries: {len(log)}") # 12 (4 metrics × 3 subscribers)
assert all(count == 4 for _, count in
[(name, log.count((name, _m)) for _m in metrics)
for name in ["dashboard", "alerting", "archiver"]])
asyncio.run(main())
What's next?¶
Continue to Tutorial 3 → Work Queue to learn about competing consumers with claim-based exclusive reads.