We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
API Security and Rate Limiting: Token Bucket Implementation in Phoenix
Protecting Phoenix APIs with token bucket rate limiting, per-client quotas, DDoS mitigation patterns, and Plug-based middleware for comprehensive API security.
Tomas Korcak (korczis)
Prismatic Platform
Why Rate Limiting Is Non-Negotiable
Every public API endpoint is a potential attack vector. Without rate limiting, a single client can exhaust server resources, scrape entire databases, brute-force authentication, or amplify denial-of-service attacks. Rate limiting is not just about preventing abuse β it is about maintaining fair access, protecting downstream services, and ensuring system stability under adversarial conditions.
The token bucket algorithm is the industry standard for API rate limiting. It provides smooth rate enforcement with burst tolerance, is simple to implement correctly, and maps naturally to per-client quotas. Our implementation uses ETS for lock-free, sub-microsecond token checks that add negligible latency to request processing.
Token Bucket Algorithm
The token bucket works by maintaining a virtual bucket of tokens for each client. Tokens are added at a fixed rate (the refill rate) up to a maximum capacity (the burst limit). Each request consumes one token. If the bucket is empty, the request is rejected:
defmodule Prismatic.RateLimiter.TokenBucket do
@moduledoc """
ETS-backed token bucket rate limiter.
Provides per-client rate limiting with configurable burst and refill rates.
Sub-microsecond check latency through ETS read_concurrency.
"""
use GenServer
require Logger
@default_capacity 100
@default_refill_rate 10
@default_refill_interval_ms 1_000
@cleanup_interval_ms 60_000
defstruct [:table, :capacity, :refill_rate, :refill_interval_ms]
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
table = :ets.new(:rate_limiter_buckets, [
:set, :public, :named_table,
read_concurrency: true, write_concurrency: true
])
state = %__MODULE__{
table: table,
capacity: Keyword.get(opts, :capacity, @default_capacity),
refill_rate: Keyword.get(opts, :refill_rate, @default_refill_rate),
refill_interval_ms: Keyword.get(opts, :refill_interval_ms, @default_refill_interval_ms)
}
schedule_cleanup()
{:ok, state}
end
@spec check(String.t(), pos_integer()) :: :ok | {:error, :rate_limited, map()}
def check(client_id, cost \\ 1) do
GenServer.call(__MODULE__, {:check, client_id, cost})
end
@impl true
def handle_call({:check, client_id, cost}, _from, state) do
now = System.monotonic_time(:millisecond)
{tokens, last_refill} =
case :ets.lookup(state.table, client_id) do
[{^client_id, tokens, last_refill}] -> {tokens, last_refill}
[] -> {state.capacity, now}
end
elapsed = now - last_refill
refill_cycles = div(elapsed, state.refill_interval_ms)
new_tokens = min(tokens + refill_cycles * state.refill_rate, state.capacity)
new_last_refill = if refill_cycles > 0, do: now, else: last_refill
if new_tokens >= cost do
:ets.insert(state.table, {client_id, new_tokens - cost, new_last_refill})
{:reply, :ok, state}
else
:ets.insert(state.table, {client_id, new_tokens, new_last_refill})
retry_after_ms = state.refill_interval_ms - rem(elapsed, state.refill_interval_ms)
{:reply, {:error, :rate_limited, %{
remaining: new_tokens,
limit: state.capacity,
retry_after_ms: retry_after_ms
}}, state}
end
end
@impl true
def handle_info(:cleanup, state) do
cutoff = System.monotonic_time(:millisecond) - 300_000
:ets.foldl(fn {client_id, _tokens, last_refill}, acc ->
if last_refill < cutoff do
:ets.delete(state.table, client_id)
end
acc
end, nil, state.table)
schedule_cleanup()
{:noreply, state}
end
defp schedule_cleanup do
Process.send_after(self(), :cleanup, @cleanup_interval_ms)
end
end
Plug-Based Middleware
The rate limiter integrates into Phoenix's plug pipeline. It extracts the client identifier from the API key or IP address and applies the appropriate limits:
defmodule PrismaticWeb.Plugs.RateLimiter do
@moduledoc """
Phoenix Plug for API rate limiting.
Identifies clients by API key or IP address and enforces per-client limits.
Returns standard rate limit headers on every response.
"""
import Plug.Conn
@behaviour Plug
@impl true
def init(opts), do: opts
@impl true
def call(conn, opts) do
client_id = extract_client_id(conn)
cost = Keyword.get(opts, :cost, 1)
case Prismatic.RateLimiter.TokenBucket.check(client_id, cost) do
:ok ->
conn
|> put_rate_limit_headers(client_id)
{:error, :rate_limited, info} ->
retry_after_seconds = div(info.retry_after_ms, 1000) + 1
conn
|> put_resp_header("retry-after", to_string(retry_after_seconds))
|> put_resp_header("x-ratelimit-remaining", "0")
|> put_resp_header("x-ratelimit-limit", to_string(info.limit))
|> put_resp_content_type("application/json")
|> send_resp(429, Jason.encode!(%{
error: "rate_limited",
message: "Too many requests. Retry after #{retry_after_seconds} seconds.",
retry_after: retry_after_seconds
}))
|> halt()
end
end
defp extract_client_id(conn) do
case get_req_header(conn, "authorization") do
["Bearer " <> token] ->
"key:" <> String.slice(token, 0, 10)
_ ->
ip = conn.remote_ip |> :inet.ntoa() |> to_string()
"ip:" <> ip
end
end
defp put_rate_limit_headers(conn, client_id) do
case Prismatic.RateLimiter.TokenBucket.get_bucket_info(client_id) do
{:ok, info} ->
conn
|> put_resp_header("x-ratelimit-limit", to_string(info.capacity))
|> put_resp_header("x-ratelimit-remaining", to_string(info.remaining))
|> put_resp_header("x-ratelimit-reset", to_string(info.reset_at))
_ ->
conn
end
end
end
Per-Client Quota Configuration
Different client tiers receive different rate limits. API key holders get higher limits than anonymous IP-based clients:
|------------|----------------|----------------|-------------|----------------|
defmodule Prismatic.RateLimiter.QuotaManager do
@moduledoc """
Manages per-client rate limit quotas based on API key tier.
"""
@tier_limits %{
anonymous: %{capacity: 10, refill_rate: 30, daily_quota: 1_000},
free: %{capacity: 20, refill_rate: 60, daily_quota: 10_000},
standard: %{capacity: 50, refill_rate: 300, daily_quota: 100_000},
premium: %{capacity: 200, refill_rate: 1_000, daily_quota: :unlimited},
internal: %{capacity: 1_000, refill_rate: 5_000, daily_quota: :unlimited}
}
@spec limits_for(String.t()) :: map()
def limits_for("key:" <> key_prefix) do
case lookup_key_tier(key_prefix) do
{:ok, tier} -> Map.get(@tier_limits, tier, @tier_limits.anonymous)
:not_found -> @tier_limits.anonymous
end
end
def limits_for("ip:" <> _ip) do
@tier_limits.anonymous
end
defp lookup_key_tier(prefix) do
case :ets.lookup(:api_key_tiers, prefix) do
[{^prefix, tier}] -> {:ok, tier}
[] -> :not_found
end
end
end
DDoS Mitigation Patterns
Rate limiting alone is not sufficient against distributed attacks. Our defense-in-depth approach layers multiple protections:
defmodule PrismaticWeb.Plugs.DDoSGuard do
@moduledoc """
DDoS mitigation plug with connection tracking and anomaly detection.
Operates independently from per-client rate limiting.
"""
import Plug.Conn
@behaviour Plug
@global_rps_limit 10_000
@suspicious_patterns [
~r/(?:union|select|drop|insert|update|delete)\s/i,
~r/<script[^>]*>/i,
~r/\.\.\//
]
@impl true
def init(opts), do: opts
@impl true
def call(conn, _opts) do
cond do
global_limit_exceeded?() ->
conn
|> send_resp(503, "Service temporarily unavailable")
|> halt()
suspicious_request?(conn) ->
log_suspicious(conn)
conn
|> send_resp(400, "Bad request")
|> halt()
true ->
increment_global_counter()
conn
end
end
defp suspicious_request?(conn) do
path = conn.request_path
query = conn.query_string || ""
combined = path <> "?" <> query
Enum.any?(@suspicious_patterns, &Regex.match?(&1, combined))
end
defp global_limit_exceeded? do
case :ets.lookup(:global_rate, :current_second) do
[{:current_second, count, _ts}] -> count > @global_rps_limit
[] -> false
end
end
defp increment_global_counter do
now = System.system_time(:second)
case :ets.lookup(:global_rate, :current_second) do
[{:current_second, count, ^now}] ->
:ets.insert(:global_rate, {:current_second, count + 1, now})
_ ->
:ets.insert(:global_rate, {:current_second, 1, now})
end
end
defp log_suspicious(conn) do
ip = conn.remote_ip |> :inet.ntoa() |> to_string()
:telemetry.execute(
[:prismatic, :security, :suspicious_request],
%{count: 1},
%{ip: ip, path: conn.request_path, method: conn.method}
)
end
end
Response Headers and Client Communication
Standard rate limit headers communicate quota status to well-behaved clients:
|--------|---------|---------|
X-RateLimit-Limit300X-RateLimit-Remaining247X-RateLimit-Reset1711324800Retry-After12The complete API security stack β token bucket rate limiting, per-client quotas, DDoS mitigation, and request validation β forms a layered defense that protects backend services while maintaining fair access for legitimate clients. The ETS-backed implementation ensures that security checks add sub-millisecond latency, making protection transparent to normal API consumers.