utils.py

← Back to explorer
utils/utils.py
import os
import secrets
import string

import requests
import psycopg
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from collections import defaultdict


env_path = Path(__file__).resolve().parents[1] / '.env'
load_dotenv(env_path)


header = {"Accept": "application/vnd.github.v3+json",
          "Authorization": f"Bearer {os.getenv('TOKEN')}"}


def get_db_connection():
    return psycopg.connect(os.getenv("KEYLOGGER_DB_URL"))

def generate_uid(length=16):
    return ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length))

def load_activity_data():
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute("SELECT hour_key, count FROM hourly_summary ORDER BY hour_key")
    hourly_data = cur.fetchall()
    cur.close()
    conn.close()

    daily_summary = defaultdict(
        lambda: {"keys_pressed": 0, "hours": 0, "tools": set()})

    for row in hourly_data:
        timestamp_str, count = row
        dt = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M")
        day = dt.strftime("%Y-%m-%d")
        daily_summary[day]["keys_pressed"] += count
        daily_summary[day]["hours"] += 1 if count > 0 else 0
        if 9 <= dt.hour < 12:
            daily_summary[day]["tools"].add("VS Code")
        elif 12 <= dt.hour < 15:
            daily_summary[day]["tools"].add("Terminal")
        else:
            daily_summary[day]["tools"].add("Browser")

    days = []
    for date in sorted(daily_summary):
        summary = daily_summary[date]
        days.append({
            "date": date,
            "hours": summary["hours"],
            "keys_pressed": summary["keys_pressed"],
            "lines": summary["keys_pressed"] // 5,
            "tools": sorted(summary["tools"])
        })

    return days


def get_project_metadata(repository_name, github_token):
    headers = {
        "Authorization": f"Bearer {github_token}",
        "Accept": "application/vnd.github.v3+json"
    }

    contents_url = f"https://api.github.com/repos/{repository_name}/git/trees/main?recursive=1"
    contents_resp = requests.get(contents_url, headers=headers)
    ext_counts = defaultdict(int)
    total = 0

    if contents_resp.status_code == 200:
        tree = contents_resp.json().get("tree", [])
        for item in tree:
            if item["type"] == "blob":
                path = item["path"]
                if '.' in path:
                    ext = path.rsplit('.', 1)[-1].lower()
                    ext_counts[ext] += 1
                    total += 1

    ext_to_lang = {
        "py": "Python", "js": "JavaScript", "ts": "TypeScript",
        "html": "HTML", "css": "CSS", "json": "JSON", "yml": "YAML",
        "md": "Markdown", "sh": "Shell", "java": "Java", "yaml": "YAML",
        "c": "C", "csharp": "C#", "cpp": "C++"
    }

    language_counts = defaultdict(int)
    for ext, count in ext_counts.items():
        lang = ext_to_lang.get(ext, ext.upper())
        language_counts[lang] += count

    language_percentages = {
        lang: round((count / total) * 100, 1)
        for lang, count in language_counts.items()
    }

    commits_url = f"https://api.github.com/repos/{repository_name}/commits"
    commits_resp = requests.get(commits_url, headers=headers)
    commits = commits_resp.json() if commits_resp.status_code == 200 else []

    last_updated = None
    if commits:
        last_updated = commits[0]['commit']['committer']['date']
        last_updated = datetime.strptime(
            last_updated, "%Y-%m-%dT%H:%M:%SZ").strftime("%d %b %Y")

    return {
        "languages": language_percentages,
        "last_updated": last_updated,
        "recent_commit_count": len(commits)
    }


def get_file_summary(repository_name, github_token):
    headers = {
        "Authorization": f"Bearer {github_token}",
        "Accept": "application/vnd.github.v3+json"
    }

    url = f"https://api.github.com/repos/{repository_name}/git/trees/main?recursive=1"
    response = requests.get(url, headers=headers)

    file_types = defaultdict(lambda: {"count": 0, "size": 0})
    total_files = 0
    total_folders = 0
    total_size = 0

    if response.status_code == 200:
        for item in response.json().get("tree", []):
            if item["type"] == "blob":
                total_files += 1
                total_size += item.get("size", 0)
                path = item["path"]
                ext = path.split('.')[-1].lower() if '.' in path else 'other'
                file_types[ext]["count"] += 1
                file_types[ext]["size"] += item.get("size", 0)
            elif item["type"] == "tree":
                total_folders += 1

    type_list = [
        {"label": f"{ext.upper()} file" if ext != "other" else "Other files",
         "count": val["count"],
         "size_kb": round(val["size"] / 1024)}
        for ext, val in file_types.items()
    ]

    return {
        "total_files": total_files,
        "total_folders": total_folders,
        "total_size_kb": round(total_size / 1024),
        "types": sorted(type_list, key=lambda x: -x["count"])
    }


def fetch_github_contents(repository_name, path=""):
    url = f"https://api.github.com/repos/{repository_name}/contents/{path}"
    headers = header
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        return None

    items = []
    for item in response.json():
        if item["type"] == "dir":
            items.append({
                "type": "folder",
                "name": item["name"],
                "path": item["path"],
            })
        elif item["type"] == "file":
            items.append({
                "type": "file",
                "name": item["name"],
                "url": item["html_url"],
                "path": item["path"]
            })

    items.sort(key=lambda x: (x["type"] != "folder", x["name"].lower()))
    return items