blog.py

← Back to explorer
utils/blog.py
import os
import re
import hashlib
from datetime import datetime

import psycopg


def get_blog_db_connection():
    """Blog DB connection.

    Uses BLOG_DB_URL if set, otherwise falls back to KEYLOGGER_DB_URL.
    (Keeping deployment simple: you can reuse the same Postgres instance.)
    """
    dsn = os.getenv("BLOG_DB_URL") or os.getenv("KEYLOGGER_DB_URL")
    if not dsn:
        raise RuntimeError("BLOG_DB_URL (or KEYLOGGER_DB_URL) is not set")
    return psycopg.connect(dsn)


def ensure_blog_schema():
    """Create/upgrade blog tables.

    This is intentionally idempotent so it can run on every boot.
    """
    ddl = """
    -- Core posts
    CREATE TABLE IF NOT EXISTS blog_posts (
        id SERIAL PRIMARY KEY,
        slug TEXT UNIQUE NOT NULL,
        title TEXT NOT NULL,
        excerpt TEXT NOT NULL DEFAULT '',
        tags TEXT[] NOT NULL DEFAULT '{}',
        content_md TEXT NOT NULL,
        content_html TEXT NOT NULL,
        is_published BOOLEAN NOT NULL DEFAULT FALSE,
        created_at TIMESTAMP NOT NULL DEFAULT NOW(),
        updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
        published_at TIMESTAMP
    );

    -- Optional cover image
    ALTER TABLE blog_posts
    ADD COLUMN IF NOT EXISTS cover_image TEXT;

    -- Tags (array)
    ALTER TABLE blog_posts
    ADD COLUMN IF NOT EXISTS tags TEXT[] NOT NULL DEFAULT '{}';

    -- Likes (anonymous)
    CREATE TABLE IF NOT EXISTS blog_likes (
        id SERIAL PRIMARY KEY,
        post_id INTEGER NOT NULL REFERENCES blog_posts(id) ON DELETE CASCADE,
        anon_hash TEXT NOT NULL,
        created_at TIMESTAMP NOT NULL DEFAULT NOW(),
        UNIQUE (post_id, anon_hash)
    );

    -- Comments (anonymous)
    CREATE TABLE IF NOT EXISTS blog_comments (
        id SERIAL PRIMARY KEY,
        post_id INTEGER NOT NULL REFERENCES blog_posts(id) ON DELETE CASCADE,
        anon_hash TEXT NOT NULL,
        name TEXT NOT NULL DEFAULT 'Anonymous',
        content TEXT NOT NULL,
        is_hidden BOOLEAN NOT NULL DEFAULT FALSE,
        created_at TIMESTAMP NOT NULL DEFAULT NOW()
    );

    -- Legacy compatibility (some earlier versions used display_name)
    ALTER TABLE blog_comments
    ADD COLUMN IF NOT EXISTS display_name TEXT;

    -- Extra images per post
    CREATE TABLE IF NOT EXISTS blog_images (
        id SERIAL PRIMARY KEY,
        post_id INTEGER NOT NULL REFERENCES blog_posts(id) ON DELETE CASCADE,
        filename TEXT NOT NULL,
        sort_order INTEGER NOT NULL DEFAULT 0,
        created_at TIMESTAMP NOT NULL DEFAULT NOW()
    );


        -- Migrations for older schemas
    ALTER TABLE blog_posts ADD COLUMN IF NOT EXISTS cover_image TEXT;
    ALTER TABLE blog_images ADD COLUMN IF NOT EXISTS filename TEXT;
    ALTER TABLE blog_images ADD COLUMN IF NOT EXISTS sort_order INTEGER NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS idx_blog_posts_published
    ON blog_posts (is_published, published_at DESC NULLS LAST, created_at DESC);

    CREATE INDEX IF NOT EXISTS idx_blog_posts_slug
    ON blog_posts (slug);

    CREATE INDEX IF NOT EXISTS idx_blog_likes_post
    ON blog_likes (post_id);

    CREATE INDEX IF NOT EXISTS idx_blog_comments_post
    ON blog_comments (post_id, created_at DESC);

    CREATE INDEX IF NOT EXISTS idx_blog_images_post
    ON blog_images (post_id, sort_order ASC, id ASC);
    """

    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(ddl)
        conn.commit()
    finally:
        conn.close()


def slugify(text: str) -> str:
    text = text.strip().lower()
    text = re.sub(r"[^a-z0-9\s-]", "", text)
    text = re.sub(r"[\s_-]+", "-", text)
    text = re.sub(r"^-+|-+$", "", text)
    return text or "post"


def client_anon_hash(request) -> str:
    """Hash the client identity for anonymous likes/comments.

    Uses IP + User-Agent + a server-side salt.
    """
    salt = os.getenv("ANON_SALT") or "change-me"
    ip = (request.headers.get("X-Forwarded-For", "").split(",")[0].strip() or request.remote_addr or "")
    ua = request.headers.get("User-Agent", "")
    raw = f"{ip}|{ua}|{salt}".encode("utf-8", errors="ignore")
    return hashlib.sha256(raw).hexdigest()


def utcnow():
    # Postgres uses server timezone; keep timestamps consistent when set manually.
    return datetime.utcnow()