feat(backend): netwerk endpoints toegevoegd

This commit is contained in:
kodi
2026-02-21 07:36:32 +01:00
parent 881382602b
commit 5d5fdab122
+263
View File
@@ -1,4 +1,5 @@
import os
import sys
import subprocess
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
@@ -779,5 +780,267 @@ async def containers_stats_stream(interval: float = 2.0):
}
return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers)
# --- PODMAN NETWORKS (nieuw) ---
def _podman_get_json_checked(url: str):
r = SESSION.get(url)
if r.status_code >= 400:
raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}")
try:
return r.json()
except Exception:
raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}")
def _deep_get(d, path, default=None):
cur = d
for key in path:
if not isinstance(cur, dict) or key not in cur:
return default
cur = cur[key]
return cur
@app.get("/networks")
def list_networks():
# Libpod: /libpod/networks/json
url = f"{PODMAN_API_BASE}/libpod/networks/json"
return {"networks": _podman_get_json_checked(url)}
@app.get("/networks/meta")
def networks_meta():
candidates = [
f"{PODMAN_API_BASE}/libpod/info",
f"{PODMAN_API_BASE}/libpod/info/json",
f"{PODMAN_API_BASE}/info",
f"{PODMAN_API_BASE}/info/json",
f"{PODMAN_API_BASE}/libpod/system/info",
f"{PODMAN_API_BASE}/libpod/system/info/json",
]
last_err = None
info = None
used = None
for url in candidates:
r = SESSION.get(url)
if r.status_code == 200:
used = url
try:
info = r.json()
except Exception:
raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}")
break
last_err = f"{r.status_code}: {r.text}"
if info is None:
raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}")
network_backend = (
_deep_get(info, ["host", "networkBackend"]) or
_deep_get(info, ["Host", "NetworkBackend"]) or
_deep_get(info, ["host", "network", "backend"]) or
_deep_get(info, ["Host", "Network", "Backend"])
)
rootless = (
_deep_get(info, ["host", "rootless"]) or
_deep_get(info, ["Host", "Rootless"]) or
_deep_get(info, ["host", "security", "rootless"]) or
_deep_get(info, ["Host", "Security", "Rootless"])
)
if not isinstance(rootless, bool):
rootless = None
return {
"networkBackend": network_backend,
"rootless": rootless,
"infoEndpoint": used,
}
@app.get("/networks/usage")
def networks_usage():
"""
Bouwt mapping netwerk -> containers/pods, en container -> netwerken.
Werkt betrouwbaar ook als network inspect geen containers toont.
"""
# 1) containers list (all=true)
url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true"
containers = _podman_get_json_checked(url) or []
by_network: dict[str, dict] = {}
by_container: dict[str, list[str]] = {}
by_container_meta: dict[str, dict] = {}
def _norm_name(c: dict) -> str:
# Podman kan Names (list) of Name (string) geven.
n = c.get("Name")
if isinstance(n, str) and n:
return n
names = c.get("Names")
if isinstance(names, list) and names:
# vaak begint dit met "/name"
nm = str(names[0]).lstrip("/")
return nm
# fallback id
cid = c.get("Id") or c.get("id") or ""
return cid[:12] if cid else "(unknown)"
def _norm_id(c: dict) -> str:
return c.get("Id") or c.get("id") or ""
def _pod_name(c: dict) -> str | None:
# Verschilt per output; we proberen een paar logische keys
for k in ("PodName", "pod", "Pod", "PodID", "PodId"):
v = c.get(k)
if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks
return v
return None
def _extract_networks_from_summary(c: dict) -> list[str] | None:
# Mogelijke structuren in list output
nets = c.get("Networks")
if isinstance(nets, dict):
return list(nets.keys())
if isinstance(nets, list):
return [str(x) for x in nets if x]
ns = c.get("NetworkSettings")
if isinstance(ns, dict):
nets2 = ns.get("Networks")
if isinstance(nets2, dict):
return list(nets2.keys())
# Sommige builds hebben NetworkNames
nn = c.get("NetworkNames")
if isinstance(nn, list):
return [str(x) for x in nn if x]
return None
def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]:
"""
Returns: (networks, extra_info)
extra_info kan bv. networkMode/containerOwner bevatten.
"""
if not cid:
return ([], {})
insp = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{cid}/json")
extra: dict = {}
# 1) normale inspect: NetworkSettings.Networks
ns = insp.get("NetworkSettings") if isinstance(insp, dict) else None
if isinstance(ns, dict):
nets = ns.get("Networks")
if isinstance(nets, dict):
return (list(nets.keys()), extra)
# 2) container network namespace mode: HostConfig.NetworkMode = "container:<id>"
hc = insp.get("HostConfig") if isinstance(insp, dict) else None
if isinstance(hc, dict):
nm = hc.get("NetworkMode")
if isinstance(nm, str) and nm.startswith("container:"):
owner_id = nm.split("container:", 1)[1]
extra["networkMode"] = nm
extra["networkOwnerId"] = owner_id
# Inspect owner container en pak diens netwerken
owner = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_id}/json")
# 1) netwerken van owner vinden (meerdere varianten)
owner_ns = owner.get("NetworkSettings") or {}
owner_nets = owner_ns.get("Networks")
# fallback: sommige outputs hebben dit anders of leeg
if not isinstance(owner_nets, dict) or not owner_nets:
owner_nets = (
(owner.get("networkSettings") or {}).get("Networks") or
(owner.get("Network") or {}).get("Networks")
)
# 2) owner naam vinden (meerdere varianten)
owner_name = None
# meest voorkomend bij inspect
if isinstance(owner.get("Name"), str) and owner.get("Name"):
owner_name = owner.get("Name")
# fallback: soms staat het in Config.Name
if not owner_name:
cfg = owner.get("Config") or {}
if isinstance(cfg.get("Name"), str) and cfg.get("Name"):
owner_name = cfg.get("Name")
# fallback: soms in ContainerConfig
if not owner_name:
ccfg = owner.get("ContainerConfig") or {}
if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"):
owner_name = ccfg.get("Name")
# fallback: als niets werkt, toon korte id
if not owner_name:
owner_name = owner_id[:12]
extra["networkOwnerName"] = str(owner_name).lstrip("/")
# 3) netwerken returnen (als we ze gevonden hebben)
if isinstance(owner_nets, dict) and owner_nets:
return (list(owner_nets.keys()), extra)
# als owner_nets niet bruikbaar is: return leeg maar mét ownerName
return ([], extra)
return ([], extra)
# 2) Loop containers: verzamel netwerken
for c in containers:
if not isinstance(c, dict):
continue
cid = _norm_id(c)
cname = _norm_name(c)
pod = _pod_name(c)
nets = _extract_networks_from_summary(c)
extra = {}
if not nets:
nets, extra = _extract_networks_from_inspect(cid)
by_container_meta[cname] = extra
nets = [n for n in (nets or []) if isinstance(n, str) and n]
# byContainer blijft lijst (contract simpel houden)
by_container[cname] = sorted(set(nets))
for n in nets:
slot = by_network.setdefault(n, {"containers": [], "pods": []})
slot["containers"].append({
"id": cid,
"name": cname,
"pod": pod,
**extra, # voegt networkMode/owner info toe indien van toepassing
})
# 3) Pods afleiden (lightweight) via containers
for n, slot in by_network.items():
pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")})
slot["pods"] = [{"name": p} for p in pods]
return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta}
@app.get("/networks/{name}")
def inspect_network(name: str):
url1 = f"{PODMAN_API_BASE}/libpod/networks/{name}/json"
r = SESSION.get(url1)
if r.status_code == 200:
return _podman_get_json_checked(url1)
url2 = f"{PODMAN_API_BASE}/libpod/network/{name}/json"
return _podman_get_json_checked(url2)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)