1410 lines
48 KiB
Python
1410 lines
48 KiB
Python
import os
|
|
import sys
|
|
import subprocess
|
|
from app_images import init_images_router
|
|
from fastapi import FastAPI, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
import requests_unixsocket
|
|
import uvicorn
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
app = FastAPI(title="Podman MVP Control Plane", root_path="/api")
|
|
SESSION = requests_unixsocket.Session()
|
|
PODMAN_API_BASE = "http+unix://%2Frun%2Fuser%2F1000%2Fpodman%2Fpodman.sock/v5.4.2"
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
ALLOWLIST_FILE = os.getenv("ALLOWLIST_FILE", os.path.join(BASE_DIR, "allowed_units.txt"))
|
|
WORKLOADS_DIR = "/app/workloads"
|
|
|
|
# --- STATS CACHE (contract-neutral; in-memory) ---
|
|
# Poll Podman stats centrally and expose as optional dashboard fields.
|
|
_STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None}
|
|
_STATS_CACHE_TS = None
|
|
_STATS_POLLER_TASK = None
|
|
|
|
def _norm_container_name(name) -> str:
|
|
try:
|
|
return str(name or "").lstrip("/")
|
|
except Exception:
|
|
return ""
|
|
|
|
def _parse_stats_interval_seconds() -> float:
|
|
raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0")
|
|
try:
|
|
v = float(raw)
|
|
except Exception:
|
|
v = 1.0
|
|
if v <= 0:
|
|
v = 1.0
|
|
if v < 0.5:
|
|
v = 0.5
|
|
if v > 30:
|
|
v = 30
|
|
return v
|
|
|
|
async def _stats_poller_loop():
|
|
global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS
|
|
|
|
interval = _parse_stats_interval_seconds()
|
|
stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false"
|
|
|
|
def _to_float(x):
|
|
try:
|
|
return float(x)
|
|
except Exception:
|
|
return None
|
|
|
|
while True:
|
|
try:
|
|
data = SESSION.get(stats_url, timeout=5).json()
|
|
stats_list = data.get("Stats") if isinstance(data, dict) else None
|
|
if not isinstance(stats_list, list):
|
|
stats_list = []
|
|
|
|
new_cache = {}
|
|
for st in stats_list:
|
|
if not isinstance(st, dict):
|
|
continue
|
|
key = _norm_container_name(st.get("Name"))
|
|
if not key:
|
|
continue
|
|
cpu_val = st.get("CPUPerc")
|
|
if cpu_val is None:
|
|
cpu_val = st.get("CPU")
|
|
if cpu_val is None:
|
|
cpu_val = st.get("AvgCPU")
|
|
new_cache[key] = {
|
|
"cpu": _to_float(cpu_val),
|
|
"mem_usage": _to_float(st.get("MemUsage")),
|
|
"mem_perc": _to_float(st.get("MemPerc")),
|
|
}
|
|
|
|
_STATS_CACHE_BY_NAME = new_cache
|
|
_STATS_CACHE_TS = int(__import__("time").time())
|
|
except Exception:
|
|
# Keep last good cache; try again next tick.
|
|
pass
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
@app.on_event("startup")
|
|
async def _startup_stats_poller():
|
|
global _STATS_POLLER_TASK
|
|
if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done():
|
|
return
|
|
_STATS_POLLER_TASK = asyncio.create_task(_stats_poller_loop())
|
|
|
|
# --- ROUTERS ---
|
|
# Images API lives in a dedicated module to keep this file from growing further.
|
|
app.include_router(init_images_router(SESSION, PODMAN_API_BASE))
|
|
|
|
# --- ADAPTERS (contract-neutral helpers) ---
|
|
# Centralize Podman socket and systemctl invocation.
|
|
# MUST NOT change endpoint outputs, status codes, or side-effects.
|
|
|
|
def _podman_get_json(url: str):
|
|
return SESSION.get(url).json()
|
|
|
|
def _podman_get_text(url: str) -> str:
|
|
return SESSION.get(url).text
|
|
|
|
def _podman_post(url: str, **kwargs):
|
|
return SESSION.post(url, **kwargs)
|
|
|
|
def _podman_action_post(kind: str, name: str, action: str):
|
|
if kind == "pods":
|
|
url = f"{PODMAN_API_BASE}/libpod/pods/{name}/{action}"
|
|
else:
|
|
url = f"{PODMAN_API_BASE}/libpod/containers/{name}/{action}"
|
|
return _podman_post(url)
|
|
|
|
def _podman_delete(url: str):
|
|
return SESSION.delete(url)
|
|
|
|
def _systemctl(cmd):
|
|
# Proxy to existing run() to avoid behavioral changes.
|
|
return run(cmd)
|
|
|
|
def _run_systemctl_action(action: str, unit: str):
|
|
cmd = ["systemctl", "--user", action, unit]
|
|
return _systemctl(cmd)
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
podman_ok = False
|
|
try:
|
|
r = SESSION.get(f"{PODMAN_API_BASE}/libpod/info", timeout=2)
|
|
if r.status_code == 200:
|
|
try:
|
|
r.json()
|
|
podman_ok = True
|
|
except Exception:
|
|
podman_ok = False
|
|
except Exception:
|
|
podman_ok = False
|
|
|
|
systemd_reachable = False
|
|
try:
|
|
res = subprocess.run(
|
|
["systemctl", "--user", "list-units", "--no-pager", "--no-legend"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
timeout=2,
|
|
)
|
|
systemd_reachable = (res.returncode == 0)
|
|
except Exception:
|
|
systemd_reachable = False
|
|
|
|
ok = podman_ok and systemd_reachable
|
|
return {"ok": ok, "podman": {"ok": podman_ok}, "systemd_user": {"reachable": systemd_reachable}}
|
|
|
|
|
|
# --- MODELS ---
|
|
class FileContent(BaseModel):
|
|
content: str
|
|
|
|
|
|
# --- WORKLOADS ---
|
|
@app.get("/workloads")
|
|
def list_workloads():
|
|
workloads = []
|
|
for root, _, files in os.walk(WORKLOADS_DIR):
|
|
for f in files:
|
|
if f.endswith((".yaml", ".yml", ".json")):
|
|
full = os.path.join(root, f)
|
|
rel = os.path.relpath(full, WORKLOADS_DIR)
|
|
workloads.append(rel)
|
|
return {"workloads": workloads}
|
|
|
|
|
|
@app.get("/workloads/read/{filename:path}")
|
|
def read_workload(filename: str):
|
|
path = os.path.join(WORKLOADS_DIR, filename)
|
|
if not os.path.exists(path):
|
|
raise HTTPException(404)
|
|
with open(path, 'r') as f:
|
|
content = f.read()
|
|
return {"filename": filename, "content": content}
|
|
|
|
|
|
@app.post("/workloads/save-file")
|
|
def save_workload_file(data: dict):
|
|
path = data.get("path")
|
|
content = data.get("content")
|
|
full_path = os.path.join(WORKLOADS_DIR, path)
|
|
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
|
with open(full_path, "w") as f:
|
|
f.write(content)
|
|
return {"status": "success"}
|
|
|
|
|
|
@app.post("/workloads/deploy/{filename:path}")
|
|
def deploy_workload(filename: str):
|
|
path = os.path.join(WORKLOADS_DIR, filename)
|
|
with open(path, 'r') as f:
|
|
yaml_content = f.read()
|
|
url = f"{PODMAN_API_BASE}/libpod/kube/play"
|
|
return _podman_post(url, data=yaml_content).json()
|
|
|
|
|
|
# --- FILE RESTRICTIONS ---
|
|
def safe_join(base, path):
|
|
# prevent traversal
|
|
base = os.path.abspath(base)
|
|
final = os.path.abspath(os.path.join(base, path))
|
|
if not final.startswith(base):
|
|
raise HTTPException(status_code=403, detail="Forbidden path")
|
|
return final
|
|
|
|
|
|
# STEP 4: Centralize WORKLOADS_DIR subtree enforcement via one helper.
|
|
# MUST be behavior-identical to previous safe_join(WORKLOADS_DIR, ...) calls.
|
|
def _files_safe_join(path: str) -> str:
|
|
return safe_join(WORKLOADS_DIR, path)
|
|
|
|
|
|
# --- FILES API ---
|
|
@app.get("/files/tree")
|
|
def file_tree():
|
|
root = WORKLOADS_DIR
|
|
result = []
|
|
for dirpath, dirnames, filenames in os.walk(root):
|
|
rel = os.path.relpath(dirpath, root)
|
|
if rel == ".":
|
|
rel = ""
|
|
result.append({
|
|
"path": rel,
|
|
"dirs": sorted(dirnames),
|
|
"files": sorted(filenames),
|
|
})
|
|
return result
|
|
|
|
|
|
@app.get("/files/read")
|
|
def file_read(path: str = Query(...)):
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
if os.path.isdir(full):
|
|
raise HTTPException(status_code=403, detail="Is a directory")
|
|
with open(full, "r") as f:
|
|
content = f.read()
|
|
return {"content": content}
|
|
|
|
|
|
@app.post("/files/save")
|
|
def file_save(path: str = Query(...), data: FileContent = None):
|
|
full = _files_safe_join(path)
|
|
os.makedirs(os.path.dirname(full), exist_ok=True)
|
|
with open(full, "w") as f:
|
|
f.write(data.content)
|
|
return {"status": "success", "path": path}
|
|
|
|
|
|
@app.delete("/files/delete")
|
|
def file_delete(path: str = Query(...)):
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
if os.path.isdir(full):
|
|
raise HTTPException(status_code=400, detail="Kan niet verwijderen: is directory")
|
|
try:
|
|
os.remove(full)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Kan niet verwijderen: {e}")
|
|
return {"status": "deleted", "type": "file"}
|
|
|
|
|
|
@app.post("/files/mkdir")
|
|
def file_mkdir(path: str = Query(...)):
|
|
# UI expects operations under systemd/; enforce prefix if absent.
|
|
if not path.startswith("systemd"):
|
|
path = os.path.join("systemd", path)
|
|
full = _files_safe_join(path)
|
|
os.makedirs(full, exist_ok=True)
|
|
return {"status": "directory created", "path": path}
|
|
|
|
|
|
@app.delete("/files/rmdir")
|
|
def file_rmdir(path: str = Query(..., description="Directory path under systemd/")):
|
|
# Only allow deletion under systemd subtree
|
|
if not path or path == "systemd" or path == "systemd/":
|
|
raise HTTPException(status_code=400, detail="Refusing to delete systemd root")
|
|
if not path.startswith("systemd/") and path != "systemd":
|
|
raise HTTPException(status_code=400, detail="Only systemd subtree is allowed")
|
|
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Directory not found")
|
|
if not os.path.isdir(full):
|
|
raise HTTPException(status_code=400, detail="Path is not a directory")
|
|
|
|
# directory must be empty
|
|
try:
|
|
Path(full).rmdir()
|
|
except OSError:
|
|
# not empty
|
|
# build a stable detail payload
|
|
try:
|
|
dirs = []
|
|
files = []
|
|
for entry in os.listdir(full):
|
|
p = os.path.join(full, entry)
|
|
if os.path.isdir(p):
|
|
dirs.append(entry)
|
|
else:
|
|
files.append(entry)
|
|
except Exception:
|
|
dirs, files = [], []
|
|
raise HTTPException(status_code=409, detail={
|
|
"error": "directory not empty",
|
|
"dirs": sorted(dirs),
|
|
"files": sorted(files),
|
|
})
|
|
|
|
return {"deleted": True, "path": path}
|
|
|
|
|
|
# --- PODS / CONTAINERS ---
|
|
@app.get("/pods")
|
|
def list_pods():
|
|
# Cruciaal: ?all=true zorgt dat EXIT_STATE pods ook getoond worden
|
|
url = f"{PODMAN_API_BASE}/libpod/pods/json?all=true"
|
|
return _podman_get_json(url)
|
|
|
|
|
|
@app.post("/actions/{action}/{name}")
|
|
def take_action(action: str, name: str):
|
|
# Legacy endpoint (keep behavior)
|
|
possible_names = [name, f"pod{name}", f"pod-{name}"]
|
|
|
|
if action == "start":
|
|
# STAP 1: Probeer direct de pod te starten (de 'Cockpit' methode)
|
|
for target in possible_names:
|
|
res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/start")
|
|
if res.status_code in (200, 204):
|
|
return {"status": "started", "target": target, "method": "direct"}
|
|
|
|
# STAP 2: Als direct starten faalt, probeer dan YAML opnieuw te deployen
|
|
target_path = None
|
|
for ext in (".yaml", ".yml"):
|
|
cand = os.path.join(WORKLOADS_DIR, f"{name}{ext}")
|
|
if os.path.exists(cand):
|
|
target_path = cand
|
|
break
|
|
|
|
if target_path:
|
|
with open(target_path, 'r') as file:
|
|
yaml_content = file.read()
|
|
res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content)
|
|
|
|
# SPECIALE CASE: Pod bestaat al, forceer dan restart
|
|
if res.status_code == 500 and "already exists" in res.text:
|
|
print(f"DEBUG: Forceer herstart voor {name} wegens conflict")
|
|
for target in possible_names:
|
|
_podman_delete(f"{PODMAN_API_BASE}/libpod/pods/{target}?force=true")
|
|
# Probeer het nu opnieuw
|
|
retry_res = _podman_post(f"{PODMAN_API_BASE}/libpod/kube/play", data=yaml_content)
|
|
return retry_res.json()
|
|
|
|
return res.json()
|
|
|
|
return {"status": "unknown", "method": "no_yaml_found"}
|
|
|
|
if action == "stop":
|
|
for target in possible_names:
|
|
res = _podman_post(f"{PODMAN_API_BASE}/libpod/pods/{target}/stop")
|
|
if res.status_code in (200, 204):
|
|
return {"status": "stopped", "target": target}
|
|
return {"status": "not found"}
|
|
|
|
return {"status": "unknown"}
|
|
|
|
|
|
# --- DASHBOARD HELPERS (contract-neutral, no ordering/sorting changes) ---
|
|
|
|
def _build_pod_to_containers_map(containers: list):
|
|
# preserves original order of containers processing; no sorting added
|
|
pod_to_containers = {}
|
|
for c in containers:
|
|
pod_name = c.get("PodName") or ""
|
|
if pod_name:
|
|
pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0])
|
|
return pod_to_containers
|
|
|
|
|
|
def _map_pod_to_unit(podname: str) -> str | None:
|
|
"""
|
|
HOTFIX 3.1 FIX 1:
|
|
If podname starts with "pod", map to <rest>.service (e.g. podmediaserver -> mediaserver.service)
|
|
Else: <podname>.service
|
|
"""
|
|
if not podname:
|
|
return None
|
|
if podname.startswith("pod"):
|
|
return f"{podname[3:]}.service"
|
|
return f"{podname}.service"
|
|
|
|
|
|
def _append_podman_pods_dashboard_rows(dashboard: list, api_pods: list, pod_to_containers: dict):
|
|
# preserves original api_pods iteration order
|
|
for p in api_pods:
|
|
name = p.get("Name")
|
|
status = p.get("Status", "unknown")
|
|
unit = _map_pod_to_unit(name) if name else ""
|
|
dashboard.append({
|
|
"Name": name,
|
|
"Status": status,
|
|
"Containers": pod_to_containers.get(name, []),
|
|
"Unit": unit,
|
|
"Source": "podman",
|
|
})
|
|
|
|
|
|
def _append_defined_pods_dashboard_rows(dashboard: list, by_name: dict, root_dir: str):
|
|
# preserves original os.walk order and file iteration order
|
|
for root, _, files in os.walk(root_dir):
|
|
for f in files:
|
|
if f.endswith((".yaml", ".yml")):
|
|
base = os.path.splitext(os.path.basename(f))[0]
|
|
pod_name = f"pod{base}"
|
|
unit_name = _map_pod_to_unit(pod_name)
|
|
|
|
if pod_name not in by_name:
|
|
code, out = _systemctl(["systemctl", "--user", "is-active", unit_name])
|
|
status = (out or "").strip() or ("active" if code == 0 else "inactive")
|
|
dashboard.append({
|
|
"Name": pod_name,
|
|
"Status": status,
|
|
"Containers": [],
|
|
"Unit": unit_name,
|
|
"Source": "systemd",
|
|
})
|
|
|
|
|
|
def _ensure_container_status_field(container: dict):
|
|
# keep exact existing defaulting behavior
|
|
if "Status" not in container:
|
|
container["Status"] = container.get("State", "")
|
|
|
|
|
|
def _make_defined_container_dashboard_row(name: str, relpath: str):
|
|
# keep exact key set and default values as before
|
|
return {
|
|
"Names": [name],
|
|
"Image": "",
|
|
"State": "",
|
|
"Status": "",
|
|
"Ports": [],
|
|
"PodName": "",
|
|
"_dashboard_source": "systemd",
|
|
"_dashboard_unit": f"{name}.service",
|
|
"_dashboard_def_path": relpath,
|
|
"_dashboard_cpu": None,
|
|
"_dashboard_mem_usage": None,
|
|
"_dashboard_mem_perc": None,
|
|
}
|
|
|
|
|
|
def _legacy_dashboard_item_from_container(c: dict):
|
|
# Keep exact keys & defaults as before
|
|
return {
|
|
"name": (c.get("Names") or ["?"])[0],
|
|
"status": c.get("Status") or c.get("State") or "",
|
|
"path": "",
|
|
"ip": "",
|
|
"containers": [],
|
|
}
|
|
|
|
|
|
@app.get("/pods-dashboard")
|
|
def pods_dashboard():
|
|
dashboard = []
|
|
|
|
# 0) Bouw mapping: pod_name -> [container_names...]
|
|
containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true")
|
|
pod_to_containers = _build_pod_to_containers_map(containers)
|
|
|
|
# 1) A) echte pods
|
|
api_pods = _podman_get_json(f"{PODMAN_API_BASE}/libpod/pods/json?all=true")
|
|
by_name = {p.get("Name"): p for p in api_pods}
|
|
|
|
_append_podman_pods_dashboard_rows(dashboard, api_pods, pod_to_containers)
|
|
|
|
# 1) B) defined pods via workloads scan
|
|
# Based on YAML files in WORKLOADS_DIR; show even if not running.
|
|
_append_defined_pods_dashboard_rows(dashboard, by_name, WORKLOADS_DIR)
|
|
|
|
return dashboard
|
|
|
|
|
|
def _systemd_then_podman(systemd_callable, podman_callable):
|
|
systemd_res = systemd_callable()
|
|
if systemd_res is not None:
|
|
if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0:
|
|
return systemd_res
|
|
return podman_callable(systemd_res)
|
|
return podman_callable(None)
|
|
|
|
|
|
def try_systemd_pod_action(action: str, podname: str):
|
|
# If systemd unit exists/allowed, prefer it.
|
|
unit = _map_pod_to_unit(podname)
|
|
if not unit:
|
|
return None
|
|
code, out = _systemctl(["systemctl", "--user", action, unit])
|
|
return {
|
|
"method": "systemd",
|
|
"pod": podname,
|
|
"unit": unit,
|
|
"cmd": f"systemctl --user {action} {unit}",
|
|
"exit": code,
|
|
"output": out,
|
|
}
|
|
|
|
|
|
@app.post("/pods/actions/{action}/{podname}")
|
|
def pod_action_prefer_systemd(action: str, podname: str):
|
|
if action not in ("start", "stop", "restart"):
|
|
return {"error": "Invalid action"}, 400
|
|
|
|
def _systemd_call():
|
|
return try_systemd_pod_action(action, podname)
|
|
|
|
def _podman_call(systemd_res):
|
|
if systemd_res:
|
|
note = "systemd failed; falling back to podman"
|
|
podman = _podman_action_post("pods", podname, action).json()
|
|
return {"method": "systemd_then_podman", "note": note, "systemd": systemd_res, "podman": podman}
|
|
return {"method": "podman", "result": _podman_action_post("pods", podname, action).json()}
|
|
|
|
return _systemd_then_podman(_systemd_call, _podman_call)
|
|
|
|
|
|
def find_defined_containers():
|
|
defined = {}
|
|
for root, _, files in os.walk(os.path.join(WORKLOADS_DIR, "systemd")):
|
|
for f in files:
|
|
if f.endswith(".container"):
|
|
name = os.path.splitext(f)[0]
|
|
full = os.path.join(root, f)
|
|
rel = os.path.relpath(full, WORKLOADS_DIR)
|
|
defined[name] = rel
|
|
return defined
|
|
|
|
def _extract_published_ports(container: dict) -> list[str]:
|
|
"""
|
|
Normalize Podman API Ports into a stable display list:
|
|
- "127.0.0.1:8080:8000/tcp"
|
|
- "8080:8000/tcp" (if no host ip)
|
|
"""
|
|
out: list[str] = []
|
|
for p in (container.get("Ports") or []):
|
|
host_ip = p.get("host_ip") or p.get("HostIp") or ""
|
|
host_port = p.get("host_port") or p.get("HostPort")
|
|
cont_port = p.get("container_port") or p.get("ContainerPort")
|
|
proto = p.get("protocol") or p.get("Protocol") or ""
|
|
|
|
if host_port is None or cont_port is None:
|
|
continue
|
|
|
|
s = ""
|
|
if host_ip:
|
|
s += f"{host_ip}:"
|
|
s += f"{host_port}:{cont_port}"
|
|
if proto:
|
|
s += f"/{proto}"
|
|
out.append(s)
|
|
|
|
return out
|
|
|
|
@app.get("/containers-dashboard")
|
|
def containers_dashboard():
|
|
dashboard = []
|
|
defined = find_defined_containers()
|
|
|
|
# Cache zodat we niet voor elke container opnieuw systemctl doen
|
|
unit_active_cache = {}
|
|
|
|
stats_by_name = _STATS_CACHE_BY_NAME
|
|
|
|
def _unit_is_active(unit):
|
|
if not unit:
|
|
return False
|
|
if unit in unit_active_cache:
|
|
return unit_active_cache[unit]
|
|
code, out = _systemctl(["systemctl", "--user", "is-active", unit])
|
|
ok = (code == 0) or ((out or "").strip() == "active")
|
|
unit_active_cache[unit] = ok
|
|
return ok
|
|
|
|
# A) echte containers (runtime)
|
|
real = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true")
|
|
for c in real:
|
|
_ensure_container_status_field(c)
|
|
|
|
# Published ports: behoud jouw hotfix
|
|
c["_dashboard_published_ports"] = _extract_published_ports(c)
|
|
|
|
# Normaliseer naam: Podman kan "/name" geven
|
|
rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/")
|
|
|
|
# Optional live stats (always present; null on miss)
|
|
c["_dashboard_cpu"] = None
|
|
c["_dashboard_mem_usage"] = None
|
|
c["_dashboard_mem_perc"] = None
|
|
st = stats_by_name.get(rname)
|
|
if isinstance(st, dict):
|
|
c["_dashboard_cpu"] = st.get("cpu")
|
|
c["_dashboard_mem_usage"] = st.get("mem_usage")
|
|
c["_dashboard_mem_perc"] = st.get("mem_perc")
|
|
|
|
# 1) Managed: systemd als er een .container definitie bestaat
|
|
if rname in defined:
|
|
c["_dashboard_source"] = "systemd"
|
|
c["_dashboard_unit"] = f"{rname}.service"
|
|
c["_dashboard_def_path"] = defined[rname]
|
|
else:
|
|
# 2) Extra: zit container in een pod die via systemd (kube/quadlet) draait?
|
|
podname = (c.get("PodName") or "").strip()
|
|
pod_unit = _map_pod_to_unit(podname) if podname else None
|
|
|
|
if pod_unit and _unit_is_active(pod_unit):
|
|
c["_dashboard_source"] = "systemd"
|
|
c["_dashboard_unit"] = pod_unit
|
|
# geen _dashboard_def_path, want dit is geen .container definitie
|
|
else:
|
|
c["_dashboard_source"] = "podman"
|
|
|
|
dashboard.append(c)
|
|
|
|
# B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures)
|
|
runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real)
|
|
|
|
# C) defined containers from systemd/*.container (skip duplicates)
|
|
for name, relpath in defined.items():
|
|
if name in runtime_names:
|
|
continue
|
|
row = _make_defined_container_dashboard_row(name, relpath)
|
|
code, out = _systemctl(["systemctl", "--user", "is-active", f"{name}.service"])
|
|
row["Status"] = (out or "").strip()
|
|
dashboard.append(row)
|
|
|
|
return dashboard
|
|
|
|
|
|
@app.get("/containers")
|
|
def list_containers():
|
|
# Ook hier ?all=true voor gestopte containers
|
|
url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true"
|
|
return _podman_get_json(url)
|
|
|
|
|
|
@app.post("/containers/{action}/{name}")
|
|
def container_action(action: str, name: str):
|
|
if action not in ("start", "stop", "restart"):
|
|
return {"error": "Invalid action"}, 400
|
|
|
|
defined = find_defined_containers()
|
|
_sys = {"code": None, "out": None}
|
|
|
|
def _systemd_call():
|
|
if name in defined:
|
|
code, out = _systemctl(["systemctl", "--user", action, name])
|
|
_sys["code"] = code
|
|
_sys["out"] = out
|
|
if code == 0:
|
|
return {
|
|
"method": "systemd",
|
|
"name": name,
|
|
"unit": f"{name}.service",
|
|
"definition": defined[name],
|
|
"cmd": f"systemctl --user {action} {name}",
|
|
"exit": code,
|
|
"output": out,
|
|
}
|
|
return {"exit": code, "output": out}
|
|
return None
|
|
|
|
def _podman_call(systemd_res):
|
|
res = _podman_action_post("containers", name, action)
|
|
if res.status_code in (200, 204):
|
|
return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code}
|
|
|
|
if res.status_code >= 400:
|
|
return {
|
|
"method": "podman",
|
|
"name": name,
|
|
"cmd": f"podman {action} {name}",
|
|
"status_code": res.status_code,
|
|
"error": getattr(res, "text", "") or "",
|
|
}, res.status_code
|
|
|
|
if name in defined:
|
|
return {
|
|
"method": "systemd",
|
|
"name": name,
|
|
"unit": f"{name}.service",
|
|
"definition": defined[name],
|
|
"cmd": f"systemctl --user {action} {name}",
|
|
"exit": _sys["code"],
|
|
"output": _sys["out"],
|
|
}
|
|
|
|
return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code}
|
|
|
|
return _systemd_then_podman(_systemd_call, _podman_call)
|
|
|
|
|
|
@app.get("/debug/defined-containers")
|
|
def debug_defined_containers():
|
|
return find_defined_containers()
|
|
|
|
|
|
@app.get("/dashboard")
|
|
def get_dashboard():
|
|
# Legacy dashboard view (keep shape)
|
|
try:
|
|
api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true")
|
|
except:
|
|
api_containers = []
|
|
items = []
|
|
for c in api_containers:
|
|
items.append(_legacy_dashboard_item_from_container(c))
|
|
return items
|
|
|
|
|
|
@app.get("/test-hybrid")
|
|
def test_hybrid():
|
|
# 1. Check filesystem
|
|
try:
|
|
bestanden = []
|
|
for root, _, files in os.walk(WORKLOADS_DIR):
|
|
for f in files:
|
|
bestanden.append(os.path.join(root, f))
|
|
except Exception as e:
|
|
bestanden = f"FS Fout: {str(e)}"
|
|
|
|
# 2. Check Podman API
|
|
try:
|
|
api_containers = _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/json?all=true")
|
|
except Exception as e:
|
|
api_containers = f"API Fout: {str(e)}"
|
|
|
|
return {
|
|
"bestanden_gevonden": bestanden if isinstance(bestanden, list) else [],
|
|
"api_containers_aantal": len(api_containers) if isinstance(api_containers, list) else -1,
|
|
"api_raw_sample": api_containers[0] if isinstance(api_containers, list) and api_containers else api_containers,
|
|
}
|
|
|
|
|
|
@app.get("/containers/logs/{name}")
|
|
def get_container_logs(name: str):
|
|
# We vragen de laatste 100 regels op (tail=100)
|
|
txt = _podman_get_text(f"{PODMAN_API_BASE}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100")
|
|
# Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst
|
|
return {"logs": txt}
|
|
|
|
|
|
@app.get("/containers/inspect/{name}")
|
|
def inspect_container(name: str):
|
|
return _podman_get_json(f"{PODMAN_API_BASE}/libpod/containers/{name}/json")
|
|
|
|
|
|
# --- SYSTEMD allowlist ---
|
|
def read_allowlist():
|
|
units = []
|
|
if os.path.exists(ALLOWLIST_FILE):
|
|
with open(ALLOWLIST_FILE, "r") as f:
|
|
for line in f:
|
|
u = line.strip()
|
|
if u and u.endswith(".service"):
|
|
units.append(u)
|
|
return sorted(set(units))
|
|
|
|
|
|
def list_unit_files():
|
|
# fallback (als allowlist leeg is): probeer systemctl list-unit-files
|
|
code, out = _systemctl(["systemctl", "--user", "list-unit-files", "--type=service", "--no-pager"])
|
|
if code != 0:
|
|
return []
|
|
units = []
|
|
for line in out.splitlines():
|
|
parts = line.split()
|
|
if parts and parts[0].endswith(".service"):
|
|
units.append(parts[0])
|
|
return sorted(set(units))
|
|
|
|
|
|
def unit_state(unit):
|
|
# active state
|
|
_, active = _systemctl(["systemctl", "--user", "is-active", unit])
|
|
active = active.splitlines()[0].strip() if active else "unknown"
|
|
# enabled state (kan falen in container-context)
|
|
code, enabled_out = _systemctl(["systemctl", "--user", "is-enabled", unit])
|
|
enabled = enabled_out.splitlines()[0].strip() if (enabled_out and code == 0) else "unknown"
|
|
return active, enabled
|
|
|
|
|
|
@app.get("/systemd/allowlist")
|
|
def systemd_allowlist():
|
|
units = read_allowlist()
|
|
allow_mode = len(units) > 0
|
|
if not units:
|
|
units = list_unit_files()
|
|
return {"allow_mode": allow_mode, "units": units}
|
|
|
|
|
|
@app.post("/daemon-reload")
|
|
def api_daemon_reload():
|
|
try:
|
|
code, out = _systemctl(["systemctl", "--user", "daemon-reload"])
|
|
return {
|
|
"cmd": "systemctl --user daemon-reload",
|
|
"exit": code,
|
|
"output": out,
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/{action}/{unit}")
|
|
def api_action(action: str, unit: str):
|
|
if action not in ("status", "start", "stop", "restart"):
|
|
raise HTTPException(status_code=400, detail="Invalid action")
|
|
|
|
units = read_allowlist()
|
|
allow_mode = len(units) > 0
|
|
if allow_mode and unit not in units:
|
|
raise HTTPException(status_code=403, detail="Unit not allowed by allowlist")
|
|
|
|
cmd = ["systemctl", "--user", action, unit]
|
|
code, out = _run_systemctl_action(action, unit)
|
|
return {"cmd": " ".join(cmd), "exit": code, "output": out}
|
|
|
|
|
|
@app.post("/api/<action>/<unit>")
|
|
def legacy_api_action(action: str, unit: str):
|
|
# legacy flask-like path; keep behavior (even if not used by index.html)
|
|
if action not in ("status", "start", "stop", "restart"):
|
|
return {"error": "Invalid action"}, 400
|
|
cmd = ["systemctl", "--user", action, unit]
|
|
code, out = _run_systemctl_action(action, unit)
|
|
return {"cmd": " ".join(cmd), "exit": code, "output": out}
|
|
|
|
|
|
def run(cmd):
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
|
|
output = (result.stdout or "") + (result.stderr or "")
|
|
return result.returncode, output.strip()
|
|
except Exception as e:
|
|
return 1, str(e)
|
|
|
|
# ENDPOINT TOEGEVOEGD NA CHATGPT
|
|
|
|
@app.get("/containers/stats/stream")
|
|
async def containers_stats_stream(interval: float = 2.0):
|
|
"""
|
|
SSE stream met periodieke container stats.
|
|
Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast.
|
|
"""
|
|
# Guardrails tegen misbruik
|
|
if interval < 0.5:
|
|
interval = 0.5
|
|
if interval > 30:
|
|
interval = 30
|
|
|
|
stats_url = f"{PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false"
|
|
|
|
async def event_gen():
|
|
try:
|
|
while True:
|
|
# timeout zodat een haperende podman socket niet je stream “bevriest”
|
|
try:
|
|
data = SESSION.get(stats_url, timeout=5).json()
|
|
except Exception as e:
|
|
data = {"Error": str(e), "Stats": []}
|
|
|
|
payload = {
|
|
"ts": int(__import__("time").time()),
|
|
"data": data,
|
|
}
|
|
|
|
yield "event: stats\n"
|
|
yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n"
|
|
await asyncio.sleep(interval)
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no", # helpt bij proxies
|
|
}
|
|
return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers)
|
|
|
|
|
|
# --- PODMAN NETWORKS (nieuw) ---
|
|
def _podman_get_json_checked(url: str):
|
|
r = SESSION.get(url)
|
|
if r.status_code >= 400:
|
|
raise HTTPException(status_code=502, detail=f"Podman API fout {r.status_code}: {r.text}")
|
|
try:
|
|
return r.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=502, detail=f"Podman API gaf geen JSON terug: {r.text[:2000]}")
|
|
|
|
|
|
def _deep_get(d, path, default=None):
|
|
cur = d
|
|
for key in path:
|
|
if not isinstance(cur, dict) or key not in cur:
|
|
return default
|
|
cur = cur[key]
|
|
return cur
|
|
|
|
|
|
@app.get("/networks")
|
|
def list_networks():
|
|
# Libpod: /libpod/networks/json
|
|
url = f"{PODMAN_API_BASE}/libpod/networks/json"
|
|
return {"networks": _podman_get_json_checked(url)}
|
|
|
|
|
|
@app.get("/networks/meta")
|
|
def networks_meta():
|
|
candidates = [
|
|
f"{PODMAN_API_BASE}/libpod/info",
|
|
f"{PODMAN_API_BASE}/libpod/info/json",
|
|
f"{PODMAN_API_BASE}/info",
|
|
f"{PODMAN_API_BASE}/info/json",
|
|
f"{PODMAN_API_BASE}/libpod/system/info",
|
|
f"{PODMAN_API_BASE}/libpod/system/info/json",
|
|
]
|
|
|
|
last_err = None
|
|
info = None
|
|
used = None
|
|
|
|
for url in candidates:
|
|
r = SESSION.get(url)
|
|
if r.status_code == 200:
|
|
used = url
|
|
try:
|
|
info = r.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=502, detail=f"Podman info endpoint gaf geen JSON terug: {url}")
|
|
break
|
|
last_err = f"{r.status_code}: {r.text}"
|
|
|
|
if info is None:
|
|
raise HTTPException(status_code=502, detail=f"Podman info endpoint niet gevonden. Laatste fout: {last_err}")
|
|
|
|
network_backend = (
|
|
_deep_get(info, ["host", "networkBackend"]) or
|
|
_deep_get(info, ["Host", "NetworkBackend"]) or
|
|
_deep_get(info, ["host", "network", "backend"]) or
|
|
_deep_get(info, ["Host", "Network", "Backend"])
|
|
)
|
|
|
|
rootless = (
|
|
_deep_get(info, ["host", "rootless"]) or
|
|
_deep_get(info, ["Host", "Rootless"]) or
|
|
_deep_get(info, ["host", "security", "rootless"]) or
|
|
_deep_get(info, ["Host", "Security", "Rootless"])
|
|
)
|
|
if not isinstance(rootless, bool):
|
|
rootless = None
|
|
|
|
return {
|
|
"networkBackend": network_backend,
|
|
"rootless": rootless,
|
|
"infoEndpoint": used,
|
|
}
|
|
|
|
@app.get("/networks/usage")
|
|
def networks_usage():
|
|
"""
|
|
Bouwt mapping netwerk -> containers/pods, en container -> netwerken.
|
|
Werkt betrouwbaar ook als network inspect geen containers toont.
|
|
"""
|
|
# 1) containers list (all=true)
|
|
url = f"{PODMAN_API_BASE}/libpod/containers/json?all=true"
|
|
containers = _podman_get_json_checked(url) or []
|
|
|
|
by_network: dict[str, dict] = {}
|
|
by_container: dict[str, list[str]] = {}
|
|
by_container_meta: dict[str, dict] = {}
|
|
|
|
def _norm_name(c: dict) -> str:
|
|
# Podman kan Names (list) of Name (string) geven.
|
|
n = c.get("Name")
|
|
if isinstance(n, str) and n:
|
|
return n
|
|
names = c.get("Names")
|
|
if isinstance(names, list) and names:
|
|
# vaak begint dit met "/name"
|
|
nm = str(names[0]).lstrip("/")
|
|
return nm
|
|
# fallback id
|
|
cid = c.get("Id") or c.get("id") or ""
|
|
return cid[:12] if cid else "(unknown)"
|
|
|
|
def _norm_id(c: dict) -> str:
|
|
return c.get("Id") or c.get("id") or ""
|
|
|
|
def _pod_name(c: dict) -> str | None:
|
|
# Verschilt per output; we proberen een paar logische keys
|
|
for k in ("PodName", "pod", "Pod", "PodID", "PodId"):
|
|
v = c.get(k)
|
|
if isinstance(v, str) and v and v != "": # PodID is geen naam, maar beter dan niks
|
|
return v
|
|
return None
|
|
|
|
def _extract_networks_from_summary(c: dict) -> list[str] | None:
|
|
# Mogelijke structuren in list output
|
|
nets = c.get("Networks")
|
|
if isinstance(nets, dict):
|
|
return list(nets.keys())
|
|
if isinstance(nets, list):
|
|
return [str(x) for x in nets if x]
|
|
|
|
ns = c.get("NetworkSettings")
|
|
if isinstance(ns, dict):
|
|
nets2 = ns.get("Networks")
|
|
if isinstance(nets2, dict):
|
|
return list(nets2.keys())
|
|
|
|
# Sommige builds hebben NetworkNames
|
|
nn = c.get("NetworkNames")
|
|
if isinstance(nn, list):
|
|
return [str(x) for x in nn if x]
|
|
|
|
return None
|
|
|
|
def _extract_networks_from_inspect_obj(insp: dict) -> list[str]:
|
|
"""
|
|
Probeert netwerk-namen uit een container inspect te halen.
|
|
Ondersteunt varianten/casing die per Podman/driver kunnen verschillen.
|
|
"""
|
|
if not isinstance(insp, dict):
|
|
return []
|
|
|
|
candidates = []
|
|
|
|
# 1) meest voorkomend
|
|
ns = insp.get("NetworkSettings")
|
|
if isinstance(ns, dict):
|
|
candidates.append(ns.get("Networks"))
|
|
candidates.append(ns.get("networks"))
|
|
|
|
# 2) sommige outputs hebben Networks top-level
|
|
candidates.append(insp.get("Networks"))
|
|
candidates.append(insp.get("networks"))
|
|
|
|
# 3) extra varianten
|
|
n2 = insp.get("networkSettings")
|
|
if isinstance(n2, dict):
|
|
candidates.append(n2.get("Networks"))
|
|
candidates.append(n2.get("networks"))
|
|
|
|
n3 = insp.get("Network")
|
|
if isinstance(n3, dict):
|
|
candidates.append(n3.get("Networks"))
|
|
candidates.append(n3.get("networks"))
|
|
|
|
cfg = insp.get("Config")
|
|
if isinstance(cfg, dict):
|
|
candidates.append(cfg.get("Networks"))
|
|
candidates.append(cfg.get("networks"))
|
|
|
|
# Normaliseer candidates naar lijst[str]
|
|
out: list[str] = []
|
|
for val in candidates:
|
|
if isinstance(val, dict):
|
|
out.extend([str(k) for k in val.keys() if k])
|
|
elif isinstance(val, list):
|
|
for x in val:
|
|
if isinstance(x, str) and x:
|
|
out.append(x)
|
|
elif isinstance(x, dict):
|
|
# Best-effort: soms bevat list entries met Name
|
|
nm = x.get("Name") or x.get("name")
|
|
if isinstance(nm, str) and nm:
|
|
out.append(nm)
|
|
|
|
# uniq + stable sort
|
|
return sorted(set([n for n in out if isinstance(n, str) and n]))
|
|
|
|
def _extract_networks_from_inspect(cid: str) -> tuple[list[str], dict]:
|
|
"""
|
|
Returns: (networks, extra_info)
|
|
extra_info kan bv. networkMode/containerOwner bevatten.
|
|
"""
|
|
if not cid:
|
|
return ([], {})
|
|
|
|
insp = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{cid}/json")
|
|
extra: dict = {}
|
|
|
|
# 1) normale inspect: probeer meerdere paden
|
|
nets0 = _extract_networks_from_inspect_obj(insp)
|
|
if nets0:
|
|
return (nets0, extra)
|
|
|
|
# 2) container network namespace mode: HostConfig.NetworkMode = "container:<id>"
|
|
hc = insp.get("HostConfig") if isinstance(insp, dict) else None
|
|
if isinstance(hc, dict):
|
|
nm = hc.get("NetworkMode")
|
|
if isinstance(nm, str) and nm.startswith("container:"):
|
|
owner_id = nm.split("container:", 1)[1]
|
|
extra["networkMode"] = nm
|
|
extra["networkOwnerId"] = owner_id
|
|
# Inspect owner container en pak diens netwerken
|
|
owner = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_id}/json")
|
|
|
|
# 1) netwerken van owner vinden (meerdere varianten)
|
|
owner_nets_list = _extract_networks_from_inspect_obj(owner)
|
|
|
|
# 2) owner naam vinden (meerdere varianten)
|
|
owner_name = None
|
|
|
|
# meest voorkomend bij inspect
|
|
if isinstance(owner.get("Name"), str) and owner.get("Name"):
|
|
owner_name = owner.get("Name")
|
|
|
|
# fallback: soms staat het in Config.Name
|
|
if not owner_name:
|
|
cfg = owner.get("Config") or {}
|
|
if isinstance(cfg.get("Name"), str) and cfg.get("Name"):
|
|
owner_name = cfg.get("Name")
|
|
|
|
# fallback: soms in ContainerConfig
|
|
if not owner_name:
|
|
ccfg = owner.get("ContainerConfig") or {}
|
|
if isinstance(ccfg.get("Name"), str) and ccfg.get("Name"):
|
|
owner_name = ccfg.get("Name")
|
|
|
|
# fallback: als niets werkt, toon korte id
|
|
if not owner_name:
|
|
owner_name = owner_id[:12]
|
|
|
|
extra["networkOwnerName"] = str(owner_name).lstrip("/")
|
|
|
|
# 3) netwerken returnen (als we ze gevonden hebben)
|
|
if owner_nets_list:
|
|
return (owner_nets_list, extra)
|
|
|
|
# Extra fallback: probeer inspect via ownerName (soms werkt naam beter dan id)
|
|
try:
|
|
owner_name_for_lookup = extra.get("networkOwnerName")
|
|
if owner_name_for_lookup and owner_name_for_lookup != owner_id:
|
|
owner2 = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/containers/{owner_name_for_lookup}/json")
|
|
owner2_nets = _extract_networks_from_inspect_obj(owner2)
|
|
if owner2_nets:
|
|
return (owner2_nets, extra)
|
|
except Exception:
|
|
pass
|
|
|
|
# Owner fallback: sommige infra containers gebruiken pasta/host/none
|
|
try:
|
|
ohc = owner.get("HostConfig") if isinstance(owner, dict) else None
|
|
if isinstance(ohc, dict):
|
|
onm = ohc.get("NetworkMode")
|
|
if isinstance(onm, str) and onm in ("pasta", "host", "none"):
|
|
# owner gebruikt geen Podman-netwerk; behandel als pseudo-netwerk
|
|
return ([onm], extra)
|
|
except Exception:
|
|
pass
|
|
|
|
return ([], extra)
|
|
|
|
# 3) Special networking modes: pasta/host/none
|
|
# In deze modes bestaat vaak geen NetworkSettings.Networks map.
|
|
if isinstance(hc, dict):
|
|
nm2 = hc.get("NetworkMode")
|
|
if isinstance(nm2, str) and nm2 in ("pasta", "host", "none"):
|
|
extra["networkMode"] = nm2
|
|
return ([nm2], extra)
|
|
|
|
return ([], extra)
|
|
|
|
# 2) Loop containers: verzamel netwerken
|
|
for c in containers:
|
|
if not isinstance(c, dict):
|
|
continue
|
|
|
|
cid = _norm_id(c)
|
|
cname = _norm_name(c)
|
|
pod = _pod_name(c)
|
|
|
|
nets = _extract_networks_from_summary(c)
|
|
extra = {}
|
|
|
|
if not nets:
|
|
nets, extra = _extract_networks_from_inspect(cid)
|
|
|
|
by_container_meta[cname] = extra
|
|
|
|
nets = [n for n in (nets or []) if isinstance(n, str) and n]
|
|
|
|
# byContainer blijft lijst (contract simpel houden)
|
|
by_container[cname] = sorted(set(nets))
|
|
|
|
for n in nets:
|
|
slot = by_network.setdefault(n, {"containers": [], "pods": []})
|
|
slot["containers"].append({
|
|
"id": cid,
|
|
"name": cname,
|
|
"pod": pod,
|
|
**extra, # voegt networkMode/owner info toe indien van toepassing
|
|
})
|
|
|
|
# 3) Pods afleiden (lightweight) via containers
|
|
for n, slot in by_network.items():
|
|
pods = sorted({c.get("pod") for c in slot["containers"] if isinstance(c.get("pod"), str) and c.get("pod")})
|
|
slot["pods"] = [{"name": p} for p in pods]
|
|
|
|
# --- FALLBACK: derive owner networks via network-inspect (works for pod infra/shared netns) ---
|
|
# We look at shared netns containers (networkMode=container:...) and map their owner-id to networks
|
|
owner_ids: set[str] = set()
|
|
owner_names: dict[str, str] = {} # ownerId -> ownerName
|
|
|
|
for cname, meta in by_container_meta.items():
|
|
try:
|
|
mode = str((meta or {}).get("networkMode") or "")
|
|
except Exception:
|
|
mode = ""
|
|
if not mode.startswith("container:"):
|
|
continue
|
|
|
|
owner_id = (meta or {}).get("networkOwnerId")
|
|
owner_name = (meta or {}).get("networkOwnerName")
|
|
if isinstance(owner_id, str) and owner_id:
|
|
owner_ids.add(owner_id)
|
|
if isinstance(owner_name, str) and owner_name:
|
|
owner_names[owner_id] = owner_name
|
|
|
|
def _collect_container_ids_from_network_inspect(net_inspect) -> set[str]:
|
|
"""
|
|
Key-agnostic: scan alle strings in network inspect en verzamel hex IDs (12..64 chars).
|
|
Dit is robuust tegen schema-verschillen tussen netavark/cni/podman versies.
|
|
"""
|
|
ids: set[str] = set()
|
|
|
|
def looks_like_hex_id(s: str) -> bool:
|
|
if not isinstance(s, str):
|
|
return False
|
|
s = s.strip()
|
|
if len(s) < 12 or len(s) > 64:
|
|
return False
|
|
# alleen hex chars
|
|
for ch in s:
|
|
if ch not in "0123456789abcdef":
|
|
return False
|
|
return True
|
|
|
|
def walk(obj):
|
|
if obj is None:
|
|
return
|
|
if isinstance(obj, str):
|
|
# soms staat id als "container:<id>"
|
|
if obj.startswith("container:"):
|
|
cand = obj.split("container:", 1)[1]
|
|
if looks_like_hex_id(cand):
|
|
ids.add(cand)
|
|
elif looks_like_hex_id(obj):
|
|
ids.add(obj)
|
|
return
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
# keys kunnen ook ids zijn
|
|
if isinstance(k, str) and looks_like_hex_id(k):
|
|
ids.add(k)
|
|
walk(v)
|
|
return
|
|
if isinstance(obj, list):
|
|
for it in obj:
|
|
walk(it)
|
|
return
|
|
|
|
walk(net_inspect)
|
|
return ids
|
|
|
|
owner_networks_by_id: dict[str, set[str]] = {oid: set() for oid in owner_ids}
|
|
|
|
# List networks
|
|
try:
|
|
nets_list = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/json")
|
|
except Exception:
|
|
nets_list = []
|
|
|
|
net_names: list[str] = []
|
|
if isinstance(nets_list, list):
|
|
for n in nets_list:
|
|
if isinstance(n, dict):
|
|
nm = n.get("name") or n.get("Name")
|
|
if isinstance(nm, str) and nm:
|
|
net_names.append(nm)
|
|
|
|
# Inspect each network and see if it contains any owner_id
|
|
for net_name in sorted(set(net_names)):
|
|
try:
|
|
net_inspect = _podman_get_json_checked(f"{PODMAN_API_BASE}/libpod/networks/{net_name}/json")
|
|
except Exception:
|
|
continue
|
|
|
|
attached_ids = _collect_container_ids_from_network_inspect(net_inspect)
|
|
if not attached_ids:
|
|
continue
|
|
|
|
for oid in owner_ids:
|
|
short = oid[:12]
|
|
for aid in attached_ids:
|
|
if not isinstance(aid, str) or not aid:
|
|
continue
|
|
# match exact / short / prefix
|
|
if aid == oid or aid == short or oid.startswith(aid) or aid.startswith(short):
|
|
owner_networks_by_id.setdefault(oid, set()).add(net_name)
|
|
break
|
|
|
|
# Apply: if shared container or owner container has empty by_container[], fill it with owner's networks
|
|
for cname, meta in by_container_meta.items():
|
|
try:
|
|
mode = str((meta or {}).get("networkMode") or "")
|
|
except Exception:
|
|
mode = ""
|
|
if not mode.startswith("container:"):
|
|
continue
|
|
|
|
owner_id = (meta or {}).get("networkOwnerId")
|
|
if not (isinstance(owner_id, str) and owner_id):
|
|
continue
|
|
|
|
owner_nets = sorted(owner_networks_by_id.get(owner_id, set()))
|
|
if not owner_nets:
|
|
continue
|
|
|
|
# 1) fill owner-name entry (if known)
|
|
owner_name = (meta or {}).get("networkOwnerName") or owner_names.get(owner_id)
|
|
if isinstance(owner_name, str) and owner_name and not by_container.get(owner_name):
|
|
by_container[owner_name] = owner_nets
|
|
|
|
# 2) fill shared container entry
|
|
if not by_container.get(cname):
|
|
by_container[cname] = owner_nets
|
|
|
|
# --- FINALIZE: derive by_container from by_network (robust for pods/shared netns) ---
|
|
by_container_derived: dict[str, list[str]] = {}
|
|
|
|
for net_name, info in (by_network or {}).items():
|
|
containers2 = (info or {}).get("containers") or []
|
|
for c2 in containers2:
|
|
if not isinstance(c2, dict):
|
|
continue
|
|
cname2 = c2.get("name") or c2.get("Name")
|
|
if not cname2:
|
|
continue
|
|
by_container_derived.setdefault(cname2, []).append(net_name)
|
|
|
|
# dedupe + stable sort
|
|
for k, v in by_container_derived.items():
|
|
by_container_derived[k] = sorted(set(v))
|
|
|
|
# merge: vul lege items in by_container, maar breek niks
|
|
for k, v in by_container_derived.items():
|
|
if not by_container.get(k):
|
|
by_container[k] = v
|
|
|
|
# --- shared netns: shared containers erven owner-netwerken (als owner bekend is) ---
|
|
for cname, meta in by_container_meta.items():
|
|
try:
|
|
mode = str((meta or {}).get("networkMode") or "")
|
|
except Exception:
|
|
mode = ""
|
|
if not mode.startswith("container:"):
|
|
continue
|
|
owner = (meta or {}).get("networkOwnerName") or (meta or {}).get("networkOwnerId")
|
|
if owner and by_container.get(owner) and not by_container.get(cname):
|
|
by_container[cname] = by_container[owner]
|
|
|
|
return {"byNetwork": by_network, "byContainer": by_container, "byContainerMeta": by_container_meta}
|
|
|
|
|
|
@app.get("/networks/{name}")
|
|
def inspect_network(name: str):
|
|
url1 = f"{PODMAN_API_BASE}/libpod/networks/{name}/json"
|
|
r = SESSION.get(url1)
|
|
if r.status_code == 200:
|
|
return _podman_get_json_checked(url1)
|
|
|
|
url2 = f"{PODMAN_API_BASE}/libpod/network/{name}/json"
|
|
return _podman_get_json_checked(url2)
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|