import os import subprocess from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel import requests_unixsocket import uvicorn import asyncio import json from pathlib import Path from fastapi.responses import StreamingResponse app = FastAPI(title="Podman MVP Control Plane", root_path="/api") SESSION = requests_unixsocket.Session() PODMAN_API_BASE = "http+unix://%2Frun%2Fuser%2F1000%2Fpodman%2Fpodman.sock/v5.4.2" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) ALLOWLIST_FILE = os.getenv("ALLOWLIST_FILE", os.path.join(BASE_DIR, "allowed_units.txt")) WORKLOADS_DIR = "/app/workloads" # --- ADAPTERS (contract-neutral helpers) --- # Centralize Podman socket and systemctl invocation. # MUST NOT change endpoint outputs, status codes, or side-effects. def _podman_get_json(url: str): return SESSION.get(url).json() def _podman_get_text(url: str) -> str: return SESSION.get(url).text def _podman_post(url: str, **kwargs): return SESSION.post(url, **kwargs) def _podman_action_post(kind: str, name: str, action: str): if kind == "pods": url = f"{PODMAN_API_BASE}/libpod/pods/{name}/{action}" else: url = f"{PODMAN_API_BASE}/libpod/containers/{name}/{action}" return _podman_post(url) def _podman_delete(url: str): return SESSION.delete(url) def _systemctl(cmd): # Proxy to existing run() to avoid behavioral changes. return run(cmd) def _run_systemctl_action(action: str, unit: str): cmd = ["systemctl", "--user", action, unit] return _systemctl(cmd) # --- MODELS --- class FileContent(BaseModel): content: str # --- WORKLOADS --- @app.get("/workloads") def list_workloads(): workloads = [] for root, _, files in os.walk(WORKLOADS_DIR): for f in files: if f.endswith((".yaml", ".yml", ".json")): full = os.path.join(root, f) rel = os.path.relpath(full, WORKLOADS_DIR) workloads.append(rel) return {"workloads": workloads} @app.get("/workloads/read/{filename:path}") def read_workload(filename: str): path = os.path.join(WORKLOADS_DIR, filename) if not os.path.exists(path): raise HTTPException(404) with open(path, 'r') as f: content = f.read() return {"filename": filename, "content": content} @app.post("/workloads/save-file") def save_workload_file(data: dict): path = data.get("path") content = data.get("content") full_path = os.path.join(WORKLOADS_DIR, path) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w") as f: f.write(content) return {"status": "success"} @app.post("/workloads/deploy/{filename:path}") def deploy_workload(filename: str): path = os.path.join(WORKLOADS_DIR, filename) with open(path, 'r') as f: yaml_content = f.read() url = f"{PODMAN_API_BASE}/libpod/kube/play" return _podman_post(url, data=yaml_content).json() # --- FILE RESTRICTIONS --- def safe_join(base, path): # prevent traversal base = os.path.abspath(base) final = os.path.abspath(os.path.join(base, path)) if not final.startswith(base): raise HTTPException(status_code=403, detail="Forbidden path") return final # STEP 4: Centralize WORKLOADS_DIR subtree enforcement via one helper. # MUST be behavior-identical to previous safe_join(WORKLOADS_DIR, ...) calls. def _files_safe_join(path: str) -> str: return safe_join(WORKLOADS_DIR, path) # --- FILES API --- @app.get("/files/tree") def file_tree(): root = WORKLOADS_DIR result = [] for dirpath, dirnames, filenames in os.walk(root): rel = os.path.relpath(dirpath, root) if rel == ".": rel = "" result.append({ "path": rel, "dirs": sorted(dirnames), "files": sorted(filenames), }) return result @app.get("/files/read") def file_read(path: str = Query(...)): full = _files_safe_join(path) if not os.path.exists(full): raise HTTPException(status_code=404, detail="Not found") if os.path.isdir(full): raise HTTPException(status_code=403, detail="Is a directory") with open(full, "r") as f: content = f.read() return {"content": content} @app.post("/files/save") def file_save(path: str = Query(...), data: FileContent = None): full = _files_safe_join(path) os.makedirs(os.path.dirname(full), exist_ok=True) with open(full, "w") as f: f.write(data.content) return {"status": "success", "path": path} @app.delete("/files/delete") def file_delete(path: str = Query(...)): full = _files_safe_join(path) if not os.path.exists(full): raise HTTPException(status_code=404, detail="Not found") if os.path.isdir(full): raise HTTPException(status_code=400, detail="Kan niet verwijderen: is directory") try: os.remove(full) except Exception as e: raise HTTPException(status_code=400, detail=f"Kan niet verwijderen: {e}") return {"status": "deleted", "type": "file"} @app.post("/files/mkdir") def file_mkdir(path: str = Query(...)): # UI expects operations under systemd/; enforce prefix if absent. if not path.startswith("systemd"): path = os.path.join("systemd", path) full = _files_safe_join(path) os.makedirs(full, exist_ok=True) return {"status": "directory created", "path": path} @app.delete("/files/rmdir") def file_rmdir(path: str = Query(..., description="Directory path under systemd/")): # Only allow deletion under systemd subtree if not path or path == "systemd" or path == "systemd/": raise HTTPException(status_code=400, detail="Refusing to delete systemd root") if not path.startswith("systemd/") and path != "systemd": raise HTTPException(status_code=400, detail="Only systemd subtree is allowed") full = _files_safe_join(path) if not os.path.exists(full): raise HTTPException(status_code=404, detail="Directory not found") if not os.path.isdir(full): raise HTTPException(status_code=400, detail="Path is not a directory") # directory must be empty try: Path(full).rmdir() except OSError: # not empty # build a stable detail payload try: dirs = [] files = [] for entry in os.listdir(full): p = os.path.join(full, entry) if os.path.isdir(p): dirs.append(entry) else: files.append(entry) except Exception: dirs, files = [], [] raise HTTPException(status_code=409, detail={ "error": "directory not empty", "dirs": sorted(dirs), "files": sorted(files), }) return {"deleted": True, "path": path} # --- PODS / CONTAINERS --- @app.get("/pods") def list_pods(): # Cruciaal: ?all=true zorgt dat EXIT_STATE pods ook getoond worden url = f"{PODMAN_API_BASE}/libpod/pods/json?all=true" return _podman_get_json(url) @app.post("/actions/{action}/{name}") def take_action(action: str, name: str): # Legacy endpoint (keep behavior) possible_names = [name, f"pod{name}", f"pod-{name}"] if action == "start": # STAP 1: Probeer direct de pod te starten (de 'Cockpit' methode) for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/start") if res.status_code in (200, 204): return {"status": "started", "target": target, "method": "direct"} # STAP 2: Als direct starten faalt, probeer dan YAML opnieuw te deployen target_path = None for ext in (".yaml", ".yml"): cand = os.path.join(WORKLOADS_DIR, f"{name}{ext}") if os.path.exists(cand): target_path = cand break if target_path: with open(target_path, 'r') as file: yaml_content = file.read() res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) # SPECIALE CASE: Pod bestaat al, forceer dan restart if res.status_code == 500 and "already exists" in res.text: print(f"DEBUG: Forceer herstart voor {name} wegens conflict") for target in possible_names: _podman_delete(f"{PODMAN_API_BASE}/libpod/pods/{target}?force=true") # Probeer het nu opnieuw retry_res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) return retry_res.json() return res.json() return {"status": "unknown", "method": "no_yaml_found"} if action == "stop": for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/stop") if res.status_code in (200, 204): return {"status": "stopped", "target": target} return {"status": "not found"} return {"status": "unknown"} # --- DASHBOARD HELPERS (contract-neutral, no ordering/sorting changes) --- def _build_pod_to_containers_map(containers: list): # preserves original order of containers processing; no sorting added pod_to_containers = {} for c in containers: pod_name = c.get("PodName") or "" if pod_name: pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0]) return pod_to_containers def _map_pod_to_unit(podname: str) -> str | None: """ HOTFIX 3.1 FIX 1: If podname starts with "pod", map to .service (e.g. podmediaserver -> mediaserver.service) Else: .service """ if not podname: return None if podname.startswith("pod"): return f"{podname[3:]}.service" return f"{podname}.service" def _append_podman_pods_dashboard_rows(dashboard: list, api_pods: list, pod_to_containers: dict): # preserves original api_pods iteration order for p in api_pods: name = p.get("Name") status = p.get("Status", "unknown") unit = _map_pod_to_unit(name) if name else "" dashboard.append({ "Name": name, "Status": status, "Containers": pod_to_containers.get(name, []), "Unit": unit, "Source": "podman", }) def _append_defined_pods_dashboard_rows(dashboard: list, by_name: dict, root_dir: str): # preserves original os.walk order and file iteration order for root, _, files in os.walk(root_dir): for f in files: if f.endswith((".yaml", ".yml")): base = os.path.splitext(os.path.basename(f))[0] pod_name = f"pod{base}" unit_name = _map_pod_to_unit(pod_name) if pod_name not in by_name: code, out = _systemctl(["systemctl", "--user", "is-active", unit_name]) status = (out or "").strip() or ("active" if code == 0 else "inactive") dashboard.append({ "Name": pod_name, "Status": status, "Containers": [], "Unit": unit_name, "Source": "systemd", }) def _ensure_container_status_field(container: dict): # keep exact existing defaulting behavior if "Status" not in container: container["Status"] = container.get("State", "") def _make_defined_container_dashboard_row(name: str, relpath: str): # keep exact key set and default values as before return { "Names": [name], "Image": "", "State": "", "Status": "", "Ports": [], "PodName": "", "_dashboard_source": "systemd", "_dashboard_unit": f"{name}.service", "_dashboard_def_path": relpath, } def _legacy_dashboard_item_from_container(c: dict): # Keep exact keys & defaults as before return { "name": (c.get("Names") or ["?"])[0], "status": c.get("Status") or c.get("State") or "", "path": "", "ip": "", "containers": [], } @app.get("/pods-dashboard") def pods_dashboard(): dashboard = [] # 0) Bouw mapping: pod_name -> [container_names...] containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") pod_to_containers = _build_pod_to_containers_map(containers) # 1) A) echte pods api_pods = _podman_get_json(f"{PODMAN_API_BASE}/libpod/pods/json?all=true") by_name = {p.get("Name"): p for p in api_pods} _append_podman_pods_dashboard_rows(dashboard, api_pods, pod_to_containers) # 1) B) defined pods via workloads scan # Based on YAML files in WORKLOADS_DIR; show even if not running. _append_defined_pods_dashboard_rows(dashboard, by_name, WORKLOADS_DIR) return dashboard def _systemd_then_podman(systemd_callable, podman_callable): systemd_res = systemd_callable() if systemd_res is not None: if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0: return systemd_res return podman_callable(systemd_res) return podman_callable(None) def try_systemd_pod_action(action: str, podname: str): # If systemd unit exists/allowed, prefer it. unit = _map_pod_to_unit(podname) if not unit: return None code, out = _systemctl(["systemctl", "--user", action, unit]) return { "method": "systemd", "pod": podname, "unit": unit, "cmd": f"systemctl --user {action} {unit}", "exit": code, "output": out, } @app.post("/pods/actions/{action}/{podname}") def pod_action_prefer_systemd(action: str, podname: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 def _systemd_call(): return try_systemd_pod_action(action, podname) def _podman_call(systemd_res): if systemd_res: note = "systemd failed; falling back to podman" podman = _podman_action_post("pods", podname, action).json() return {"method": "systemd_then_podman", "note": note, "systemd": systemd_res, "podman": podman} return {"method": "podman", "result": _podman_action_post("pods", podname, action).json()} return _systemd_then_podman(_systemd_call, _podman_call) def find_defined_containers(): defined = {} for root, _, files in os.walk(os.path.join(WORKLOADS_DIR, "systemd")): for f in files: if f.endswith(".container"): name = os.path.splitext(f)[0] full = os.path.join(root, f) rel = os.path.relpath(full, WORKLOADS_DIR) defined[name] = rel return defined @app.get("/containers-dashboard") def containers_dashboard(): dashboard = [] # A) echte containers (UNCHANGED) real = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") for c in real: _ensure_container_status_field(c) c["_dashboard_source"] = "podman" dashboard.append(c) # B) Dedup set (HOTFIX 3.3) — exact extraction, no sorting runtime_names = set((c.get("Names") or ["?"])[0] for c in real) # C) defined containers from systemd/*.container (skip duplicates) defined = find_defined_containers() for name, relpath in defined.items(): if name in runtime_names: continue row = _make_defined_container_dashboard_row(name, relpath) # fill Status from systemd is-active (existing hotfix 3.1 behavior) code, out = _systemctl(["systemctl", "--user", "is-active", f"{name}.service"]) row["Status"] = (out or "").strip() dashboard.append(row) return dashboard @app.get("/containers") def list_containers(): # Ook hier ?all=true voor gestopte containers url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true" return _podman_get_json(url) @app.post("/containers/{action}/{name}") def container_action(action: str, name: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 defined = find_defined_containers() _sys = {"code": None, "out": None} def _systemd_call(): if name in defined: code, out = _systemctl(["systemctl", "--user", action, name]) _sys["code"] = code _sys["out"] = out if code == 0: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": code, "output": out, } return {"exit": code, "output": out} return None def _podman_call(systemd_res): res = _podman_action_post("containers", name, action) if res.status_code in (200, 204): return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} if res.status_code >= 400: return { "method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code, "error": getattr(res, "text", "") or "", }, res.status_code if name in defined: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": _sys["code"], "output": _sys["out"], } return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} return _systemd_then_podman(_systemd_call, _podman_call) @app.get("/debug/defined-containers") def debug_defined_containers(): return find_defined_containers() @app.get("/dashboard") def get_dashboard(): # Legacy dashboard view (keep shape) try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except: api_containers = [] items = [] for c in api_containers: items.append(_legacy_dashboard_item_from_container(c)) return items @app.get("/test-hybrid") def test_hybrid(): # 1. Check filesystem try: bestanden = [] for root, _, files in os.walk(WORKLOADS_DIR): for f in files: bestanden.append(os.path.join(root, f)) except Exception as e: bestanden = f"FS Fout: {str(e)}" # 2. Check Podman API try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except Exception as e: api_containers = f"API Fout: {str(e)}" return { "bestanden_gevonden": bestanden if isinstance(bestanden, list) else [], "api_containers_aantal": len(api_containers) if isinstance(api_containers, list) else -1, "api_raw_sample": api_containers[0] if isinstance(api_containers, list) and api_containers else api_containers, } @app.get("/containers/logs/{name}") def get_container_logs(name: str): # We vragen de laatste 100 regels op (tail=100) txt = _podman_get_text(f"{PODMAN_API_BASE}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100") # Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst return {"logs": txt} @app.get("/containers/inspect/{name}") def inspect_container(name: str): return _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/{name}/json") # --- SYSTEMD allowlist --- def read_allowlist(): units = [] if os.path.exists(ALLOWLIST_FILE): with open(ALLOWLIST_FILE, "r") as f: for line in f: u = line.strip() if u and u.endswith(".service"): units.append(u) return sorted(set(units)) def list_unit_files(): # fallback (als allowlist leeg is): probeer systemctl list-unit-files code, out = _systemctl(["systemctl", "--user", "list-unit-files", "--type=service", "--no-pager"]) if code != 0: return [] units = [] for line in out.splitlines(): parts = line.split() if parts and parts[0].endswith(".service"): units.append(parts[0]) return sorted(set(units)) def unit_state(unit): # active state _, active = _systemctl(["systemctl", "--user", "is-active", unit]) active = active.splitlines()[0].strip() if active else "unknown" # enabled state (kan falen in container-context) code, enabled_out = _systemctl(["systemctl", "--user", "is-enabled", unit]) enabled = enabled_out.splitlines()[0].strip() if (enabled_out and code == 0) else "unknown" return active, enabled @app.get("/systemd/allowlist") def systemd_allowlist(): units = read_allowlist() allow_mode = len(units) > 0 if not units: units = list_unit_files() return {"allow_mode": allow_mode, "units": units} @app.post("/daemon-reload") def api_daemon_reload(): try: code, out = _systemctl(["systemctl", "--user", "daemon-reload"]) return { "cmd": "systemctl --user daemon-reload", "exit": code, "output": out, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/{action}/{unit}") def api_action(action: str, unit: str): if action not in ("status", "start", "stop", "restart"): raise HTTPException(status_code=400, detail="Invalid action") units = read_allowlist() allow_mode = len(units) > 0 if allow_mode and unit not in units: raise HTTPException(status_code=403, detail="Unit not allowed by allowlist") cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} @app.post("/api//") def legacy_api_action(action: str, unit: str): # legacy flask-like path; keep behavior (even if not used by index.html) if action not in ("status", "start", "stop", "restart"): return {"error": "Invalid action"}, 400 cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} def run(cmd): try: result = subprocess.run(cmd, capture_output=True, text=True, check=False) output = (result.stdout or "") + (result.stderr or "") return result.returncode, output.strip() except Exception as e: return 1, str(e) # ENDPOINT TOEGEVOEGD NA CHATGPT @app.get("/containers/stats/stream") async def containers_stats_stream(interval: float = 2.0): """ SSE stream met periodieke container stats. Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast. """ # Guardrails tegen misbruik if interval < 0.5: interval = 0.5 if interval > 30: interval = 30 stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" async def event_gen(): try: while True: # timeout zodat een haperende podman socket niet je stream “bevriest” try: data = SESSION.get(stats_url, timeout=5).json() except Exception as e: data = {"Error": str(e), "Stats": []} payload = { "ts": int(__import__("time").time()), "data": data, } yield "event: stats\n" yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n" await asyncio.sleep(interval) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # helpt bij proxies } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)