server.py

← Back to explorer
server.py
import os
import base64
import mimetypes
from urllib.parse import quote

from flask import Flask, render_template, abort, request, redirect, url_for, session, flash, jsonify
from werkzeug.utils import secure_filename
from uuid import uuid4
import requests
import markdown as md
import bleach
from utils.metas import projects, github_projects  # :contentReference[oaicite:3]{index=3}
from utils.utils import header, fetch_github_contents, get_project_metadata, get_file_summary, load_activity_data  # :contentReference[oaicite:4]{index=4}
from utils.blog import (
    ensure_blog_schema,
    get_blog_db_connection,
    slugify,
    client_anon_hash,
    utcnow,
)

app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY") or os.urandom(32)
# Uploads (blog images)
UPLOAD_FOLDER = os.path.join("static", "uploads")
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "webp", "gif"}

app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER

def _ensure_upload_dir():
    os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)

def _allowed_file(filename: str) -> bool:
    return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS

def _save_upload(file_storage):
    """Save an uploaded file into static/uploads and return the stored filename."""
    if not file_storage or not getattr(file_storage, "filename", ""):
        return None
    filename = secure_filename(file_storage.filename)
    if not filename or not _allowed_file(filename):
        return None
    ext = filename.rsplit(".", 1)[1].lower()
    stored = f"{uuid4().hex}.{ext}"
    _ensure_upload_dir()
    file_storage.save(os.path.join(app.config["UPLOAD_FOLDER"], stored))
    return stored



@app.before_request
def _ensure_blog_schema_once():
    # A tiny "lazy init" so the site works without a separate migration step.
    if not getattr(app, "_blog_schema_ready", False):
        ensure_blog_schema()
        app._blog_schema_ready = True


def _is_admin_logged_in() -> bool:
    return bool(session.get("is_admin"))


def _require_admin():
    if not _is_admin_logged_in():
        return redirect(url_for("admin_login", next=request.path))
    return None


def _render_markdown_safe(markdown_text: str) -> str:
    html = md.markdown(
        markdown_text or "",
        extensions=["fenced_code", "tables", "codehilite"],
        output_format="html5",
    )

    allowed_tags = set(bleach.sanitizer.ALLOWED_TAGS).union(
        {
            "p",
            "pre",
            "code",
            "h1",
            "h2",
            "h3",
            "h4",
            "h5",
            "h6",
            "img",
            "blockquote",
            "hr",
            "br",
            "span",
            "div",
            "table",
            "thead",
            "tbody",
            "tr",
            "th",
            "td",
        }
    )

    allowed_attrs = {
        **bleach.sanitizer.ALLOWED_ATTRIBUTES,
        "a": ["href", "title", "rel", "target"],
        "img": ["src", "alt", "title"],
        "*": ["class"],
    }

    return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs, strip=True)

@app.route("/")
def home():
    return render_template("main.html")

@app.route("/activity")
def activity():
    days = load_activity_data()
    latest_three = days[-3:]
    return render_template("activity.html", days=days, latest_three=latest_three)

@app.route("/activity/raw")
def activity_raw():
    days = load_activity_data()
    return render_template("activity_raw.html", days=days)

@app.route("/projects")
def projects_view():
    return render_template("projects.html", projects=projects)


@app.route("/blog")
def blog_index():
    q = (request.args.get("q") or "").strip()
    tag = (request.args.get("tag") or "").strip()

    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            if q and tag:
                like = f"%{q}%"
                cur.execute(
                    """
                    SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
                           COUNT(l.id) as like_count
                    FROM blog_posts p
                    LEFT JOIN blog_likes l ON p.id = l.post_id
                    WHERE p.is_published = TRUE
                      AND (%s = ANY(p.tags))
                      AND (p.title ILIKE %s OR p.excerpt ILIKE %s OR p.content_md ILIKE %s)
                    GROUP BY p.id
                    ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
                    LIMIT 50
                    """,
                    (tag, like, like, like),
                )
            elif q:
                like = f"%{q}%"
                cur.execute(
                    """
                    SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
                           COUNT(l.id) as like_count
                    FROM blog_posts p
                    LEFT JOIN blog_likes l ON p.id = l.post_id
                    WHERE p.is_published = TRUE
                      AND (p.title ILIKE %s OR p.excerpt ILIKE %s OR p.content_md ILIKE %s)
                    GROUP BY p.id
                    ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
                    LIMIT 50
                    """,
                    (like, like, like),
                )
            elif tag:
                cur.execute(
                    """
                    SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
                           COUNT(l.id) as like_count
                    FROM blog_posts p
                    LEFT JOIN blog_likes l ON p.id = l.post_id
                    WHERE p.is_published = TRUE
                      AND (%s = ANY(p.tags))
                    GROUP BY p.id
                    ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
                    LIMIT 50
                    """,
                    (tag,),
                )
            else:
                cur.execute(
                    """
                    SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
                           COUNT(l.id) as like_count
                    FROM blog_posts p
                    LEFT JOIN blog_likes l ON p.id = l.post_id
                    WHERE p.is_published = TRUE
                    GROUP BY p.id
                    ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
                    LIMIT 50
                    """
                )
            rows = cur.fetchall()
    finally:
        conn.close()

    posts = [
        {
            "id": r[0],
            "slug": r[1],
            "title": r[2],
            "excerpt": r[3],
            "published_at": r[4],
            "cover_image": r[5],
            "tags": r[6] or [],
            "like_count": r[7],
        }
        for r in rows
    ]

    return render_template("blog_index.html", posts=posts, q=q, tag=tag)


