import os
import base64
import mimetypes
from urllib.parse import quote
from flask import Flask, render_template, abort, request, redirect, url_for, session, flash, jsonify
from werkzeug.utils import secure_filename
from uuid import uuid4
import requests
import markdown as md
import bleach
from utils.metas import projects, github_projects # :contentReference[oaicite:3]{index=3}
from utils.utils import header, fetch_github_contents, get_project_metadata, get_file_summary, load_activity_data # :contentReference[oaicite:4]{index=4}
from utils.blog import (
ensure_blog_schema,
get_blog_db_connection,
slugify,
client_anon_hash,
utcnow,
)
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY") or os.urandom(32)
# Uploads (blog images)
UPLOAD_FOLDER = os.path.join("static", "uploads")
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "webp", "gif"}
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
def _ensure_upload_dir():
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
def _allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def _save_upload(file_storage):
"""Save an uploaded file into static/uploads and return the stored filename."""
if not file_storage or not getattr(file_storage, "filename", ""):
return None
filename = secure_filename(file_storage.filename)
if not filename or not _allowed_file(filename):
return None
ext = filename.rsplit(".", 1)[1].lower()
stored = f"{uuid4().hex}.{ext}"
_ensure_upload_dir()
file_storage.save(os.path.join(app.config["UPLOAD_FOLDER"], stored))
return stored
@app.before_request
def _ensure_blog_schema_once():
# A tiny "lazy init" so the site works without a separate migration step.
if not getattr(app, "_blog_schema_ready", False):
ensure_blog_schema()
app._blog_schema_ready = True
def _is_admin_logged_in() -> bool:
return bool(session.get("is_admin"))
def _require_admin():
if not _is_admin_logged_in():
return redirect(url_for("admin_login", next=request.path))
return None
def _render_markdown_safe(markdown_text: str) -> str:
html = md.markdown(
markdown_text or "",
extensions=["fenced_code", "tables", "codehilite"],
output_format="html5",
)
allowed_tags = set(bleach.sanitizer.ALLOWED_TAGS).union(
{
"p",
"pre",
"code",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"img",
"blockquote",
"hr",
"br",
"span",
"div",
"table",
"thead",
"tbody",
"tr",
"th",
"td",
}
)
allowed_attrs = {
**bleach.sanitizer.ALLOWED_ATTRIBUTES,
"a": ["href", "title", "rel", "target"],
"img": ["src", "alt", "title"],
"*": ["class"],
}
return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs, strip=True)
@app.route("/")
def home():
return render_template("main.html")
@app.route("/activity")
def activity():
days = load_activity_data()
latest_three = days[-3:]
return render_template("activity.html", days=days, latest_three=latest_three)
@app.route("/activity/raw")
def activity_raw():
days = load_activity_data()
return render_template("activity_raw.html", days=days)
@app.route("/projects")
def projects_view():
return render_template("projects.html", projects=projects)
@app.route("/blog")
def blog_index():
q = (request.args.get("q") or "").strip()
tag = (request.args.get("tag") or "").strip()
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
if q and tag:
like = f"%{q}%"
cur.execute(
"""
SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
COUNT(l.id) as like_count
FROM blog_posts p
LEFT JOIN blog_likes l ON p.id = l.post_id
WHERE p.is_published = TRUE
AND (%s = ANY(p.tags))
AND (p.title ILIKE %s OR p.excerpt ILIKE %s OR p.content_md ILIKE %s)
GROUP BY p.id
ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
LIMIT 50
""",
(tag, like, like, like),
)
elif q:
like = f"%{q}%"
cur.execute(
"""
SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
COUNT(l.id) as like_count
FROM blog_posts p
LEFT JOIN blog_likes l ON p.id = l.post_id
WHERE p.is_published = TRUE
AND (p.title ILIKE %s OR p.excerpt ILIKE %s OR p.content_md ILIKE %s)
GROUP BY p.id
ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
LIMIT 50
""",
(like, like, like),
)
elif tag:
cur.execute(
"""
SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
COUNT(l.id) as like_count
FROM blog_posts p
LEFT JOIN blog_likes l ON p.id = l.post_id
WHERE p.is_published = TRUE
AND (%s = ANY(p.tags))
GROUP BY p.id
ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
LIMIT 50
""",
(tag,),
)
else:
cur.execute(
"""
SELECT p.id, p.slug, p.title, p.excerpt, p.published_at, p.cover_image, p.tags,
COUNT(l.id) as like_count
FROM blog_posts p
LEFT JOIN blog_likes l ON p.id = l.post_id
WHERE p.is_published = TRUE
GROUP BY p.id
ORDER BY p.published_at DESC NULLS LAST, p.created_at DESC
LIMIT 50
"""
)
rows = cur.fetchall()
finally:
conn.close()
posts = [
{
"id": r[0],
"slug": r[1],
"title": r[2],
"excerpt": r[3],
"published_at": r[4],
"cover_image": r[5],
"tags": r[6] or [],
"like_count": r[7],
}
for r in rows
]
return render_template("blog_index.html", posts=posts, q=q, tag=tag)
@app.route("/blog/<slug>")
def blog_post(slug):
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, slug, title, excerpt, content_html, published_at, cover_image, tags
FROM blog_posts
WHERE slug = %s AND is_published = TRUE
""",
(slug,),
)
row = cur.fetchone()
if not row:
abort(404)
post_id = row[0]
cur.execute("SELECT COUNT(*) FROM blog_likes WHERE post_id = %s", (post_id,))
like_count = int(cur.fetchone()[0])
cur.execute(
"""
SELECT COALESCE(name, display_name, 'Anonymous') AS display_name, content, created_at
FROM blog_comments
WHERE post_id = %s AND is_hidden = FALSE
ORDER BY created_at DESC
LIMIT 200
""",
(post_id,),
)
comments = [
{"display_name": c[0], "content": c[1], "created_at": c[2]}
for c in cur.fetchall()
]
cur.execute(
"""
SELECT filename
FROM blog_images
WHERE post_id = %s
ORDER BY sort_order ASC, id ASC
""",
(post_id,),
)
images = [{"filename": r[0]} for r in cur.fetchall()]
anon = client_anon_hash(request)
cur.execute(
"SELECT 1 FROM blog_likes WHERE post_id = %s AND anon_hash = %s",
(post_id, anon),
)
has_liked = bool(cur.fetchone())
finally:
conn.close()
post = {
"id": row[0],
"slug": row[1],
"title": row[2],
"excerpt": row[3],
"content_html": row[4],
"published_at": row[5],
"cover_image": row[6],
"tags": row[7] or [],
}
return render_template(
"blog_post.html",
post=post,
like_count=like_count,
has_liked=has_liked,
comments=comments,
images=images,
)
@app.route("/api/blog/<int:post_id>/like", methods=["POST"])
def blog_like(post_id):
anon = client_anon_hash(request)
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
# Toggle like
cur.execute(
"SELECT id FROM blog_likes WHERE post_id = %s AND anon_hash = %s",
(post_id, anon),
)
existing = cur.fetchone()
if existing:
cur.execute("DELETE FROM blog_likes WHERE id = %s", (existing[0],))
liked = False
else:
cur.execute(
"INSERT INTO blog_likes (post_id, anon_hash) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(post_id, anon),
)
liked = True
cur.execute("SELECT COUNT(*) FROM blog_likes WHERE post_id = %s", (post_id,))
count = int(cur.fetchone()[0])
conn.commit()
finally:
conn.close()
return jsonify({"ok": True, "liked": liked, "count": count})
@app.route("/api/blog/<int:post_id>/comment", methods=["POST"])
def blog_comment(post_id):
data = request.get_json(silent=True) or {}
content = (data.get("content") or "").strip()
name = (data.get("display_name") or data.get("name") or "Anonymous").strip() or "Anonymous"
if not content or len(content) > 2000:
return jsonify({"ok": False, "error": "Comment is empty or too long."}), 400
if len(name) > 40:
name = name[:40]
anon = client_anon_hash(request)
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO blog_comments (post_id, anon_hash, name, content) VALUES (%s, %s, %s, %s)",
(post_id, anon, name, content),
)
conn.commit()
finally:
conn.close()
return jsonify({"ok": True})
@app.route("/admin/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = (request.form.get("password") or "").strip()
expected_user = os.getenv("ADMIN_USERNAME") or "admin"
expected_pass = os.getenv("ADMIN_PASSWORD") or "admin"
if username == expected_user and password == expected_pass:
session["is_admin"] = True
nxt = request.args.get("next") or url_for("admin_posts")
return redirect(nxt)
flash("Invalid credentials.")
return render_template("admin_login.html")
@app.route("/admin/logout")
def admin_logout():
session.clear()
return redirect(url_for("home"))
@app.route("/admin/blog")
def admin_posts():
guard = _require_admin()
if guard:
return guard
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, slug, title, is_published, created_at, published_at
FROM blog_posts
ORDER BY created_at DESC
LIMIT 200
"""
)
rows = cur.fetchall()
finally:
conn.close()
posts = [
{
"id": r[0],
"slug": r[1],
"title": r[2],
"is_published": r[3],
"created_at": r[4],
"published_at": r[5],
}
for r in rows
]
return render_template("admin_posts.html", posts=posts)
@app.route("/admin/blog/new", methods=["GET", "POST"])
def admin_new_post():
guard = _require_admin()
if guard:
return guard
if request.method == "POST":
title = (request.form.get("title") or "").strip()
excerpt = (request.form.get("excerpt") or "").strip()
tags_raw = (request.form.get("tags") or "").strip()
slug = slugify((request.form.get("slug") or "").strip() or title)
cover_image = _save_upload(request.files.get("cover_image"))
uploaded_images = request.files.getlist("post_images")
image_files = [fn for fn in (_save_upload(f) for f in uploaded_images) if fn]
content_md = (request.form.get("content_md") or "").strip()
is_published = bool(request.form.get("is_published"))
if not title or not content_md:
flash("Title and content are required.")
return render_template("admin_edit.html", post=None)
# Tags: comma/semicolon separated -> TEXT[]
tags = [t.strip() for t in tags_raw.replace(";", ",").split(",") if t.strip()]
seen = set()
tags = [t for t in tags if not (t.lower() in seen or seen.add(t.lower()))]
content_html = _render_markdown_safe(content_md)
published_at = utcnow() if is_published else None
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO blog_posts (
slug, title, excerpt, tags, cover_image,
content_md, content_html, is_published,
created_at, updated_at, published_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), %s)
RETURNING id
""",
(slug, title, excerpt, tags, cover_image, content_md, content_html, is_published, published_at),
)
new_id = cur.fetchone()[0]
if image_files:
for i, fn in enumerate(image_files):
cur.execute(
"INSERT INTO blog_images (post_id, filename, sort_order) VALUES (%s, %s, %s)",
(new_id, fn, i),
)
conn.commit()
finally:
conn.close()
return redirect(url_for("admin_edit_post", post_id=new_id))
return render_template("admin_edit.html", post=None)
@app.route("/admin/blog/<int:post_id>/edit", methods=["GET", "POST"])
def admin_edit_post(post_id):
guard = _require_admin()
if guard:
return guard
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, slug, title, excerpt, tags, cover_image, content_md, is_published
FROM blog_posts
WHERE id = %s
""",
(post_id,),
)
row = cur.fetchone()
if not row:
abort(404)
cur.execute(
"""
SELECT filename
FROM blog_images
WHERE post_id = %s
ORDER BY sort_order ASC, id ASC
""",
(post_id,),
)
existing_images = [r[0] for r in cur.fetchall()]
if request.method == "POST":
title = (request.form.get("title") or "").strip()
excerpt = (request.form.get("excerpt") or "").strip()
tags_raw = (request.form.get("tags") or "").strip()
remove_cover = bool(request.form.get("remove_cover"))
remove_gallery = bool(request.form.get("remove_gallery"))
new_cover = _save_upload(request.files.get("cover_image"))
content_md = (request.form.get("content_md") or "").strip()
is_published = bool(request.form.get("is_published"))
slug = slugify(request.form.get("slug") or title)
uploaded_images = request.files.getlist("post_images")
new_images = [fn for fn in (_save_upload(f) for f in uploaded_images) if fn]
if not title or not content_md:
flash("Title and content are required.")
else:
tags = [t.strip() for t in tags_raw.replace(";", ",").split(",") if t.strip()]
seen = set()
tags = [t for t in tags if not (t.lower() in seen or seen.add(t.lower()))]
cur.execute("SELECT cover_image FROM blog_posts WHERE id = %s", (post_id,))
current_cover = cur.fetchone()[0]
cover_value = None if remove_cover else (new_cover or current_cover)
content_html = _render_markdown_safe(content_md)
now = utcnow()
# Preserve existing published_at if already published
if is_published:
cur.execute("SELECT published_at FROM blog_posts WHERE id = %s", (post_id,))
existing_pub = cur.fetchone()[0]
published_at = existing_pub or now
else:
published_at = None
cur.execute(
"""
UPDATE blog_posts
SET slug = %s,
title = %s,
excerpt = %s,
tags = %s,
cover_image = %s,
content_md = %s,
content_html = %s,
is_published = %s,
updated_at = NOW(),
published_at = %s
WHERE id = %s
""",
(slug, title, excerpt, tags, cover_value, content_md, content_html, is_published, published_at, post_id),
)
if remove_gallery:
cur.execute("DELETE FROM blog_images WHERE post_id = %s", (post_id,))
if new_images:
# append after existing sort_order
cur.execute("SELECT COALESCE(MAX(sort_order), -1) FROM blog_images WHERE post_id = %s", (post_id,))
start_order = (cur.fetchone()[0] or -1) + 1
for i, fn in enumerate(new_images):
cur.execute(
"INSERT INTO blog_images (post_id, filename, sort_order) VALUES (%s, %s, %s)",
(post_id, fn, start_order + i),
)
conn.commit()
flash("Saved.")
post = {
"id": row[0],
"slug": row[1],
"title": row[2],
"excerpt": row[3],
"tags": row[4] or [],
"cover_image": row[5],
"content_md": row[6],
"is_published": row[7],
"images": existing_images,
}
# Refresh if POST
cur.execute(
"SELECT slug, title, excerpt, tags, cover_image, content_md, is_published FROM blog_posts WHERE id = %s",
(post_id,),
)
r2 = cur.fetchone()
post.update(
{
"slug": r2[0],
"title": r2[1],
"excerpt": r2[2],
"tags": r2[3] or [],
"cover_image": r2[4],
"content_md": r2[5],
"is_published": r2[6],
}
)
cur.execute(
"SELECT filename FROM blog_images WHERE post_id = %s ORDER BY sort_order ASC, id ASC",
(post_id,),
)
post["images"] = [r[0] for r in cur.fetchall()]
finally:
conn.close()
return render_template("admin_edit.html", post=post)
@app.route("/admin/blog/<int:post_id>/delete", methods=["POST"])
def admin_delete_post(post_id):
guard = _require_admin()
if guard:
return guard
conn = get_blog_db_connection()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM blog_posts WHERE id = %s", (post_id,))
conn.commit()
finally:
conn.close()
return redirect(url_for("admin_posts"))
@app.route("/projects/<project_name>/", defaults={"folder_path": ""})
@app.route("/projects/<project_name>/<path:folder_path>")
def project_detail(project_name, folder_path):
repository_name = github_projects.get(project_name)
if not repository_name:
abort(404)
structure = fetch_github_contents(repository_name, path=folder_path)
if structure is None:
return render_template(
"project_detail.html",
project_name=project_name,
project_display_name=project_name.replace("-", " ").title(),
structure=[],
folder_path=folder_path,
metadata=None,
file_summary=None,
error="GitHub API error or rate limit exceeded."
)
metadata = get_project_metadata(repository_name, os.getenv("TOKEN"))
file_summary = get_file_summary(repository_name, os.getenv("TOKEN"))
return render_template(
"project_detail.html",
project_name=project_name,
project_display_name=project_name.replace("-", " ").title(),
structure=structure,
folder_path=folder_path,
metadata=metadata,
file_summary=file_summary,
error=None
)
@app.route("/projects/<project_name>/file/<path:file_path>")
def view_file(project_name, file_path):
repository_name = github_projects.get(project_name)
if not repository_name:
abort(404)
url = f"https://api.github.com/repos/{repository_name}/contents/{file_path}"
response = requests.get(url, headers=header)
if response.status_code != 200:
abort(404)
content_json = response.json()
file_content = None
image_data_url = None
if content_json.get("encoding") == "base64":
decoded_bytes = base64.b64decode(content_json["content"])
file_ext = file_path.split(".")[-1].lower() if "." in file_path else ""
if file_ext in ["png", "jpg", "jpeg", "gif", "webp", "svg"]:
mime_type = mimetypes.guess_type(file_path)[0] or "image/png"
base64_data = base64.b64encode(decoded_bytes).decode("utf-8")
image_data_url = f"data:{mime_type};base64,{base64_data}"
else:
file_content = decoded_bytes.decode("utf-8", errors="replace")
else:
file_content = "Cannot display content."
file_name = file_path.split("/")[-1]
file_ext = file_name.split(".")[-1] if "." in file_name else ""
return render_template(
"file_viewer.html",
project_name=project_name,
file_path=file_path,
file_name=file_name,
file_content=file_content,
file_ext=file_ext,
image_data_url=image_data_url
)
# app.py (add this route)
@app.route("/photos")
def photos():
photos_dir = os.path.join(app.static_folder, "images", "photos")
allowed = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
items = []
if os.path.isdir(photos_dir):
for name in sorted(os.listdir(photos_dir)):
ext = os.path.splitext(name)[1].lower()
if ext in allowed:
items.append(f"images/photos/{name}")
return render_template("photos.html", photos=items)
@app.route("/contacts")
def contacts():
links = [
{
"name": "GitHub",
"href": "https://github.com/AalbatrossGuy",
"icon": "GitHub.svg",
"subtitle": "github.com/AalbatrossGuy"
},
{
"name": "LinkedIn",
"href": "https://linkedin.com/in/kishaloy-roy",
"icon": "LinkedIn.svg",
"subtitle": "linkedin.com/KishaloyRoy"
},
{
"name": "Reddit",
"href": "https://reddit.com/user/AalbatrossGuy",
"icon": "RedditIcon.svg",
"subtitle": "reddit.com/AalbatrossGuy"
},
{
"name": "Email",
"href": "mailto:[email protected]",
"icon": "email.png",
"subtitle": "[email protected]"
},
{
"name": "Instagram",
"href": "https://instagram.com/aalbatrossguy",
"icon": "InstagramIcon.svg",
"subtitle": "instagram.com/aalbatrossguy"
},
{
"name": "Monkeytype",
"href": "https://monkeytype.com/profile/AalbatrossGuy",
"icon": "MonkeyTypeIcon.svg",
"subtitle": "monkeytype.com/AalbatrossGuy"
},
{
"name": "Stack Overflow",
"href": "https://stackoverflow.com/users/13810518/aag",
"icon": "Stack_Overflow.svg",
"subtitle": "stackoverflow.com/AalbatrossGuy"
},
{
"name": "Matrix",
"href": "https://matrix.to/#/@aalbatrossguyy:matrix.org",
"icon": "MatrixIcon.svg",
"subtitle": "@AalbatrossGuy"
},
{
"name": "Discord",
"href": "https://discord.com/users/676414187131371520",
"icon": "DiscordIcon.svg",
"subtitle": "@aalbatrossguy"
},
]
return render_template("contacts.html", links=links)
if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True)