Back to Blog
Engineering March 15, 2026 | 9 min read

Graph Database Patterns with KuzuDB for Entity Analysis

Using KuzuDB as an embedded graph database for entity relationship traversal, ownership chain analysis, Cypher-like queries, and visualization data generation.

Tomas Korcak (korczis)

Prismatic Platform

Intelligence analysis is fundamentally about relationships. Who owns what, who knows whom, which entities share addresses, directors, or financial flows. Relational databases handle entity storage well, but traversing multi-hop relationships efficiently requires a graph database. The Prismatic Platform uses KuzuDB, an embedded graph database with a Cypher-compatible query language, for entity relationship analysis.


Why KuzuDB


KuzuDB is an embedded, columnar graph database optimized for analytical workloads. Unlike Neo4j, it runs in-process without a separate server, which simplifies deployment in an umbrella application:


FeatureKuzuDBNeo4jPostgreSQL (recursive CTE)

|---------|--------|-------|---------------------------|

DeploymentEmbedded (NIF)ServerServer Query languageCypher-compatibleCypherSQL Multi-hop traversalNative, optimizedNative, optimizedRecursive CTE (slow) Memory modelColumnar, disk-backedHeap-basedRow-based ConcurrencySingle-writer, multi-readerFull MVCCFull MVCC Operational overheadZeroHighMedium

Integration Architecture


The KuzuDB integration is isolated in the prismatic_storage_kuzudb umbrella app. It provides a GenServer-based connection manager and a query builder:



defmodule PrismaticStorageKuzudb.Connection do

@moduledoc """

KuzuDB connection manager.


Maintains a persistent connection to the KuzuDB database

file and provides query execution with structured results.

Single-writer serialization through GenServer.

"""


use GenServer

require Logger


@type query_result :: {:ok, [map()]} | {:error, term()}


@spec start_link(keyword()) :: GenServer.on_start()

def start_link(opts) do

GenServer.start_link(__MODULE__, opts, name: __MODULE__)

end


@spec query(String.t(), map()) :: query_result()

def query(cypher, params \\ %{}) do

GenServer.call(__MODULE__, {:query, cypher, params}, 30_000)

end


@spec write(String.t(), map()) :: :ok | {:error, term()}

def write(cypher, params \\ %{}) do

GenServer.call(__MODULE__, {:write, cypher, params}, 30_000)

end


@impl GenServer

def init(opts) do

db_path = Keyword.fetch!(opts, :path)

{:ok, db} = Kuzu.Database.new(db_path)

{:ok, conn} = Kuzu.Connection.new(db)

{:ok, %{db: db, conn: conn}}

end


@impl GenServer

def handle_call({:query, cypher, params}, _from, %{conn: conn} = state) do

result = execute_query(conn, cypher, params)

{:reply, result, state}

end


@impl GenServer

def handle_call({:write, cypher, params}, _from, %{conn: conn} = state) do

result = execute_write(conn, cypher, params)

{:reply, result, state}

end


defp execute_query(conn, cypher, params) do

case Kuzu.Connection.query(conn, cypher, params) do

{:ok, result} -> {:ok, Kuzu.Result.to_maps(result)}

{:error, reason} -> {:error, reason}

end

rescue

e in RuntimeError -> {:error, Exception.message(e)}

end


defp execute_write(conn, cypher, params) do

case Kuzu.Connection.query(conn, cypher, params) do

{:ok, _result} -> :ok

{:error, reason} -> {:error, reason}

end

rescue

e in RuntimeError -> {:error, Exception.message(e)}

end

end


Schema Definition


Graph schemas define node tables (entities) and relationship tables (edges). The schema is created during application startup:



defmodule PrismaticStorageKuzudb.Schema do

@moduledoc """

Graph schema definition for entity relationship analysis.


Defines node tables for entities (companies, persons, addresses)

and relationship tables for ownership, directorship, and

address connections.

"""


alias PrismaticStorageKuzudb.Connection


@spec create_schema() :: :ok

def create_schema do

# Node tables

Connection.write("""

CREATE NODE TABLE IF NOT EXISTS Company (

id STRING, name STRING, ico STRING, country STRING,

risk_score DOUBLE, status STRING, PRIMARY KEY (id)

)

""")


Connection.write("""

CREATE NODE TABLE IF NOT EXISTS Person (

id STRING, name STRING, birth_date DATE, country STRING,

risk_score DOUBLE, PRIMARY KEY (id)

)

""")


Connection.write("""

CREATE NODE TABLE IF NOT EXISTS Address (

id STRING, street STRING, city STRING, postal_code STRING,

country STRING, PRIMARY KEY (id)

)

""")


# Relationship tables

Connection.write("""

CREATE REL TABLE IF NOT EXISTS OWNS (

FROM Person TO Company,

share_pct DOUBLE, since DATE, verified BOOLEAN

)

""")


Connection.write("""

CREATE REL TABLE IF NOT EXISTS DIRECTS (

FROM Person TO Company,

role STRING, since DATE, until DATE

)

""")


Connection.write("""

CREATE REL TABLE IF NOT EXISTS REGISTERED_AT (

FROM Company TO Address, since DATE

)

""")


