Files
rbxlogger/api/app/main.py
2026-04-27 22:55:58 +02:00

141 lines
5.2 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Header, Query, Request
from fastapi.responses import JSONResponse, FileResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from .auth import init_db, verify_api_key, get_db
from .models import Event, BatchEvents
from .influx import write_event, client, INFLUX_ORG, INFLUX_BUCKET
import logging
import os
import secrets
import pathlib
logging.basicConfig(level=logging.INFO)
# Environment variables
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # change in pod YAML!
init_db()
app = FastAPI(title="Signal - Roblox Telemetry")
security = HTTPBasic()
# ---------- HTTP Basic auth dependency ----------
def require_admin(credentials: HTTPBasicCredentials = Depends(security)):
if credentials.username != ADMIN_USERNAME or credentials.password != ADMIN_PASSWORD:
raise HTTPException(status_code=401, detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"})
return True
# ---------- Original endpoints ----------
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/api/log")
async def ingest_event(payload: Event | BatchEvents, game: str = Depends(verify_api_key)):
def check_event(e: Event):
if not e.type or not e.type.strip():
raise HTTPException(400, "Event type is required")
if not e.serverId:
raise HTTPException(400, "serverId is required")
if e.time <= 0:
raise HTTPException(400, "Invalid timestamp")
# Optionally refuse huge data (e.g., >1KB per event)
if isinstance(payload, BatchEvents):
for event in payload.events:
write_event(event, game)
else:
write_event(payload, game)
return {"success": True}
# ---------- Serve the admin HTML ----------
STATIC_DIR = pathlib.Path(__file__).parent.parent / "static"
@app.get("/admin", include_in_schema=False)
async def admin_page(admin: bool = Depends(require_admin)):
return FileResponse(STATIC_DIR / "index.html")
# ---------- Admin: API Keys ----------
@app.get("/admin/keys")
async def list_keys(admin: bool = Depends(require_admin)):
conn = get_db()
rows = conn.execute("SELECT key, game, active, created_at FROM api_keys").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/admin/keys")
async def create_key(game: str = Query(...), key: str = Query(None), admin: bool = Depends(require_admin)):
if not key:
key = secrets.token_hex(16)
conn = get_db()
conn.execute("INSERT OR REPLACE INTO api_keys(key, game, active) VALUES(?, ?, 1)", (key, game))
conn.commit()
conn.close()
return {"key": key, "game": game, "active": True}
@app.put("/admin/keys/{key}/toggle")
async def toggle_key(key: str, admin: bool = Depends(require_admin)):
conn = get_db()
conn.execute("UPDATE api_keys SET active = NOT active WHERE key = ?", (key,))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/admin/keys/{key}")
async def delete_key(key: str, admin: bool = Depends(require_admin)):
conn = get_db()
conn.execute("DELETE FROM api_keys WHERE key = ?", (key,))
conn.commit()
conn.close()
return {"ok": True}
# ---------- Admin: InfluxDB Buckets ----------
@app.get("/admin/buckets")
async def list_buckets(admin: bool = Depends(require_admin)):
if not client:
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
buckets_api = client.buckets_api()
buckets = buckets_api.find_buckets().buckets
return [{"name": b.name, "id": b.id} for b in buckets]
# ---------- Admin: Events ----------
@app.get("/admin/events")
async def list_events(hours: float = 24, limit: int = 100, admin: bool = Depends(require_admin)):
if not client:
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -{hours}h)
|> limit(n: {limit})
'''
tables = client.query_api().query(query, org=INFLUX_ORG)
events = []
for table in tables:
for record in table.records:
events.append({
"time": record.get_time().isoformat(),
"measurement": record.get_measurement(),
"fields": {k: v for k, v in record.values.items() if k not in ('_start','_stop','_time')},
"tags": dict(record.values.get("_tags", {}))
})
return events[:limit]
@app.delete("/admin/events")
async def delete_events(
start: str = Query(..., description="ISO timestamp"),
stop: str = Query(...),
measurement: str = Query(None),
admin: bool = Depends(require_admin)
):
if not client:
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
try:
predicate = f'_measurement="{measurement}"' if measurement else ""
delete_api = client.delete_api()
delete_api.delete(
start=start, stop=stop, bucket=INFLUX_BUCKET, org=INFLUX_ORG, predicate=predicate
)
return {"ok": True}
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)