From cab706deb246dbda15f950c8b6c0b8cebf81cb9d Mon Sep 17 00:00:00 2001 From: kodi Date: Fri, 27 Feb 2026 14:23:43 +0100 Subject: [PATCH] refactor(api): move networks endpoints into app_networks router --- control/app.py | 498 +-------------------------------------- control/app_networks.py | 499 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 501 insertions(+), 496 deletions(-) create mode 100644 control/app_networks.py diff --git a/control/app.py b/control/app.py index 298e689..fb5d726 100644 --- a/control/app.py +++ b/control/app.py @@ -3,6 +3,7 @@ import sys import subprocess from app_images import init_images_router from app_files import init_files_router +from app_networks import init_networks_router from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel import requests_unixsocket @@ -101,6 +102,7 @@ async def _startup_stats_poller(): # Images API lives in a dedicated module to keep this file from growing further. app.include_router(init_images_router(SESSION, PODMAN_API_BASE)) app.include_router(init_files_router(SESSION, PODMAN_API_BASE, WORKLOADS_DIR)) +app.include_router(init_networks_router(SESSION, PODMAN_API_BASE)) # --- ADAPTERS (contract-neutral helpers) --- # Centralize Podman socket and systemctl invocation. @@ -694,502 +696,6 @@ async def containers_stats_stream(interval: float = 2.0): return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) -# --- PODMAN NETWORKS (nieuw) --- -def _podman_get_json_checked(url: str): - r = SESSION.get(url) - if r.status_code >= 400: - raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}") - try: - return r.json() - except Exception: - raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}") - - -def _deep_get(d, path, default=None): - cur = d - for key in path: - if not isinstance(cur, dict) or key not in cur: - return default - cur = cur[key] - return cur - - -@app.get("/networks") -def list_networks(): - # Libpod: /libpod/networks/json - url = f"{PODMAN_API_BASE}/libpod/networks/json" - return {"networks": _podman_get_json_checked(url)} - - -@app.get("/networks/meta") -def networks_meta(): - candidates = [ - f"{PODMAN_API_BASE}/libpod/info", - f"{PODMAN_API_BASE}/libpod/info/json", - f"{PODMAN_API_BASE}/info", - f"{PODMAN_API_BASE}/info/json", - f"{PODMAN_API_BASE}/libpod/system/info", - f"{PODMAN_API_BASE}/libpod/system/info/json", - ] - - last_err = None - info = None - used = None - - for url in candidates: - r = SESSION.get(url) - if r.status_code == 200: - used = url - try: - info = r.json() - except Exception: - raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}") - break - last_err = f"{r.status_code}: {r.text}" - - if info is None: - raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}") - - network_backend = ( - _deep_get(info, ["host", "networkBackend"]) or - _deep_get(info, ["Host", "NetworkBackend"]) or - _deep_get(info, ["host", "network", "backend"]) or - _deep_get(info, ["Host", "Network", "Backend"]) - ) - - rootless = ( - _deep_get(info, ["host", "rootless"]) or - _deep_get(info, ["Host", "Rootless"]) or - _deep_get(info, ["host", "security", "rootless"]) or - _deep_get(info, ["Host", "Security", "Rootless"]) - ) - if not isinstance(rootless, bool): - rootless = None - - return { - "networkBackend": network_backend, - "rootless": rootless, - "infoEndpoint": used, - } - -@app.get("/networks/usage") -def networks_usage(): - """ - Bouwt mapping netwerk -> containers/pods, en container -> netwerken. - Werkt betrouwbaar ook als network inspect geen containers toont. - """ - # 1) containers list (all=true) - url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true" - containers = _podman_get_json_checked(url) or [] - - by_network: dict[str, dict] = {} - by_container: dict[str, list[str]] = {} - by_container_meta: dict[str, dict] = {} - - def _norm_name(c: dict) -> str: - # Podman kan Names (list) of Name (string) geven. - n = c.get("Name") - if isinstance(n, str) and n: - return n - names = c.get("Names") - if isinstance(names, list) and names: - # vaak begint dit met "/name" - nm = str(names[0]).lstrip("/") - return nm - # fallback id - cid = c.get("Id") or c.get("id") or "" - return cid[:12] if cid else "(unknown)" - - def _norm_id(c: dict) -> str: - return c.get("Id") or c.get("id") or "" - - def _pod_name(c: dict) -> str | None: - # Verschilt per output; we proberen een paar logische keys - for k in ("PodName", "pod", "Pod", "PodID", "PodId"): - v = c.get(k) - if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks - return v - return None - - def _extract_networks_from_summary(c: dict) -> list[str] | None: - # Mogelijke structuren in list output - nets = c.get("Networks") - if isinstance(nets, dict): - return list(nets.keys()) - if isinstance(nets, list): - return [str(x) for x in nets if x] - - ns = c.get("NetworkSettings") - if isinstance(ns, dict): - nets2 = ns.get("Networks") - if isinstance(nets2, dict): - return list(nets2.keys()) - - # Sommige builds hebben NetworkNames - nn = c.get("NetworkNames") - if isinstance(nn, list): - return [str(x) for x in nn if x] - - return None - - def _extract_networks_from_inspect_obj(insp: dict) -> list[str]: - """ - Probeert netwerk-namen uit een container inspect te halen. - Ondersteunt varianten/casing die per Podman/driver kunnen verschillen. - """ - if not isinstance(insp, dict): - return [] - - candidates = [] - - # 1) meest voorkomend - ns = insp.get("NetworkSettings") - if isinstance(ns, dict): - candidates.append(ns.get("Networks")) - candidates.append(ns.get("networks")) - - # 2) sommige outputs hebben Networks top-level - candidates.append(insp.get("Networks")) - candidates.append(insp.get("networks")) - - # 3) extra varianten - n2 = insp.get("networkSettings") - if isinstance(n2, dict): - candidates.append(n2.get("Networks")) - candidates.append(n2.get("networks")) - - n3 = insp.get("Network") - if isinstance(n3, dict): - candidates.append(n3.get("Networks")) - candidates.append(n3.get("networks")) - - cfg = insp.get("Config") - if isinstance(cfg, dict): - candidates.append(cfg.get("Networks")) - candidates.append(cfg.get("networks")) - - # Normaliseer candidates naar lijst[str] - out: list[str] = [] - for val in candidates: - if isinstance(val, dict): - out.extend([str(k) for k in val.keys() if k]) - elif isinstance(val, list): - for x in val: - if isinstance(x, str) and x: - out.append(x) - elif isinstance(x, dict): - # Best-effort: soms bevat list entries met Name - nm = x.get("Name") or x.get("name") - if isinstance(nm, str) and nm: - out.append(nm) - - # uniq + stable sort - return sorted(set([n for n in out if isinstance(n, str) and n])) - - def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]: - """ - Returns: (networks, extra_info) - extra_info kan bv. networkMode/containerOwner bevatten. - """ - if not cid: - return ([], {}) - - insp = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{cid}/json") - extra: dict = {} - - # 1) normale inspect: probeer meerdere paden - nets0 = _extract_networks_from_inspect_obj(insp) - if nets0: - return (nets0, extra) - - # 2) container network namespace mode: HostConfig.NetworkMode = "container:" - hc = insp.get("HostConfig") if isinstance(insp, dict) else None - if isinstance(hc, dict): - nm = hc.get("NetworkMode") - if isinstance(nm, str) and nm.startswith("container:"): - owner_id = nm.split("container:", 1)[1] - extra["networkMode"] = nm - extra["networkOwnerId"] = owner_id - # Inspect owner container en pak diens netwerken - owner = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_id}/json") - - # 1) netwerken van owner vinden (meerdere varianten) - owner_nets_list = _extract_networks_from_inspect_obj(owner) - - # 2) owner naam vinden (meerdere varianten) - owner_name = None - - # meest voorkomend bij inspect - if isinstance(owner.get("Name"), str) and owner.get("Name"): - owner_name = owner.get("Name") - - # fallback: soms staat het in Config.Name - if not owner_name: - cfg = owner.get("Config") or {} - if isinstance(cfg.get("Name"), str) and cfg.get("Name"): - owner_name = cfg.get("Name") - - # fallback: soms in ContainerConfig - if not owner_name: - ccfg = owner.get("ContainerConfig") or {} - if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"): - owner_name = ccfg.get("Name") - - # fallback: als niets werkt, toon korte id - if not owner_name: - owner_name = owner_id[:12] - - extra["networkOwnerName"] = str(owner_name).lstrip("/") - - # 3) netwerken returnen (als we ze gevonden hebben) - if owner_nets_list: - return (owner_nets_list, extra) - - # Extra fallback: probeer inspect via ownerName (soms werkt naam beter dan id) - try: - owner_name_for_lookup = extra.get("networkOwnerName") - if owner_name_for_lookup and owner_name_for_lookup != owner_id: - owner2 = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_name_for_lookup}/json") - owner2_nets = _extract_networks_from_inspect_obj(owner2) - if owner2_nets: - return (owner2_nets, extra) - except Exception: - pass - - # Owner fallback: sommige infra containers gebruiken pasta/host/none - try: - ohc = owner.get("HostConfig") if isinstance(owner, dict) else None - if isinstance(ohc, dict): - onm = ohc.get("NetworkMode") - if isinstance(onm, str) and onm in ("pasta", "host", "none"): - # owner gebruikt geen Podman-netwerk; behandel als pseudo-netwerk - return ([onm], extra) - except Exception: - pass - - return ([], extra) - - # 3) Special networking modes: pasta/host/none - # In deze modes bestaat vaak geen NetworkSettings.Networks map. - if isinstance(hc, dict): - nm2 = hc.get("NetworkMode") - if isinstance(nm2, str) and nm2 in ("pasta", "host", "none"): - extra["networkMode"] = nm2 - return ([nm2], extra) - - return ([], extra) - - # 2) Loop containers: verzamel netwerken - for c in containers: - if not isinstance(c, dict): - continue - - cid = _norm_id(c) - cname = _norm_name(c) - pod = _pod_name(c) - - nets = _extract_networks_from_summary(c) - extra = {} - - if not nets: - nets, extra = _extract_networks_from_inspect(cid) - - by_container_meta[cname] = extra - - nets = [n for n in (nets or []) if isinstance(n, str) and n] - - # byContainer blijft lijst (contract simpel houden) - by_container[cname] = sorted(set(nets)) - - for n in nets: - slot = by_network.setdefault(n, {"containers": [], "pods": []}) - slot["containers"].append({ - "id": cid, - "name": cname, - "pod": pod, - **extra, # voegt networkMode/owner info toe indien van toepassing - }) - - # 3) Pods afleiden (lightweight) via containers - for n, slot in by_network.items(): - pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")}) - slot["pods"] = [{"name": p} for p in pods] - - # --- FALLBACK: derive owner networks via network-inspect (works for pod infra/shared netns) --- - # We look at shared netns containers (networkMode=container:...) and map their owner-id to networks - owner_ids: set[str] = set() - owner_names: dict[str, str] = {} # ownerId -> ownerName - - for cname, meta in by_container_meta.items(): - try: - mode = str((meta or {}).get("networkMode") or "") - except Exception: - mode = "" - if not mode.startswith("container:"): - continue - - owner_id = (meta or {}).get("networkOwnerId") - owner_name = (meta or {}).get("networkOwnerName") - if isinstance(owner_id, str) and owner_id: - owner_ids.add(owner_id) - if isinstance(owner_name, str) and owner_name: - owner_names[owner_id] = owner_name - - def _collect_container_ids_from_network_inspect(net_inspect) -> set[str]: - """ - Key-agnostic: scan alle strings in network inspect en verzamel hex IDs (12..64 chars). - Dit is robuust tegen schema-verschillen tussen netavark/cni/podman versies. - """ - ids: set[str] = set() - - def looks_like_hex_id(s: str) -> bool: - if not isinstance(s, str): - return False - s = s.strip() - if len(s) < 12 or len(s) > 64: - return False - # alleen hex chars - for ch in s: - if ch not in "0123456789abcdef": - return False - return True - - def walk(obj): - if obj is None: - return - if isinstance(obj, str): - # soms staat id als "container:" - if obj.startswith("container:"): - cand = obj.split("container:", 1)[1] - if looks_like_hex_id(cand): - ids.add(cand) - elif looks_like_hex_id(obj): - ids.add(obj) - return - if isinstance(obj, dict): - for k, v in obj.items(): - # keys kunnen ook ids zijn - if isinstance(k, str) and looks_like_hex_id(k): - ids.add(k) - walk(v) - return - if isinstance(obj, list): - for it in obj: - walk(it) - return - - walk(net_inspect) - return ids - - owner_networks_by_id: dict[str, set[str]] = {oid: set() for oid in owner_ids} - - # List networks - try: - nets_list = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/json") - except Exception: - nets_list = [] - - net_names: list[str] = [] - if isinstance(nets_list, list): - for n in nets_list: - if isinstance(n, dict): - nm = n.get("name") or n.get("Name") - if isinstance(nm, str) and nm: - net_names.append(nm) - - # Inspect each network and see if it contains any owner_id - for net_name in sorted(set(net_names)): - try: - net_inspect = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/{net_name}/json") - except Exception: - continue - - attached_ids = _collect_container_ids_from_network_inspect(net_inspect) - if not attached_ids: - continue - - for oid in owner_ids: - short = oid[:12] - for aid in attached_ids: - if not isinstance(aid, str) or not aid: - continue - # match exact / short / prefix - if aid == oid or aid == short or oid.startswith(aid) or aid.startswith(short): - owner_networks_by_id.setdefault(oid, set()).add(net_name) - break - - # Apply: if shared container or owner container has empty by_container[], fill it with owner's networks - for cname, meta in by_container_meta.items(): - try: - mode = str((meta or {}).get("networkMode") or "") - except Exception: - mode = "" - if not mode.startswith("container:"): - continue - - owner_id = (meta or {}).get("networkOwnerId") - if not (isinstance(owner_id, str) and owner_id): - continue - - owner_nets = sorted(owner_networks_by_id.get(owner_id, set())) - if not owner_nets: - continue - - # 1) fill owner-name entry (if known) - owner_name = (meta or {}).get("networkOwnerName") or owner_names.get(owner_id) - if isinstance(owner_name, str) and owner_name and not by_container.get(owner_name): - by_container[owner_name] = owner_nets - - # 2) fill shared container entry - if not by_container.get(cname): - by_container[cname] = owner_nets - - # --- FINALIZE: derive by_container from by_network (robust for pods/shared netns) --- - by_container_derived: dict[str, list[str]] = {} - - for net_name, info in (by_network or {}).items(): - containers2 = (info or {}).get("containers") or [] - for c2 in containers2: - if not isinstance(c2, dict): - continue - cname2 = c2.get("name") or c2.get("Name") - if not cname2: - continue - by_container_derived.setdefault(cname2, []).append(net_name) - - # dedupe + stable sort - for k, v in by_container_derived.items(): - by_container_derived[k] = sorted(set(v)) - - # merge: vul lege items in by_container, maar breek niks - for k, v in by_container_derived.items(): - if not by_container.get(k): - by_container[k] = v - - # --- shared netns: shared containers erven owner-netwerken (als owner bekend is) --- - for cname, meta in by_container_meta.items(): - try: - mode = str((meta or {}).get("networkMode") or "") - except Exception: - mode = "" - if not mode.startswith("container:"): - continue - owner = (meta or {}).get("networkOwnerName") or (meta or {}).get("networkOwnerId") - if owner and by_container.get(owner) and not by_container.get(cname): - by_container[cname] = by_container[owner] - - return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta} - - -@app.get("/networks/{name}") -def inspect_network(name: str): - url1 = f"{PODMAN_API_BASE}/libpod/networks/{name}/json" - r = SESSION.get(url1) - if r.status_code == 200: - return _podman_get_json_checked(url1) - - url2 = f"{PODMAN_API_BASE}/libpod/network/{name}/json" - return _podman_get_json_checked(url2) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/control/app_networks.py b/control/app_networks.py new file mode 100644 index 0000000..31d5c8f --- /dev/null +++ b/control/app_networks.py @@ -0,0 +1,499 @@ +from fastapi import APIRouter, HTTPException + + +def init_networks_router(session, podman_api_base: str) -> APIRouter: + router = APIRouter(tags=["networks"]) + + def _podman_get_json_checked(url: str): + r = session.get(url) + if r.status_code >= 400: + raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}") + try: + return r.json() + except Exception: + raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}") + + def _deep_get(d, path, default=None): + cur = d + for key in path: + if not isinstance(cur, dict) or key not in cur: + return default + cur = cur[key] + return cur + + @router.get("/networks") + def list_networks(): + # Libpod: /libpod/networks/json + url = f"{podman_api_base}/libpod/networks/json" + return {"networks": _podman_get_json_checked(url)} + + @router.get("/networks/meta") + def networks_meta(): + candidates = [ + f"{podman_api_base}/libpod/info", + f"{podman_api_base}/libpod/info/json", + f"{podman_api_base}/info", + f"{podman_api_base}/info/json", + f"{podman_api_base}/libpod/system/info", + f"{podman_api_base}/libpod/system/info/json", + ] + + last_err = None + info = None + used = None + + for url in candidates: + r = session.get(url) + if r.status_code == 200: + used = url + try: + info = r.json() + except Exception: + raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}") + break + last_err = f"{r.status_code}: {r.text}" + + if info is None: + raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}") + + network_backend = ( + _deep_get(info, ["host", "networkBackend"]) or + _deep_get(info, ["Host", "NetworkBackend"]) or + _deep_get(info, ["host", "network", "backend"]) or + _deep_get(info, ["Host", "Network", "Backend"]) + ) + + rootless = ( + _deep_get(info, ["host", "rootless"]) or + _deep_get(info, ["Host", "Rootless"]) or + _deep_get(info, ["host", "security", "rootless"]) or + _deep_get(info, ["Host", "Security", "Rootless"]) + ) + if not isinstance(rootless, bool): + rootless = None + + return { + "networkBackend": network_backend, + "rootless": rootless, + "infoEndpoint": used, + } + + @router.get("/networks/usage") + def networks_usage(): + """ + Bouwt mapping netwerk -> containers/pods, en container -> netwerken. + Werkt betrouwbaar ook als network inspect geen containers toont. + """ + # 1) containers list (all=true) + url = f"{podman_api_base}/libpod/containers/json?all=true" + containers = _podman_get_json_checked(url) or [] + + by_network: dict[str, dict] = {} + by_container: dict[str, list[str]] = {} + by_container_meta: dict[str, dict] = {} + + def _norm_name(c: dict) -> str: + # Podman kan Names (list) of Name (string) geven. + n = c.get("Name") + if isinstance(n, str) and n: + return n + names = c.get("Names") + if isinstance(names, list) and names: + # vaak begint dit met "/name" + nm = str(names[0]).lstrip("/") + return nm + # fallback id + cid = c.get("Id") or c.get("id") or "" + return cid[:12] if cid else "(unknown)" + + def _norm_id(c: dict) -> str: + return c.get("Id") or c.get("id") or "" + + def _pod_name(c: dict) -> str | None: + # Verschilt per output; we proberen een paar logische keys + for k in ("PodName", "pod", "Pod", "PodID", "PodId"): + v = c.get(k) + if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks + return v + return None + + def _extract_networks_from_summary(c: dict) -> list[str] | None: + # Mogelijke structuren in list output + nets = c.get("Networks") + if isinstance(nets, dict): + return list(nets.keys()) + if isinstance(nets, list): + return [str(x) for x in nets if x] + + ns = c.get("NetworkSettings") + if isinstance(ns, dict): + nets2 = ns.get("Networks") + if isinstance(nets2, dict): + return list(nets2.keys()) + + # Sommige builds hebben NetworkNames + nn = c.get("NetworkNames") + if isinstance(nn, list): + return [str(x) for x in nn if x] + + return None + + def _extract_networks_from_inspect_obj(insp: dict) -> list[str]: + """ + Probeert netwerk-namen uit een container inspect te halen. + Ondersteunt varianten/casing die per Podman/driver kunnen verschillen. + """ + if not isinstance(insp, dict): + return [] + + candidates = [] + + # 1) meest voorkomend + ns = insp.get("NetworkSettings") + if isinstance(ns, dict): + candidates.append(ns.get("Networks")) + candidates.append(ns.get("networks")) + + # 2) sommige outputs hebben Networks top-level + candidates.append(insp.get("Networks")) + candidates.append(insp.get("networks")) + + # 3) extra varianten + n2 = insp.get("networkSettings") + if isinstance(n2, dict): + candidates.append(n2.get("Networks")) + candidates.append(n2.get("networks")) + + n3 = insp.get("Network") + if isinstance(n3, dict): + candidates.append(n3.get("Networks")) + candidates.append(n3.get("networks")) + + cfg = insp.get("Config") + if isinstance(cfg, dict): + candidates.append(cfg.get("Networks")) + candidates.append(cfg.get("networks")) + + # Normaliseer candidates naar lijst[str] + out: list[str] = [] + for val in candidates: + if isinstance(val, dict): + out.extend([str(k) for k in val.keys() if k]) + elif isinstance(val, list): + for x in val: + if isinstance(x, str) and x: + out.append(x) + elif isinstance(x, dict): + # Best-effort: soms bevat list entries met Name + nm = x.get("Name") or x.get("name") + if isinstance(nm, str) and nm: + out.append(nm) + + # uniq + stable sort + return sorted(set([n for n in out if isinstance(n, str) and n])) + + def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]: + """ + Returns: (networks, extra_info) + extra_info kan bv. networkMode/containerOwner bevatten. + """ + if not cid: + return ([], {}) + + insp = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{cid}/json") + extra: dict = {} + + # 1) normale inspect: probeer meerdere paden + nets0 = _extract_networks_from_inspect_obj(insp) + if nets0: + return (nets0, extra) + + # 2) container network namespace mode: HostConfig.NetworkMode = "container:" + hc = insp.get("HostConfig") if isinstance(insp, dict) else None + if isinstance(hc, dict): + nm = hc.get("NetworkMode") + if isinstance(nm, str) and nm.startswith("container:"): + owner_id = nm.split("container:", 1)[1] + extra["networkMode"] = nm + extra["networkOwnerId"] = owner_id + # Inspect owner container en pak diens netwerken + owner = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{owner_id}/json") + + # 1) netwerken van owner vinden (meerdere varianten) + owner_nets_list = _extract_networks_from_inspect_obj(owner) + + # 2) owner naam vinden (meerdere varianten) + owner_name = None + + # meest voorkomend bij inspect + if isinstance(owner.get("Name"), str) and owner.get("Name"): + owner_name = owner.get("Name") + + # fallback: soms staat het in Config.Name + if not owner_name: + cfg = owner.get("Config") or {} + if isinstance(cfg.get("Name"), str) and cfg.get("Name"): + owner_name = cfg.get("Name") + + # fallback: soms in ContainerConfig + if not owner_name: + ccfg = owner.get("ContainerConfig") or {} + if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"): + owner_name = ccfg.get("Name") + + # fallback: als niets werkt, toon korte id + if not owner_name: + owner_name = owner_id[:12] + + extra["networkOwnerName"] = str(owner_name).lstrip("/") + + # 3) netwerken returnen (als we ze gevonden hebben) + if owner_nets_list: + return (owner_nets_list, extra) + + # Extra fallback: probeer inspect via ownerName (soms werkt naam beter dan id) + try: + owner_name_for_lookup = extra.get("networkOwnerName") + if owner_name_for_lookup and owner_name_for_lookup != owner_id: + owner2 = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{owner_name_for_lookup}/json") + owner2_nets = _extract_networks_from_inspect_obj(owner2) + if owner2_nets: + return (owner2_nets, extra) + except Exception: + pass + + # Owner fallback: sommige infra containers gebruiken pasta/host/none + try: + ohc = owner.get("HostConfig") if isinstance(owner, dict) else None + if isinstance(ohc, dict): + onm = ohc.get("NetworkMode") + if isinstance(onm, str) and onm in ("pasta", "host", "none"): + # owner gebruikt geen Podman-netwerk; behandel als pseudo-netwerk + return ([onm], extra) + except Exception: + pass + + return ([], extra) + + # 3) Special networking modes: pasta/host/none + # In deze modes bestaat vaak geen NetworkSettings.Networks map. + if isinstance(hc, dict): + nm2 = hc.get("NetworkMode") + if isinstance(nm2, str) and nm2 in ("pasta", "host", "none"): + extra["networkMode"] = nm2 + return ([nm2], extra) + + return ([], extra) + + # 2) Loop containers: verzamel netwerken + for c in containers: + if not isinstance(c, dict): + continue + + cid = _norm_id(c) + cname = _norm_name(c) + pod = _pod_name(c) + + nets = _extract_networks_from_summary(c) + extra = {} + + if not nets: + nets, extra = _extract_networks_from_inspect(cid) + + by_container_meta[cname] = extra + + nets = [n for n in (nets or []) if isinstance(n, str) and n] + + # byContainer blijft lijst (contract simpel houden) + by_container[cname] = sorted(set(nets)) + + for n in nets: + slot = by_network.setdefault(n, {"containers": [], "pods": []}) + slot["containers"].append({ + "id": cid, + "name": cname, + "pod": pod, + **extra, # voegt networkMode/owner info toe indien van toepassing + }) + + # 3) Pods afleiden (lightweight) via containers + for n, slot in by_network.items(): + pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")}) + slot["pods"] = [{"name": p} for p in pods] + + # --- FALLBACK: derive owner networks via network-inspect (works for pod infra/shared netns) --- + # We look at shared netns containers (networkMode=container:...) and map their owner-id to networks + owner_ids: set[str] = set() + owner_names: dict[str, str] = {} # ownerId -> ownerName + + for cname, meta in by_container_meta.items(): + try: + mode = str((meta or {}).get("networkMode") or "") + except Exception: + mode = "" + if not mode.startswith("container:"): + continue + + owner_id = (meta or {}).get("networkOwnerId") + owner_name = (meta or {}).get("networkOwnerName") + if isinstance(owner_id, str) and owner_id: + owner_ids.add(owner_id) + if isinstance(owner_name, str) and owner_name: + owner_names[owner_id] = owner_name + + def _collect_container_ids_from_network_inspect(net_inspect) -> set[str]: + """ + Key-agnostic: scan alle strings in network inspect en verzamel hex IDs (12..64 chars). + Dit is robuust tegen schema-verschillen tussen netavark/cni/podman versies. + """ + ids: set[str] = set() + + def looks_like_hex_id(s: str) -> bool: + if not isinstance(s, str): + return False + s = s.strip() + if len(s) < 12 or len(s) > 64: + return False + # alleen hex chars + for ch in s: + if ch not in "0123456789abcdef": + return False + return True + + def walk(obj): + if obj is None: + return + if isinstance(obj, str): + # soms staat id als "container:" + if obj.startswith("container:"): + cand = obj.split("container:", 1)[1] + if looks_like_hex_id(cand): + ids.add(cand) + elif looks_like_hex_id(obj): + ids.add(obj) + return + if isinstance(obj, dict): + for k, v in obj.items(): + # keys kunnen ook ids zijn + if isinstance(k, str) and looks_like_hex_id(k): + ids.add(k) + walk(v) + return + if isinstance(obj, list): + for it in obj: + walk(it) + return + + walk(net_inspect) + return ids + + owner_networks_by_id: dict[str, set[str]] = {oid: set() for oid in owner_ids} + + # List networks + try: + nets_list = _podman_get_json_checked(f"{podman_api_base}/libpod/networks/json") + except Exception: + nets_list = [] + + net_names: list[str] = [] + if isinstance(nets_list, list): + for n in nets_list: + if isinstance(n, dict): + nm = n.get("name") or n.get("Name") + if isinstance(nm, str) and nm: + net_names.append(nm) + + # Inspect each network and see if it contains any owner_id + for net_name in sorted(set(net_names)): + try: + net_inspect = _podman_get_json_checked(f"{podman_api_base}/libpod/networks/{net_name}/json") + except Exception: + continue + + attached_ids = _collect_container_ids_from_network_inspect(net_inspect) + if not attached_ids: + continue + + for oid in owner_ids: + short = oid[:12] + for aid in attached_ids: + if not isinstance(aid, str) or not aid: + continue + # match exact / short / prefix + if aid == oid or aid == short or oid.startswith(aid) or aid.startswith(short): + owner_networks_by_id.setdefault(oid, set()).add(net_name) + break + + # Apply: if shared container or owner container has empty by_container[], fill it with owner's networks + for cname, meta in by_container_meta.items(): + try: + mode = str((meta or {}).get("networkMode") or "") + except Exception: + mode = "" + if not mode.startswith("container:"): + continue + + owner_id = (meta or {}).get("networkOwnerId") + if not (isinstance(owner_id, str) and owner_id): + continue + + owner_nets = sorted(owner_networks_by_id.get(owner_id, set())) + if not owner_nets: + continue + + # 1) fill owner-name entry (if known) + owner_name = (meta or {}).get("networkOwnerName") or owner_names.get(owner_id) + if isinstance(owner_name, str) and owner_name and not by_container.get(owner_name): + by_container[owner_name] = owner_nets + + # 2) fill shared container entry + if not by_container.get(cname): + by_container[cname] = owner_nets + + # --- FINALIZE: derive by_container from by_network (robust for pods/shared netns) --- + by_container_derived: dict[str, list[str]] = {} + + for net_name, info in (by_network or {}).items(): + containers2 = (info or {}).get("containers") or [] + for c2 in containers2: + if not isinstance(c2, dict): + continue + cname2 = c2.get("name") or c2.get("Name") + if not cname2: + continue + by_container_derived.setdefault(cname2, []).append(net_name) + + # dedupe + stable sort + for k, v in by_container_derived.items(): + by_container_derived[k] = sorted(set(v)) + + # merge: vul lege items in by_container, maar breek niks + for k, v in by_container_derived.items(): + if not by_container.get(k): + by_container[k] = v + + # --- shared netns: shared containers erven owner-netwerken (als owner bekend is) --- + for cname, meta in by_container_meta.items(): + try: + mode = str((meta or {}).get("networkMode") or "") + except Exception: + mode = "" + if not mode.startswith("container:"): + continue + owner = (meta or {}).get("networkOwnerName") or (meta or {}).get("networkOwnerId") + if owner and by_container.get(owner) and not by_container.get(cname): + by_container[cname] = by_container[owner] + + return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta} + + @router.get("/networks/{name}") + def inspect_network(name: str): + url1 = f"{podman_api_base}/libpod/networks/{name}/json" + r = session.get(url1) + if r.status_code == 200: + return _podman_get_json_checked(url1) + + url2 = f"{podman_api_base}/libpod/network/{name}/json" + return _podman_get_json_checked(url2) + + return router