@app.route("/blog/<slug>")
def blog_post(slug):
    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT id, slug, title, excerpt, content_html, published_at, cover_image, tags
                FROM blog_posts
                WHERE slug = %s AND is_published = TRUE
                """,
                (slug,),
            )
            row = cur.fetchone()
            if not row:
                abort(404)

            post_id = row[0]

            cur.execute("SELECT COUNT(*) FROM blog_likes WHERE post_id = %s", (post_id,))
            like_count = int(cur.fetchone()[0])

            cur.execute(
                """
                SELECT COALESCE(name, display_name, 'Anonymous') AS display_name, content, created_at
                FROM blog_comments
                WHERE post_id = %s AND is_hidden = FALSE
                ORDER BY created_at DESC
                LIMIT 200
                """,
                (post_id,),
            )
            comments = [
                {"display_name": c[0], "content": c[1], "created_at": c[2]}
                for c in cur.fetchall()
            ]

            cur.execute(
                """
                SELECT filename
                FROM blog_images
                WHERE post_id = %s
                ORDER BY sort_order ASC, id ASC
                """,
                (post_id,),
            )
            images = [{"filename": r[0]} for r in cur.fetchall()]

            anon = client_anon_hash(request)
            cur.execute(
                "SELECT 1 FROM blog_likes WHERE post_id = %s AND anon_hash = %s",
                (post_id, anon),
            )
            has_liked = bool(cur.fetchone())

    finally:
        conn.close()

    post = {
        "id": row[0],
        "slug": row[1],
        "title": row[2],
        "excerpt": row[3],
        "content_html": row[4],
        "published_at": row[5],
        "cover_image": row[6],
        "tags": row[7] or [],
    }

    return render_template(
        "blog_post.html",
        post=post,
        like_count=like_count,
        has_liked=has_liked,
        comments=comments,
        images=images,
    )


@app.route("/api/blog/<int:post_id>/like", methods=["POST"])
def blog_like(post_id):
    anon = client_anon_hash(request)
    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            # Toggle like
            cur.execute(
                "SELECT id FROM blog_likes WHERE post_id = %s AND anon_hash = %s",
                (post_id, anon),
            )
            existing = cur.fetchone()
            if existing:
                cur.execute("DELETE FROM blog_likes WHERE id = %s", (existing[0],))
                liked = False
            else:
                cur.execute(
                    "INSERT INTO blog_likes (post_id, anon_hash) VALUES (%s, %s) ON CONFLICT DO NOTHING",
                    (post_id, anon),
                )
                liked = True

            cur.execute("SELECT COUNT(*) FROM blog_likes WHERE post_id = %s", (post_id,))
            count = int(cur.fetchone()[0])
        conn.commit()
    finally:
        conn.close()

    return jsonify({"ok": True, "liked": liked, "count": count})


@app.route("/api/blog/<int:post_id>/comment", methods=["POST"])
def blog_comment(post_id):
    data = request.get_json(silent=True) or {}
    content = (data.get("content") or "").strip()
    name = (data.get("display_name") or data.get("name") or "Anonymous").strip() or "Anonymous"

    if not content or len(content) > 2000:
        return jsonify({"ok": False, "error": "Comment is empty or too long."}), 400
    if len(name) > 40:
        name = name[:40]

    anon = client_anon_hash(request)
    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO blog_comments (post_id, anon_hash, name, content) VALUES (%s, %s, %s, %s)",
                (post_id, anon, name, content),
            )
        conn.commit()
    finally:
        conn.close()

    return jsonify({"ok": True})


@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
    if request.method == "POST":
        username = (request.form.get("username") or "").strip()
        password = (request.form.get("password") or "").strip()
        expected_user = os.getenv("ADMIN_USERNAME") or "admin"
        expected_pass = os.getenv("ADMIN_PASSWORD") or "admin"

        if username == expected_user and password == expected_pass:
            session["is_admin"] = True
            nxt = request.args.get("next") or url_for("admin_posts")
            return redirect(nxt)
        flash("Invalid credentials.")

    return render_template("admin_login.html")


@app.route("/admin/logout")
def admin_logout():
    session.clear()
    return redirect(url_for("home"))


@app.route("/admin/blog")
def admin_posts():
    guard = _require_admin()
    if guard:
        return guard

    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT id, slug, title, is_published, created_at, published_at
                FROM blog_posts
                ORDER BY created_at DESC
                LIMIT 200
                """
            )
            rows = cur.fetchall()
    finally:
        conn.close()

    posts = [
        {
            "id": r[0],
            "slug": r[1],
            "title": r[2],
            "is_published": r[3],
            "created_at": r[4],
            "published_at": r[5],
        }
        for r in rows
    ]

    return render_template("admin_posts.html", posts=posts)


