import os import sys import subprocess from app_images import init_images_router from app_files import init_files_router from app_networks import init_networks_router from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel import requests_unixsocket import uvicorn import asyncio import json from pathlib import Path from fastapi.responses import StreamingResponse app = FastAPI(title="Podman MVP Control Plane", root_path="/api") SESSION = requests_unixsocket.Session() PODMAN_API_BASE = "http+unix://%2Frun%2Fuser%2F1000%2Fpodman%2Fpodman.sock/v5.4.2" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WORKLOADS_DIR = "/app/workloads" # --- STATS CACHE (contract-neutral; in-memory) --- # Poll Podman stats centrally and expose as optional dashboard fields. _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None} _STATS_CACHE_TS = None _STATS_POLLER_TASK = None def _norm_container_name(name) -> str: try: return str(name or "").lstrip("/") except Exception: return "" def _parse_stats_interval_seconds() -> float: raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0") try: v = float(raw) except Exception: v = 1.0 if v <= 0: v = 1.0 if v < 0.5: v = 0.5 if v > 30: v = 30 return v async def _stats_poller_loop(): global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS interval = _parse_stats_interval_seconds() stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" def _to_float(x): try: return float(x) except Exception: return None while True: try: data = SESSION.get(stats_url, timeout=5).json() stats_list = data.get("Stats") if isinstance(data, dict) else None if not isinstance(stats_list, list): stats_list = [] new_cache = {} for st in stats_list: if not isinstance(st, dict): continue key = _norm_container_name(st.get("Name")) if not key: continue # CPUPerc returned by Podman is already percentage (0.10 == 0.10%) cpu_val = st.get("CPUPerc") if cpu_val is None: cpu_val = st.get("CPU") if cpu_val is None: cpu_val = st.get("AvgCPU") new_cache[key] = { "cpu": _to_float(cpu_val), "mem_usage": _to_float(st.get("MemUsage")), "mem_perc": _to_float(st.get("MemPerc")), } _STATS_CACHE_BY_NAME = new_cache _STATS_CACHE_TS = int(__import__("time").time()) except Exception: # Keep last good cache; try again next tick. pass await asyncio.sleep(interval) @app.on_event("startup") async def _startup_stats_poller(): global _STATS_POLLER_TASK if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done(): return _STATS_POLLER_TASK = asyncio.create_task(_stats_poller_loop()) # --- ROUTERS --- # Images API lives in a dedicated module to keep this file from growing further. app.include_router(init_images_router(SESSION, PODMAN_API_BASE)) app.include_router(init_files_router(SESSION, PODMAN_API_BASE, WORKLOADS_DIR)) app.include_router(init_networks_router(SESSION, PODMAN_API_BASE)) # --- ADAPTERS (contract-neutral helpers) --- # Centralize Podman socket and systemctl invocation. # MUST NOT change endpoint outputs, status codes, or side-effects. def _podman_get_json(url: str): return SESSION.get(url).json() def _podman_get_text(url: str) -> str: return SESSION.get(url).text def _podman_post(url: str, **kwargs): return SESSION.post(url, **kwargs) def _podman_action_post(kind: str, name: str, action: str): if kind == "pods": url = f"{PODMAN_API_BASE}/libpod/pods/{name}/{action}" else: url = f"{PODMAN_API_BASE}/libpod/containers/{name}/{action}" return _podman_post(url) def _podman_delete(url: str): return SESSION.delete(url) def _systemctl(cmd): # Proxy to existing run() to avoid behavioral changes. return run(cmd) def _run_systemctl_action(action: str, unit: str): cmd = ["systemctl", "--user", action, unit] return _systemctl(cmd) @app.get("/health") def health(): podman_ok = False try: r = SESSION.get(f"{PODMAN_API_BASE}/libpod/info", timeout=2) if r.status_code == 200: try: r.json() podman_ok = True except Exception: podman_ok = False except Exception: podman_ok = False systemd_reachable = False try: res = subprocess.run( ["systemctl", "--user", "list-units", "--no-pager", "--no-legend"], capture_output=True, text=True, check=False, timeout=2, ) systemd_reachable = (res.returncode == 0) except Exception: systemd_reachable = False ok = podman_ok and systemd_reachable return {"ok": ok, "podman": {"ok": podman_ok}, "systemd_user": {"reachable": systemd_reachable}} # --- PODS / CONTAINERS --- @app.get("/pods") def list_pods(): # Cruciaal: ?all=true zorgt dat EXIT_STATE pods ook getoond worden url = f"{PODMAN_API_BASE}/libpod/pods/json?all=true" return _podman_get_json(url) @app.post("/actions/{action}/{name}") def take_action(action: str, name: str): # Legacy endpoint (keep behavior) possible_names = [name, f"pod{name}", f"pod-{name}"] if action == "start": # STAP 1: Probeer direct de pod te starten (de 'Cockpit' methode) for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/start") if res.status_code in (200, 204): return {"status": "started", "target": target, "method": "direct"} # STAP 2: Als direct starten faalt, probeer dan YAML opnieuw te deployen target_path = None for ext in (".yaml", ".yml"): cand = os.path.join(WORKLOADS_DIR, f"{name}{ext}") if os.path.exists(cand): target_path = cand break if target_path: with open(target_path, 'r') as file: yaml_content = file.read() res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) # SPECIALE CASE: Pod bestaat al, forceer dan restart if res.status_code == 500 and "already exists" in res.text: print(f"DEBUG: Forceer herstart voor {name} wegens conflict") for target in possible_names: _podman_delete(f"{PODMAN_API_BASE}/libpod/pods/{target}?force=true") # Probeer het nu opnieuw retry_res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) return retry_res.json() return res.json() return {"status": "unknown", "method": "no_yaml_found"} if action == "stop": for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/stop") if res.status_code in (200, 204): return {"status": "stopped", "target": target} return {"status": "not found"} return {"status": "unknown"} # --- DASHBOARD HELPERS (contract-neutral, no ordering/sorting changes) --- def _build_pod_to_containers_map(containers: list): # preserves original order of containers processing; no sorting added pod_to_containers = {} for c in containers: pod_name = c.get("PodName") or "" if pod_name: pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0]) return pod_to_containers def _map_pod_to_unit(podname: str) -> str | None: """ HOTFIX 3.1 FIX 1: If podname starts with "pod", map to .service (e.g. podmediaserver -> mediaserver.service) Else: .service """ if not podname: return None if podname.startswith("pod"): return f"{podname[3:]}.service" return f"{podname}.service" def _append_podman_pods_dashboard_rows(dashboard: list, api_pods: list, pod_to_containers: dict): # preserves original api_pods iteration order for p in api_pods: name = p.get("Name") status = p.get("Status", "unknown") unit = _map_pod_to_unit(name) if name else "" dashboard.append({ "Name": name, "Status": status, "Containers": pod_to_containers.get(name, []), "Unit": unit, "Source": "podman", }) def _append_defined_pods_dashboard_rows(dashboard: list, by_name: dict, root_dir: str): # preserves original os.walk order and file iteration order for root, _, files in os.walk(root_dir): for f in files: if f.endswith((".yaml", ".yml")): base = os.path.splitext(os.path.basename(f))[0] pod_name = f"pod{base}" unit_name = _map_pod_to_unit(pod_name) if pod_name not in by_name: code, out = _systemctl(["systemctl", "--user", "is-active", unit_name]) status = (out or "").strip() or ("active" if code == 0 else "inactive") dashboard.append({ "Name": pod_name, "Status": status, "Containers": [], "Unit": unit_name, "Source": "systemd", }) def _ensure_container_status_field(container: dict): # keep exact existing defaulting behavior if "Status" not in container: container["Status"] = container.get("State", "") def _make_defined_container_dashboard_row(name: str, relpath: str): # keep exact key set and default values as before return { "Names": [name], "Image": "", "State": "", "Status": "", "Ports": [], "PodName": "", "_dashboard_source": "systemd", "_dashboard_unit": f"{name}.service", "_dashboard_def_path": relpath, "_dashboard_cpu": None, "_dashboard_mem_usage": None, "_dashboard_mem_perc": None, } def _legacy_dashboard_item_from_container(c: dict): # Keep exact keys & defaults as before return { "name": (c.get("Names") or ["?"])[0], "status": c.get("Status") or c.get("State") or "", "path": "", "ip": "", "containers": [], } @app.get("/pods-dashboard") def pods_dashboard(): dashboard = [] # 0) Bouw mapping: pod_name -> [container_names...] containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") pod_to_containers = _build_pod_to_containers_map(containers) # 1) A) echte pods api_pods = _podman_get_json(f"{PODMAN_API_BASE}/libpod/pods/json?all=true") by_name = {p.get("Name"): p for p in api_pods} _append_podman_pods_dashboard_rows(dashboard, api_pods, pod_to_containers) # 1) B) defined pods via workloads scan # Based on YAML files in WORKLOADS_DIR; show even if not running. _append_defined_pods_dashboard_rows(dashboard, by_name, WORKLOADS_DIR) return dashboard def _systemd_then_podman(systemd_callable, podman_callable): systemd_res = systemd_callable() if systemd_res is not None: if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0: return systemd_res return podman_callable(systemd_res) return podman_callable(None) def try_systemd_pod_action(action: str, podname: str): # If systemd unit exists/allowed, prefer it. unit = _map_pod_to_unit(podname) if not unit: return None code, out = _systemctl(["systemctl", "--user", action, unit]) return { "method": "systemd", "pod": podname, "unit": unit, "cmd": f"systemctl --user {action} {unit}", "exit": code, "output": out, } @app.post("/pods/actions/{action}/{podname}") def pod_action_prefer_systemd(action: str, podname: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 def _systemd_call(): return try_systemd_pod_action(action, podname) def _podman_call(systemd_res): if systemd_res: note = "systemd failed; falling back to podman" podman = _podman_action_post("pods", podname, action).json() return {"method": "systemd_then_podman", "note": note, "systemd": systemd_res, "podman": podman} return {"method": "podman", "result": _podman_action_post("pods", podname, action).json()} return _systemd_then_podman(_systemd_call, _podman_call) def find_defined_containers(): defined = {} for root, _, files in os.walk(os.path.join(WORKLOADS_DIR, "systemd")): for f in files: if f.endswith(".container"): name = os.path.splitext(f)[0] full = os.path.join(root, f) rel = os.path.relpath(full, WORKLOADS_DIR) defined[name] = rel return defined def _extract_published_ports(container: dict) -> list[str]: """ Normalize Podman API Ports into a stable display list: - "127.0.0.1:8080:8000/tcp" - "8080:8000/tcp" (if no host ip) """ out: list[str] = [] for p in (container.get("Ports") or []): host_ip = p.get("host_ip") or p.get("HostIp") or "" host_port = p.get("host_port") or p.get("HostPort") cont_port = p.get("container_port") or p.get("ContainerPort") proto = p.get("protocol") or p.get("Protocol") or "" if host_port is None or cont_port is None: continue s = "" if host_ip: s += f"{host_ip}:" s += f"{host_port}:{cont_port}" if proto: s += f"/{proto}" out.append(s) return out @app.get("/containers-dashboard") def containers_dashboard(): dashboard = [] defined = find_defined_containers() # Cache zodat we niet voor elke container opnieuw systemctl doen unit_active_cache = {} stats_by_name = _STATS_CACHE_BY_NAME def _unit_is_active(unit): if not unit: return False if unit in unit_active_cache: return unit_active_cache[unit] code, out = _systemctl(["systemctl", "--user", "is-active", unit]) ok = (code == 0) or ((out or "").strip() == "active") unit_active_cache[unit] = ok return ok # A) echte containers (runtime) real = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") for c in real: _ensure_container_status_field(c) # Published ports: behoud jouw hotfix c["_dashboard_published_ports"] = _extract_published_ports(c) # Normaliseer naam: Podman kan "/name" geven rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/") # Optional live stats (always present; null on miss) c["_dashboard_cpu"] = None c["_dashboard_mem_usage"] = None c["_dashboard_mem_perc"] = None st = stats_by_name.get(rname) if isinstance(st, dict): c["_dashboard_cpu"] = st.get("cpu") c["_dashboard_mem_usage"] = st.get("mem_usage") c["_dashboard_mem_perc"] = st.get("mem_perc") # 1) Managed: systemd als er een .container definitie bestaat if rname in defined: c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = f"{rname}.service" c["_dashboard_def_path"] = defined[rname] else: # 2) Extra: zit container in een pod die via systemd (kube/quadlet) draait? podname = (c.get("PodName") or "").strip() pod_unit = _map_pod_to_unit(podname) if podname else None if pod_unit and _unit_is_active(pod_unit): c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = pod_unit # geen _dashboard_def_path, want dit is geen .container definitie else: c["_dashboard_source"] = "podman" dashboard.append(c) # B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures) runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real) # C) defined containers from systemd/*.container (skip duplicates) for name, relpath in defined.items(): if name in runtime_names: continue row = _make_defined_container_dashboard_row(name, relpath) code, out = _systemctl(["systemctl", "--user", "is-active", f"{name}.service"]) row["Status"] = (out or "").strip() dashboard.append(row) return dashboard @app.get("/containers") def list_containers(): # Ook hier ?all=true voor gestopte containers url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true" return _podman_get_json(url) @app.post("/containers/{action}/{name}") def container_action(action: str, name: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 defined = find_defined_containers() _sys = {"code": None, "out": None} def _systemd_call(): if name in defined: code, out = _systemctl(["systemctl", "--user", action, name]) _sys["code"] = code _sys["out"] = out if code == 0: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": code, "output": out, } return {"exit": code, "output": out} return None def _podman_call(systemd_res): res = _podman_action_post("containers", name, action) if res.status_code in (200, 204): return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} if res.status_code >= 400: return { "method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code, "error": getattr(res, "text", "") or "", }, res.status_code if name in defined: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": _sys["code"], "output": _sys["out"], } return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} return _systemd_then_podman(_systemd_call, _podman_call) @app.get("/debug/defined-containers") def debug_defined_containers(): return find_defined_containers() @app.get("/dashboard") def get_dashboard(): # Legacy dashboard view (keep shape) try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except: api_containers = [] items = [] for c in api_containers: items.append(_legacy_dashboard_item_from_container(c)) return items @app.get("/test-hybrid") def test_hybrid(): # 1. Check filesystem try: bestanden = [] for root, _, files in os.walk(WORKLOADS_DIR): for f in files: bestanden.append(os.path.join(root, f)) except Exception as e: bestanden = f"FS Fout: {str(e)}" # 2. Check Podman API try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except Exception as e: api_containers = f"API Fout: {str(e)}" return { "bestanden_gevonden": bestanden if isinstance(bestanden, list) else [], "api_containers_aantal": len(api_containers) if isinstance(api_containers, list) else -1, "api_raw_sample": api_containers[0] if isinstance(api_containers, list) and api_containers else api_containers, } @app.get("/containers/logs/{name}") def get_container_logs(name: str): # We vragen de laatste 100 regels op (tail=100) txt = _podman_get_text(f"{PODMAN_API_BASE}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100") # Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst return {"logs": txt} @app.get("/containers/inspect/{name}") def inspect_container(name: str): return _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/{name}/json") @app.post("/daemon-reload") def api_daemon_reload(): try: code, out = _systemctl(["systemctl", "--user", "daemon-reload"]) return { "cmd": "systemctl --user daemon-reload", "exit": code, "output": out, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/{action}/{unit}") def api_action(action: str, unit: str): if action not in ("status", "start", "stop", "restart"): raise HTTPException(status_code=400, detail="Invalid action") cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} @app.post("/api//") def legacy_api_action(action: str, unit: str): # legacy flask-like path; keep behavior (even if not used by index.html) if action not in ("status", "start", "stop", "restart"): return {"error": "Invalid action"}, 400 cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} def run(cmd): try: result = subprocess.run(cmd, capture_output=True, text=True, check=False) output = (result.stdout or "") + (result.stderr or "") return result.returncode, output.strip() except Exception as e: return 1, str(e) # ENDPOINT TOEGEVOEGD NA CHATGPT @app.get("/containers/stats/stream") async def containers_stats_stream(interval: float = 2.0): """ SSE stream met periodieke container stats. Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast. """ # Guardrails tegen misbruik if interval < 0.5: interval = 0.5 if interval > 30: interval = 30 stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" async def event_gen(): try: while True: # timeout zodat een haperende podman socket niet je stream “bevriest” try: data = SESSION.get(stats_url, timeout=5).json() except Exception as e: data = {"Error": str(e), "Stats": []} payload = { "ts": int(__import__("time").time()), "data": data, } yield "event: stats\n" yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n" await asyncio.sleep(interval) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # helpt bij proxies } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)