Getting Started
Python SDK
Install and use the Qanatix Python SDK for ingestion, search, and data management.
Python SDK
The official Python SDK for Qanatix. One import, typed responses, automatic retry, and both sync and async clients.
Install
pip install qanatixWith pandas support:
pip install qanatix[pandas]Initialize
Two clients — one for your private data (requires API key), one for public data (no auth):
import qanatix
# Private — your data, requires API key
qx = qanatix.Qanatix("sk_live_...")
# Open — public data, no key needed
qx_open = qanatix.QanatixOpen()Self-hosted:
qx = qanatix.Qanatix("sk_live_...", base_url="https://qanatix.your-company.com")Search
# Basic search
results = qx.search("parts_catalog", "stainless M8 bolt")
for r in results.results:
print(r.name, r.score, r.collection_data)
# With filters and sorting
results = qx.search(
"parts_catalog",
"stainless bolt",
filters={"material": "Stainless Steel", "price_eur_max": 1.00},
sort="-score",
limit=10,
)Auto-paginate
for record in qx.search.iter("parts_catalog", "bolt", page_size=50, max_results=500):
print(record.name)Search public data
qx_open = qanatix.QanatixOpen()
results = qx_open.search("suppliers", "ISO 9001 certified Germany")
collections = qx_open.collections.list()Records
# Create
rec = qx.records.create(
"parts_catalog", "fastener", "Hex Bolt M10x50",
data={"part_number": "HB-M10-50", "price_eur": 0.18},
)
# Get
rec = qx.records.get("record-uuid")
# List
recs = qx.records.list(limit=50)
# Update
qx.records.update("record-uuid", data={"price_eur": 0.20})
# Delete
qx.records.delete("record-uuid")
# Bulk
qx.records.bulk_update(["id1", "id2"], status="archived")
count = qx.records.bulk_delete(["id1", "id2", "id3"])Ingest
Batch (JSON)
result = qx.ingest.batch("parts_catalog", "fastener", [
{"name": "Hex Bolt M8x40", "part_number": "HB-M8-40", "price_eur": 0.12},
{"name": "Hex Nut M8", "part_number": "HN-M8", "price_eur": 0.04},
])
print(result.summary.accepted) # 2Batches over 5,000 records are automatically chunked into multiple requests.
File upload
result = qx.ingest.upload("parts_catalog", "fastener", "parts.csv")Supported formats: CSV, JSON, NDJSON, XML.
From DataFrame
import pandas as pd
df = pd.read_csv("parts.csv")
result = qx.ingest.from_dataframe("parts_catalog", "fastener", df, name_column="part_name")
print(f"Accepted: {result.summary.accepted}")Upload status
status = qx.ingest.status(result.upload_id)
errors = qx.ingest.errors(result.upload_id)Poll until complete
result = qx.ingest.batch("col", "type", records)
result.wait(poll_interval=2.0, timeout=120.0)Connectors
# Create a PostgreSQL connector
conn = qx.connectors.create(
name="erp-parts",
connector_type="postgresql",
collection="parts_catalog",
record_type="fastener",
connection_config={
"host": "db.example.com",
"port": 5432,
"database": "erp",
"user": "readonly",
"password": "...",
},
query="SELECT part_number AS name, * FROM parts WHERE updated_at > :last_sync",
schedule="0 */6 * * *", # every 6 hours
)
# List
connectors = qx.connectors.list()
# Trigger pull
qx.connectors.pull(conn.id)
# Delete
qx.connectors.delete(conn.id)Supported connector types: postgresql, mysql, mongodb, neo4j.
Collections
collections = qx.collections.list()
for c in collections:
print(c.collection, c.record_count, c.record_types)API Keys
# Create
key = qx.keys.create("pipeline-key", ["search", "upload"])
print(key.key) # sk_live_... — shown once
# List
keys = qx.keys.list()
# Rotate
new_key = qx.keys.rotate(key.id)
# Revoke
qx.keys.revoke(key.id)Export
response = qx.export("parts_catalog", format="csv")
with open("export.csv", "wb") as f:
for chunk in response.iter_bytes():
f.write(chunk)Async
Every method is available in async via AsyncQanatix and AsyncQanatixOpen:
import qanatix
# Private
async with qanatix.AsyncQanatix("sk_live_...") as qx:
results = await qx.search("parts_catalog", "stainless bolt")
result = await qx.ingest.batch("parts_catalog", "fastener", records)
rec = await qx.records.get("record-uuid")
# Open
async with qanatix.AsyncQanatixOpen() as qx:
results = await qx.search("suppliers", "CNC machining suppliers")
async for record in qx.search.iter("suppliers", "precision casting"):
print(record.name)Clients
| Client | Auth | Available resources |
|---|---|---|
Qanatix("sk_live_...") | API key | records, ingest, search, chat, connectors, webhooks, collections, keys, export |
QanatixOpen() | None | search, collections |
AsyncQanatix("sk_live_...") | API key | Same as Qanatix, async |
AsyncQanatixOpen() | None | Same as QanatixOpen, async |
Error handling
All errors inherit from QanatixError:
from qanatix import QanatixError, AuthenticationError, RateLimitError, NotFoundError
try:
qx.records.get("nonexistent-id")
except NotFoundError:
print("Record not found")
except RateLimitError as e:
print(f"Rate limited — retry after {e.retry_after}s")
except QanatixError as e:
print(f"API error {e.status_code}: {e}")| Exception | HTTP Status |
|---|---|
AuthenticationError | 401 |
PermissionError | 403 |
NotFoundError | 404 |
ValidationError | 422 |
RateLimitError | 429 |
ServerError | 5xx |
The SDK automatically retries on 429, 502, 503, and 504 with exponential backoff (up to 3 attempts).
Webhooks
# Create a webhook
webhook = qx.webhooks.create(
url="https://your-server.com/webhooks/qanatix",
events=["record.created", "upload.complete"],
description="Production webhook",
)
print(webhook.secret) # save this — used to verify signatures
# List webhooks
webhooks = qx.webhooks.list()
# Test a webhook
result = qx.webhooks.test(webhook.id)
print(result) # delivery status
# View deliveries
deliveries = qx.webhooks.deliveries(webhook.id)
# Delete
qx.webhooks.delete(webhook.id)Verify signatures
from qanatix import verify_signature
is_valid = verify_signature(
payload=request.body,
signature=request.headers["X-Qanatix-Signature"],
secret="whsec_...",
)Next steps
- Quickstart — end-to-end example
- Search — filters, sorting, response formats
- Data Import — all ingestion methods
- API Reference — full HTTP API docs