from fastapi import APIRouter, HTTPException def init_networks_router(session, podman_api_base: str) -> APIRouter: router = APIRouter(tags=["networks"]) def _podman_get_json_checked(url: str): r = session.get(url) if r.status_code >= 400: raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}") try: return r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}") def _deep_get(d, path, default=None): cur = d for key in path: if not isinstance(cur, dict) or key not in cur: return default cur = cur[key] return cur @router.get("/networks") def list_networks(): # Libpod: /libpod/networks/json url = f"{podman_api_base}/libpod/networks/json" return {"networks": _podman_get_json_checked(url)} @router.get("/networks/meta") def networks_meta(): candidates = [ f"{podman_api_base}/libpod/info", f"{podman_api_base}/libpod/info/json", f"{podman_api_base}/info", f"{podman_api_base}/info/json", f"{podman_api_base}/libpod/system/info", f"{podman_api_base}/libpod/system/info/json", ] last_err = None info = None used = None for url in candidates: r = session.get(url) if r.status_code == 200: used = url try: info = r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}") break last_err = f"{r.status_code}: {r.text}" if info is None: raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}") network_backend = ( _deep_get(info, ["host", "networkBackend"]) or _deep_get(info, ["Host", "NetworkBackend"]) or _deep_get(info, ["host", "network", "backend"]) or _deep_get(info, ["Host", "Network", "Backend"]) ) rootless = ( _deep_get(info, ["host", "rootless"]) or _deep_get(info, ["Host", "Rootless"]) or _deep_get(info, ["host", "security", "rootless"]) or _deep_get(info, ["Host", "Security", "Rootless"]) ) if not isinstance(rootless, bool): rootless = None return { "networkBackend": network_backend, "rootless": rootless, "infoEndpoint": used, } @router.get("/networks/usage") def networks_usage(): """ Bouwt mapping netwerk -> containers/pods, en container -> netwerken. Ground truth: NetworkSettings.Networks uit container inspect. Infra containers (IsInfra=true) worden gefilterd. """ # 1) Containers ophalen containers = _podman_get_json_checked( f"{podman_api_base}/libpod/containers/json?all=true" ) or [] by_network: dict[str, dict] = {} by_container: dict[str, list[str]] = {} by_container_meta: dict[str, dict] = {} def _norm_name(c: dict) -> str: n = c.get("Name") if isinstance(n, str) and n: return n names = c.get("Names") if isinstance(names, list) and names: return str(names[0]).lstrip("/") cid = c.get("Id") or c.get("id") or "" return cid[:12] if cid else "(unknown)" def _norm_id(c: dict) -> str: return c.get("Id") or c.get("id") or "" def _pod_name(c: dict) -> str | None: for k in ("PodName", "pod", "Pod"): v = c.get(k) if isinstance(v, str) and v: return v return None def _extract_networks_from_summary(c: dict) -> list[str] | None: nets = c.get("Networks") if isinstance(nets, dict): return list(nets.keys()) if isinstance(nets, list): return [str(x) for x in nets if x] ns = c.get("NetworkSettings") if isinstance(ns, dict): nets2 = ns.get("Networks") if isinstance(nets2, dict): return list(nets2.keys()) nn = c.get("NetworkNames") if isinstance(nn, list): return [str(x) for x in nn if x] return None def _ns_networks(insp: dict) -> dict: """Haal NetworkSettings.Networks dict op uit inspect — de ground truth.""" ns = insp.get("NetworkSettings") if isinstance(insp, dict) else None nets = ns.get("Networks") if isinstance(ns, dict) else None return nets if isinstance(nets, dict) else {} def _extract_from_inspect(cid: str) -> tuple[list[str], dict, dict]: """ Returns: (net_names, extra, net_details) - net_names: lijst van netwerknamen - extra: {networkMode, networkOwnerId, networkOwnerName} voor container: mode - net_details: {net_name: {ip, aliases}} voor bridge-netwerken """ if not cid: return [], {}, {} insp = _podman_get_json_checked( f"{podman_api_base}/libpod/containers/{cid}/json" ) extra: dict = {} # 1) NetworkSettings.Networks is de ground truth voor bridge-containers nets_dict = _ns_networks(insp) if nets_dict: net_details = {} for net_name, net_info in nets_dict.items(): if isinstance(net_info, dict): ip = net_info.get("IPAddress") or "" aliases = [ a for a in (net_info.get("Aliases") or []) if isinstance(a, str) ] net_details[net_name] = {"ip": ip, "aliases": aliases} else: net_details[net_name] = {"ip": "", "aliases": []} return sorted(nets_dict.keys()), extra, net_details # 2) Shared network namespace: NetworkMode = "container:" hc = insp.get("HostConfig") if isinstance(insp, dict) else None nm = hc.get("NetworkMode") if isinstance(hc, dict) else None if isinstance(nm, str) and nm.startswith("container:"): owner_id = nm.split("container:", 1)[1] extra["networkMode"] = nm extra["networkOwnerId"] = owner_id owner = _podman_get_json_checked( f"{podman_api_base}/libpod/containers/{owner_id}/json" ) owner_name = str(owner.get("Name") or owner_id[:12]).lstrip("/") extra["networkOwnerName"] = owner_name owner_nets = _ns_networks(owner) if owner_nets: return sorted(owner_nets.keys()), extra, {} # Owner gebruikt pasta/host/none owner_nm = (owner.get("HostConfig") or {}).get("NetworkMode") or "" if owner_nm in ("pasta", "host", "none"): return [owner_nm], extra, {} return [], extra, {} # 3) Pseudo-netwerken: pasta / host / none if isinstance(nm, str) and nm in ("pasta", "host", "none"): extra["networkMode"] = nm return [nm], extra, {} return [], {}, {} import re _INFRA_NAME_RE = re.compile(r"^[0-9a-f]+-infra$") _PSEUDO_NETS = {"pasta", "host", "none"} # 2) Loop over alle containers for c in containers: if not isinstance(c, dict): continue cname_pre = _norm_name(c) if c.get("IsInfra") or _INFRA_NAME_RE.match(cname_pre): continue # pod infra containers overslaan cid = _norm_id(c) cname = cname_pre pod = _pod_name(c) nets = _extract_networks_from_summary(c) extra: dict = {} net_details: dict = {} if not nets: nets, extra, net_details = _extract_from_inspect(cid) elif any(n not in _PSEUDO_NETS for n in nets): # Bridge-container: inspect voor IP/aliases _, extra, net_details = _extract_from_inspect(cid) by_container_meta[cname] = extra nets = [n for n in (nets or []) if isinstance(n, str) and n] by_container[cname] = sorted(set(nets)) for n in nets: slot = by_network.setdefault(n, {"containers": [], "pods": []}) nd = net_details.get(n, {}) slot["containers"].append({ "id": cid, "name": cname, "pod": pod, "ip": nd.get("ip", ""), "aliases": nd.get("aliases", []), **extra, }) # 3) Pods afleiden via containers for n, slot in by_network.items(): pods = sorted({ c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod") }) slot["pods"] = [{"name": p} for p in pods] return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta} @router.get("/networks/{name}") def inspect_network(name: str): url1 = f"{podman_api_base}/libpod/networks/{name}/json" r = session.get(url1) if r.status_code == 200: return _podman_get_json_checked(url1) url2 = f"{podman_api_base}/libpod/network/{name}/json" return _podman_get_json_checked(url2) return router