818 lines
28 KiB
Python
818 lines
28 KiB
Python
import os
|
|
import asyncio
|
|
import json
|
|
import time
|
|
import socket
|
|
import secrets
|
|
import threading
|
|
from collections import deque
|
|
from typing import Optional
|
|
from urllib.parse import unquote
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
from common import (
|
|
_helper_call,
|
|
_podman_action_post,
|
|
_podman_get_json,
|
|
_podman_get_text,
|
|
_systemd_then_podman,
|
|
)
|
|
|
|
|
|
_SESSION = None
|
|
_PODMAN_API_BASE = None
|
|
|
|
# --- STATS CACHE (contract-neutral; in-memory) ---
|
|
# Poll Podman stats centrally and expose as optional dashboard fields.
|
|
_STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None, "mem_perc": float|None}
|
|
_STATS_CACHE_TS = None
|
|
_STATS_POLLER_TASK = None
|
|
_STATS_SHOWN_NAMES: set = set() # namen van alle dashboard-containers uit laatste dashboard call
|
|
|
|
# --- EXEC SESSION CACHE (in-memory) ---
|
|
_EXEC_SESSIONS = {} # session_id -> _ExecSessionState
|
|
_EXEC_SESSIONS_LOCK = threading.Lock()
|
|
_EXEC_SESSION_IDLE_TTL_SECONDS = 60 * 60
|
|
_EXEC_SESSION_CLOSED_GC_SECONDS = 5 * 60
|
|
_EXEC_SESSION_MAX_ACTIVE = 12
|
|
_EXEC_INPUT_MAX_BYTES = 32 * 1024
|
|
|
|
|
|
class ExecStartRequest(BaseModel):
|
|
cmd: list[str] = Field(default_factory=lambda: ["/bin/sh"])
|
|
tty: bool = True
|
|
|
|
|
|
class ExecInputRequest(BaseModel):
|
|
data: str = ""
|
|
|
|
|
|
class ExecResizeRequest(BaseModel):
|
|
rows: int = 24
|
|
cols: int = 80
|
|
|
|
|
|
class _ExecSessionState:
|
|
def __init__(self, session_id: str, exec_id: str, container: str, sock: socket.socket, tty: bool):
|
|
self.session_id = session_id
|
|
self.exec_id = exec_id
|
|
self.container = container
|
|
self.sock = sock
|
|
self.tty = tty
|
|
self.created_at = int(time.time())
|
|
self.last_activity = self.created_at
|
|
self.closed = False
|
|
self.close_reason = ""
|
|
self.seq = 0
|
|
self.events = deque(maxlen=2000) # {"seq","ts","type","data"}
|
|
self.lock = threading.Lock()
|
|
self.reader_thread = None
|
|
|
|
def push_event(self, event_type: str, data: str):
|
|
with self.lock:
|
|
self.seq += 1
|
|
self.events.append({
|
|
"seq": self.seq,
|
|
"ts": int(time.time()),
|
|
"type": event_type,
|
|
"data": data,
|
|
})
|
|
self.last_activity = int(time.time())
|
|
|
|
def mark_closed(self, reason: str):
|
|
if self.closed:
|
|
return
|
|
self.closed = True
|
|
self.close_reason = reason or "closed"
|
|
self.push_event("closed", self.close_reason)
|
|
|
|
|
|
def _parse_podman_unix_socket_and_base(api_base: str) -> tuple[str, str]:
|
|
if not isinstance(api_base, str) or not api_base.startswith("http+unix://"):
|
|
raise HTTPException(status_code=500, detail="Unsupported PODMAN_API_BASE for exec")
|
|
|
|
tail = api_base[len("http+unix://"):]
|
|
slash = tail.find("/")
|
|
if slash < 0:
|
|
encoded_socket = tail
|
|
base_path = ""
|
|
else:
|
|
encoded_socket = tail[:slash]
|
|
base_path = tail[slash:]
|
|
|
|
socket_path = unquote(encoded_socket)
|
|
if not socket_path:
|
|
raise HTTPException(status_code=500, detail="Podman socket path missing")
|
|
|
|
if not base_path:
|
|
base_path = ""
|
|
if base_path and not base_path.startswith("/"):
|
|
base_path = "/" + base_path
|
|
|
|
return socket_path, base_path
|
|
|
|
|
|
def _open_exec_hijacked_socket(exec_id: str, tty: bool) -> tuple[socket.socket, bytes]:
|
|
socket_path, base_path = _parse_podman_unix_socket_and_base(_PODMAN_API_BASE)
|
|
req_path = f"{base_path}/libpod/exec/{exec_id}/start"
|
|
body = json.dumps({"Detach": False, "Tty": bool(tty)}, separators=(",", ":")).encode("utf-8")
|
|
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.settimeout(10.0)
|
|
sock.connect(socket_path)
|
|
|
|
req = (
|
|
f"POST {req_path} HTTP/1.1\r\n"
|
|
"Host: d\r\n"
|
|
"Content-Type: application/json\r\n"
|
|
f"Content-Length: {len(body)}\r\n"
|
|
"\r\n"
|
|
).encode("utf-8") + body
|
|
|
|
sock.sendall(req)
|
|
|
|
raw = b""
|
|
while b"\r\n\r\n" not in raw:
|
|
chunk = sock.recv(4096)
|
|
if not chunk:
|
|
break
|
|
raw += chunk
|
|
if len(raw) > 64 * 1024:
|
|
sock.close()
|
|
raise HTTPException(status_code=502, detail="Exec start response headers too large")
|
|
|
|
if b"\r\n\r\n" not in raw:
|
|
sock.close()
|
|
raise HTTPException(status_code=502, detail="Exec start invalid HTTP response")
|
|
|
|
head, _, rest = raw.partition(b"\r\n\r\n")
|
|
status_line = head.split(b"\r\n", 1)[0].decode("utf-8", errors="replace")
|
|
try:
|
|
parts = status_line.split(" ", 2)
|
|
status_code = int(parts[1])
|
|
except Exception:
|
|
sock.close()
|
|
raise HTTPException(status_code=502, detail=f"Exec start parse error: {status_line}")
|
|
|
|
if status_code != 200:
|
|
body_preview = rest.decode("utf-8", errors="replace")
|
|
sock.close()
|
|
raise HTTPException(status_code=502, detail=f"Exec start failed ({status_code}): {body_preview}")
|
|
|
|
sock.settimeout(1.0)
|
|
return sock, rest
|
|
|
|
|
|
def _get_exec_session_or_404(session_id: str) -> _ExecSessionState:
|
|
with _EXEC_SESSIONS_LOCK:
|
|
sess = _EXEC_SESSIONS.get(session_id)
|
|
if not sess:
|
|
raise HTTPException(status_code=404, detail=f"Unknown exec session: {session_id}")
|
|
return sess
|
|
|
|
|
|
def _close_exec_session(sess: _ExecSessionState, reason: str):
|
|
try:
|
|
sess.sock.shutdown(socket.SHUT_RDWR)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
sess.sock.close()
|
|
except Exception:
|
|
pass
|
|
sess.mark_closed(reason)
|
|
|
|
|
|
def _cleanup_exec_sessions():
|
|
now = int(time.time())
|
|
to_delete = []
|
|
with _EXEC_SESSIONS_LOCK:
|
|
for sid, sess in _EXEC_SESSIONS.items():
|
|
idle = now - int(sess.last_activity or now)
|
|
if sess.closed and idle > _EXEC_SESSION_CLOSED_GC_SECONDS:
|
|
to_delete.append(sid)
|
|
continue
|
|
if (not sess.closed) and idle > _EXEC_SESSION_IDLE_TTL_SECONDS:
|
|
_close_exec_session(sess, "idle-timeout")
|
|
for sid in to_delete:
|
|
_EXEC_SESSIONS.pop(sid, None)
|
|
|
|
|
|
def _reader_loop(session_id: str, sess: _ExecSessionState, initial_rest: bytes):
|
|
try:
|
|
if initial_rest:
|
|
txt = initial_rest.decode("utf-8", errors="replace")
|
|
if txt:
|
|
sess.push_event("stdout", txt)
|
|
|
|
while not sess.closed:
|
|
try:
|
|
chunk = sess.sock.recv(4096)
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
_close_exec_session(sess, f"read-error: {str(e)}")
|
|
break
|
|
|
|
if not chunk:
|
|
_close_exec_session(sess, "eof")
|
|
break
|
|
|
|
txt = chunk.decode("utf-8", errors="replace")
|
|
if txt:
|
|
sess.push_event("stdout", txt)
|
|
finally:
|
|
sess.mark_closed(sess.close_reason or "reader-exit")
|
|
|
|
|
|
def _norm_container_name(name) -> str:
|
|
try:
|
|
return str(name or "").lstrip("/")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _parse_stats_interval_seconds() -> float:
|
|
raw = os.getenv("STATS_INTERVAL_SECONDS", "1.0")
|
|
try:
|
|
v = float(raw)
|
|
except Exception:
|
|
v = 1.0
|
|
if v <= 0:
|
|
v = 1.0
|
|
if v < 0.5:
|
|
v = 0.5
|
|
if v > 30:
|
|
v = 30
|
|
return v
|
|
|
|
|
|
def _parse_positive_int_env(name: str, default: int, minimum: int, maximum: int) -> int:
|
|
raw = os.getenv(name, str(default))
|
|
try:
|
|
v = int(raw)
|
|
except Exception:
|
|
v = int(default)
|
|
if v < minimum:
|
|
v = minimum
|
|
if v > maximum:
|
|
v = maximum
|
|
return v
|
|
|
|
|
|
def _exec_max_active_sessions() -> int:
|
|
return _parse_positive_int_env("EXEC_SESSION_MAX_ACTIVE", _EXEC_SESSION_MAX_ACTIVE, 1, 500)
|
|
|
|
|
|
def _exec_max_input_bytes() -> int:
|
|
return _parse_positive_int_env("EXEC_INPUT_MAX_BYTES", _EXEC_INPUT_MAX_BYTES, 64, 1024 * 1024)
|
|
|
|
|
|
async def _stats_poller_loop():
|
|
global _STATS_CACHE_BY_NAME, _STATS_CACHE_TS
|
|
|
|
interval = _parse_stats_interval_seconds()
|
|
stats_url = f"{_PODMAN_API_BASE}/libpod/containers/stats?all=true&stream=false"
|
|
|
|
def _to_float(x):
|
|
try:
|
|
return float(x)
|
|
except Exception:
|
|
return None
|
|
|
|
while True:
|
|
try:
|
|
data = _SESSION.get(stats_url, timeout=5).json()
|
|
stats_list = data.get("Stats") if isinstance(data, dict) else None
|
|
if not isinstance(stats_list, list):
|
|
stats_list = []
|
|
|
|
new_cache = {}
|
|
for st in stats_list:
|
|
if not isinstance(st, dict):
|
|
continue
|
|
key = _norm_container_name(st.get("Name"))
|
|
if not key:
|
|
continue
|
|
# CPUPerc returned by Podman is already percentage (0.10 == 0.10%)
|
|
cpu_val = st.get("CPUPerc")
|
|
if cpu_val is None:
|
|
cpu_val = st.get("CPU")
|
|
if cpu_val is None:
|
|
cpu_val = st.get("AvgCPU")
|
|
new_cache[key] = {
|
|
"cpu": _to_float(cpu_val),
|
|
"mem_usage": _to_float(st.get("MemUsage")),
|
|
"mem_perc": _to_float(st.get("MemPerc")),
|
|
}
|
|
|
|
_STATS_CACHE_BY_NAME = new_cache
|
|
_STATS_CACHE_TS = int(__import__("time").time())
|
|
except Exception:
|
|
# Keep last good cache; try again next tick.
|
|
pass
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def start_stats_poller():
|
|
global _STATS_POLLER_TASK
|
|
if _STATS_POLLER_TASK and not _STATS_POLLER_TASK.done():
|
|
return
|
|
loop = asyncio.get_running_loop()
|
|
_STATS_POLLER_TASK = loop.create_task(_stats_poller_loop())
|
|
|
|
|
|
def init_containers_router(
|
|
session,
|
|
podman_api_base: str,
|
|
workloads_dir: str,
|
|
systemctl_func,
|
|
) -> APIRouter:
|
|
router = APIRouter(tags=["containers"])
|
|
|
|
global _SESSION, _PODMAN_API_BASE
|
|
_SESSION = session
|
|
_PODMAN_API_BASE = podman_api_base
|
|
|
|
def find_defined_containers():
|
|
defined = {}
|
|
for root, _, files in os.walk(os.path.join(workloads_dir, "systemd")):
|
|
for f in files:
|
|
if f.endswith(".container"):
|
|
name = os.path.splitext(f)[0]
|
|
full = os.path.join(root, f)
|
|
rel = os.path.relpath(full, workloads_dir)
|
|
defined[name] = rel
|
|
return defined
|
|
|
|
def _ensure_container_status_field(container: dict):
|
|
# keep exact existing defaulting behavior
|
|
if "Status" not in container:
|
|
container["Status"] = container.get("State", "")
|
|
|
|
def _make_defined_container_dashboard_row(name: str, relpath: str):
|
|
# keep exact key set and default values as before
|
|
return {
|
|
"Names": [name],
|
|
"Image": "",
|
|
"State": "",
|
|
"Status": "",
|
|
"Ports": [],
|
|
"PodName": "",
|
|
"_dashboard_source": "systemd",
|
|
"_dashboard_unit": f"{name}.service",
|
|
"_dashboard_def_path": relpath,
|
|
"_dashboard_cpu": None,
|
|
"_dashboard_mem_usage": None,
|
|
"_dashboard_mem_perc": None,
|
|
}
|
|
|
|
def _legacy_dashboard_item_from_container(c: dict):
|
|
# Keep exact keys & defaults as before
|
|
return {
|
|
"name": (c.get("Names") or ["?"])[0],
|
|
"status": c.get("Status") or c.get("State") or "",
|
|
"path": "",
|
|
"ip": "",
|
|
"containers": [],
|
|
}
|
|
|
|
def _extract_published_ports(container: dict) -> list[str]:
|
|
"""
|
|
Normalize Podman API Ports into a stable display list:
|
|
- "127.0.0.1:8080:8000/tcp"
|
|
- "8080:8000/tcp" (if no host ip)
|
|
"""
|
|
out: list[str] = []
|
|
for p in (container.get("Ports") or []):
|
|
host_ip = p.get("host_ip") or p.get("HostIp") or ""
|
|
host_port = p.get("host_port") or p.get("HostPort")
|
|
cont_port = p.get("container_port") or p.get("ContainerPort")
|
|
proto = p.get("protocol") or p.get("Protocol") or ""
|
|
|
|
if host_port is None or cont_port is None:
|
|
continue
|
|
|
|
s = ""
|
|
if host_ip:
|
|
s += f"{host_ip}:"
|
|
s += f"{host_port}:{cont_port}"
|
|
if proto:
|
|
s += f"/{proto}"
|
|
out.append(s)
|
|
|
|
return out
|
|
|
|
@router.get("/containers-dashboard")
|
|
def containers_dashboard():
|
|
dashboard = []
|
|
defined = find_defined_containers()
|
|
|
|
stats_by_name = _STATS_CACHE_BY_NAME
|
|
|
|
# A) echte containers (runtime)
|
|
real = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true")
|
|
for c in real:
|
|
_ensure_container_status_field(c)
|
|
|
|
# Published ports: behoud jouw hotfix
|
|
c["_dashboard_published_ports"] = _extract_published_ports(c)
|
|
|
|
# Normaliseer naam: Podman kan "/name" geven
|
|
rname = ((c.get("Names") or ["?"])[0] or "").lstrip("/")
|
|
|
|
# Optional live stats (always present; null on miss)
|
|
c["_dashboard_cpu"] = None
|
|
c["_dashboard_mem_usage"] = None
|
|
c["_dashboard_mem_perc"] = None
|
|
st = stats_by_name.get(rname)
|
|
if isinstance(st, dict):
|
|
c["_dashboard_cpu"] = st.get("cpu")
|
|
c["_dashboard_mem_usage"] = st.get("mem_usage")
|
|
c["_dashboard_mem_perc"] = st.get("mem_perc")
|
|
|
|
# Classificatie: PODMAN_SYSTEMD_UNIT label is ground truth
|
|
labels = c.get("Labels") or {}
|
|
podman_unit = labels.get("PODMAN_SYSTEMD_UNIT") or ""
|
|
if podman_unit:
|
|
c["_dashboard_source"] = "systemd"
|
|
c["_dashboard_unit"] = podman_unit
|
|
else:
|
|
c["_dashboard_source"] = "podman"
|
|
|
|
# Definitiepad: onafhankelijk van classificatie
|
|
if rname in defined:
|
|
c["_dashboard_def_path"] = defined[rname]
|
|
|
|
dashboard.append(c)
|
|
|
|
# B) Dedup set: ook genormaliseerd (voorkomt /name vs name doublures)
|
|
runtime_names = set((((c.get("Names") or ["?"])[0] or "").lstrip("/")) for c in real)
|
|
|
|
# C) defined containers from systemd/*.container (skip duplicates)
|
|
for name, relpath in defined.items():
|
|
if name in runtime_names:
|
|
continue
|
|
row = _make_defined_container_dashboard_row(name, relpath)
|
|
code, out = systemctl_func(["systemctl", "--user", "is-active", f"{name}.service"])
|
|
row["Status"] = (out or "").strip()
|
|
dashboard.append(row)
|
|
|
|
# Bijwerken welke containernamen in het dashboard staan (voor /stats filter)
|
|
global _STATS_SHOWN_NAMES
|
|
_STATS_SHOWN_NAMES = {
|
|
_norm_container_name((c.get("Names") or ["?"])[0])
|
|
for c in dashboard
|
|
} - {"?", ""}
|
|
|
|
return dashboard
|
|
|
|
@router.get("/stats")
|
|
def stats_snapshot():
|
|
cache = _STATS_CACHE_BY_NAME
|
|
if _STATS_SHOWN_NAMES:
|
|
return {k: v for k, v in cache.items() if k in _STATS_SHOWN_NAMES}
|
|
return cache
|
|
|
|
@router.get("/containers")
|
|
def list_containers():
|
|
# Ook hier ?all=true voor gestopte containers
|
|
url = f"{podman_api_base}/libpod/containers/json?all=true"
|
|
return _podman_get_json(session, url)
|
|
|
|
@router.get("/containers/inspect/{name}")
|
|
def inspect_container(name: str):
|
|
return _podman_get_json(session, f"{podman_api_base}/libpod/containers/{name}/json")
|
|
|
|
@router.get("/containers/logs/{name}")
|
|
def get_container_logs(name: str):
|
|
# We vragen de laatste 100 regels op (tail=100)
|
|
txt = _podman_get_text(session, f"{podman_api_base}/libpod/containers/{name}/logs?stdout=true&stderr=true&tail=100")
|
|
# Podman logs komen vaak met wat binaire metadata, we decoden dit als tekst
|
|
return {"logs": txt}
|
|
|
|
@router.get("/containers/stats/stream")
|
|
async def containers_stats_stream(interval: float = 2.0):
|
|
"""
|
|
SSE stream met periodieke container stats.
|
|
Contract-neutraal: nieuw endpoint, geen bestaande outputs aangepast.
|
|
"""
|
|
# Guardrails tegen misbruik
|
|
if interval < 0.5:
|
|
interval = 0.5
|
|
if interval > 30:
|
|
interval = 30
|
|
|
|
stats_url = f"{podman_api_base}/libpod/containers/stats?all=true&stream=false"
|
|
|
|
async def event_gen():
|
|
try:
|
|
while True:
|
|
# timeout zodat een haperende podman socket niet je stream “bevriest”
|
|
try:
|
|
data = session.get(stats_url, timeout=5).json()
|
|
except Exception as e:
|
|
data = {"Error": str(e), "Stats": []}
|
|
|
|
payload = {
|
|
"ts": int(__import__("time").time()),
|
|
"data": data,
|
|
}
|
|
|
|
yield "event: stats\n"
|
|
yield f"data: {json.dumps(payload, separators=(',',':'))}\n\n"
|
|
await asyncio.sleep(interval)
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no", # helpt bij proxies
|
|
}
|
|
return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers)
|
|
|
|
@router.get("/debug/defined-containers")
|
|
def debug_defined_containers():
|
|
return find_defined_containers()
|
|
|
|
@router.get("/dashboard")
|
|
def get_dashboard():
|
|
# Legacy dashboard view (keep shape)
|
|
try:
|
|
api_containers = _podman_get_json(session, f"{podman_api_base}/libpod/containers/json?all=true")
|
|
except:
|
|
api_containers = []
|
|
items = []
|
|
for c in api_containers:
|
|
items.append(_legacy_dashboard_item_from_container(c))
|
|
return items
|
|
|
|
@router.post("/containers/{action}/{name}")
|
|
def container_action(action: str, name: str):
|
|
"""
|
|
Voer een actie uit op een container.
|
|
|
|
- **start** — Start de container (of bijbehorende systemd-unit).
|
|
- **stop** — ⚠️ Destructief: stopt de container direct.
|
|
- **restart** — ⚠️ Destructief: herstart de container direct.
|
|
|
|
Gebruikt systemd als de container een beheerde unit heeft; anders Podman API direct.
|
|
"""
|
|
if action not in ("start", "stop", "restart"):
|
|
return {"error": "Invalid action"}, 400
|
|
|
|
defined = find_defined_containers()
|
|
_sys = {"code": None, "out": None}
|
|
|
|
def _systemd_call():
|
|
if name in defined:
|
|
code, out = _helper_call(action, f"{name}.service")
|
|
_sys["code"] = code
|
|
_sys["out"] = out
|
|
if code == 0:
|
|
return {
|
|
"method": "systemd",
|
|
"name": name,
|
|
"unit": f"{name}.service",
|
|
"definition": defined[name],
|
|
"cmd": f"systemctl --user {action} {name}",
|
|
"exit": code,
|
|
"output": out,
|
|
}
|
|
return {"exit": code, "output": out}
|
|
return None
|
|
|
|
def _podman_call(systemd_res):
|
|
res = _podman_action_post(session, podman_api_base, "containers", name, action)
|
|
if res.status_code in (200, 204):
|
|
return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code}
|
|
|
|
if res.status_code >= 400:
|
|
return {
|
|
"method": "podman",
|
|
"name": name,
|
|
"cmd": f"podman {action} {name}",
|
|
"status_code": res.status_code,
|
|
"error": getattr(res, "text", "") or "",
|
|
}, res.status_code
|
|
|
|
if name in defined:
|
|
return {
|
|
"method": "systemd",
|
|
"name": name,
|
|
"unit": f"{name}.service",
|
|
"definition": defined[name],
|
|
"cmd": f"systemctl --user {action} {name}",
|
|
"exit": _sys["code"],
|
|
"output": _sys["out"],
|
|
}
|
|
|
|
return {"method": "podman", "name": name, "cmd": f"podman {action} {name}", "status_code": res.status_code}
|
|
|
|
return _systemd_then_podman(_systemd_call, _podman_call)
|
|
|
|
@router.post("/containers/{name}/exec/start")
|
|
def container_exec_start(name: str, req: Optional[ExecStartRequest] = None):
|
|
_cleanup_exec_sessions()
|
|
if req is None:
|
|
req = ExecStartRequest()
|
|
cmd = req.cmd or ["/bin/sh"]
|
|
|
|
with _EXEC_SESSIONS_LOCK:
|
|
active = sum(1 for s in _EXEC_SESSIONS.values() if not s.closed)
|
|
max_active = _exec_max_active_sessions()
|
|
if active >= max_active:
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail=f"Too many active exec sessions ({active}/{max_active})",
|
|
)
|
|
|
|
create_url = f"{podman_api_base}/libpod/containers/{name}/exec"
|
|
payload = {
|
|
"AttachStdin": True,
|
|
"AttachStdout": True,
|
|
"AttachStderr": True,
|
|
"Tty": bool(req.tty),
|
|
"Cmd": cmd,
|
|
}
|
|
try:
|
|
create_res = session.post(create_url, json=payload, timeout=10)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=502, detail=f"Exec create request failed: {str(e)}")
|
|
|
|
if create_res.status_code >= 400:
|
|
raise HTTPException(status_code=502, detail=create_res.text)
|
|
|
|
try:
|
|
exec_id = (create_res.json() or {}).get("Id")
|
|
except Exception:
|
|
exec_id = None
|
|
|
|
if not exec_id:
|
|
raise HTTPException(status_code=502, detail=f"Exec create returned no Id: {create_res.text}")
|
|
|
|
sock, initial_rest = _open_exec_hijacked_socket(exec_id, bool(req.tty))
|
|
|
|
session_id = secrets.token_hex(8)
|
|
sess = _ExecSessionState(
|
|
session_id=session_id,
|
|
exec_id=exec_id,
|
|
container=name,
|
|
sock=sock,
|
|
tty=bool(req.tty),
|
|
)
|
|
t = threading.Thread(target=_reader_loop, args=(session_id, sess, initial_rest), daemon=True)
|
|
sess.reader_thread = t
|
|
|
|
with _EXEC_SESSIONS_LOCK:
|
|
_EXEC_SESSIONS[session_id] = sess
|
|
t.start()
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"exec_id": exec_id,
|
|
"container": name,
|
|
"tty": bool(req.tty),
|
|
"cmd": cmd,
|
|
"created_at": sess.created_at,
|
|
}
|
|
|
|
@router.get("/containers/exec/{session_id}")
|
|
def container_exec_session_info(session_id: str):
|
|
_cleanup_exec_sessions()
|
|
sess = _get_exec_session_or_404(session_id)
|
|
with sess.lock:
|
|
events = len(sess.events)
|
|
seq = sess.seq
|
|
return {
|
|
"session_id": sess.session_id,
|
|
"exec_id": sess.exec_id,
|
|
"container": sess.container,
|
|
"tty": sess.tty,
|
|
"created_at": sess.created_at,
|
|
"last_activity": sess.last_activity,
|
|
"closed": sess.closed,
|
|
"close_reason": sess.close_reason,
|
|
"event_count": events,
|
|
"event_seq": seq,
|
|
}
|
|
|
|
@router.get("/containers/exec/{session_id}/stream")
|
|
async def container_exec_stream(session_id: str, after: int = 0):
|
|
_cleanup_exec_sessions()
|
|
sess = _get_exec_session_or_404(session_id)
|
|
|
|
async def event_gen():
|
|
cursor = int(after or 0)
|
|
last_ping = time.time()
|
|
try:
|
|
while True:
|
|
pending = []
|
|
closed = False
|
|
with sess.lock:
|
|
pending = [e for e in sess.events if e["seq"] > cursor]
|
|
closed = sess.closed
|
|
|
|
if pending:
|
|
for ev in pending:
|
|
cursor = ev["seq"]
|
|
yield "event: exec\n"
|
|
yield f"data: {json.dumps(ev, separators=(',',':'))}\n\n"
|
|
else:
|
|
now = time.time()
|
|
if (now - last_ping) >= 10.0:
|
|
last_ping = now
|
|
yield "event: ping\n"
|
|
yield f"data: {int(now)}\n\n"
|
|
|
|
if closed and not pending:
|
|
break
|
|
|
|
await asyncio.sleep(0.2)
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers)
|
|
|
|
@router.post("/containers/exec/{session_id}/input")
|
|
def container_exec_input(session_id: str, req: ExecInputRequest):
|
|
_cleanup_exec_sessions()
|
|
sess = _get_exec_session_or_404(session_id)
|
|
if sess.closed:
|
|
raise HTTPException(status_code=409, detail=f"Exec session is closed: {sess.close_reason or 'closed'}")
|
|
|
|
data = (req.data or "").encode("utf-8")
|
|
if not data:
|
|
return {"ok": True, "session_id": session_id, "bytes": 0}
|
|
max_bytes = _exec_max_input_bytes()
|
|
if len(data) > max_bytes:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"Input too large ({len(data)} bytes > {max_bytes} bytes)",
|
|
)
|
|
|
|
try:
|
|
sess.sock.sendall(data)
|
|
sess.last_activity = int(time.time())
|
|
return {"ok": True, "session_id": session_id, "bytes": len(data)}
|
|
except Exception as e:
|
|
_close_exec_session(sess, f"write-error: {str(e)}")
|
|
raise HTTPException(status_code=409, detail=f"Exec input failed: {str(e)}")
|
|
|
|
@router.post("/containers/exec/{session_id}/resize")
|
|
def container_exec_resize(session_id: str, req: ExecResizeRequest):
|
|
_cleanup_exec_sessions()
|
|
sess = _get_exec_session_or_404(session_id)
|
|
|
|
try:
|
|
insp = session.get(f"{podman_api_base}/libpod/exec/{sess.exec_id}/json", timeout=5)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=502, detail=f"Exec inspect failed: {str(e)}")
|
|
|
|
if insp.status_code >= 400:
|
|
raise HTTPException(status_code=502, detail=insp.text)
|
|
|
|
try:
|
|
running = bool((insp.json() or {}).get("Running"))
|
|
except Exception:
|
|
running = False
|
|
|
|
if not running:
|
|
raise HTTPException(status_code=409, detail="Exec session is not running; resize requires running session")
|
|
|
|
url = f"{podman_api_base}/libpod/exec/{sess.exec_id}/resize?h={int(req.rows)}&w={int(req.cols)}"
|
|
try:
|
|
res = session.post(url, timeout=5)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=502, detail=f"Exec resize request failed: {str(e)}")
|
|
|
|
if res.status_code >= 400:
|
|
detail = (res.text or "").strip()
|
|
if res.status_code == 500 and "not running" in detail.lower():
|
|
raise HTTPException(status_code=409, detail="Exec session is not running")
|
|
raise HTTPException(status_code=502, detail=detail)
|
|
|
|
sess.last_activity = int(time.time())
|
|
return {"ok": True, "session_id": session_id, "rows": int(req.rows), "cols": int(req.cols)}
|
|
|
|
@router.post("/containers/exec/{session_id}/stop")
|
|
def container_exec_stop(session_id: str):
|
|
_cleanup_exec_sessions()
|
|
sess = _get_exec_session_or_404(session_id)
|
|
if sess.closed:
|
|
return {"ok": True, "session_id": session_id, "already_closed": True, "reason": sess.close_reason}
|
|
|
|
_close_exec_session(sess, "stopped-by-user")
|
|
return {"ok": True, "session_id": session_id, "already_closed": False, "reason": "stopped-by-user"}
|
|
|
|
return router
|