Back to Blog
Architecture March 13, 2026 | 10 min read

Phoenix PubSub at Scale: Real-Time Event Architecture

Designing event-driven systems with Phoenix PubSub

Prismatic Engineering

Prismatic Platform

Event-Driven Architecture in Elixir


Phoenix PubSub provides a lightweight publish-subscribe mechanism built on top

of Erlang's distributed process groups. Unlike external message brokers (Kafka,

RabbitMQ), PubSub is in-process, zero-latency, and requires no infrastructure

beyond your BEAM node.


The Prismatic Platform uses PubSub as the backbone for real-time features across

94 umbrella applications, enabling loose coupling between domains that would

otherwise require direct dependencies.


Topic Naming Conventions


Consistent topic naming is critical when dozens of subsystems publish events.

The platform follows a hierarchical naming scheme:


domain:resource:action


Examples from production:


TopicPublisherPurpose

|-------|----------|---------|

dd:pipelineprismatic_ddDD pipeline stage transitions dd:entity:updatedprismatic_ddEntity data changes system_eventsprismatic_telemetryPlatform-wide system events error_patternsprismatic_quality_intelligenceError pattern detection osint:run:{id}prismatic_osint_coreIndividual OSINT tool execution

The {id} suffix pattern creates scoped topics for individual operations.

When a user runs an OSINT tool, a unique topic is created for that execution.

Only the requesting LiveView subscribes, preventing broadcast storms.


Subscriber Patterns


LiveView Subscribers


The most common subscriber is a LiveView process that updates its UI in

real-time:



def mount(_params, _session, socket) do

if connected?(socket) do

Phoenix.PubSub.subscribe(Prismatic.PubSub, "dd:pipeline")

end

{:ok, assign(socket, events: [])}

end


def handle_info({:pipeline_event, event}, socket) do

events = [event | socket.assigns.events] |> Enum.take(50)

{:noreply, assign(socket, events: events)}

end


The Enum.take(50) is important: without bounding the event list, a busy

pipeline could accumulate thousands of events in the LiveView's memory.


GenServer Subscribers


Background processes subscribe to events for aggregation, logging, or

triggering side effects:



def init(state) do

Phoenix.PubSub.subscribe(Prismatic.PubSub, "error_patterns")

{:ok, state}

end


def handle_info({:pattern_detected, pattern}, state) do

state = update_statistics(state, pattern)

maybe_trigger_alert(pattern)

{:noreply, state}

end


Publishing Patterns


Structured Events


All PubSub messages use tagged tuples for pattern matching:



# Good: structured, matchable

Phoenix.PubSub.broadcast(

Prismatic.PubSub,

"dd:pipeline",

{:stage_completed, %{stage: :fetch, group: "ares", count: 42}}

)


# Avoid: unstructured maps are harder to match

Phoenix.PubSub.broadcast(

Prismatic.PubSub,

"dd:pipeline",

%{type: "stage_completed", stage: "fetch"}

)


Broadcast vs Direct Send


PubSub broadcasts to all subscribers. For one-to-one communication, use

Phoenix.PubSub.direct_child_spec/1 or simply send/2 if you have the PID:



# Broadcast: all subscribers receive the message

Phoenix.PubSub.broadcast(pubsub, topic, message)


# Local broadcast: only subscribers on this node

Phoenix.PubSub.local_broadcast(pubsub, topic, message)


local_broadcast/3 is useful when events are node-specific (like LiveView

updates) and should not cross cluster boundaries.


Backpressure Handling


PubSub has no built-in backpressure. If a publisher sends faster than

subscribers can process, subscriber mailboxes grow unbounded. The platform

handles this at three levels:


1. Publisher Rate Limiting



defmodule PrismaticDD.Pipeline.Broadcaster do

use GenServer


def broadcast_batch(events) do

GenServer.cast(__MODULE__, {:broadcast_batch, events})

end


def handle_cast({:broadcast_batch, events}, state) do

events

|> Enum.chunk_every(10)

|> Enum.each(fn chunk ->

Phoenix.PubSub.broadcast(Prismatic.PubSub, "dd:pipeline", {:batch, chunk})

Process.sleep(10)

end)

{:noreply, state}

end

end


2. Subscriber Debouncing


LiveView subscribers debounce rapid updates to avoid excessive re-renders:



def handle_info({:entity_updated, _entity}, socket) do

if socket.assigns[:debounce_timer] do

Process.cancel_timer(socket.assigns.debounce_timer)

end

timer = Process.send_after(self(), :flush_updates, 100)

{:noreply, assign(socket, debounce_timer: timer)}

end


3. Mailbox Monitoring


A dedicated process monitors subscriber mailbox sizes and logs warnings

when they exceed thresholds, enabling proactive intervention before

memory pressure becomes critical.


Testing PubSub


Testing PubSub interactions requires subscribing in the test process:



test "pipeline broadcasts stage completion" do

Phoenix.PubSub.subscribe(Prismatic.PubSub, "dd:pipeline")


PrismaticDD.Pipeline.run_stage(:fetch, "ares")


assert_receive {:stage_completed, %{stage: :fetch, group: "ares"}}, 5000

end


The assert_receive macro with a timeout ensures tests do not hang if the

event is never published.


Tags

pubsub phoenix events real-time architecture