We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
Getting Started with the Prismatic Python SDK
Complete guide to the Prismatic Python SDK covering async queries, pandas integration for data analysis, Jupyter notebook workflows, and batch operations.
Tomas Korcak (korczis)
Prismatic Platform
The Prismatic Python SDK offers an async-first interface to the platform API, with native pandas DataFrame integration for analytical workflows. Whether you are screening entities in a Jupyter notebook or building automated pipelines, the SDK handles authentication, pagination, and type validation.
Installation
Install from PyPI with optional extras for data analysis:
pip install prismatic-sdk
# With pandas and Jupyter integration
pip install prismatic-sdk[analysis]
The SDK requires Python 3.11+ and uses httpx for async HTTP with connection pooling.
Client Configuration
The client supports both synchronous and asynchronous usage. The async client is recommended for production workloads:
import os
from prismatic import PrismaticClient, AsyncPrismaticClient
# Synchronous client (scripts, notebooks)
client = PrismaticClient(
base_url=os.environ.get("PRISMATIC_API_URL", "https://api.prismatic.local"),
api_key=os.environ["PRISMATIC_API_KEY"],
timeout=30.0,
)
# Async client (production services)
async_client = AsyncPrismaticClient(
base_url=os.environ["PRISMATIC_API_URL"],
api_key=os.environ["PRISMATIC_API_KEY"],
max_connections=20,
)
On the server side, the API key validation uses an ETS-backed registry for sub-millisecond lookups:
defmodule PrismaticWeb.Auth.ApiKeyRegistry do
@moduledoc """
ETS-backed API key registry for O(1) key validation.
Keys are loaded at startup and refreshed every 60 seconds
from the database to handle revocations.
"""
use GenServer
require Logger
@table :api_key_registry
@refresh_interval :timer.seconds(60)
@spec validate(String.t()) :: {:ok, User.t()} | {:error, :invalid_key}
def validate(api_key) do
case :ets.lookup(@table, hash_key(api_key)) do
[{_key, user_id, _scopes}] -> {:ok, Users.get!(user_id)}
[] -> {:error, :invalid_key}
end
end
defp hash_key(key), do: :crypto.hash(:sha256, key)
end
Async Query Patterns
The async client uses Python's asyncio for concurrent operations. This is particularly effective for multi-source intelligence gathering:
import asyncio
from prismatic import AsyncPrismaticClient, EntityType
async def multi_source_search(names: list[str]) -> list[dict]:
async with AsyncPrismaticClient() as client:
tasks = [
client.osint.search(
query=name,
entity_types=[EntityType.COMPANY],
sources=["czech_business_registry", "sanctions_eu", "sanctions_us"],
)
for name in names
]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
if failed:
print(f"Warning: {len(failed)} queries failed")
return successful
|-------------|----------|-------------|-----------------|
PrismaticClientAsyncPrismaticClientPrismaticClient.batchPandas Integration
The SDK provides built-in DataFrame conversion for analytical workflows. Every response object has a .to_dataframe() method:
import pandas as pd
from prismatic import PrismaticClient
client = PrismaticClient()
# Search returns a typed response with .to_dataframe()
results = client.osint.search(
query="Progresus",
entity_types=["company"],
sources=["all"],
limit=100,
)
# Convert to DataFrame for analysis
df = results.to_dataframe()
print(df.columns.tolist())
# ['id', 'name', 'type', 'confidence', 'risk_score', 'sources', 'created_at']
# Filter high-risk entities
high_risk = df[df["risk_score"] > 0.7].sort_values("risk_score", ascending=False)
print(f"High-risk entities: {len(high_risk)}")
# Group by source for coverage analysis
source_coverage = df.explode("sources").groupby("sources").size()
print(source_coverage)
Jupyter Notebook Workflows
The SDK includes rich display formatters for Jupyter environments. Entities, cases, and search results render as interactive HTML tables:
# In a Jupyter notebook cell
from prismatic import PrismaticClient
from prismatic.display import configure_notebook
# Enable rich display formatting
configure_notebook()
client = PrismaticClient()
# This renders as an interactive table in Jupyter
case_details = client.dd.get_case("case-123", include=["entities", "findings"])
case_details # Rich HTML display with expandable sections
Analytical Pipeline Example
Build a complete screening pipeline in a notebook:
import pandas as pd
from prismatic import PrismaticClient
client = PrismaticClient()
# Step 1: Load entity list from CSV
entities_df = pd.read_csv("screening_list.csv")
names = entities_df["company_name"].tolist()
# Step 2: Batch screening
screening = client.osint.batch_search(
queries=[{"query": n, "sources": ["all"]} for n in names],
concurrency=5,
)
# Step 3: Aggregate results into DataFrame
results_df = screening.to_dataframe()
results_df["original_name"] = names
# Step 4: Risk classification
results_df["risk_category"] = pd.cut(
results_df["risk_score"],
bins=[0, 0.3, 0.6, 0.85, 1.0],
labels=["low", "medium", "high", "critical"],
)
# Step 5: Summary statistics
summary = results_df.groupby("risk_category").agg(
count=("id", "size"),
avg_confidence=("confidence", "mean"),
avg_risk=("risk_score", "mean"),
).round(3)
print(summary)
Batch Operations
For large-scale screening jobs, the batch API processes entities server-side using Broadway pipelines. The Python SDK handles chunking and progress tracking:
from prismatic import PrismaticClient
client = PrismaticClient()
# Screen 1000+ entities with progress tracking
with client.osint.batch_context(concurrency=10) as batch:
for company in large_company_list:
batch.add(query=company["name"], sources=["all"])
results = batch.execute(
on_progress=lambda done, total: print(f"{done}/{total}"),
on_error=lambda name, err: print(f"Failed: {name}: {err}"),
)
print(f"Screened {results.total} entities")
print(f"Flagged: {results.flagged_count}")
The server-side Broadway pipeline that processes these batches:
defmodule PrismaticOsint.BatchPipeline do
@moduledoc """
Broadway pipeline for concurrent batch OSINT screening.
Processes entity batches with configurable concurrency,
rate limiting per source, and result aggregation.
"""
use Broadway
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Broadway.DummyProducer, []},
concurrency: 1
],
processors: [
default: [concurrency: Keyword.get(opts, :concurrency, 10)]
],
batchers: [
default: [batch_size: 50, batch_timeout: 5_000]
]
)
end
@impl Broadway
def handle_message(_, message, _context) do
case OsintMesh.search(message.data.query, message.data.sources) do
{:ok, results} -> Message.put_data(message, results)
{:error, reason} -> Message.failed(message, reason)
end
end
end
Error Handling
The SDK raises typed exceptions with structured error details:
from prismatic.exceptions import (
PrismaticApiError,
RateLimitError,
ValidationError,
AuthenticationError,
)
try:
result = client.osint.search(query="test")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
print(f"Invalid params: {e.details}")
except AuthenticationError:
print("API key invalid or expired")
except PrismaticApiError as e:
print(f"API error {e.status_code}: {e.message}")
|---------------|-------------|----------------|
RateLimitErrorretry_after secondsValidationErrorAuthenticationErrorNotFoundErrorServerErrorConfiguration Reference
Environment variables supported by the SDK:
PRISMATIC_API_URL=https://api.prismatic.local
PRISMATIC_API_KEY=psk_live_...
PRISMATIC_TIMEOUT=30
PRISMATIC_MAX_RETRIES=3
PRISMATIC_LOG_LEVEL=WARNING
The Python SDK is designed for both interactive exploration in Jupyter and production automation pipelines. Combined with pandas, it provides a powerful analytical layer over the platform's intelligence capabilities.