Files
podman-mvp/control/common.py
T
kodi 580c301718 refactor: verplaats run() duplicaat naar common.py
run() stond identiek in app.py en app_system.py. Verplaatst naar
common.py als single source of truth; beide modules importeren
nu de centrale versie.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 10:26:34 +01:00

79 lines
2.2 KiB
Python

import subprocess
from fastapi import HTTPException
def run(cmd):
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
output = (result.stdout or "") + (result.stderr or "")
return result.returncode, output.strip()
except Exception as e:
return 1, str(e)
def _podman_get_json_checked(session, url: str):
r = session.get(url)
if r.status_code >= 400:
raise HTTPException(status_code=502, detail=r.text)
try:
return r.json()
except Exception:
raise HTTPException(status_code=502, detail=r.text)
def _podman_get_json(session, url: str):
return session.get(url).json()
def _podman_get_text(session, url: str) -> str:
return session.get(url).text
def _podman_post(session, url: str, **kwargs):
return session.post(url, **kwargs)
def _podman_action_post(session, podman_api_base: str, kind: str, name: str, action: str):
if kind == "pods":
url = f"{podman_api_base}/libpod/pods/{name}/{action}"
else:
url = f"{podman_api_base}/libpod/containers/{name}/{action}"
return _podman_post(session, url)
def _podman_delete(session, url: str):
return session.delete(url)
def _systemctl(cmd, run_func):
# Proxy to existing run() to avoid behavioral changes.
return run_func(cmd)
def _build_pod_to_containers_map(containers: list):
# preserves original order of containers processing; no sorting added
pod_to_containers = {}
for c in containers:
pod_name = c.get("PodName") or ""
if pod_name:
pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0])
return pod_to_containers
def _map_pod_to_unit(podname: str) -> str | None:
if not podname:
return None
if podname.startswith("pod"):
return f"{podname[3:]}.service"
return f"{podname}.service"
def _systemd_then_podman(systemd_callable, podman_callable):
systemd_res = systemd_callable()
if systemd_res is not None:
if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0:
return systemd_res
return podman_callable(systemd_res)
return podman_callable(None)