We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
Phoenix PubSub at Scale: Real-Time Event Architecture
Designing event-driven systems with Phoenix PubSub
Prismatic Engineering
Prismatic Platform
Event-Driven Architecture in Elixir
Phoenix PubSub provides a lightweight publish-subscribe mechanism built on top
of Erlang's distributed process groups. Unlike external message brokers (Kafka,
RabbitMQ), PubSub is in-process, zero-latency, and requires no infrastructure
beyond your BEAM node.
The Prismatic Platform uses PubSub as the backbone for real-time features across
94 umbrella applications, enabling loose coupling between domains that would
otherwise require direct dependencies.
Topic Naming Conventions
Consistent topic naming is critical when dozens of subsystems publish events.
The platform follows a hierarchical naming scheme:
domain:resource:action
Examples from production:
|-------|----------|---------|
dd:pipelinedd:entity:updatedsystem_eventserror_patternsosint:run:{id}The {id} suffix pattern creates scoped topics for individual operations.
When a user runs an OSINT tool, a unique topic is created for that execution.
Only the requesting LiveView subscribes, preventing broadcast storms.
Subscriber Patterns
LiveView Subscribers
The most common subscriber is a LiveView process that updates its UI in
real-time:
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(Prismatic.PubSub, "dd:pipeline")
end
{:ok, assign(socket, events: [])}
end
def handle_info({:pipeline_event, event}, socket) do
events = [event | socket.assigns.events] |> Enum.take(50)
{:noreply, assign(socket, events: events)}
end
The Enum.take(50) is important: without bounding the event list, a busy
pipeline could accumulate thousands of events in the LiveView's memory.
GenServer Subscribers
Background processes subscribe to events for aggregation, logging, or
triggering side effects:
def init(state) do
Phoenix.PubSub.subscribe(Prismatic.PubSub, "error_patterns")
{:ok, state}
end
def handle_info({:pattern_detected, pattern}, state) do
state = update_statistics(state, pattern)
maybe_trigger_alert(pattern)
{:noreply, state}
end
Publishing Patterns
Structured Events
All PubSub messages use tagged tuples for pattern matching:
# Good: structured, matchable
Phoenix.PubSub.broadcast(
Prismatic.PubSub,
"dd:pipeline",
{:stage_completed, %{stage: :fetch, group: "ares", count: 42}}
)
# Avoid: unstructured maps are harder to match
Phoenix.PubSub.broadcast(
Prismatic.PubSub,
"dd:pipeline",
%{type: "stage_completed", stage: "fetch"}
)
Broadcast vs Direct Send
PubSub broadcasts to all subscribers. For one-to-one communication, use
Phoenix.PubSub.direct_child_spec/1 or simply send/2 if you have the PID:
# Broadcast: all subscribers receive the message
Phoenix.PubSub.broadcast(pubsub, topic, message)
# Local broadcast: only subscribers on this node
Phoenix.PubSub.local_broadcast(pubsub, topic, message)
local_broadcast/3 is useful when events are node-specific (like LiveView
updates) and should not cross cluster boundaries.
Backpressure Handling
PubSub has no built-in backpressure. If a publisher sends faster than
subscribers can process, subscriber mailboxes grow unbounded. The platform
handles this at three levels:
1. Publisher Rate Limiting
defmodule PrismaticDD.Pipeline.Broadcaster do
use GenServer
def broadcast_batch(events) do
GenServer.cast(__MODULE__, {:broadcast_batch, events})
end
def handle_cast({:broadcast_batch, events}, state) do
events
|> Enum.chunk_every(10)
|> Enum.each(fn chunk ->
Phoenix.PubSub.broadcast(Prismatic.PubSub, "dd:pipeline", {:batch, chunk})
Process.sleep(10)
end)
{:noreply, state}
end
end
2. Subscriber Debouncing
LiveView subscribers debounce rapid updates to avoid excessive re-renders:
def handle_info({:entity_updated, _entity}, socket) do
if socket.assigns[:debounce_timer] do
Process.cancel_timer(socket.assigns.debounce_timer)
end
timer = Process.send_after(self(), :flush_updates, 100)
{:noreply, assign(socket, debounce_timer: timer)}
end
3. Mailbox Monitoring
A dedicated process monitors subscriber mailbox sizes and logs warnings
when they exceed thresholds, enabling proactive intervention before
memory pressure becomes critical.
Testing PubSub
Testing PubSub interactions requires subscribing in the test process:
test "pipeline broadcasts stage completion" do
Phoenix.PubSub.subscribe(Prismatic.PubSub, "dd:pipeline")
PrismaticDD.Pipeline.run_stage(:fetch, "ares")
assert_receive {:stage_completed, %{stage: :fetch, group: "ares"}}, 5000
end
The assert_receive macro with a timeout ensures tests do not hang if the
event is never published.