Data Ingestion
Streaming NDJSON
Stream data into QANATIX with backpressure control.
Streaming NDJSON
Stream newline-delimited JSON records into QANATIX in real-time. Useful for IoT sensors, event streams, and continuous data feeds.
Endpoint
curl -X POST https://api.qanatix.com/api/v1/ingest/iot/sensor_reading/stream \
-H "Authorization: Bearer sk_live_abc123..." \
-H "Content-Type: application/x-ndjson" \
--data-binary @- << 'EOF'
{"name": "Sensor A - Reading 1", "temperature": 22.5, "humidity": 45}
{"name": "Sensor A - Reading 2", "temperature": 22.7, "humidity": 44}
{"name": "Sensor B - Reading 1", "temperature": 19.8, "humidity": 62}
EOFHow it works
QANATIX buffers incoming records and flushes them in batches:
| Setting | Default |
|---|---|
| Batch size | 100 records |
| Flush interval | 5 seconds |
| Max pending records | 10,000 |
Records are flushed to the ingestion pipeline when either the batch size or flush interval is reached, whichever comes first.
Backpressure
If the buffer exceeds 10,000 pending records, QANATIX returns 429 Too Many Requests. Your client should back off and retry.
Python client example
import httpx
import json
async def stream_data(records):
async with httpx.AsyncClient() as client:
lines = "\n".join(json.dumps(r) for r in records)
resp = await client.post(
"https://api.qanatix.com/api/v1/ingest/iot/sensor_reading/stream",
headers={
"Authorization": "Bearer sk_live_abc123...",
"Content-Type": "application/x-ndjson",
},
content=lines,
)
return resp.json()Response
{
"accepted": 3,
"parse_errors": 0,
"buffer_size": 3,
"flushed": false
}| Field | Description |
|---|---|
accepted | Records successfully parsed and buffered |
parse_errors | Lines that failed JSON parsing (skipped) |
buffer_size | Current buffer depth |
flushed | Whether a batch was flushed during this request |