Skip to main content

Installation

pip install streamlogia

Initialise the client

import os
from logingestor import LogIngestorClient

client = LogIngestorClient(
    api_key=os.environ["LOGINGESTOR_API_KEY"],
    project_id=os.environ["LOGINGESTOR_PROJECT_ID"],
    source="my-service",
)
ParameterDefaultDescription
api_keyrequiredYour Log Ingestor API key
project_idrequiredUUID of the project to ingest into
source"unknown"Default source tag applied to every entry
batch_size1Flush the queue when it reaches this many entries
consoleTrueMirror logs to stdout/stderr
on_errorprints to stderrCalled with the exception when an ingest request fails

Log methods

client.debug("Cache miss", meta={"key": "user:abc"})
client.info("Order created", meta={"order_id": "ord_99"}, tags=["orders"])
client.warn("Rate limit approaching", tags=["alerts"])
client.error("Payment failed", meta={"order_id": "o_456", "reason": "card_declined"})
All methods accept:
  • meta — arbitrary dict of structured fields attached to the entry
  • tags — list of string tags for filtering in the UI
Console output: DEBUG/INFO go to stdout, WARN/ERROR go to stderr. Set console=False to suppress.

Flush and close

client.close()  # flush remaining logs before exit

Standard library logging integration

import logging
import os
from logingestor import LogIngestorClient

client = LogIngestorClient(
    api_key=os.environ["LOGINGESTOR_API_KEY"],
    project_id=os.environ["LOGINGESTOR_PROJECT_ID"],
    source="my-service",
)

logger = logging.getLogger("my-service")
logger.setLevel(logging.DEBUG)
logger.addHandler(client.logging_handler())

logger.info("Server started on port 8080")
logger.error("Database connection failed", exc_info=True)
# ↑ Exception traceback is automatically captured in meta
Structured fields can be passed via extra={"meta": {...}}:
logger.info("payment record persisted", extra={"meta": {"payment_id": "pay_abc"}})

Flask integration

import os
import signal
import sys
from flask import Flask, jsonify, request
from logingestor import LogIngestorClient

client = LogIngestorClient(
    api_key=os.environ["LOGINGESTOR_API_KEY"],
    project_id=os.environ["LOGINGESTOR_PROJECT_ID"],
    source="payment-service",
)

signal.signal(signal.SIGTERM, lambda *_: (client.close(), sys.exit(0)))

app = Flask(__name__)
client.flask_middleware(app)  # logs every request automatically

@app.post("/payments")
def create_payment():
    data = request.get_json()
    payment_id = f"pay_{os.urandom(4).hex()}"
    client.info("payment processed", meta={
        "payment_id": payment_id,
        "customer_id": data.get("customerId"),
        "amount": data.get("amount"),
    }, tags=["payments"])
    return jsonify({"paymentId": payment_id}), 201

FastAPI / ASGI integration

import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from logingestor import LogIngestorClient

client = LogIngestorClient(
    api_key=os.environ["LOGINGESTOR_API_KEY"],
    project_id=os.environ["LOGINGESTOR_PROJECT_ID"],
    source="order-service",
)

@asynccontextmanager
async def lifespan(app: FastAPI):
    yield
    client.close()

app = FastAPI(lifespan=lifespan)
app.add_middleware(client.asgi_middleware())  # logs every request automatically

@app.post("/orders", status_code=201)
async def create_order(body: dict):
    order_id = f"ord_{os.urandom(4).hex()}"
    client.info("order created", meta={
        "order_id": order_id,
        "customer_id": body.get("customerId"),
    }, tags=["orders"])
    return {"orderId": order_id}
The middleware captures method, path, status, duration_ms, ip, user_agent, and request_id for every request. Status >=500 is logged at ERROR, >=400 at WARN, others at INFO.

Raw HTTP (no SDK)

import os
import urllib.request
import json

def ingest(entries: list) -> None:
    body = json.dumps(entries).encode()
    req = urllib.request.Request(
        "https://api.streamlogia.com/v1/ingest",
        data=body,
        headers={
            "Authorization": f"Bearer {os.environ['LOGINGESTOR_API_KEY']}",
            "Content-Type": "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=5) as resp:
        if resp.status >= 400:
            raise RuntimeError(f"ingest failed with status {resp.status}")

ingest([{
    "projectId": os.environ["LOGINGESTOR_PROJECT_ID"],
    "level":     "INFO",
    "message":   "Hello, Streamlogia!",
    "source":    "my-service",
    "timestamp": "2024-01-01T00:00:00Z",
    "tags":      [],
    "meta":      {},
}])