QANATIX
Getting Started

Python SDK

Install and use the Qanatix Python SDK for ingestion, search, and data management.

Python SDK

The official Python SDK for Qanatix. One import, typed responses, automatic retry, and both sync and async clients.

Install

pip install qanatix

With pandas support:

pip install qanatix[pandas]

Initialize

Two clients — one for your private data (requires API key), one for public data (no auth):

import qanatix

# Private — your data, requires API key
qx = qanatix.Qanatix("sk_live_...")

# Open — public data, no key needed
qx_open = qanatix.QanatixOpen()

Self-hosted:

qx = qanatix.Qanatix("sk_live_...", base_url="https://qanatix.your-company.com")
# Basic search
results = qx.search("parts_catalog", "stainless M8 bolt")
for r in results.results:
    print(r.name, r.score, r.collection_data)

# With filters and sorting
results = qx.search(
    "parts_catalog",
    "stainless bolt",
    filters={"material": "Stainless Steel", "price_eur_max": 1.00},
    sort="-score",
    limit=10,
)

Auto-paginate

for record in qx.search.iter("parts_catalog", "bolt", page_size=50, max_results=500):
    print(record.name)

Search public data

qx_open = qanatix.QanatixOpen()
results = qx_open.search("suppliers", "ISO 9001 certified Germany")
collections = qx_open.collections.list()

Records

# Create
rec = qx.records.create(
    "parts_catalog", "fastener", "Hex Bolt M10x50",
    data={"part_number": "HB-M10-50", "price_eur": 0.18},
)

# Get
rec = qx.records.get("record-uuid")

# List
recs = qx.records.list(limit=50)

# Update
qx.records.update("record-uuid", data={"price_eur": 0.20})

# Delete
qx.records.delete("record-uuid")

# Bulk
qx.records.bulk_update(["id1", "id2"], status="archived")
count = qx.records.bulk_delete(["id1", "id2", "id3"])

Ingest

Batch (JSON)

result = qx.ingest.batch("parts_catalog", "fastener", [
    {"name": "Hex Bolt M8x40", "part_number": "HB-M8-40", "price_eur": 0.12},
    {"name": "Hex Nut M8", "part_number": "HN-M8", "price_eur": 0.04},
])
print(result.summary.accepted)  # 2

Batches over 5,000 records are automatically chunked into multiple requests.

File upload

result = qx.ingest.upload("parts_catalog", "fastener", "parts.csv")

Supported formats: CSV, JSON, NDJSON, XML.

From DataFrame

import pandas as pd

df = pd.read_csv("parts.csv")
result = qx.ingest.from_dataframe("parts_catalog", "fastener", df, name_column="part_name")
print(f"Accepted: {result.summary.accepted}")

Upload status

status = qx.ingest.status(result.upload_id)
errors = qx.ingest.errors(result.upload_id)

Poll until complete

result = qx.ingest.batch("col", "type", records)
result.wait(poll_interval=2.0, timeout=120.0)

Connectors

# Create a PostgreSQL connector
conn = qx.connectors.create(
    name="erp-parts",
    connector_type="postgresql",
    collection="parts_catalog",
    record_type="fastener",
    connection_config={
        "host": "db.example.com",
        "port": 5432,
        "database": "erp",
        "user": "readonly",
        "password": "...",
    },
    query="SELECT part_number AS name, * FROM parts WHERE updated_at > :last_sync",
    schedule="0 */6 * * *",  # every 6 hours
)

# List
connectors = qx.connectors.list()

# Trigger pull
qx.connectors.pull(conn.id)

# Delete
qx.connectors.delete(conn.id)

Supported connector types: postgresql, mysql, mongodb, neo4j.

Collections

collections = qx.collections.list()
for c in collections:
    print(c.collection, c.record_count, c.record_types)

API Keys

# Create
key = qx.keys.create("pipeline-key", ["search", "upload"])
print(key.key)  # sk_live_... — shown once

# List
keys = qx.keys.list()

# Rotate
new_key = qx.keys.rotate(key.id)

# Revoke
qx.keys.revoke(key.id)

Export

response = qx.export("parts_catalog", format="csv")
with open("export.csv", "wb") as f:
    for chunk in response.iter_bytes():
        f.write(chunk)

Async

Every method is available in async via AsyncQanatix and AsyncQanatixOpen:

import qanatix

# Private
async with qanatix.AsyncQanatix("sk_live_...") as qx:
    results = await qx.search("parts_catalog", "stainless bolt")
    result = await qx.ingest.batch("parts_catalog", "fastener", records)
    rec = await qx.records.get("record-uuid")

# Open
async with qanatix.AsyncQanatixOpen() as qx:
    results = await qx.search("suppliers", "CNC machining suppliers")
    async for record in qx.search.iter("suppliers", "precision casting"):
        print(record.name)

Clients

ClientAuthAvailable resources
Qanatix("sk_live_...")API keyrecords, ingest, search, chat, connectors, webhooks, collections, keys, export
QanatixOpen()Nonesearch, collections
AsyncQanatix("sk_live_...")API keySame as Qanatix, async
AsyncQanatixOpen()NoneSame as QanatixOpen, async

Error handling

All errors inherit from QanatixError:

from qanatix import QanatixError, AuthenticationError, RateLimitError, NotFoundError

try:
    qx.records.get("nonexistent-id")
except NotFoundError:
    print("Record not found")
except RateLimitError as e:
    print(f"Rate limited — retry after {e.retry_after}s")
except QanatixError as e:
    print(f"API error {e.status_code}: {e}")
ExceptionHTTP Status
AuthenticationError401
PermissionError403
NotFoundError404
ValidationError422
RateLimitError429
ServerError5xx

The SDK automatically retries on 429, 502, 503, and 504 with exponential backoff (up to 3 attempts).

Webhooks

# Create a webhook
webhook = qx.webhooks.create(
    url="https://your-server.com/webhooks/qanatix",
    events=["record.created", "upload.complete"],
    description="Production webhook",
)
print(webhook.secret)  # save this — used to verify signatures

# List webhooks
webhooks = qx.webhooks.list()

# Test a webhook
result = qx.webhooks.test(webhook.id)
print(result)  # delivery status

# View deliveries
deliveries = qx.webhooks.deliveries(webhook.id)

# Delete
qx.webhooks.delete(webhook.id)

Verify signatures

from qanatix import verify_signature

is_valid = verify_signature(
    payload=request.body,
    signature=request.headers["X-Qanatix-Signature"],
    secret="whsec_...",
)

Next steps

On this page