import os import sys import subprocess from app_images import init_images_router from app_files import init_files_router from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel import requests_unixsocket import uvicorn import asyncio import json from pathlib import Path from fastapi.responses import StreamingResponse app = FastAPI(title="Podman MVP Control Plane", root_path="/api") SESSION = requests_unixsocket.Session() PODMAN_API_BASE = "http+unix://%2Frun%2Fuser%2F1000%2Fpodman%2Fpodman.sock/v5.4.2" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WORKLOADS_DIR = "/app/workloads" # --- STATS CACHE (contract-neutral; in-memory) --- # Poll Podman stats centrally and expose as optional dashboard fields. _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None} _STATS_CACHE_TS = None _STATS_POLLER_TASK = None def _norm_container_name(name) -> str: try: return str(name or "").lstrip("/") except Exception: return "" def _parse_stats_interval_seconds() -> float: raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0") try: v = float(raw) except Exception: v = 1.0 if v <= 0: v = 1.0 if v < 0.5: v = 0.5 if v > 30: v = 30 return v async def _stats_poller_loop(): global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS interval = _parse_stats_interval_seconds() stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" def _to_float(x): try: return float(x) except Exception: return None while True: try: data = SESSION.get(stats_url, timeout=5).json() stats_list = data.get("Stats") if isinstance(data, dict) else None if not isinstance(stats_list, list): stats_list = [] new_cache = {} for st in stats_list: if not isinstance(st, dict): continue key = _norm_container_name(st.get("Name")) if not key: continue # CPUPerc returned by Podman is already percentage (0.10 == 0.10%) cpu_val = st.get("CPUPerc") if cpu_val is None: cpu_val = st.get("CPU") if cpu_val is None: cpu_val = st.get("AvgCPU") new_cache[key] = { "cpu": _to_float(cpu_val), "mem_usage": _to_float(st.get("MemUsage")), "mem_perc": _to_float(st.get("MemPerc")), } _STATS_CACHE_BY_NAME = new_cache _STATS_CACHE_TS = int(__import__("time").time()) except Exception: # Keep last good cache; try again next tick. pass await asyncio.sleep(interval) @app.on_event("startup") async def _startup_stats_poller(): global _STATS_POLLER_TASK if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done(): return _STATS_POLLER_TASK = asyncio.create_task(_stats_poller_loop()) # --- ROUTERS --- # Images API lives in a dedicated module to keep this file from growing further. app.include_router(init_images_router(SESSION, PODMAN_API_BASE)) app.include_router(init_files_router(SESSION, PODMAN_API_BASE, WORKLOADS_DIR)) # --- ADAPTERS (contract-neutral helpers) --- # Centralize Podman socket and systemctl invocation. # MUST NOT change endpoint outputs, status codes, or side-effects. def _podman_get_json(url: str): return SESSION.get(url).json() def _podman_get_text(url: str) -> str: return SESSION.get(url).text def _podman_post(url: str, **kwargs): return SESSION.post(url, **kwargs) def _podman_action_post(kind: str, name: str, action: str): if kind == "pods": url = f"{PODMAN_API_BASE}/libpod/pods/{name}/{action}" else: url = f"{PODMAN_API_BASE}/libpod/containers/{name}/{action}" return _podman_post(url) def _podman_delete(url: str): return SESSION.delete(url) def _systemctl(cmd): # Proxy to existing run() to avoid behavioral changes. return run(cmd) def _run_systemctl_action(action: str, unit: str): cmd = ["systemctl", "--user", action, unit] return _systemctl(cmd) @app.get("/health") def health(): podman_ok = False try: r = SESSION.get(f"{PODMAN_API_BASE}/libpod/info", timeout=2) if r.status_code == 200: try: r.json() podman_ok = True except Exception: podman_ok = False except Exception: podman_ok = False systemd_reachable = False try: res = subprocess.run( ["systemctl", "--user", "list-units", "--no-pager", "--no-legend"], capture_output=True, text=True, check=False, timeout=2, ) systemd_reachable = (res.returncode == 0) except Exception: systemd_reachable = False ok = podman_ok and systemd_reachable return {"ok": ok, "podman": {"ok": podman_ok}, "systemd_user": {"reachable": systemd_reachable}} # --- PODS / CONTAINERS --- @app.get("/pods") def list_pods(): # Cruciaal: ?all=true zorgt dat EXIT_STATE pods ook getoond worden url = f"{PODMAN_API_BASE}/libpod/pods/json?all=true" return _podman_get_json(url) @app.post("/actions/{action}/{name}") def take_action(action: str, name: str): # Legacy endpoint (keep behavior) possible_names = [name, f"pod{name}", f"pod-{name}"] if action == "start": # STAP 1: Probeer direct de pod te starten (de 'Cockpit' methode) for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/start") if res.status_code in (200, 204): return {"status": "started", "target": target, "method": "direct"} # STAP 2: Als direct starten faalt, probeer dan YAML opnieuw te deployen target_path = None for ext in (".yaml", ".yml"): cand = os.path.join(WORKLOADS_DIR, f"{name}{ext}") if os.path.exists(cand): target_path = cand break if target_path: with open(target_path, 'r') as file: yaml_content = file.read() res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) # SPECIALE CASE: Pod bestaat al, forceer dan restart if res.status_code == 500 and "already exists" in res.text: print(f"DEBUG: Forceer herstart voor {name} wegens conflict") for target in possible_names: _podman_delete(f"{PODMAN_API_BASE}/libpod/pods/{target}?force=true") # Probeer het nu opnieuw retry_res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content) return retry_res.json() return res.json() return {"status": "unknown", "method": "no_yaml_found"} if action == "stop": for target in possible_names: res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/stop") if res.status_code in (200, 204): return {"status": "stopped", "target": target} return {"status": "not found"} return {"status": "unknown"} # --- DASHBOARD HELPERS (contract-neutral, no ordering/sorting changes) --- def _build_pod_to_containers_map(containers: list): # preserves original order of containers processing; no sorting added pod_to_containers = {} for c in containers: pod_name = c.get("PodName") or "" if pod_name: pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0]) return pod_to_containers def _map_pod_to_unit(podname: str) -> str | None: """ HOTFIX 3.1 FIX 1: If podname starts with "pod", map to .service (e.g. podmediaserver -> mediaserver.service) Else: .service """ if not podname: return None if podname.startswith("pod"): return f"{podname[3:]}.service" return f"{podname}.service" def _append_podman_pods_dashboard_rows(dashboard: list, api_pods: list, pod_to_containers: dict): # preserves original api_pods iteration order for p in api_pods: name = p.get("Name") status = p.get("Status", "unknown") unit = _map_pod_to_unit(name) if name else "" dashboard.append({ "Name": name, "Status": status, "Containers": pod_to_containers.get(name, []), "Unit": unit, "Source": "podman", }) def _append_defined_pods_dashboard_rows(dashboard: list, by_name: dict, root_dir: str): # preserves original os.walk order and file iteration order for root, _, files in os.walk(root_dir): for f in files: if f.endswith((".yaml", ".yml")): base = os.path.splitext(os.path.basename(f))[0] pod_name = f"pod{base}" unit_name = _map_pod_to_unit(pod_name) if pod_name not in by_name: code, out = _systemctl(["systemctl", "--user", "is-active", unit_name]) status = (out or "").strip() or ("active" if code == 0 else "inactive") dashboard.append({ "Name": pod_name, "Status": status, "Containers": [], "Unit": unit_name, "Source": "systemd", }) def _ensure_container_status_field(container: dict): # keep exact existing defaulting behavior if "Status" not in container: container["Status"] = container.get("State", "") def _make_defined_container_dashboard_row(name: str, relpath: str): # keep exact key set and default values as before return { "Names": [name], "Image": "", "State": "", "Status": "", "Ports": [], "PodName": "", "_dashboard_source": "systemd", "_dashboard_unit": f"{name}.service", "_dashboard_def_path": relpath, "_dashboard_cpu": None, "_dashboard_mem_usage": None, "_dashboard_mem_perc": None, } def _legacy_dashboard_item_from_container(c: dict): # Keep exact keys & defaults as before return { "name": (c.get("Names") or ["?"])[0], "status": c.get("Status") or c.get("State") or "", "path": "", "ip": "", "containers": [], } @app.get("/pods-dashboard") def pods_dashboard(): dashboard = [] # 0) Bouw mapping: pod_name -> [container_names...] containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") pod_to_containers = _build_pod_to_containers_map(containers) # 1) A) echte pods api_pods = _podman_get_json(f"{PODMAN_API_BASE}/libpod/pods/json?all=true") by_name = {p.get("Name"): p for p in api_pods} _append_podman_pods_dashboard_rows(dashboard, api_pods, pod_to_containers) # 1) B) defined pods via workloads scan # Based on YAML files in WORKLOADS_DIR; show even if not running. _append_defined_pods_dashboard_rows(dashboard, by_name, WORKLOADS_DIR) return dashboard def _systemd_then_podman(systemd_callable, podman_callable): systemd_res = systemd_callable() if systemd_res is not None: if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0: return systemd_res return podman_callable(systemd_res) return podman_callable(None) def try_systemd_pod_action(action: str, podname: str): # If systemd unit exists/allowed, prefer it. unit = _map_pod_to_unit(podname) if not unit: return None code, out = _systemctl(["systemctl", "--user", action, unit]) return { "method": "systemd", "pod": podname, "unit": unit, "cmd": f"systemctl --user {action} {unit}", "exit": code, "output": out, } @app.post("/pods/actions/{action}/{podname}") def pod_action_prefer_systemd(action: str, podname: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 def _systemd_call(): return try_systemd_pod_action(action, podname) def _podman_call(systemd_res): if systemd_res: note = "systemd failed; falling back to podman" podman = _podman_action_post("pods", podname, action).json() return {"method": "systemd_then_podman", "note": note, "systemd": systemd_res, "podman": podman} return {"method": "podman", "result": _podman_action_post("pods", podname, action).json()} return _systemd_then_podman(_systemd_call, _podman_call) def find_defined_containers(): defined = {} for root, _, files in os.walk(os.path.join(WORKLOADS_DIR, "systemd")): for f in files: if f.endswith(".container"): name = os.path.splitext(f)[0] full = os.path.join(root, f) rel = os.path.relpath(full, WORKLOADS_DIR) defined[name] = rel return defined def _extract_published_ports(container: dict) -> list[str]: """ Normalize Podman API Ports into a stable display list: - "127.0.0.1:8080:8000/tcp" - "8080:8000/tcp" (if no host ip) """ out: list[str] = [] for p in (container.get("Ports") or []): host_ip = p.get("host_ip") or p.get("HostIp") or "" host_port = p.get("host_port") or p.get("HostPort") cont_port = p.get("container_port") or p.get("ContainerPort") proto = p.get("protocol") or p.get("Protocol") or "" if host_port is None or cont_port is None: continue s = "" if host_ip: s += f"{host_ip}:" s += f"{host_port}:{cont_port}" if proto: s += f"/{proto}" out.append(s) return out @app.get("/containers-dashboard") def containers_dashboard(): dashboard = [] defined = find_defined_containers() # Cache zodat we niet voor elke container opnieuw systemctl doen unit_active_cache = {} stats_by_name = _STATS_CACHE_BY_NAME def _unit_is_active(unit): if not unit: return False if unit in unit_active_cache: return unit_active_cache[unit] code, out = _systemctl(["systemctl", "--user", "is-active", unit]) ok = (code == 0) or ((out or "").strip() == "active") unit_active_cache[unit] = ok return ok # A) echte containers (runtime) real = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") for c in real: _ensure_container_status_field(c) # Published ports: behoud jouw hotfix c["_dashboard_published_ports"] = _extract_published_ports(c) # Normaliseer naam: Podman kan "/name" geven rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/") # Optional live stats (always present; null on miss) c["_dashboard_cpu"] = None c["_dashboard_mem_usage"] = None c["_dashboard_mem_perc"] = None st = stats_by_name.get(rname) if isinstance(st, dict): c["_dashboard_cpu"] = st.get("cpu") c["_dashboard_mem_usage"] = st.get("mem_usage") c["_dashboard_mem_perc"] = st.get("mem_perc") # 1) Managed: systemd als er een .container definitie bestaat if rname in defined: c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = f"{rname}.service" c["_dashboard_def_path"] = defined[rname] else: # 2) Extra: zit container in een pod die via systemd (kube/quadlet) draait? podname = (c.get("PodName") or "").strip() pod_unit = _map_pod_to_unit(podname) if podname else None if pod_unit and _unit_is_active(pod_unit): c["_dashboard_source"] = "systemd" c["_dashboard_unit"] = pod_unit # geen _dashboard_def_path, want dit is geen .container definitie else: c["_dashboard_source"] = "podman" dashboard.append(c) # B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures) runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real) # C) defined containers from systemd/*.container (skip duplicates) for name, relpath in defined.items(): if name in runtime_names: continue row = _make_defined_container_dashboard_row(name, relpath) code, out = _systemctl(["systemctl", "--user", "is-active", f"{name}.service"]) row["Status"] = (out or "").strip() dashboard.append(row) return dashboard @app.get("/containers") def list_containers(): # Ook hier ?all=true voor gestopte containers url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true" return _podman_get_json(url) @app.post("/containers/{action}/{name}") def container_action(action: str, name: str): if action not in ("start", "stop", "restart"): return {"error": "Invalid action"}, 400 defined = find_defined_containers() _sys = {"code": None, "out": None} def _systemd_call(): if name in defined: code, out = _systemctl(["systemctl", "--user", action, name]) _sys["code"] = code _sys["out"] = out if code == 0: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": code, "output": out, } return {"exit": code, "output": out} return None def _podman_call(systemd_res): res = _podman_action_post("containers", name, action) if res.status_code in (200, 204): return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} if res.status_code >= 400: return { "method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code, "error": getattr(res, "text", "") or "", }, res.status_code if name in defined: return { "method": "systemd", "name": name, "unit": f"{name}.service", "definition": defined[name], "cmd": f"systemctl --user {action} {name}", "exit": _sys["code"], "output": _sys["out"], } return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code} return _systemd_then_podman(_systemd_call, _podman_call) @app.get("/debug/defined-containers") def debug_defined_containers(): return find_defined_containers() @app.get("/dashboard") def get_dashboard(): # Legacy dashboard view (keep shape) try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except: api_containers = [] items = [] for c in api_containers: items.append(_legacy_dashboard_item_from_container(c)) return items @app.get("/test-hybrid") def test_hybrid(): # 1. Check filesystem try: bestanden = [] for root, _, files in os.walk(WORKLOADS_DIR): for f in files: bestanden.append(os.path.join(root, f)) except Exception as e: bestanden = f"FS Fout: {str(e)}" # 2. Check Podman API try: api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true") except Exception as e: api_containers = f"API Fout: {str(e)}" return { "bestanden_gevonden": bestanden if isinstance(bestanden, list) else [], "api_containers_aantal": len(api_containers) if isinstance(api_containers, list) else -1, "api_raw_sample": api_containers[0] if isinstance(api_containers, list) and api_containers else api_containers, } @app.get("/containers/logs/{name}") def get_container_logs(name: str): # We vragen de laatste 100 regels op (tail=100) txt = _podman_get_text(f"{PODMAN_API_BASE}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100") # Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst return {"logs": txt} @app.get("/containers/inspect/{name}") def inspect_container(name: str): return _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/{name}/json") @app.post("/daemon-reload") def api_daemon_reload(): try: code, out = _systemctl(["systemctl", "--user", "daemon-reload"]) return { "cmd": "systemctl --user daemon-reload", "exit": code, "output": out, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/{action}/{unit}") def api_action(action: str, unit: str): if action not in ("status", "start", "stop", "restart"): raise HTTPException(status_code=400, detail="Invalid action") cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} @app.post("/api//") def legacy_api_action(action: str, unit: str): # legacy flask-like path; keep behavior (even if not used by index.html) if action not in ("status", "start", "stop", "restart"): return {"error": "Invalid action"}, 400 cmd = ["systemctl", "--user", action, unit] code, out = _run_systemctl_action(action, unit) return {"cmd": " ".join(cmd), "exit": code, "output": out} def run(cmd): try: result = subprocess.run(cmd, capture_output=True, text=True, check=False) output = (result.stdout or "") + (result.stderr or "") return result.returncode, output.strip() except Exception as e: return 1, str(e) # ENDPOINT TOEGEVOEGD NA CHATGPT @app.get("/containers/stats/stream") async def containers_stats_stream(interval: float = 2.0): """ SSE stream met periodieke container stats. Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast. """ # Guardrails tegen misbruik if interval < 0.5: interval = 0.5 if interval > 30: interval = 30 stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false" async def event_gen(): try: while True: # timeout zodat een haperende podman socket niet je stream “bevriest” try: data = SESSION.get(stats_url, timeout=5).json() except Exception as e: data = {"Error": str(e), "Stats": []} payload = { "ts": int(__import__("time").time()), "data": data, } yield "event: stats\n" yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n" await asyncio.sleep(interval) except asyncio.CancelledError: return headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # helpt bij proxies } return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) # --- PODMAN NETWORKS (nieuw) --- def _podman_get_json_checked(url: str): r = SESSION.get(url) if r.status_code >= 400: raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}") try: return r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}") def _deep_get(d, path, default=None): cur = d for key in path: if not isinstance(cur, dict) or key not in cur: return default cur = cur[key] return cur @app.get("/networks") def list_networks(): # Libpod: /libpod/networks/json url = f"{PODMAN_API_BASE}/libpod/networks/json" return {"networks": _podman_get_json_checked(url)} @app.get("/networks/meta") def networks_meta(): candidates = [ f"{PODMAN_API_BASE}/libpod/info", f"{PODMAN_API_BASE}/libpod/info/json", f"{PODMAN_API_BASE}/info", f"{PODMAN_API_BASE}/info/json", f"{PODMAN_API_BASE}/libpod/system/info", f"{PODMAN_API_BASE}/libpod/system/info/json", ] last_err = None info = None used = None for url in candidates: r = SESSION.get(url) if r.status_code == 200: used = url try: info = r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}") break last_err = f"{r.status_code}: {r.text}" if info is None: raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}") network_backend = ( _deep_get(info, ["host", "networkBackend"]) or _deep_get(info, ["Host", "NetworkBackend"]) or _deep_get(info, ["host", "network", "backend"]) or _deep_get(info, ["Host", "Network", "Backend"]) ) rootless = ( _deep_get(info, ["host", "rootless"]) or _deep_get(info, ["Host", "Rootless"]) or _deep_get(info, ["host", "security", "rootless"]) or _deep_get(info, ["Host", "Security", "Rootless"]) ) if not isinstance(rootless, bool): rootless = None return { "networkBackend": network_backend, "rootless": rootless, "infoEndpoint": used, } @app.get("/networks/usage") def networks_usage(): """ Bouwt mapping netwerk -> containers/pods, en container -> netwerken. Werkt betrouwbaar ook als network inspect geen containers toont. """ # 1) containers list (all=true) url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true" containers = _podman_get_json_checked(url) or [] by_network: dict[str, dict] = {} by_container: dict[str, list[str]] = {} by_container_meta: dict[str, dict] = {} def _norm_name(c: dict) -> str: # Podman kan Names (list) of Name (string) geven. n = c.get("Name") if isinstance(n, str) and n: return n names = c.get("Names") if isinstance(names, list) and names: # vaak begint dit met "/name" nm = str(names[0]).lstrip("/") return nm # fallback id cid = c.get("Id") or c.get("id") or "" return cid[:12] if cid else "(unknown)" def _norm_id(c: dict) -> str: return c.get("Id") or c.get("id") or "" def _pod_name(c: dict) -> str | None: # Verschilt per output; we proberen een paar logische keys for k in ("PodName", "pod", "Pod", "PodID", "PodId"): v = c.get(k) if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks return v return None def _extract_networks_from_summary(c: dict) -> list[str] | None: # Mogelijke structuren in list output nets = c.get("Networks") if isinstance(nets, dict): return list(nets.keys()) if isinstance(nets, list): return [str(x) for x in nets if x] ns = c.get("NetworkSettings") if isinstance(ns, dict): nets2 = ns.get("Networks") if isinstance(nets2, dict): return list(nets2.keys()) # Sommige builds hebben NetworkNames nn = c.get("NetworkNames") if isinstance(nn, list): return [str(x) for x in nn if x] return None def _extract_networks_from_inspect_obj(insp: dict) -> list[str]: """ Probeert netwerk-namen uit een container inspect te halen. Ondersteunt varianten/casing die per Podman/driver kunnen verschillen. """ if not isinstance(insp, dict): return [] candidates = [] # 1) meest voorkomend ns = insp.get("NetworkSettings") if isinstance(ns, dict): candidates.append(ns.get("Networks")) candidates.append(ns.get("networks")) # 2) sommige outputs hebben Networks top-level candidates.append(insp.get("Networks")) candidates.append(insp.get("networks")) # 3) extra varianten n2 = insp.get("networkSettings") if isinstance(n2, dict): candidates.append(n2.get("Networks")) candidates.append(n2.get("networks")) n3 = insp.get("Network") if isinstance(n3, dict): candidates.append(n3.get("Networks")) candidates.append(n3.get("networks")) cfg = insp.get("Config") if isinstance(cfg, dict): candidates.append(cfg.get("Networks")) candidates.append(cfg.get("networks")) # Normaliseer candidates naar lijst[str] out: list[str] = [] for val in candidates: if isinstance(val, dict): out.extend([str(k) for k in val.keys() if k]) elif isinstance(val, list): for x in val: if isinstance(x, str) and x: out.append(x) elif isinstance(x, dict): # Best-effort: soms bevat list entries met Name nm = x.get("Name") or x.get("name") if isinstance(nm, str) and nm: out.append(nm) # uniq + stable sort return sorted(set([n for n in out if isinstance(n, str) and n])) def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]: """ Returns: (networks, extra_info) extra_info kan bv. networkMode/containerOwner bevatten. """ if not cid: return ([], {}) insp = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{cid}/json") extra: dict = {} # 1) normale inspect: probeer meerdere paden nets0 = _extract_networks_from_inspect_obj(insp) if nets0: return (nets0, extra) # 2) container network namespace mode: HostConfig.NetworkMode = "container:" hc = insp.get("HostConfig") if isinstance(insp, dict) else None if isinstance(hc, dict): nm = hc.get("NetworkMode") if isinstance(nm, str) and nm.startswith("container:"): owner_id = nm.split("container:", 1)[1] extra["networkMode"] = nm extra["networkOwnerId"] = owner_id # Inspect owner container en pak diens netwerken owner = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_id}/json") # 1) netwerken van owner vinden (meerdere varianten) owner_nets_list = _extract_networks_from_inspect_obj(owner) # 2) owner naam vinden (meerdere varianten) owner_name = None # meest voorkomend bij inspect if isinstance(owner.get("Name"), str) and owner.get("Name"): owner_name = owner.get("Name") # fallback: soms staat het in Config.Name if not owner_name: cfg = owner.get("Config") or {} if isinstance(cfg.get("Name"), str) and cfg.get("Name"): owner_name = cfg.get("Name") # fallback: soms in ContainerConfig if not owner_name: ccfg = owner.get("ContainerConfig") or {} if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"): owner_name = ccfg.get("Name") # fallback: als niets werkt, toon korte id if not owner_name: owner_name = owner_id[:12] extra["networkOwnerName"] = str(owner_name).lstrip("/") # 3) netwerken returnen (als we ze gevonden hebben) if owner_nets_list: return (owner_nets_list, extra) # Extra fallback: probeer inspect via ownerName (soms werkt naam beter dan id) try: owner_name_for_lookup = extra.get("networkOwnerName") if owner_name_for_lookup and owner_name_for_lookup != owner_id: owner2 = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_name_for_lookup}/json") owner2_nets = _extract_networks_from_inspect_obj(owner2) if owner2_nets: return (owner2_nets, extra) except Exception: pass # Owner fallback: sommige infra containers gebruiken pasta/host/none try: ohc = owner.get("HostConfig") if isinstance(owner, dict) else None if isinstance(ohc, dict): onm = ohc.get("NetworkMode") if isinstance(onm, str) and onm in ("pasta", "host", "none"): # owner gebruikt geen Podman-netwerk; behandel als pseudo-netwerk return ([onm], extra) except Exception: pass return ([], extra) # 3) Special networking modes: pasta/host/none # In deze modes bestaat vaak geen NetworkSettings.Networks map. if isinstance(hc, dict): nm2 = hc.get("NetworkMode") if isinstance(nm2, str) and nm2 in ("pasta", "host", "none"): extra["networkMode"] = nm2 return ([nm2], extra) return ([], extra) # 2) Loop containers: verzamel netwerken for c in containers: if not isinstance(c, dict): continue cid = _norm_id(c) cname = _norm_name(c) pod = _pod_name(c) nets = _extract_networks_from_summary(c) extra = {} if not nets: nets, extra = _extract_networks_from_inspect(cid) by_container_meta[cname] = extra nets = [n for n in (nets or []) if isinstance(n, str) and n] # byContainer blijft lijst (contract simpel houden) by_container[cname] = sorted(set(nets)) for n in nets: slot = by_network.setdefault(n, {"containers": [], "pods": []}) slot["containers"].append({ "id": cid, "name": cname, "pod": pod, **extra, # voegt networkMode/owner info toe indien van toepassing }) # 3) Pods afleiden (lightweight) via containers for n, slot in by_network.items(): pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")}) slot["pods"] = [{"name": p} for p in pods] # --- FALLBACK: derive owner networks via network-inspect (works for pod infra/shared netns) --- # We look at shared netns containers (networkMode=container:...) and map their owner-id to networks owner_ids: set[str] = set() owner_names: dict[str, str] = {} # ownerId -> ownerName for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner_id = (meta or {}).get("networkOwnerId") owner_name = (meta or {}).get("networkOwnerName") if isinstance(owner_id, str) and owner_id: owner_ids.add(owner_id) if isinstance(owner_name, str) and owner_name: owner_names[owner_id] = owner_name def _collect_container_ids_from_network_inspect(net_inspect) -> set[str]: """ Key-agnostic: scan alle strings in network inspect en verzamel hex IDs (12..64 chars). Dit is robuust tegen schema-verschillen tussen netavark/cni/podman versies. """ ids: set[str] = set() def looks_like_hex_id(s: str) -> bool: if not isinstance(s, str): return False s = s.strip() if len(s) < 12 or len(s) > 64: return False # alleen hex chars for ch in s: if ch not in "0123456789abcdef": return False return True def walk(obj): if obj is None: return if isinstance(obj, str): # soms staat id als "container:" if obj.startswith("container:"): cand = obj.split("container:", 1)[1] if looks_like_hex_id(cand): ids.add(cand) elif looks_like_hex_id(obj): ids.add(obj) return if isinstance(obj, dict): for k, v in obj.items(): # keys kunnen ook ids zijn if isinstance(k, str) and looks_like_hex_id(k): ids.add(k) walk(v) return if isinstance(obj, list): for it in obj: walk(it) return walk(net_inspect) return ids owner_networks_by_id: dict[str, set[str]] = {oid: set() for oid in owner_ids} # List networks try: nets_list = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/json") except Exception: nets_list = [] net_names: list[str] = [] if isinstance(nets_list, list): for n in nets_list: if isinstance(n, dict): nm = n.get("name") or n.get("Name") if isinstance(nm, str) and nm: net_names.append(nm) # Inspect each network and see if it contains any owner_id for net_name in sorted(set(net_names)): try: net_inspect = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/{net_name}/json") except Exception: continue attached_ids = _collect_container_ids_from_network_inspect(net_inspect) if not attached_ids: continue for oid in owner_ids: short = oid[:12] for aid in attached_ids: if not isinstance(aid, str) or not aid: continue # match exact / short / prefix if aid == oid or aid == short or oid.startswith(aid) or aid.startswith(short): owner_networks_by_id.setdefault(oid, set()).add(net_name) break # Apply: if shared container or owner container has empty by_container[], fill it with owner's networks for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner_id = (meta or {}).get("networkOwnerId") if not (isinstance(owner_id, str) and owner_id): continue owner_nets = sorted(owner_networks_by_id.get(owner_id, set())) if not owner_nets: continue # 1) fill owner-name entry (if known) owner_name = (meta or {}).get("networkOwnerName") or owner_names.get(owner_id) if isinstance(owner_name, str) and owner_name and not by_container.get(owner_name): by_container[owner_name] = owner_nets # 2) fill shared container entry if not by_container.get(cname): by_container[cname] = owner_nets # --- FINALIZE: derive by_container from by_network (robust for pods/shared netns) --- by_container_derived: dict[str, list[str]] = {} for net_name, info in (by_network or {}).items(): containers2 = (info or {}).get("containers") or [] for c2 in containers2: if not isinstance(c2, dict): continue cname2 = c2.get("name") or c2.get("Name") if not cname2: continue by_container_derived.setdefault(cname2, []).append(net_name) # dedupe + stable sort for k, v in by_container_derived.items(): by_container_derived[k] = sorted(set(v)) # merge: vul lege items in by_container, maar breek niks for k, v in by_container_derived.items(): if not by_container.get(k): by_container[k] = v # --- shared netns: shared containers erven owner-netwerken (als owner bekend is) --- for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner = (meta or {}).get("networkOwnerName") or (meta or {}).get("networkOwnerId") if owner and by_container.get(owner) and not by_container.get(cname): by_container[cname] = by_container[owner] return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta} @app.get("/networks/{name}") def inspect_network(name: str): url1 = f"{PODMAN_API_BASE}/libpod/networks/{name}/json" r = SESSION.get(url1) if r.status_code == 200: return _podman_get_json_checked(url1) url2 = f"{PODMAN_API_BASE}/libpod/network/{name}/json" return _podman_get_json_checked(url2) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)