We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
Graph Database Patterns with KuzuDB for Entity Analysis
Using KuzuDB as an embedded graph database for entity relationship traversal, ownership chain analysis, Cypher-like queries, and visualization data generation.
Tomas Korcak (korczis)
Prismatic Platform
Intelligence analysis is fundamentally about relationships. Who owns what, who knows whom, which entities share addresses, directors, or financial flows. Relational databases handle entity storage well, but traversing multi-hop relationships efficiently requires a graph database. The Prismatic Platform uses KuzuDB, an embedded graph database with a Cypher-compatible query language, for entity relationship analysis.
Why KuzuDB
KuzuDB is an embedded, columnar graph database optimized for analytical workloads. Unlike Neo4j, it runs in-process without a separate server, which simplifies deployment in an umbrella application:
|---------|--------|-------|---------------------------|
Integration Architecture
The KuzuDB integration is isolated in the prismatic_storage_kuzudb umbrella app. It provides a GenServer-based connection manager and a query builder:
defmodule PrismaticStorageKuzudb.Connection do
@moduledoc """
KuzuDB connection manager.
Maintains a persistent connection to the KuzuDB database
file and provides query execution with structured results.
Single-writer serialization through GenServer.
"""
use GenServer
require Logger
@type query_result :: {:ok, [map()]} | {:error, term()}
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@spec query(String.t(), map()) :: query_result()
def query(cypher, params \\ %{}) do
GenServer.call(__MODULE__, {:query, cypher, params}, 30_000)
end
@spec write(String.t(), map()) :: :ok | {:error, term()}
def write(cypher, params \\ %{}) do
GenServer.call(__MODULE__, {:write, cypher, params}, 30_000)
end
@impl GenServer
def init(opts) do
db_path = Keyword.fetch!(opts, :path)
{:ok, db} = Kuzu.Database.new(db_path)
{:ok, conn} = Kuzu.Connection.new(db)
{:ok, %{db: db, conn: conn}}
end
@impl GenServer
def handle_call({:query, cypher, params}, _from, %{conn: conn} = state) do
result = execute_query(conn, cypher, params)
{:reply, result, state}
end
@impl GenServer
def handle_call({:write, cypher, params}, _from, %{conn: conn} = state) do
result = execute_write(conn, cypher, params)
{:reply, result, state}
end
defp execute_query(conn, cypher, params) do
case Kuzu.Connection.query(conn, cypher, params) do
{:ok, result} -> {:ok, Kuzu.Result.to_maps(result)}
{:error, reason} -> {:error, reason}
end
rescue
e in RuntimeError -> {:error, Exception.message(e)}
end
defp execute_write(conn, cypher, params) do
case Kuzu.Connection.query(conn, cypher, params) do
{:ok, _result} -> :ok
{:error, reason} -> {:error, reason}
end
rescue
e in RuntimeError -> {:error, Exception.message(e)}
end
end
Schema Definition
Graph schemas define node tables (entities) and relationship tables (edges). The schema is created during application startup:
defmodule PrismaticStorageKuzudb.Schema do
@moduledoc """
Graph schema definition for entity relationship analysis.
Defines node tables for entities (companies, persons, addresses)
and relationship tables for ownership, directorship, and
address connections.
"""
alias PrismaticStorageKuzudb.Connection
@spec create_schema() :: :ok
def create_schema do
# Node tables
Connection.write("""
CREATE NODE TABLE IF NOT EXISTS Company (
id STRING, name STRING, ico STRING, country STRING,
risk_score DOUBLE, status STRING, PRIMARY KEY (id)
)
""")
Connection.write("""
CREATE NODE TABLE IF NOT EXISTS Person (
id STRING, name STRING, birth_date DATE, country STRING,
risk_score DOUBLE, PRIMARY KEY (id)
)
""")
Connection.write("""
CREATE NODE TABLE IF NOT EXISTS Address (
id STRING, street STRING, city STRING, postal_code STRING,
country STRING, PRIMARY KEY (id)
)
""")
# Relationship tables
Connection.write("""
CREATE REL TABLE IF NOT EXISTS OWNS (
FROM Person TO Company,
share_pct DOUBLE, since DATE, verified BOOLEAN
)
""")
Connection.write("""
CREATE REL TABLE IF NOT EXISTS DIRECTS (
FROM Person TO Company,
role STRING, since DATE, until DATE
)
""")
Connection.write("""
CREATE REL TABLE IF NOT EXISTS REGISTERED_AT (
FROM Company TO Address, since DATE
)
""")
Connection.write("""
CREATE REL TABLE IF NOT EXISTS SUBSIDIARY_OF (
FROM Company TO Company,
share_pct DOUBLE, since DATE
)
""")
:ok
end
end
Entity Relationship Traversal
The core analytical queries traverse the graph to discover ownership chains, shared directors, and address co-location patterns:
defmodule PrismaticStorageKuzudb.Analysis do
@moduledoc """
Graph analysis queries for entity relationship traversal.
Provides ownership chain discovery, shared director detection,
co-location analysis, and shortest path computation between
entities in the graph.
"""
alias PrismaticStorageKuzudb.Connection
@doc """
Discover the full ownership chain for a company, traversing
up through parent companies and ultimate beneficial owners.
"""
@spec ownership_chain(String.t(), non_neg_integer()) :: {:ok, [map()]} | {:error, term()}
def ownership_chain(company_id, max_depth \\ 5) do
Connection.query("""
MATCH path = (c:Company {id: $company_id})<-[:SUBSIDIARY_OF*1..#{max_depth}]-(parent:Company)
RETURN
nodes(path) AS chain,
[r IN relationships(path) | r.share_pct] AS share_percentages,
length(path) AS depth
ORDER BY depth ASC
""", %{company_id: company_id})
end
@doc """
Find persons who direct multiple companies (shared directors).
Useful for detecting undisclosed relationships between entities.
"""
@spec shared_directors(non_neg_integer()) :: {:ok, [map()]} | {:error, term()}
def shared_directors(min_companies \\ 2) do
Connection.query("""
MATCH (p:Person)-[:DIRECTS]->(c:Company)
WITH p, collect(c) AS companies, count(c) AS company_count
WHERE company_count >= $min_companies
RETURN p.name AS director, p.id AS person_id,
company_count,
[comp IN companies | comp.name] AS company_names
ORDER BY company_count DESC
LIMIT 100
""", %{min_companies: min_companies})
end
@doc """
Find companies registered at the same address.
Shell company detection heuristic.
"""
@spec address_colocation(String.t()) :: {:ok, [map()]} | {:error, term()}
def address_colocation(address_id) do
Connection.query("""
MATCH (c:Company)-[:REGISTERED_AT]->(a:Address {id: $address_id})
RETURN c.id AS company_id, c.name AS company_name,
c.risk_score AS risk_score, c.status AS status,
a.street AS street, a.city AS city
ORDER BY c.risk_score DESC
LIMIT 100
""", %{address_id: address_id})
end
@doc """
Shortest path between two entities of any type.
Useful for discovering indirect connections.
"""
@spec shortest_path(String.t(), String.t()) :: {:ok, [map()]} | {:error, term()}
def shortest_path(source_id, target_id) do
Connection.query("""
MATCH path = shortestPath(
(source {id: $source_id})-[*1..6]-(target {id: $target_id})
)
RETURN nodes(path) AS entities,
relationships(path) AS connections,
length(path) AS distance
""", %{source_id: source_id, target_id: target_id})
end
end
Visualization Data Generation
Graph query results are transformed into visualization-ready data structures compatible with D3.js force-directed graphs and Chart.js:
defmodule PrismaticStorageKuzudb.Visualization do
@moduledoc """
Transforms graph query results into visualization-ready
data structures for D3.js and Chart.js rendering.
"""
@type graph_data :: %{
nodes: [%{id: String.t(), label: String.t(), type: String.t(), risk: float()}],
edges: [%{source: String.t(), target: String.t(), label: String.t(), weight: float()}]
}
@spec entity_network(String.t(), non_neg_integer()) :: {:ok, graph_data()} | {:error, term()}
def entity_network(entity_id, depth \\ 2) do
case Connection.query(network_query(depth), %{entity_id: entity_id}) do
{:ok, results} -> {:ok, transform_to_graph(results)}
{:error, reason} -> {:error, reason}
end
end
defp transform_to_graph(results) do
nodes =
results
|> Enum.flat_map(fn row -> row["entities"] end)
|> Enum.uniq_by(& &1["id"])
|> Enum.map(fn entity ->
%{
id: entity["id"],
label: entity["name"],
type: entity["_label"],
risk: entity["risk_score"] || 0.0
}
end)
edges =
results
|> Enum.flat_map(fn row -> row["connections"] end)
|> Enum.map(fn rel ->
%{
source: rel["_src"],
target: rel["_dst"],
label: rel["_label"],
weight: rel["share_pct"] || 1.0
}
end)
%{nodes: nodes, edges: edges}
end
end
|-----------|-------------|-----------|-------------------|
KuzuDB's embedded architecture eliminates network round-trips and deployment complexity. For an intelligence platform where relationship traversal is a core analytical capability, the performance characteristics of an in-process columnar graph database provide significant advantages over both relational recursive CTEs and client-server graph databases.