QANATIX
Data Ingestion

Custom Sources

Build your own data connector for QANATIX.

Custom Sources

QANATIX supports 10+ built-in data sources. If yours isn't covered, you can push data via the REST API or build a custom connector.

Option 1: REST API push (simplest)

Write a script that extracts data from your source and pushes it to the batch endpoint:

import httpx

QANATIX_URL = "https://api.qanatix.com/api/v1"
API_KEY = "sk_live_abc123..."

def sync_from_custom_source():
    # Extract from your source
    records = extract_from_your_system()

    # Transform to QANATIX format
    batch = []
    for record in records:
        batch.append({
            "name": record["title"],
            "source_id": f"custom-{record['id']}",  # upsert key
            "vertical_data": {
                "field1": record["value1"],
                "field2": record["value2"],
            },
        })

    # Load into QANATIX
    resp = httpx.post(
        f"{QANATIX_URL}/ingest/my_vertical/my_type/batch",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"records": batch},
    )
    print(f"Ingested {resp.json()['record_count']} records")

Use source_id for idempotent upserts — re-running the script updates existing records instead of creating duplicates.

Option 2: Webhook push (real-time)

Configure your system to POST to QANATIX when data changes:

curl -X POST https://api.qanatix.com/api/v1/webhooks/ingest/my_vertical/my_type \
  -H "Authorization: Bearer sk_live_abc123..." \
  -H "X-Webhook-Signature: sha256=..." \
  -H "Content-Type: application/json" \
  -d '{"records": [{"name": "...", "source_id": "...", "vertical_data": {...}}]}'

Set WEBHOOK_SECRET on your QANATIX instance to enable HMAC-SHA256 signature verification.

Option 3: NDJSON streaming (high throughput)

For continuous data feeds (IoT, event streams, logs):

import httpx
import json

records = [
    {"name": "Event 1", "value": 42.5},
    {"name": "Event 2", "value": 38.1},
]

lines = "\n".join(json.dumps(r) for r in records)

resp = httpx.post(
    f"{QANATIX_URL}/ingest/my_vertical/my_type/stream",
    headers={
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/x-ndjson",
    },
    content=lines,
)

See Streaming NDJSON for backpressure details.

Option 4: Database connector (scheduled pull)

If your data lives in a supported database (PostgreSQL, MySQL, MongoDB, Neo4j), use the database connector instead. It handles connection management, pagination, and scheduling.

Data format requirements

Every record needs at minimum:

FieldTypeRequiredDescription
namestringyesDisplay name for the entity
vertical_dataobjectnoYour structured data fields
source_idstringnoUnique ID from source system (enables upsert)

All other fields in the record body go into vertical_data automatically.

Schema validation

If you've registered a schema for your vertical, incoming data is validated against it. Invalid records are sent to the dead letter queue (DLQ) — check them via:

curl https://api.qanatix.com/api/v1/ingestions/\{ingestion_id\}/errors \
  -H "Authorization: Bearer sk_live_abc123..."

Scheduling tips

For recurring syncs, use a cron job or task scheduler:

# Sync every hour
0 * * * * python /path/to/sync_script.py

Or trigger via your CI/CD pipeline after data exports.

On this page