Connection.write("""

CREATE REL TABLE IF NOT EXISTS SUBSIDIARY_OF (

FROM Company TO Company,

share_pct DOUBLE, since DATE

)

""")


:ok

end

end


Entity Relationship Traversal


The core analytical queries traverse the graph to discover ownership chains, shared directors, and address co-location patterns:



defmodule PrismaticStorageKuzudb.Analysis do

@moduledoc """

Graph analysis queries for entity relationship traversal.


Provides ownership chain discovery, shared director detection,

co-location analysis, and shortest path computation between

entities in the graph.

"""


alias PrismaticStorageKuzudb.Connection


@doc """

Discover the full ownership chain for a company, traversing

up through parent companies and ultimate beneficial owners.

"""

@spec ownership_chain(String.t(), non_neg_integer()) :: {:ok, [map()]} | {:error, term()}

def ownership_chain(company_id, max_depth \\ 5) do

Connection.query("""

MATCH path = (c:Company {id: $company_id})<-[:SUBSIDIARY_OF*1..#{max_depth}]-(parent:Company)

RETURN

nodes(path) AS chain,

[r IN relationships(path) | r.share_pct] AS share_percentages,

length(path) AS depth

ORDER BY depth ASC

""", %{company_id: company_id})

end


@doc """

Find persons who direct multiple companies (shared directors).

Useful for detecting undisclosed relationships between entities.

"""

@spec shared_directors(non_neg_integer()) :: {:ok, [map()]} | {:error, term()}

def shared_directors(min_companies \\ 2) do

Connection.query("""

MATCH (p:Person)-[:DIRECTS]->(c:Company)

WITH p, collect(c) AS companies, count(c) AS company_count

WHERE company_count >= $min_companies

RETURN p.name AS director, p.id AS person_id,

company_count,

[comp IN companies | comp.name] AS company_names

ORDER BY company_count DESC

LIMIT 100

""", %{min_companies: min_companies})

end


@doc """

Find companies registered at the same address.

Shell company detection heuristic.

"""

@spec address_colocation(String.t()) :: {:ok, [map()]} | {:error, term()}

def address_colocation(address_id) do

Connection.query("""

MATCH (c:Company)-[:REGISTERED_AT]->(a:Address {id: $address_id})

RETURN c.id AS company_id, c.name AS company_name,

c.risk_score AS risk_score, c.status AS status,

a.street AS street, a.city AS city

ORDER BY c.risk_score DESC

LIMIT 100

""", %{address_id: address_id})

end


@doc """

Shortest path between two entities of any type.

Useful for discovering indirect connections.

"""

@spec shortest_path(String.t(), String.t()) :: {:ok, [map()]} | {:error, term()}

def shortest_path(source_id, target_id) do

Connection.query("""

MATCH path = shortestPath(

(source {id: $source_id})-[*1..6]-(target {id: $target_id})

)

RETURN nodes(path) AS entities,

relationships(path) AS connections,

length(path) AS distance

""", %{source_id: source_id, target_id: target_id})

end

end


Visualization Data Generation


Graph query results are transformed into visualization-ready data structures compatible with D3.js force-directed graphs and Chart.js:



defmodule PrismaticStorageKuzudb.Visualization do

@moduledoc """

Transforms graph query results into visualization-ready

data structures for D3.js and Chart.js rendering.

"""


@type graph_data :: %{

nodes: [%{id: String.t(), label: String.t(), type: String.t(), risk: float()}],

edges: [%{source: String.t(), target: String.t(), label: String.t(), weight: float()}]

}


@spec entity_network(String.t(), non_neg_integer()) :: {:ok, graph_data()} | {:error, term()}

def entity_network(entity_id, depth \\ 2) do

case Connection.query(network_query(depth), %{entity_id: entity_id}) do

{:ok, results} -> {:ok, transform_to_graph(results)}

{:error, reason} -> {:error, reason}

end

end


defp transform_to_graph(results) do

nodes =

results

|> Enum.flat_map(fn row -> row["entities"] end)

|> Enum.uniq_by(& &1["id"])

|> Enum.map(fn entity ->

%{

id: entity["id"],

label: entity["name"],

type: entity["_label"],

risk: entity["risk_score"] || 0.0

}

end)


edges =

results

|> Enum.flat_map(fn row -> row["connections"] end)

|> Enum.map(fn rel ->

%{

source: rel["_src"],

target: rel["_dst"],

label: rel["_label"],

weight: rel["share_pct"] || 1.0

}

end)


%{nodes: nodes, edges: edges}

end

end


Query TypeAvg LatencyMax DepthTypical Result Size

|-----------|-------------|-----------|-------------------|

Ownership chain5-15ms5 hops10-50 nodes Shared directors10-30ms1 hop50-200 persons Address co-location3-8ms1 hop5-50 companies Shortest path15-50ms6 hops2-12 nodes Entity network20-80ms2 hops20-200 nodes

KuzuDB's embedded architecture eliminates network round-trips and deployment complexity. For an intelligence platform where relationship traversal is a core analytical capability, the performance characteristics of an in-process columnar graph database provide significant advantages over both relational recursive CTEs and client-server graph databases.

Tags

kuzudb graph-database cypher entity-resolution visualization

Related Glossary Terms