demo_fastapi.py

← Back to explorer
api_demos/demo_fastapi.py
# Created by AG on 22-12-2025

import re
import json
from pathlib import Path
from typing import Any, Tuple
from fastavro import parse_schema
from fastavro.validation import validate, ValidationError
from fastapi import FastAPI, HTTPException, Request

demo = FastAPI()

email_regex = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
AvroContract = parse_schema(json.loads(Path("contracts/sample.avsc").read_text(encoding="utf-8")))
valid_fields = {
    field["name"] for field in AvroContract["fields"]
}

def parse_error(message: str) -> Tuple[str, int]:
    message = message.lower()

    if "required" in message and "field" in message:
        return "missing fields", 422

    if "symbols" in message and ("not in" in message or "valid" in message):
        return "invalid enum", 422

    if "null" in message and ("expected" in message or "valid" in message or "allowed" in  message):
        return "value cannot be null", 422

    if "expected" in message or "is not a" in message or "not a valid" in message:
        return "data type mismatch", 422

    return "schema could not be validated", 422


def validate_request(
    payload: Any
) -> None:
    if not isinstance(payload, dict):
        raise HTTPException(
            status_code=400,
            detail={
                "error": "invalid_request_body",
                "message": "Request must be JSON"
            },
        )

    invalid_fields = sorted(set(payload.keys()) - valid_fields)

    if invalid_fields:
        raise HTTPException(
            status_code=400,
            detail={
                "error": "unknown_fields",
                "message": "Unknown fields in request body",
                "fields": invalid_fields
            },
        )

    if "email" in payload and payload["email"] is not None:
        if not isinstance(payload["email"], str) or not email_regex.match(payload["email"]):
            raise HTTPException(
                status_code=422,
                detail={
                    "error": "invalid_email",
                    "message": "Email is not valid",
                    "field": "email"
                },
            )

    try:
        validate(payload, AvroContract, raise_errors=True, strict=True)
    except ValidationError as err:
        error_message, status_code = parse_error(str(err))
        raise HTTPException(
            status_code=status_code,
            detail={
                "error": error_message,
                "message": "Schema validation failed",
                "details": str(err)
            },
        )

@demo.get("/status")
def status():
    return {
        "OK": True
    }

@demo.post("/events/usercreated")
async def usercreated(request: Request):
    try:
        payload = await request.json()
    except Exception:
        raise HTTPException(
            status_code=400,
            detail={
                "error": "invalid_json_body",
                "message": "Request body is not valid JSON"
            }
        )

    validate_request(payload)

    return {
        "OK": True
    }