from fastapi import APIRouter, HTTPException def init_networks_router(session, podman_api_base: str) -> APIRouter: router = APIRouter(tags=["networks"]) def _podman_get_json_checked(url: str): r = session.get(url) if r.status_code >= 400: raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}") try: return r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}") def _deep_get(d, path, default=None): cur = d for key in path: if not isinstance(cur, dict) or key not in cur: return default cur = cur[key] return cur @router.get("/networks") def list_networks(): # Libpod: /libpod/networks/json url = f"{podman_api_base}/libpod/networks/json" return {"networks": _podman_get_json_checked(url)} @router.get("/networks/meta") def networks_meta(): candidates = [ f"{podman_api_base}/libpod/info", f"{podman_api_base}/libpod/info/json", f"{podman_api_base}/info", f"{podman_api_base}/info/json", f"{podman_api_base}/libpod/system/info", f"{podman_api_base}/libpod/system/info/json", ] last_err = None info = None used = None for url in candidates: r = session.get(url) if r.status_code == 200: used = url try: info = r.json() except Exception: raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}") break last_err = f"{r.status_code}: {r.text}" if info is None: raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}") network_backend = ( _deep_get(info, ["host", "networkBackend"]) or _deep_get(info, ["Host", "NetworkBackend"]) or _deep_get(info, ["host", "network", "backend"]) or _deep_get(info, ["Host", "Network", "Backend"]) ) rootless = ( _deep_get(info, ["host", "rootless"]) or _deep_get(info, ["Host", "Rootless"]) or _deep_get(info, ["host", "security", "rootless"]) or _deep_get(info, ["Host", "Security", "Rootless"]) ) if not isinstance(rootless, bool): rootless = None return { "networkBackend": network_backend, "rootless": rootless, "infoEndpoint": used, } @router.get("/networks/usage") def networks_usage(): """ Bouwt mapping netwerk -> containers/pods, en container -> netwerken. Werkt betrouwbaar ook als network inspect geen containers toont. """ # 1) containers list (all=true) url = f"{podman_api_base}/libpod/containers/json?all=true" containers = _podman_get_json_checked(url) or [] by_network: dict[str, dict] = {} by_container: dict[str, list[str]] = {} by_container_meta: dict[str, dict] = {} def _norm_name(c: dict) -> str: # Podman kan Names (list) of Name (string) geven. n = c.get("Name") if isinstance(n, str) and n: return n names = c.get("Names") if isinstance(names, list) and names: # vaak begint dit met "/name" nm = str(names[0]).lstrip("/") return nm # fallback id cid = c.get("Id") or c.get("id") or "" return cid[:12] if cid else "(unknown)" def _norm_id(c: dict) -> str: return c.get("Id") or c.get("id") or "" def _pod_name(c: dict) -> str | None: # Verschilt per output; we proberen een paar logische keys for k in ("PodName", "pod", "Pod", "PodID", "PodId"): v = c.get(k) if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks return v return None def _extract_networks_from_summary(c: dict) -> list[str] | None: # Mogelijke structuren in list output nets = c.get("Networks") if isinstance(nets, dict): return list(nets.keys()) if isinstance(nets, list): return [str(x) for x in nets if x] ns = c.get("NetworkSettings") if isinstance(ns, dict): nets2 = ns.get("Networks") if isinstance(nets2, dict): return list(nets2.keys()) # Sommige builds hebben NetworkNames nn = c.get("NetworkNames") if isinstance(nn, list): return [str(x) for x in nn if x] return None def _extract_networks_from_inspect_obj(insp: dict) -> list[str]: """ Probeert netwerk-namen uit een container inspect te halen. Ondersteunt varianten/casing die per Podman/driver kunnen verschillen. """ if not isinstance(insp, dict): return [] candidates = [] # 1) meest voorkomend ns = insp.get("NetworkSettings") if isinstance(ns, dict): candidates.append(ns.get("Networks")) candidates.append(ns.get("networks")) # 2) sommige outputs hebben Networks top-level candidates.append(insp.get("Networks")) candidates.append(insp.get("networks")) # 3) extra varianten n2 = insp.get("networkSettings") if isinstance(n2, dict): candidates.append(n2.get("Networks")) candidates.append(n2.get("networks")) n3 = insp.get("Network") if isinstance(n3, dict): candidates.append(n3.get("Networks")) candidates.append(n3.get("networks")) cfg = insp.get("Config") if isinstance(cfg, dict): candidates.append(cfg.get("Networks")) candidates.append(cfg.get("networks")) # Normaliseer candidates naar lijst[str] out: list[str] = [] for val in candidates: if isinstance(val, dict): out.extend([str(k) for k in val.keys() if k]) elif isinstance(val, list): for x in val: if isinstance(x, str) and x: out.append(x) elif isinstance(x, dict): # Best-effort: soms bevat list entries met Name nm = x.get("Name") or x.get("name") if isinstance(nm, str) and nm: out.append(nm) # uniq + stable sort return sorted(set([n for n in out if isinstance(n, str) and n])) def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]: """ Returns: (networks, extra_info) extra_info kan bv. networkMode/containerOwner bevatten. """ if not cid: return ([], {}) insp = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{cid}/json") extra: dict = {} # 1) normale inspect: probeer meerdere paden nets0 = _extract_networks_from_inspect_obj(insp) if nets0: return (nets0, extra) # 2) container network namespace mode: HostConfig.NetworkMode = "container:" hc = insp.get("HostConfig") if isinstance(insp, dict) else None if isinstance(hc, dict): nm = hc.get("NetworkMode") if isinstance(nm, str) and nm.startswith("container:"): owner_id = nm.split("container:", 1)[1] extra["networkMode"] = nm extra["networkOwnerId"] = owner_id # Inspect owner container en pak diens netwerken owner = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{owner_id}/json") # 1) netwerken van owner vinden (meerdere varianten) owner_nets_list = _extract_networks_from_inspect_obj(owner) # 2) owner naam vinden (meerdere varianten) owner_name = None # meest voorkomend bij inspect if isinstance(owner.get("Name"), str) and owner.get("Name"): owner_name = owner.get("Name") # fallback: soms staat het in Config.Name if not owner_name: cfg = owner.get("Config") or {} if isinstance(cfg.get("Name"), str) and cfg.get("Name"): owner_name = cfg.get("Name") # fallback: soms in ContainerConfig if not owner_name: ccfg = owner.get("ContainerConfig") or {} if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"): owner_name = ccfg.get("Name") # fallback: als niets werkt, toon korte id if not owner_name: owner_name = owner_id[:12] extra["networkOwnerName"] = str(owner_name).lstrip("/") # 3) netwerken returnen (als we ze gevonden hebben) if owner_nets_list: return (owner_nets_list, extra) # Extra fallback: probeer inspect via ownerName (soms werkt naam beter dan id) try: owner_name_for_lookup = extra.get("networkOwnerName") if owner_name_for_lookup and owner_name_for_lookup != owner_id: owner2 = _podman_get_json_checked(f"{podman_api_base}/libpod/containers/{owner_name_for_lookup}/json") owner2_nets = _extract_networks_from_inspect_obj(owner2) if owner2_nets: return (owner2_nets, extra) except Exception: pass # Owner fallback: sommige infra containers gebruiken pasta/host/none try: ohc = owner.get("HostConfig") if isinstance(owner, dict) else None if isinstance(ohc, dict): onm = ohc.get("NetworkMode") if isinstance(onm, str) and onm in ("pasta", "host", "none"): # owner gebruikt geen Podman-netwerk; behandel als pseudo-netwerk return ([onm], extra) except Exception: pass return ([], extra) # 3) Special networking modes: pasta/host/none # In deze modes bestaat vaak geen NetworkSettings.Networks map. if isinstance(hc, dict): nm2 = hc.get("NetworkMode") if isinstance(nm2, str) and nm2 in ("pasta", "host", "none"): extra["networkMode"] = nm2 return ([nm2], extra) return ([], extra) # 2) Loop containers: verzamel netwerken for c in containers: if not isinstance(c, dict): continue cid = _norm_id(c) cname = _norm_name(c) pod = _pod_name(c) nets = _extract_networks_from_summary(c) extra = {} if not nets: nets, extra = _extract_networks_from_inspect(cid) by_container_meta[cname] = extra nets = [n for n in (nets or []) if isinstance(n, str) and n] # byContainer blijft lijst (contract simpel houden) by_container[cname] = sorted(set(nets)) for n in nets: slot = by_network.setdefault(n, {"containers": [], "pods": []}) slot["containers"].append({ "id": cid, "name": cname, "pod": pod, **extra, # voegt networkMode/owner info toe indien van toepassing }) # 3) Pods afleiden (lightweight) via containers for n, slot in by_network.items(): pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")}) slot["pods"] = [{"name": p} for p in pods] # --- FALLBACK: derive owner networks via network-inspect (works for pod infra/shared netns) --- # We look at shared netns containers (networkMode=container:...) and map their owner-id to networks owner_ids: set[str] = set() owner_names: dict[str, str] = {} # ownerId -> ownerName for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner_id = (meta or {}).get("networkOwnerId") owner_name = (meta or {}).get("networkOwnerName") if isinstance(owner_id, str) and owner_id: owner_ids.add(owner_id) if isinstance(owner_name, str) and owner_name: owner_names[owner_id] = owner_name def _collect_container_ids_from_network_inspect(net_inspect) -> set[str]: """ Key-agnostic: scan alle strings in network inspect en verzamel hex IDs (12..64 chars). Dit is robuust tegen schema-verschillen tussen netavark/cni/podman versies. """ ids: set[str] = set() def looks_like_hex_id(s: str) -> bool: if not isinstance(s, str): return False s = s.strip() if len(s) < 12 or len(s) > 64: return False # alleen hex chars for ch in s: if ch not in "0123456789abcdef": return False return True def walk(obj): if obj is None: return if isinstance(obj, str): # soms staat id als "container:" if obj.startswith("container:"): cand = obj.split("container:", 1)[1] if looks_like_hex_id(cand): ids.add(cand) elif looks_like_hex_id(obj): ids.add(obj) return if isinstance(obj, dict): for k, v in obj.items(): # keys kunnen ook ids zijn if isinstance(k, str) and looks_like_hex_id(k): ids.add(k) walk(v) return if isinstance(obj, list): for it in obj: walk(it) return walk(net_inspect) return ids owner_networks_by_id: dict[str, set[str]] = {oid: set() for oid in owner_ids} # List networks try: nets_list = _podman_get_json_checked(f"{podman_api_base}/libpod/networks/json") except Exception: nets_list = [] net_names: list[str] = [] if isinstance(nets_list, list): for n in nets_list: if isinstance(n, dict): nm = n.get("name") or n.get("Name") if isinstance(nm, str) and nm: net_names.append(nm) # Inspect each network and see if it contains any owner_id for net_name in sorted(set(net_names)): try: net_inspect = _podman_get_json_checked(f"{podman_api_base}/libpod/networks/{net_name}/json") except Exception: continue attached_ids = _collect_container_ids_from_network_inspect(net_inspect) if not attached_ids: continue for oid in owner_ids: short = oid[:12] for aid in attached_ids: if not isinstance(aid, str) or not aid: continue # match exact / short / prefix if aid == oid or aid == short or oid.startswith(aid) or aid.startswith(short): owner_networks_by_id.setdefault(oid, set()).add(net_name) break # Apply: if shared container or owner container has empty by_container[], fill it with owner's networks for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner_id = (meta or {}).get("networkOwnerId") if not (isinstance(owner_id, str) and owner_id): continue owner_nets = sorted(owner_networks_by_id.get(owner_id, set())) if not owner_nets: continue # 1) fill owner-name entry (if known) owner_name = (meta or {}).get("networkOwnerName") or owner_names.get(owner_id) if isinstance(owner_name, str) and owner_name and not by_container.get(owner_name): by_container[owner_name] = owner_nets # 2) fill shared container entry if not by_container.get(cname): by_container[cname] = owner_nets # --- FINALIZE: derive by_container from by_network (robust for pods/shared netns) --- by_container_derived: dict[str, list[str]] = {} for net_name, info in (by_network or {}).items(): containers2 = (info or {}).get("containers") or [] for c2 in containers2: if not isinstance(c2, dict): continue cname2 = c2.get("name") or c2.get("Name") if not cname2: continue by_container_derived.setdefault(cname2, []).append(net_name) # dedupe + stable sort for k, v in by_container_derived.items(): by_container_derived[k] = sorted(set(v)) # merge: vul lege items in by_container, maar breek niks for k, v in by_container_derived.items(): if not by_container.get(k): by_container[k] = v # --- shared netns: shared containers erven owner-netwerken (als owner bekend is) --- for cname, meta in by_container_meta.items(): try: mode = str((meta or {}).get("networkMode") or "") except Exception: mode = "" if not mode.startswith("container:"): continue owner = (meta or {}).get("networkOwnerName") or (meta or {}).get("networkOwnerId") if owner and by_container.get(owner) and not by_container.get(cname): by_container[cname] = by_container[owner] return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta} @router.get("/networks/{name}") def inspect_network(name: str): url1 = f"{podman_api_base}/libpod/networks/{name}/json" r = session.get(url1) if r.status_code == 200: return _podman_get_json_checked(url1) url2 = f"{podman_api_base}/libpod/network/{name}/json" return _podman_get_json_checked(url2) return router