import os import asyncio import json import time import socket import secrets import threading from collections import deque from typing import Optional from urllib.parse import unquote from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from common import ( _helper_call, _podman_action_post, _podman_get_json, _podman_get_text, _systemd_then_podman, ) _SESSION = None _PODMAN_API_BASE = None # --- STATS CACHE (contract-neutral; in-memory) --- # Poll Podman stats centrally and expose as optional dashboard fields. _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None} _STATS_CACHE_TS = None _STATS_POLLER_TASK = None _STATS_SHOWN_NAMES: set = set() # namen van alle dashboard-containers uit laatste dashboard call # --- EXEC SESSION CACHE (in-memory) --- _EXEC_SESSIONS = {} # session_id -> _ExecSessionState _EXEC_SESSIONS_LOCK = threading.Lock() _EXEC_SESSION_IDLE_TTL_SECONDS = 60 * 60 _EXEC_SESSION_CLOSED_GC_SECONDS = 5 * 60 _EXEC_SESSION_MAX_ACTIVE = 12 _EXEC_INPUT_MAX_BYTES = 32 * 1024 class ExecStartRequest(BaseModel): cmd: list[str] = Field(default_factory=lambda: ["/bin/sh"]) tty: bool = True class ExecInputRequest(BaseModel): data: str = "" class ExecResizeRequest(BaseModel): rows: int = 24 cols: int = 80 class _ExecSessionState: def __init__(self, session_id: str, exec_id: str, container: str, sock: socket.socket, tty: bool): self.session_id = session_id self.exec_id = exec_id self.container = container self.sock = sock self.tty = tty self.created_at = int(time.time()) self.last_activity = self.created_at self.closed = False self.close_reason = "" self.seq = 0 self.events = deque(maxlen=2000) # {"seq","ts","type","data"} self.lock = threading.Lock() self.reader_thread = None def push_event(self, event_type: str, data: str): with self.lock: self.seq += 1 self.events.append({ "seq": self.seq, "ts": int(time.time()), "type": event_type, "data": data, }) self.last_activity = int(time.time()) def mark_closed(self, reason: str): if self.closed: return self.closed = True self.close_reason = reason or "closed" self.push_event("closed", self.close_reason) def _parse_podman_unix_socket_and_base(api_base: str) -> tuple[str, str]: if not isinstance(api_base, str) or not api_base.startswith("http+unix://"): raise HTTPException(status_code=500, detail="Unsupported PODMAN_API_BASE for exec") tail = api_base[len("http+unix://"):] slash = tail.find("/") if slash < 0: encoded_socket = tail base_path = "" else: encoded_socket = tail[:slash] base_path = tail[slash:] socket_path = unquote(encoded_socket) if not socket_path: raise HTTPException(status_code=500, detail="Podman socket path missing") if not base_path: base_path = "" if base_path and not base_path.startswith("/"): base_path = "/" + base_path return socket_path, base_path def _open_exec_hijacked_socket(exec_id: str, tty: bool) -> tuple[socket.socket, bytes]: socket_path, base_path = _parse_podman_unix_socket_and_base(_PODMAN_API_BASE) req_path = f"{base_path}/libpod/exec/{exec_id}/start" body = json.dumps({"Detach": False, "Tty": bool(tty)}, separators=(",", ":")).encode("utf-8") sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(10.0) sock.connect(socket_path) req = ( f"POST {req_path} HTTP/1.1\r\n" "Host: d\r\n" "Content-Type: application/json\r\n" f"Content-Length: {len(body)}\r\n" "\r\n" ).encode("utf-8") + body sock.sendall(req) raw = b"" while b"\r\n\r\n" not in raw: chunk = sock.recv(4096) if not chunk: break raw += chunk if len(raw) > 64 * 1024: sock.close() raise HTTPException(status_code=502, detail="Exec start response headers too large") if b"\r\n\r\n" not in raw: sock.close() raise HTTPException(status_code=502, detail="Exec start invalid HTTP response") head, _, rest = raw.partition(b"\r\n\r\n") status_line = head.split(b"\r\n", 1)[0].decode("utf-8", errors="replace") try: parts = status_line.split(" ", 2) status_code = int(parts[1]) except Exception: sock.close() raise HTTPException(status_code=502, detail=f"Exec start parse error: {status_line}") if status_code != 200: body_preview = rest.decode("utf-8", errors="replace") sock.close() raise HTTPException(status_code=502, detail=f"Exec start failed ({status_code}): {body_preview}") sock.settimeout(1.0) return sock, rest def _get_exec_session_or_404(session_id: str) -> _ExecSessionState: with _EXEC_SESSIONS_LOCK: sess = _EXEC_SESSIONS.get(session_id) if not sess: raise HTTPException(status_code=404, detail=f"Unknown exec session: {session_id}") return sess def _close_exec_session(sess: _ExecSessionState, reason: str): try: sess.sock.shutdown(socket.SHUT_RDWR) except Exception: pass try: sess.sock.close() except Exception: pass sess.mark_closed(reason) def _cleanup_exec_sessions(): now = int(time.time()) to_delete = [] with _EXEC_SESSIONS_LOCK: for sid, sess in _EXEC_SESSIONS.items(): idle = now - int(sess.last_activity or now) if sess.closed and idle > _EXEC_SESSION_CLOSED_GC_SECONDS: to_delete.append(sid) continue if (not sess.closed) and idle > _EXEC_SESSION_IDLE_TTL_SECONDS: _close_exec_session(sess, "idle-timeout") for sid in to_delete: _EXEC_SESSIONS.pop(sid, None) def _reader_loop(session_id: str, sess: _ExecSessionState, initial_rest: bytes): try: if initial_rest: txt = initial_rest.decode("utf-8", errors="replace") if txt: sess.push_event("stdout", txt) while not sess.closed: try: chunk = sess.sock.recv(4096) except socket.timeout: continue except Exception as e: _close_exec_session(sess, f"read-error: {str(e)}") break if not chunk: _close_exec_session(sess, "eof") break txt = chunk.decode("utf-8", errors="replace") if txt: sess.push_event("stdout", txt) finally: sess.mark_closed(sess.close_reason or "reader-exit") def _norm_container_name(name) -> str: try: return str(name or "").lstrip("/") except Exception: return "" def _parse_stats_interval_seconds() -> float: raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0") try: v = float(raw) except Exception: v = 1.0 if v <= 0: v = 1.0 if v < 0.5: v = 0.5 if v > 30: v = 30 return v def _parse_positive_int_env(name: str, default: int, minimum: int, maximum: int) -> int: raw = os.getenv(name, str(default)) try: v = int(raw) except Exception: v = int(default) if v < minimum: v = minimum if v > maximum: v = maximum return v def _exec_max_active_sessions() -> int: return _parse_positive_int_env("EXEC_SESSION_MAX_ACTIVE", _EXEC_SESSION_MAX_ACTIVE, 1, 500) def _exec_max_input_bytes() -> int: return _parse_positive_int_env("EXEC_INPUT_MAX_BYTES", _EXEC_INPUT_MAX_BYTES, 64, 1024 * 1024) async def _stats_poller_loop(): global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS interval = _parse_stats_interval_seconds() stats_url = f"{_PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" def _to_float(x): try: return float(x) except Exception: return None while True: try: data = _SESSION.get(stats_url, timeout=5).json() stats_list = data.get("Stats") if isinstance(data, dict) else None if not isinstance(stats_list, list): stats_list = [] new_cache = {} for st in stats_list: if not isinstance(st, dict): continue key = _norm_container_name(st.get("Name")) if not key: continue # CPUPerc returned by Podman is already percentage (0.10 == 0.10%) cpu_val = st.get("CPUPerc") if cpu_val is None: cpu_val = st.get("CPU") if cpu_val is None: cpu_val = st.get("AvgCPU") new_cache[key] = { "cpu": _to_float(cpu_val), "mem_usage": _to_float(st.get("MemUsage")), "mem_perc": _to_float(st.get("MemPerc")), } _STATS_CACHE_BY_NAME = new_cache _STATS_CACHE_TS = int(__import__("time").time()) except Exception: # Keep last good cache; try again next tick. pass await asyncio.sleep(interval) async def start_stats_poller(): global _STATS_POLLER_TASK if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done(): return loop = asyncio.get_running_loop() _STATS_POLLER_TASK = loop.create_task(_stats_poller_loop()) def init_containers_router( session, podman_api_base: str, workloads_dir: str, systemctl_func, ) -> APIRouter: router = APIRouter(tags=["containers"]) global _SESSION, _PODMAN_API_BASE _SESSION = session _PODMAN_API_BASE = podman_api_base def find_defined_containers(): defined = {} for root, _, files in os.walk(os.path.join(workloads_dir, "systemd")): for f in files: if f.endswith(".container"): name = os.path.splitext(f)[0] full = os.path.join(root, f) rel = os.path.relpath(full, workloads_dir) defined[name] = rel return defined def _ensure_container_status_field(container: dict): # keep exact existing defaulting behavior if "Status" not in container: container["Status"] = container.get("State", "") def _make_defined_container_dashboard_row(name: str, relpath: str): # keep exact key set and default values as before return { "Names": [name], "Image": "", "State": "", "Status": "", "Ports": [], "PodName": "", "_dashboard_source": "systemd", "_dashboard_unit": f"{name}.service", "_dashboard_def_path": relpath, "_dashboard_cpu": None, "_dashboard_mem_usage": None, "_dashboard_mem_perc": None, } def _legacy_dashboard_item_from_container(c: dict): # Keep exact keys & defaults as before return { "name": (c.get("Names") or ["?"])[0], "status": c.get("Status") or c.get("State") or "", "path": "", "ip": "", "containers": [], } def _extract_published_ports(container: dict) -> list[str]: """ Normalize Podman API Ports into a stable display list: - "127.0.0.1:8080:8000/tcp" - "8080:8000/tcp" (if no host ip) """ out: list[str] = [] for p in (container.get("Ports") or []): host_ip = p.get("host_ip") or p.get("HostIp") or "" host_port = p.get("host_port") or p.get("HostPort") cont_port = p.get("container_port") or p.get("ContainerPort") proto = p.get("protocol") or p.get("Protocol") or "" if host_port is None or cont_port is None: continue s = "" if host_ip: s += f"{host_ip}:" s += f"{host_port}:{cont_port}" if proto: s += f"/{proto}" out.append(s) return out @router.get("/containers-dashboard") def containers_dashboard(): dashboard = [] defined = find_defined_containers() stats_by_name = _STATS_CACHE_BY_NAME # A) echte containers (runtime) real = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true") for c in real: _ensure_container_status_field(c) # Published ports: behoud jouw hotfix c["_dashboard_published_ports"] = _extract_published_ports(c) # Normaliseer naam: Podman kan "/name" geven rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/") # Optional live stats (always present; null on miss) c["_dashboard_cpu"] = None c["_dashboard_mem_usage"] = None c["_dashboard_mem_perc"] = None st = stats_by_name.get(rname) if isinstance(st, dict): c["_dashboard_cpu"] = st.get("cpu") c["_dashboard_mem_usage"] = st.get("mem_usage") c["_dashboard_mem_perc"] = st.get("mem_perc") # Classificatie: PODMAN_SYSTEMD_UNIT label is ground truth labels = c.get("Labels") or {} podman_unit = labels.get("PODMAN_SYSTEMD_UNIT") or "" if podman_unit: c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = podman_unit else: c["_dashboard_source"] = "podman" # Definitiepad: onafhankelijk van classificatie if rname in defined: c["_dashboard_def_path"] = defined[rname] dashboard.append(c) # B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures) runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real) # C) defined containers from systemd/*.container (skip duplicates) for name, relpath in defined.items(): if name in runtime_names: continue row = _make_defined_container_dashboard_row(name, relpath) code, out = systemctl_func(["systemctl", "--user", "is-active", f"{name}.service"]) row["Status"] = (out or "").strip() dashboard.append(row) # Bijwerken welke containernamen in het dashboard staan (voor /stats filter) global _STATS_SHOWN_NAMES _STATS_SHOWN_NAMES = { _norm_container_name((c.get("Names") or ["?"])[0]) for c in dashboard } - {"?", ""} return dashboard @router.get("/stats") def stats_snapshot(): cache = _STATS_CACHE_BY_NAME if _STATS_SHOWN_NAMES: return {k: v for k, v in cache.items() if k in _STATS_SHOWN_NAMES} return cache @router.get("/containers") def list_containers(): # Ook hier ?all=true voor gestopte containers url = f"{podman_api_base}/libpod/containers/json?all=true" return _podman_get_json(session, url) @router.get("/containers/inspect/{name}") def inspect_container(name: str): return _podman_get_json(session, f"{podman_api_base}/libpod/containers/{name}/json") @router.get("/containers/logs/{name}") def get_container_logs(name: str): # We vragen de laatste 100 regels op (tail=100) txt = _podman_get_text(session, f"{podman_api_base}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100") # Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst return {"logs": txt} @router.get("/containers/stats/stream") async def containers_stats_stream(interval: float = 2.0): """ SSE stream met periodieke container stats. Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast. """ # Guardrails tegen misbruik if interval < 0.5: interval = 0.5 if interval > 30: interval = 30 stats_url = f"{podman_api_base}/libpod/containers/stats?all=true&stream=false" async def event_gen(): try: while True: # timeout zodat een haperende podman socket niet je stream “bevriest” try: data = session.get(stats_url, timeout=5).json() except Exception as e: data = {"Error": str(e), "Stats": []} payload = { "ts": int(__import__("time").time()), "data": data, } yield "event: stats\n" yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n" await asyncio.sleep(interval) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # helpt bij proxies } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) @router.get("/debug/defined-containers") def debug_defined_containers(): return find_defined_containers() @router.get("/dashboard") def get_dashboard(): # Legacy dashboard view (keep shape) try: api_containers = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true") except: api_containers = [] items = [] for c in api_containers: items.append(_legacy_dashboard_item_from_container(c)) return items @router.post("/containers/{action}/{name}") def container_action(action: str, name: str): """ Voer een actie uit op een container. - **start** — Start de container (of bijbehorende systemd-unit). - **stop** — ⚠️ Destructief: stopt de container direct. - **restart** — ⚠️ Destructief: herstart de container direct. Gebruikt systemd als de container een beheerde unit heeft; anders Podman API direct. """ if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 defined = find_defined_containers() _sys = {"code": None, "out": None} def _systemd_call(): if name in defined: code, out = _helper_call(action, f"{name}.service") _sys["code"] = code _sys["out"] = out if code == 0: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": code, "output": out, } return {"exit": code, "output": out} return None def _podman_call(systemd_res): res = _podman_action_post(session, podman_api_base, "containers", name, action) if res.status_code in (200, 204): return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} if res.status_code >= 400: return { "method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code, "error": getattr(res, "text", "") or "", }, res.status_code if name in defined: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": _sys["code"], "output": _sys["out"], } return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} return _systemd_then_podman(_systemd_call, _podman_call) @router.post("/containers/{name}/exec/start") def container_exec_start(name: str, req: Optional[ExecStartRequest] = None): _cleanup_exec_sessions() if req is None: req = ExecStartRequest() cmd = req.cmd or ["/bin/sh"] with _EXEC_SESSIONS_LOCK: active = sum(1 for s in _EXEC_SESSIONS.values() if not s.closed) max_active = _exec_max_active_sessions() if active >= max_active: raise HTTPException( status_code=429, detail=f"Too many active exec sessions ({active}/{max_active})", ) create_url = f"{podman_api_base}/libpod/containers/{name}/exec" payload = { "AttachStdin": True, "AttachStdout": True, "AttachStderr": True, "Tty": bool(req.tty), "Cmd": cmd, } try: create_res = session.post(create_url, json=payload, timeout=10) except Exception as e: raise HTTPException(status_code=502, detail=f"Exec create request failed: {str(e)}") if create_res.status_code >= 400: raise HTTPException(status_code=502, detail=create_res.text) try: exec_id = (create_res.json() or {}).get("Id") except Exception: exec_id = None if not exec_id: raise HTTPException(status_code=502, detail=f"Exec create returned no Id: {create_res.text}") sock, initial_rest = _open_exec_hijacked_socket(exec_id, bool(req.tty)) session_id = secrets.token_hex(8) sess = _ExecSessionState( session_id=session_id, exec_id=exec_id, container=name, sock=sock, tty=bool(req.tty), ) t = threading.Thread(target=_reader_loop, args=(session_id, sess, initial_rest), daemon=True) sess.reader_thread = t with _EXEC_SESSIONS_LOCK: _EXEC_SESSIONS[session_id] = sess t.start() return { "session_id": session_id, "exec_id": exec_id, "container": name, "tty": bool(req.tty), "cmd": cmd, "created_at": sess.created_at, } @router.get("/containers/exec/{session_id}") def container_exec_session_info(session_id: str): _cleanup_exec_sessions() sess = _get_exec_session_or_404(session_id) with sess.lock: events = len(sess.events) seq = sess.seq return { "session_id": sess.session_id, "exec_id": sess.exec_id, "container": sess.container, "tty": sess.tty, "created_at": sess.created_at, "last_activity": sess.last_activity, "closed": sess.closed, "close_reason": sess.close_reason, "event_count": events, "event_seq": seq, } @router.get("/containers/exec/{session_id}/stream") async def container_exec_stream(session_id: str, after: int = 0): _cleanup_exec_sessions() sess = _get_exec_session_or_404(session_id) async def event_gen(): cursor = int(after or 0) last_ping = time.time() try: while True: pending = [] closed = False with sess.lock: pending = [e for e in sess.events if e["seq"] > cursor] closed = sess.closed if pending: for ev in pending: cursor = ev["seq"] yield "event: exec\n" yield f"data: {json.dumps(ev, separators=(',',':'))}\n\n" else: now = time.time() if (now - last_ping) >= 10.0: last_ping = now yield "event: ping\n" yield f"data: {int(now)}\n\n" if closed and not pending: break await asyncio.sleep(0.2) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) @router.post("/containers/exec/{session_id}/input") def container_exec_input(session_id: str, req: ExecInputRequest): _cleanup_exec_sessions() sess = _get_exec_session_or_404(session_id) if sess.closed: raise HTTPException(status_code=409, detail=f"Exec session is closed: {sess.close_reason or 'closed'}") data = (req.data or "").encode("utf-8") if not data: return {"ok": True, "session_id": session_id, "bytes": 0} max_bytes = _exec_max_input_bytes() if len(data) > max_bytes: raise HTTPException( status_code=413, detail=f"Input too large ({len(data)} bytes > {max_bytes} bytes)", ) try: sess.sock.sendall(data) sess.last_activity = int(time.time()) return {"ok": True, "session_id": session_id, "bytes": len(data)} except Exception as e: _close_exec_session(sess, f"write-error: {str(e)}") raise HTTPException(status_code=409, detail=f"Exec input failed: {str(e)}") @router.post("/containers/exec/{session_id}/resize") def container_exec_resize(session_id: str, req: ExecResizeRequest): _cleanup_exec_sessions() sess = _get_exec_session_or_404(session_id) try: insp = session.get(f"{podman_api_base}/libpod/exec/{sess.exec_id}/json", timeout=5) except Exception as e: raise HTTPException(status_code=502, detail=f"Exec inspect failed: {str(e)}") if insp.status_code >= 400: raise HTTPException(status_code=502, detail=insp.text) try: running = bool((insp.json() or {}).get("Running")) except Exception: running = False if not running: raise HTTPException(status_code=409, detail="Exec session is not running; resize requires running session") url = f"{podman_api_base}/libpod/exec/{sess.exec_id}/resize?h={int(req.rows)}&w={int(req.cols)}" try: res = session.post(url, timeout=5) except Exception as e: raise HTTPException(status_code=502, detail=f"Exec resize request failed: {str(e)}") if res.status_code >= 400: detail = (res.text or "").strip() if res.status_code == 500 and "not running" in detail.lower(): raise HTTPException(status_code=409, detail="Exec session is not running") raise HTTPException(status_code=502, detail=detail) sess.last_activity = int(time.time()) return {"ok": True, "session_id": session_id, "rows": int(req.rows), "cols": int(req.cols)} @router.post("/containers/exec/{session_id}/stop") def container_exec_stop(session_id: str): _cleanup_exec_sessions() sess = _get_exec_session_or_404(session_id) if sess.closed: return {"ok": True, "session_id": session_id, "already_closed": True, "reason": sess.close_reason} _close_exec_session(sess, "stopped-by-user") return {"ok": True, "session_id": session_id, "already_closed": False, "reason": "stopped-by-user"} return router