46 lines
1.5 KiB
Python
46 lines
1.5 KiB
Python
import os
|
||
import logging
|
||
from influxdb_client import InfluxDBClient, Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
logger = logging.getLogger("influx")
|
||
|
||
INFLUX_URL = os.getenv("INFLUX_URL", "")
|
||
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN", "")
|
||
INFLUX_ORG = os.getenv("INFLUX_ORG", "")
|
||
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET", "")
|
||
|
||
client = None
|
||
write_api = None
|
||
|
||
if INFLUX_URL and INFLUX_TOKEN:
|
||
try:
|
||
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
logger.info("InfluxDB client initialised")
|
||
except Exception as e:
|
||
logger.error(f"InfluxDB client creation failed: {e}")
|
||
client = None
|
||
else:
|
||
logger.warning("InfluxDB environment variables missing – writing disabled")
|
||
|
||
def write_event(event) -> bool:
|
||
if write_api is None:
|
||
logger.debug("InfluxDB not available, skipping write")
|
||
return False
|
||
try:
|
||
p = Point(event.type).time(event.time * 1_000_000_000)
|
||
p.tag("game", event.data.get("game", "unknown"))
|
||
p.tag("serverId", event.serverId)
|
||
# Optional tags
|
||
if "oreType" in event.data:
|
||
p.tag("oreType", event.data["oreType"])
|
||
# Write all numeric/string fields
|
||
for k, v in event.data.items():
|
||
if isinstance(v, (int, float, str, bool)):
|
||
p.field(k, v)
|
||
write_api.write(bucket=INFLUX_BUCKET, record=p)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Write failed: {e}")
|
||
return False |