@app.route("/admin/blog/new", methods=["GET", "POST"])
def admin_new_post():
    guard = _require_admin()
    if guard:
        return guard

    if request.method == "POST":
        title = (request.form.get("title") or "").strip()
        excerpt = (request.form.get("excerpt") or "").strip()
        tags_raw = (request.form.get("tags") or "").strip()
        slug = slugify((request.form.get("slug") or "").strip() or title)

        cover_image = _save_upload(request.files.get("cover_image"))
        uploaded_images = request.files.getlist("post_images")
        image_files = [fn for fn in (_save_upload(f) for f in uploaded_images) if fn]

        content_md = (request.form.get("content_md") or "").strip()
        is_published = bool(request.form.get("is_published"))

        if not title or not content_md:
            flash("Title and content are required.")
            return render_template("admin_edit.html", post=None)

        # Tags: comma/semicolon separated -> TEXT[]
        tags = [t.strip() for t in tags_raw.replace(";", ",").split(",") if t.strip()]
        seen = set()
        tags = [t for t in tags if not (t.lower() in seen or seen.add(t.lower()))]

        content_html = _render_markdown_safe(content_md)
        published_at = utcnow() if is_published else None

        conn = get_blog_db_connection()
        try:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO blog_posts (
                        slug, title, excerpt, tags, cover_image,
                        content_md, content_html, is_published,
                        created_at, updated_at, published_at
                    )
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), %s)
                    RETURNING id
                    """,
                    (slug, title, excerpt, tags, cover_image, content_md, content_html, is_published, published_at),
                )
                new_id = cur.fetchone()[0]

                if image_files:
                    for i, fn in enumerate(image_files):
                        cur.execute(
                            "INSERT INTO blog_images (post_id, filename, sort_order) VALUES (%s, %s, %s)",
                            (new_id, fn, i),
                        )
            conn.commit()
        finally:
            conn.close()

        return redirect(url_for("admin_edit_post", post_id=new_id))

    return render_template("admin_edit.html", post=None)


@app.route("/admin/blog/<int:post_id>/edit", methods=["GET", "POST"])
def admin_edit_post(post_id):
    guard = _require_admin()
    if guard:
        return guard

    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT id, slug, title, excerpt, tags, cover_image, content_md, is_published
                FROM blog_posts
                WHERE id = %s
                """,
                (post_id,),
            )
            row = cur.fetchone()
            if not row:
                abort(404)

            cur.execute(
                """
                SELECT filename
                FROM blog_images
                WHERE post_id = %s
                ORDER BY sort_order ASC, id ASC
                """,
                (post_id,),
            )
            existing_images = [r[0] for r in cur.fetchall()]

            if request.method == "POST":
                title = (request.form.get("title") or "").strip()
                excerpt = (request.form.get("excerpt") or "").strip()
                tags_raw = (request.form.get("tags") or "").strip()
                remove_cover = bool(request.form.get("remove_cover"))
                remove_gallery = bool(request.form.get("remove_gallery"))
                new_cover = _save_upload(request.files.get("cover_image"))
                content_md = (request.form.get("content_md") or "").strip()
                is_published = bool(request.form.get("is_published"))
                slug = slugify(request.form.get("slug") or title)
                uploaded_images = request.files.getlist("post_images")
                new_images = [fn for fn in (_save_upload(f) for f in uploaded_images) if fn]

                if not title or not content_md:
                    flash("Title and content are required.")
                else:
                    tags = [t.strip() for t in tags_raw.replace(";", ",").split(",") if t.strip()]
                    seen = set()
                    tags = [t for t in tags if not (t.lower() in seen or seen.add(t.lower()))]

                    cur.execute("SELECT cover_image FROM blog_posts WHERE id = %s", (post_id,))
                    current_cover = cur.fetchone()[0]
                    cover_value = None if remove_cover else (new_cover or current_cover)

                    content_html = _render_markdown_safe(content_md)
                    now = utcnow()
                    # Preserve existing published_at if already published
                    if is_published:
                        cur.execute("SELECT published_at FROM blog_posts WHERE id = %s", (post_id,))
                        existing_pub = cur.fetchone()[0]
                        published_at = existing_pub or now
                    else:
                        published_at = None

                    cur.execute(
                        """
                        UPDATE blog_posts
                        SET slug = %s,
                            title = %s,
                            excerpt = %s,
                            tags = %s,
                            cover_image = %s,
                            content_md = %s,
                            content_html = %s,
                            is_published = %s,
                            updated_at = NOW(),
                            published_at = %s
                        WHERE id = %s
                        """,
                        (slug, title, excerpt, tags, cover_value, content_md, content_html, is_published, published_at, post_id),
                    )

                    if remove_gallery:
                        cur.execute("DELETE FROM blog_images WHERE post_id = %s", (post_id,))
                    if new_images:
                        # append after existing sort_order
                        cur.execute("SELECT COALESCE(MAX(sort_order), -1) FROM blog_images WHERE post_id = %s", (post_id,))
                        start_order = (cur.fetchone()[0] or -1) + 1
                        for i, fn in enumerate(new_images):
                            cur.execute(
                                "INSERT INTO blog_images (post_id, filename, sort_order) VALUES (%s, %s, %s)",
                                (post_id, fn, start_order + i),
                            )
                    conn.commit()
                    flash("Saved.")

            post = {
                "id": row[0],
                "slug": row[1],
                "title": row[2],
                "excerpt": row[3],
                "tags": row[4] or [],
                "cover_image": row[5],
                "content_md": row[6],
                "is_published": row[7],
                "images": existing_images,
            }
            # Refresh if POST
            cur.execute(
                "SELECT slug, title, excerpt, tags, cover_image, content_md, is_published FROM blog_posts WHERE id = %s",
                (post_id,),
            )
            r2 = cur.fetchone()
            post.update(
                {
                    "slug": r2[0],
                    "title": r2[1],
                    "excerpt": r2[2],
                    "tags": r2[3] or [],
                    "cover_image": r2[4],
                    "content_md": r2[5],
                    "is_published": r2[6],
                }
            )

            cur.execute(
                "SELECT filename FROM blog_images WHERE post_id = %s ORDER BY sort_order ASC, id ASC",
                (post_id,),
            )
            post["images"] = [r[0] for r in cur.fetchall()]
    finally:
        conn.close()

    return render_template("admin_edit.html", post=post)


