changed main
This commit is contained in:
119
api/app/main.py
119
api/app/main.py
@@ -1,26 +1,101 @@
|
||||
from fastapi import FastAPI, Depends, HTTPException
|
||||
from .auth import init_db, verify_api_key
|
||||
from .models import Event, BatchEvents
|
||||
from .enrichment import enrich_event
|
||||
from .influx import write_event
|
||||
import logging
|
||||
# Add to api/app/main.py (after the existing imports and app definition)
|
||||
import os
|
||||
from fastapi import Request, Query
|
||||
from fastapi.responses import JSONResponse
|
||||
from .influx import client, INFLUX_ORG, INFLUX_BUCKET
|
||||
import sqlite3
|
||||
import datetime
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
init_db()
|
||||
ADMIN_KEY = os.getenv("ADMIN_API_KEY", "admin-secret-change-me")
|
||||
|
||||
app = FastAPI(title="Signal - Roblox Telemetry")
|
||||
# ---------- simple admin auth helper ----------
|
||||
def require_admin(x_admin_key: str = Header(None)):
|
||||
if not x_admin_key or x_admin_key != ADMIN_KEY:
|
||||
raise HTTPException(status_code=403, detail="Forbidden")
|
||||
return True
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "ok"}
|
||||
# ---------- API Keys Management ----------
|
||||
@app.get("/admin/keys")
|
||||
async def list_keys(admin: bool = Depends(require_admin)):
|
||||
conn = get_db()
|
||||
rows = conn.execute("SELECT key, game, active, created_at FROM api_keys").fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
@app.post("/api/log")
|
||||
async def ingest_event(payload: Event | BatchEvents, game: str = Depends(verify_api_key)):
|
||||
if isinstance(payload, BatchEvents):
|
||||
for event in payload.events:
|
||||
enriched = enrich_event(event, game)
|
||||
write_event(enriched)
|
||||
else:
|
||||
enriched = enrich_event(payload, game)
|
||||
write_event(enriched)
|
||||
return {"success": True}
|
||||
@app.post("/admin/keys")
|
||||
async def create_key(game: str = Query(...), key: str = Query(None), admin: bool = Depends(require_admin)):
|
||||
if not key:
|
||||
import secrets
|
||||
key = secrets.token_hex(16) # auto-generate if not provided
|
||||
conn = get_db()
|
||||
conn.execute("INSERT OR REPLACE INTO api_keys(key, game, active) VALUES(?, ?, 1)", (key, game))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return {"key": key, "game": game, "active": True}
|
||||
|
||||
@app.put("/admin/keys/{key}/toggle")
|
||||
async def toggle_key(key: str, admin: bool = Depends(require_admin)):
|
||||
conn = get_db()
|
||||
conn.execute("UPDATE api_keys SET active = NOT active WHERE key = ?", (key,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return {"ok": True}
|
||||
|
||||
@app.delete("/admin/keys/{key}")
|
||||
async def delete_key(key: str, admin: bool = Depends(require_admin)):
|
||||
conn = get_db()
|
||||
conn.execute("DELETE FROM api_keys WHERE key = ?", (key,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return {"ok": True}
|
||||
|
||||
# ---------- InfluxDB Bucket Info ----------
|
||||
@app.get("/admin/buckets")
|
||||
async def list_buckets(admin: bool = Depends(require_admin)):
|
||||
if not client:
|
||||
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
|
||||
buckets_api = client.buckets_api()
|
||||
buckets = buckets_api.find_buckets().buckets
|
||||
return [{"name": b.name, "id": b.id, "org": b.org_id} for b in buckets]
|
||||
|
||||
# ---------- View recent events ----------
|
||||
@app.get("/admin/events")
|
||||
async def list_events(hours: float = 24, limit: int = 100, admin: bool = Depends(require_admin)):
|
||||
if not client:
|
||||
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
|
||||
query = f'''
|
||||
from(bucket: "{INFLUX_BUCKET}")
|
||||
|> range(start: -{hours}h)
|
||||
|> limit(n: {limit})
|
||||
'''
|
||||
tables = client.query_api().query(query, org=INFLUX_ORG)
|
||||
events = []
|
||||
for table in tables:
|
||||
for record in table.records:
|
||||
events.append({
|
||||
"time": record.get_time().isoformat(),
|
||||
"measurement": record.get_measurement(),
|
||||
"fields": {k: v for k, v in record.values.items() if k not in ('_start','_stop','_time')},
|
||||
"tags": dict(record.values.get("_tags", {}))
|
||||
})
|
||||
return events[:limit]
|
||||
|
||||
# ---------- Delete events (by time range + optional measurement) ----------
|
||||
@app.delete("/admin/events")
|
||||
async def delete_events(
|
||||
start: str = Query(..., description="ISO timestamp, e.g. 2026-04-27T00:00:00Z"),
|
||||
stop: str = Query(..., description="ISO timestamp"),
|
||||
measurement: str = Query(None),
|
||||
admin: bool = Depends(require_admin)
|
||||
):
|
||||
if not client:
|
||||
return JSONResponse({"error": "InfluxDB client not initialised"}, status_code=503)
|
||||
try:
|
||||
predicate = f'_measurement="{measurement}"' if measurement else ""
|
||||
delete_api = client.delete_api()
|
||||
delete_api.delete(
|
||||
start=start, stop=stop, bucket=INFLUX_BUCKET, org=INFLUX_ORG, predicate=predicate
|
||||
)
|
||||
return {"ok": True}
|
||||
except Exception as e:
|
||||
return JSONResponse({"error": str(e)}, status_code=500)
|
||||
Reference in New Issue
Block a user