68 lines
1.9 KiB
Python
68 lines
1.9 KiB
Python
from fastapi import HTTPException
|
|
|
|
|
|
def _podman_get_json_checked(session, url: str):
|
|
r = session.get(url)
|
|
if r.status_code >= 400:
|
|
raise HTTPException(status_code=502, detail=r.text)
|
|
try:
|
|
return r.json()
|
|
except Exception:
|
|
raise HTTPException(status_code=502, detail=r.text)
|
|
|
|
|
|
def _podman_get_json(session, url: str):
|
|
return session.get(url).json()
|
|
|
|
|
|
def _podman_get_text(session, url: str) -> str:
|
|
return session.get(url).text
|
|
|
|
|
|
def _podman_post(session, url: str, **kwargs):
|
|
return session.post(url, **kwargs)
|
|
|
|
|
|
def _podman_action_post(session, podman_api_base: str, kind: str, name: str, action: str):
|
|
if kind == "pods":
|
|
url = f"{podman_api_base}/libpod/pods/{name}/{action}"
|
|
else:
|
|
url = f"{podman_api_base}/libpod/containers/{name}/{action}"
|
|
return _podman_post(session, url)
|
|
|
|
|
|
def _podman_delete(session, url: str):
|
|
return session.delete(url)
|
|
|
|
|
|
def _systemctl(cmd, run_func):
|
|
# Proxy to existing run() to avoid behavioral changes.
|
|
return run_func(cmd)
|
|
|
|
|
|
def _build_pod_to_containers_map(containers: list):
|
|
# preserves original order of containers processing; no sorting added
|
|
pod_to_containers = {}
|
|
for c in containers:
|
|
pod_name = c.get("PodName") or ""
|
|
if pod_name:
|
|
pod_to_containers.setdefault(pod_name, []).append((c.get("Names") or ["?"])[0])
|
|
return pod_to_containers
|
|
|
|
|
|
def _map_pod_to_unit(podname: str) -> str | None:
|
|
if not podname:
|
|
return None
|
|
if podname.startswith("pod"):
|
|
return f"{podname[3:]}.service"
|
|
return f"{podname}.service"
|
|
|
|
|
|
def _systemd_then_podman(systemd_callable, podman_callable):
|
|
systemd_res = systemd_callable()
|
|
if systemd_res is not None:
|
|
if isinstance(systemd_res, dict) and systemd_res.get("exit", 1) == 0:
|
|
return systemd_res
|
|
return podman_callable(systemd_res)
|
|
return podman_callable(None)
|