@app.route("/admin/blog/<int:post_id>/delete", methods=["POST"])
def admin_delete_post(post_id):
    guard = _require_admin()
    if guard:
        return guard

    conn = get_blog_db_connection()
    try:
        with conn.cursor() as cur:
            cur.execute("DELETE FROM blog_posts WHERE id = %s", (post_id,))
        conn.commit()
    finally:
        conn.close()

    return redirect(url_for("admin_posts"))

@app.route("/projects/<project_name>/", defaults={"folder_path": ""})
@app.route("/projects/<project_name>/<path:folder_path>")
def project_detail(project_name, folder_path):
    repository_name = github_projects.get(project_name)
    if not repository_name:
        abort(404)

    structure = fetch_github_contents(repository_name, path=folder_path)
    if structure is None:
        return render_template(
            "project_detail.html",
            project_name=project_name,
            project_display_name=project_name.replace("-", " ").title(),
            structure=[],
            folder_path=folder_path,
            metadata=None,
            file_summary=None,
            error="GitHub API error or rate limit exceeded."
        )

    metadata = get_project_metadata(repository_name, os.getenv("TOKEN"))
    file_summary = get_file_summary(repository_name, os.getenv("TOKEN"))

    return render_template(
        "project_detail.html",
        project_name=project_name,
        project_display_name=project_name.replace("-", " ").title(),
        structure=structure,
        folder_path=folder_path,
        metadata=metadata,
        file_summary=file_summary,
        error=None
    )

