import os import asyncio import json from fastapi import APIRouter from fastapi.responses import StreamingResponse from common import ( _map_pod_to_unit, _podman_action_post, _podman_get_json, _podman_get_text, _systemd_then_podman, ) _SESSION = None _PODMAN_API_BASE = None # --- STATS CACHE (contract-neutral; in-memory) --- # Poll Podman stats centrally and expose as optional dashboard fields. _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None} _STATS_CACHE_TS = None _STATS_POLLER_TASK = None def _norm_container_name(name) -> str: try: return str(name or "").lstrip("/") except Exception: return "" def _parse_stats_interval_seconds() -> float: raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0") try: v = float(raw) except Exception: v = 1.0 if v <= 0: v = 1.0 if v < 0.5: v = 0.5 if v > 30: v = 30 return v async def _stats_poller_loop(): global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS interval = _parse_stats_interval_seconds() stats_url = f"{_PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" def _to_float(x): try: return float(x) except Exception: return None while True: try: data = _SESSION.get(stats_url, timeout=5).json() stats_list = data.get("Stats") if isinstance(data, dict) else None if not isinstance(stats_list, list): stats_list = [] new_cache = {} for st in stats_list: if not isinstance(st, dict): continue key = _norm_container_name(st.get("Name")) if not key: continue # CPUPerc returned by Podman is already percentage (0.10 == 0.10%) cpu_val = st.get("CPUPerc") if cpu_val is None: cpu_val = st.get("CPU") if cpu_val is None: cpu_val = st.get("AvgCPU") new_cache[key] = { "cpu": _to_float(cpu_val), "mem_usage": _to_float(st.get("MemUsage")), "mem_perc": _to_float(st.get("MemPerc")), } _STATS_CACHE_BY_NAME = new_cache _STATS_CACHE_TS = int(__import__("time").time()) except Exception: # Keep last good cache; try again next tick. pass await asyncio.sleep(interval) async def start_stats_poller(): global _STATS_POLLER_TASK if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done(): return loop = asyncio.get_running_loop() _STATS_POLLER_TASK = loop.create_task(_stats_poller_loop()) def init_containers_router( session, podman_api_base: str, workloads_dir: str, systemctl_func, ) -> APIRouter: router = APIRouter(tags=["containers"]) global _SESSION, _PODMAN_API_BASE _SESSION = session _PODMAN_API_BASE = podman_api_base def find_defined_containers(): defined = {} for root, _, files in os.walk(os.path.join(workloads_dir, "systemd")): for f in files: if f.endswith(".container"): name = os.path.splitext(f)[0] full = os.path.join(root, f) rel = os.path.relpath(full, workloads_dir) defined[name] = rel return defined def _ensure_container_status_field(container: dict): # keep exact existing defaulting behavior if "Status" not in container: container["Status"] = container.get("State", "") def _make_defined_container_dashboard_row(name: str, relpath: str): # keep exact key set and default values as before return { "Names": [name], "Image": "", "State": "", "Status": "", "Ports": [], "PodName": "", "_dashboard_source": "systemd", "_dashboard_unit": f"{name}.service", "_dashboard_def_path": relpath, "_dashboard_cpu": None, "_dashboard_mem_usage": None, "_dashboard_mem_perc": None, } def _legacy_dashboard_item_from_container(c: dict): # Keep exact keys & defaults as before return { "name": (c.get("Names") or ["?"])[0], "status": c.get("Status") or c.get("State") or "", "path": "", "ip": "", "containers": [], } def _extract_published_ports(container: dict) -> list[str]: """ Normalize Podman API Ports into a stable display list: - "127.0.0.1:8080:8000/tcp" - "8080:8000/tcp" (if no host ip) """ out: list[str] = [] for p in (container.get("Ports") or []): host_ip = p.get("host_ip") or p.get("HostIp") or "" host_port = p.get("host_port") or p.get("HostPort") cont_port = p.get("container_port") or p.get("ContainerPort") proto = p.get("protocol") or p.get("Protocol") or "" if host_port is None or cont_port is None: continue s = "" if host_ip: s += f"{host_ip}:" s += f"{host_port}:{cont_port}" if proto: s += f"/{proto}" out.append(s) return out @router.get("/containers-dashboard") def containers_dashboard(): dashboard = [] defined = find_defined_containers() # Cache zodat we niet voor elke container opnieuw systemctl doen unit_active_cache = {} stats_by_name = _STATS_CACHE_BY_NAME def _unit_is_active(unit): if not unit: return False if unit in unit_active_cache: return unit_active_cache[unit] code, out = systemctl_func(["systemctl", "--user", "is-active", unit]) ok = (code == 0) or ((out or "").strip() == "active") unit_active_cache[unit] = ok return ok # A) echte containers (runtime) real = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true") for c in real: _ensure_container_status_field(c) # Published ports: behoud jouw hotfix c["_dashboard_published_ports"] = _extract_published_ports(c) # Normaliseer naam: Podman kan "/name" geven rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/") # Optional live stats (always present; null on miss) c["_dashboard_cpu"] = None c["_dashboard_mem_usage"] = None c["_dashboard_mem_perc"] = None st = stats_by_name.get(rname) if isinstance(st, dict): c["_dashboard_cpu"] = st.get("cpu") c["_dashboard_mem_usage"] = st.get("mem_usage") c["_dashboard_mem_perc"] = st.get("mem_perc") # 1) Managed: systemd als er een .container definitie bestaat if rname in defined: c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = f"{rname}.service" c["_dashboard_def_path"] = defined[rname] else: # 2) Extra: zit container in een pod die via systemd (kube/quadlet) draait? podname = (c.get("PodName") or "").strip() pod_unit = _map_pod_to_unit(podname) if podname else None if pod_unit and _unit_is_active(pod_unit): c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = pod_unit # geen _dashboard_def_path, want dit is geen .container definitie else: c["_dashboard_source"] = "podman" dashboard.append(c) # B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures) runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real) # C) defined containers from systemd/*.container (skip duplicates) for name, relpath in defined.items(): if name in runtime_names: continue row = _make_defined_container_dashboard_row(name, relpath) code, out = systemctl_func(["systemctl", "--user", "is-active", f"{name}.service"]) row["Status"] = (out or "").strip() dashboard.append(row) return dashboard @router.get("/containers") def list_containers(): # Ook hier ?all=true voor gestopte containers url = f"{podman_api_base}/libpod/containers/json?all=true" return _podman_get_json(session, url) @router.get("/containers/inspect/{name}") def inspect_container(name: str): return _podman_get_json(session, f"{podman_api_base}/libpod/containers/{name}/json") @router.get("/containers/logs/{name}") def get_container_logs(name: str): # We vragen de laatste 100 regels op (tail=100) txt = _podman_get_text(session, f"{podman_api_base}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100") # Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst return {"logs": txt} @router.get("/containers/stats/stream") async def containers_stats_stream(interval: float = 2.0): """ SSE stream met periodieke container stats. Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast. """ # Guardrails tegen misbruik if interval < 0.5: interval = 0.5 if interval > 30: interval = 30 stats_url = f"{podman_api_base}/libpod/containers/stats?all=true&stream=false" async def event_gen(): try: while True: # timeout zodat een haperende podman socket niet je stream “bevriest” try: data = session.get(stats_url, timeout=5).json() except Exception as e: data = {"Error": str(e), "Stats": []} payload = { "ts": int(__import__("time").time()), "data": data, } yield "event: stats\n" yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n" await asyncio.sleep(interval) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # helpt bij proxies } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) @router.get("/debug/defined-containers") def debug_defined_containers(): return find_defined_containers() @router.get("/dashboard") def get_dashboard(): # Legacy dashboard view (keep shape) try: api_containers = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true") except: api_containers = [] items = [] for c in api_containers: items.append(_legacy_dashboard_item_from_container(c)) return items @router.post("/containers/{action}/{name}") def container_action(action: str, name: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 defined = find_defined_containers() _sys = {"code": None, "out": None} def _systemd_call(): if name in defined: code, out = systemctl_func(["systemctl", "--user", action, name]) _sys["code"] = code _sys["out"] = out if code == 0: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": code, "output": out, } return {"exit": code, "output": out} return None def _podman_call(systemd_res): res = _podman_action_post(session, podman_api_base, "containers", name, action) if res.status_code in (200, 204): return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} if res.status_code >= 400: return { "method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code, "error": getattr(res, "text", "") or "", }, res.status_code if name in defined: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": _sys["code"], "output": _sys["out"], } return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} return _systemd_then_podman(_systemd_call, _podman_call) return router