from fastapi import FastAPI, HTTPException, Depends, Header, Query, Request from fastapi.responses import JSONResponse, FileResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from .auth import init_db, verify_api_key, get_db from .models import Event, BatchEvents from .enrichment import enrich_event from .influx import write_event, client, INFLUX_ORG, INFLUX_BUCKET import logging import os import secrets import pathlib logging.basicConfig(level=logging.INFO) # Environment variables ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # change in pod YAML! init_db() app = FastAPI(title="Signal - Roblox Telemetry") security = HTTPBasic() # ---------- HTTP Basic auth dependency ---------- def require_admin(credentials: HTTPBasicCredentials = Depends(security)): if credentials.username != ADMIN_USERNAME or credentials.password != ADMIN_PASSWORD: raise HTTPException(status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}) return True # ---------- Original endpoints ---------- @app.get("/health") async def health(): return {"status": "ok"} @app.post("/api/log") async def ingest_event(payload: Event | BatchEvents, game: str = Depends(verify_api_key)): def check_event(e: Event): if not e.type or not e.type.strip(): raise HTTPException(400, "Event type is required") if not e.serverId: raise HTTPException(400, "serverId is required") if e.time <= 0: raise HTTPException(400, "Invalid timestamp") # Optionally refuse huge data (e.g., >1KB per event) if isinstance(payload, BatchEvents): for event in payload.events: write_event(event, game) else: write_event(payload, game) return {"success": True} # ---------- Serve the admin HTML ---------- STATIC_DIR = pathlib.Path(__file__).parent.parent / "static" @app.get("/admin", include_in_schema=False) async def admin_page(admin: bool = Depends(require_admin)): return FileResponse(STATIC_DIR / "index.html") # ---------- Admin: API Keys ---------- @app.get("/admin/keys") async def list_keys(admin: bool = Depends(require_admin)): conn = get_db() rows = conn.execute("SELECT key, game, active, created_at FROM api_keys").fetchall() conn.close() return [dict(r) for r in rows] @app.post("/admin/keys") async def create_key(game: str = Query(...), key: str = Query(None), admin: bool = Depends(require_admin)): if not key: key = secrets.token_hex(16) conn = get_db() conn.execute("INSERT OR REPLACE INTO api_keys(key, game, active) VALUES(?, ?, 1)", (key, game)) conn.commit() conn.close() return {"key": key, "game": game, "active": True} @app.put("/admin/keys/{key}/toggle") async def toggle_key(key: str, admin: bool = Depends(require_admin)): conn = get_db() conn.execute("UPDATE api_keys SET active = NOT active WHERE key = ?", (key,)) conn.commit() conn.close() return {"ok": True} @app.delete("/admin/keys/{key}") async def delete_key(key: str, admin: bool = Depends(require_admin)): conn = get_db() conn.execute("DELETE FROM api_keys WHERE key = ?", (key,)) conn.commit() conn.close() return {"ok": True} # ---------- Admin: InfluxDB Buckets ---------- @app.get("/admin/buckets") async def list_buckets(admin: bool = Depends(require_admin)): if not client: return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503) buckets_api = client.buckets_api() buckets = buckets_api.find_buckets().buckets return [{"name": b.name, "id": b.id} for b in buckets] # ---------- Admin: Events ---------- @app.get("/admin/events") async def list_events(hours: float = 24, limit: int = 100, admin: bool = Depends(require_admin)): if not client: return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503) query = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -{hours}h) |> limit(n: {limit}) ''' tables = client.query_api().query(query, org=INFLUX_ORG) events = [] for table in tables: for record in table.records: events.append({ "time": record.get_time().isoformat(), "measurement": record.get_measurement(), "fields": {k: v for k, v in record.values.items() if k not in ('_start','_stop','_time')}, "tags": dict(record.values.get("_tags", {})) }) return events[:limit] @app.delete("/admin/events") async def delete_events( start: str = Query(..., description="ISO timestamp"), stop: str = Query(...), measurement: str = Query(None), admin: bool = Depends(require_admin) ): if not client: return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503) try: predicate = f'_measurement="{measurement}"' if measurement else "" delete_api = client.delete_api() delete_api.delete( start=start, stop=stop, bucket=INFLUX_BUCKET, org=INFLUX_ORG, predicate=predicate ) return {"ok": True} except Exception as e: return JSONResponse({"error": str(e)}, status_code=500)