@app.route("/projects/<project_name>/file/<path:file_path>")
def view_file(project_name, file_path):
    repository_name = github_projects.get(project_name)
    if not repository_name:
        abort(404)

    url = f"https://api.github.com/repos/{repository_name}/contents/{file_path}"
    response = requests.get(url, headers=header)
    if response.status_code != 200:
        abort(404)

    content_json = response.json()

    file_content = None
    image_data_url = None

    if content_json.get("encoding") == "base64":
        decoded_bytes = base64.b64decode(content_json["content"])
        file_ext = file_path.split(".")[-1].lower() if "." in file_path else ""

        if file_ext in ["png", "jpg", "jpeg", "gif", "webp", "svg"]:
            mime_type = mimetypes.guess_type(file_path)[0] or "image/png"
            base64_data = base64.b64encode(decoded_bytes).decode("utf-8")
            image_data_url = f"data:{mime_type};base64,{base64_data}"
        else:
            file_content = decoded_bytes.decode("utf-8", errors="replace")
    else:
        file_content = "Cannot display content."

    file_name = file_path.split("/")[-1]
    file_ext = file_name.split(".")[-1] if "." in file_name else ""

    return render_template(
        "file_viewer.html",
        project_name=project_name,
        file_path=file_path,
        file_name=file_name,
        file_content=file_content,
        file_ext=file_ext,
        image_data_url=image_data_url
    )

# app.py (add this route)
@app.route("/photos")
def photos():
    photos_dir = os.path.join(app.static_folder, "images", "photos")
    allowed = {".jpg", ".jpeg", ".png", ".webp", ".gif"}

    items = []
    if os.path.isdir(photos_dir):
        for name in sorted(os.listdir(photos_dir)):
            ext = os.path.splitext(name)[1].lower()
            if ext in allowed:
                items.append(f"images/photos/{name}")

    return render_template("photos.html", photos=items)


@app.route("/contacts")
def contacts():
    links = [
        {
            "name": "GitHub",
            "href": "https://github.com/AalbatrossGuy",
            "icon": "GitHub.svg",
            "subtitle": "github.com/AalbatrossGuy"
        },
        {
            "name": "LinkedIn",
            "href": "https://linkedin.com/in/kishaloy-roy",
            "icon": "LinkedIn.svg",
            "subtitle": "linkedin.com/KishaloyRoy"
        },
        {
            "name": "Reddit",
            "href": "https://reddit.com/user/AalbatrossGuy",
            "icon": "RedditIcon.svg",
            "subtitle": "reddit.com/AalbatrossGuy"
        },
        {
            "name": "Email",
            "href": "mailto:[email protected]",
            "icon": "email.png",
            "subtitle": "[email protected]"
        },
        {
            "name": "Instagram",
            "href": "https://instagram.com/aalbatrossguy",
            "icon": "InstagramIcon.svg",
            "subtitle": "instagram.com/aalbatrossguy"
        },
        {
            "name": "Monkeytype",
            "href": "https://monkeytype.com/profile/AalbatrossGuy",
            "icon": "MonkeyTypeIcon.svg",
            "subtitle": "monkeytype.com/AalbatrossGuy"
        },
        {
            "name": "Stack Overflow",
            "href": "https://stackoverflow.com/users/13810518/aag",
            "icon": "Stack_Overflow.svg",
            "subtitle": "stackoverflow.com/AalbatrossGuy"
        },
        {
            "name": "Matrix",
            "href": "https://matrix.to/#/@aalbatrossguyy:matrix.org",
            "icon": "MatrixIcon.svg",
            "subtitle": "@AalbatrossGuy"
        },
        {
            "name": "Discord",
            "href": "https://discord.com/users/676414187131371520",
            "icon": "DiscordIcon.svg",
            "subtitle": "@aalbatrossguy"
        },
    ]

    return render_template("contacts.html", links=links)





if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True)