From d96fc19f415468f28c015f9cb59223b9de71f976 Mon Sep 17 00:00:00 2001 From: kodi Date: Wed, 4 Mar 2026 16:33:24 +0100 Subject: [PATCH] feat (backend): exec fase 1 --- control/app_containers.py | 387 +++++++++++++++++++++++++++++++++++++- 1 file changed, 386 insertions(+), 1 deletion(-) diff --git a/control/app_containers.py b/control/app_containers.py index 2224ab6..cf3715a 100644 --- a/control/app_containers.py +++ b/control/app_containers.py @@ -1,9 +1,17 @@ import os import asyncio import json +import time +import socket +import secrets +import threading +from collections import deque +from typing import Optional +from urllib.parse import unquote -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field from common import ( _map_pod_to_unit, _podman_action_post, @@ -22,6 +30,199 @@ _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None _STATS_CACHE_TS = None _STATS_POLLER_TASK = None +# --- EXEC SESSION CACHE (in-memory) --- +_EXEC_SESSIONS = {} # session_id -> _ExecSessionState +_EXEC_SESSIONS_LOCK = threading.Lock() +_EXEC_SESSION_IDLE_TTL_SECONDS = 60 * 60 +_EXEC_SESSION_CLOSED_GC_SECONDS = 5 * 60 + + +class ExecStartRequest(BaseModel): + cmd: list[str] = Field(default_factory=lambda: ["/bin/sh"]) + tty: bool = True + + +class ExecInputRequest(BaseModel): + data: str = "" + + +class ExecResizeRequest(BaseModel): + rows: int = 24 + cols: int = 80 + + +class _ExecSessionState: + def __init__(self, session_id: str, exec_id: str, container: str, sock: socket.socket, tty: bool): + self.session_id = session_id + self.exec_id = exec_id + self.container = container + self.sock = sock + self.tty = tty + self.created_at = int(time.time()) + self.last_activity = self.created_at + self.closed = False + self.close_reason = "" + self.seq = 0 + self.events = deque(maxlen=2000) # {"seq","ts","type","data"} + self.lock = threading.Lock() + self.reader_thread = None + + def push_event(self, event_type: str, data: str): + with self.lock: + self.seq += 1 + self.events.append({ + "seq": self.seq, + "ts": int(time.time()), + "type": event_type, + "data": data, + }) + self.last_activity = int(time.time()) + + def mark_closed(self, reason: str): + if self.closed: + return + self.closed = True + self.close_reason = reason or "closed" + self.push_event("closed", self.close_reason) + + +def _parse_podman_unix_socket_and_base(api_base: str) -> tuple[str, str]: + if not isinstance(api_base, str) or not api_base.startswith("http+unix://"): + raise HTTPException(status_code=500, detail="Unsupported PODMAN_API_BASE for exec") + + tail = api_base[len("http+unix://"):] + slash = tail.find("/") + if slash < 0: + encoded_socket = tail + base_path = "" + else: + encoded_socket = tail[:slash] + base_path = tail[slash:] + + socket_path = unquote(encoded_socket) + if not socket_path: + raise HTTPException(status_code=500, detail="Podman socket path missing") + + if not base_path: + base_path = "" + if base_path and not base_path.startswith("/"): + base_path = "/" + base_path + + return socket_path, base_path + + +def _open_exec_hijacked_socket(exec_id: str, tty: bool) -> tuple[socket.socket, bytes]: + socket_path, base_path = _parse_podman_unix_socket_and_base(_PODMAN_API_BASE) + req_path = f"{base_path}/libpod/exec/{exec_id}/start" + body = json.dumps({"Detach": False, "Tty": bool(tty)}, separators=(",", ":")).encode("utf-8") + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(10.0) + sock.connect(socket_path) + + req = ( + f"POST {req_path} HTTP/1.1\r\n" + "Host: d\r\n" + "Content-Type: application/json\r\n" + f"Content-Length: {len(body)}\r\n" + "\r\n" + ).encode("utf-8") + body + + sock.sendall(req) + + raw = b"" + while b"\r\n\r\n" not in raw: + chunk = sock.recv(4096) + if not chunk: + break + raw += chunk + if len(raw) > 64 * 1024: + sock.close() + raise HTTPException(status_code=502, detail="Exec start response headers too large") + + if b"\r\n\r\n" not in raw: + sock.close() + raise HTTPException(status_code=502, detail="Exec start invalid HTTP response") + + head, _, rest = raw.partition(b"\r\n\r\n") + status_line = head.split(b"\r\n", 1)[0].decode("utf-8", errors="replace") + try: + parts = status_line.split(" ", 2) + status_code = int(parts[1]) + except Exception: + sock.close() + raise HTTPException(status_code=502, detail=f"Exec start parse error: {status_line}") + + if status_code != 200: + body_preview = rest.decode("utf-8", errors="replace") + sock.close() + raise HTTPException(status_code=502, detail=f"Exec start failed ({status_code}): {body_preview}") + + sock.settimeout(1.0) + return sock, rest + + +def _get_exec_session_or_404(session_id: str) -> _ExecSessionState: + with _EXEC_SESSIONS_LOCK: + sess = _EXEC_SESSIONS.get(session_id) + if not sess: + raise HTTPException(status_code=404, detail=f"Unknown exec session: {session_id}") + return sess + + +def _close_exec_session(sess: _ExecSessionState, reason: str): + try: + sess.sock.shutdown(socket.SHUT_RDWR) + except Exception: + pass + try: + sess.sock.close() + except Exception: + pass + sess.mark_closed(reason) + + +def _cleanup_exec_sessions(): + now = int(time.time()) + to_delete = [] + with _EXEC_SESSIONS_LOCK: + for sid, sess in _EXEC_SESSIONS.items(): + idle = now - int(sess.last_activity or now) + if sess.closed and idle > _EXEC_SESSION_CLOSED_GC_SECONDS: + to_delete.append(sid) + continue + if (not sess.closed) and idle > _EXEC_SESSION_IDLE_TTL_SECONDS: + _close_exec_session(sess, "idle-timeout") + for sid in to_delete: + _EXEC_SESSIONS.pop(sid, None) + + +def _reader_loop(session_id: str, sess: _ExecSessionState, initial_rest: bytes): + try: + if initial_rest: + txt = initial_rest.decode("utf-8", errors="replace") + if txt: + sess.push_event("stdout", txt) + + while not sess.closed: + try: + chunk = sess.sock.recv(4096) + except socket.timeout: + continue + except Exception as e: + _close_exec_session(sess, f"read-error: {str(e)}") + break + + if not chunk: + _close_exec_session(sess, "eof") + break + + txt = chunk.decode("utf-8", errors="replace") + if txt: + sess.push_event("stdout", txt) + finally: + sess.mark_closed(sess.close_reason or "reader-exit") + def _norm_container_name(name) -> str: try: @@ -384,4 +585,188 @@ def init_containers_router( return _systemd_then_podman(_systemd_call, _podman_call) + @router.post("/containers/{name}/exec/start") + def container_exec_start(name: str, req: Optional[ExecStartRequest] = None): + _cleanup_exec_sessions() + if req is None: + req = ExecStartRequest() + cmd = req.cmd or ["/bin/sh"] + + create_url = f"{podman_api_base}/libpod/containers/{name}/exec" + payload = { + "AttachStdin": True, + "AttachStdout": True, + "AttachStderr": True, + "Tty": bool(req.tty), + "Cmd": cmd, + } + try: + create_res = session.post(create_url, json=payload, timeout=10) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Exec create request failed: {str(e)}") + + if create_res.status_code >= 400: + raise HTTPException(status_code=502, detail=create_res.text) + + try: + exec_id = (create_res.json() or {}).get("Id") + except Exception: + exec_id = None + + if not exec_id: + raise HTTPException(status_code=502, detail=f"Exec create returned no Id: {create_res.text}") + + sock, initial_rest = _open_exec_hijacked_socket(exec_id, bool(req.tty)) + + session_id = secrets.token_hex(8) + sess = _ExecSessionState( + session_id=session_id, + exec_id=exec_id, + container=name, + sock=sock, + tty=bool(req.tty), + ) + t = threading.Thread(target=_reader_loop, args=(session_id, sess, initial_rest), daemon=True) + sess.reader_thread = t + + with _EXEC_SESSIONS_LOCK: + _EXEC_SESSIONS[session_id] = sess + t.start() + + return { + "session_id": session_id, + "exec_id": exec_id, + "container": name, + "tty": bool(req.tty), + "cmd": cmd, + "created_at": sess.created_at, + } + + @router.get("/containers/exec/{session_id}") + def container_exec_session_info(session_id: str): + _cleanup_exec_sessions() + sess = _get_exec_session_or_404(session_id) + with sess.lock: + events = len(sess.events) + seq = sess.seq + return { + "session_id": sess.session_id, + "exec_id": sess.exec_id, + "container": sess.container, + "tty": sess.tty, + "created_at": sess.created_at, + "last_activity": sess.last_activity, + "closed": sess.closed, + "close_reason": sess.close_reason, + "event_count": events, + "event_seq": seq, + } + + @router.get("/containers/exec/{session_id}/stream") + async def container_exec_stream(session_id: str, after: int = 0): + _cleanup_exec_sessions() + sess = _get_exec_session_or_404(session_id) + + async def event_gen(): + cursor = int(after or 0) + last_ping = time.time() + try: + while True: + pending = [] + closed = False + with sess.lock: + pending = [e for e in sess.events if e["seq"] > cursor] + closed = sess.closed + + if pending: + for ev in pending: + cursor = ev["seq"] + yield "event: exec\n" + yield f"data: {json.dumps(ev, separators=(',',':'))}\n\n" + else: + now = time.time() + if (now - last_ping) >= 10.0: + last_ping = now + yield "event: ping\n" + yield f"data: {int(now)}\n\n" + + if closed and not pending: + break + + await asyncio.sleep(0.2) + except asyncio.CancelledError: + return + + headers = { + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers) + + @router.post("/containers/exec/{session_id}/input") + def container_exec_input(session_id: str, req: ExecInputRequest): + _cleanup_exec_sessions() + sess = _get_exec_session_or_404(session_id) + if sess.closed: + raise HTTPException(status_code=409, detail=f"Exec session is closed: {sess.close_reason or 'closed'}") + + data = (req.data or "").encode("utf-8") + if not data: + return {"ok": True, "session_id": session_id, "bytes": 0} + + try: + sess.sock.sendall(data) + sess.last_activity = int(time.time()) + return {"ok": True, "session_id": session_id, "bytes": len(data)} + except Exception as e: + _close_exec_session(sess, f"write-error: {str(e)}") + raise HTTPException(status_code=409, detail=f"Exec input failed: {str(e)}") + + @router.post("/containers/exec/{session_id}/resize") + def container_exec_resize(session_id: str, req: ExecResizeRequest): + _cleanup_exec_sessions() + sess = _get_exec_session_or_404(session_id) + + try: + insp = session.get(f"{podman_api_base}/libpod/exec/{sess.exec_id}/json", timeout=5) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Exec inspect failed: {str(e)}") + + if insp.status_code >= 400: + raise HTTPException(status_code=502, detail=insp.text) + + try: + running = bool((insp.json() or {}).get("Running")) + except Exception: + running = False + + if not running: + raise HTTPException(status_code=409, detail="Exec session is not running; resize requires running session") + + url = f"{podman_api_base}/libpod/exec/{sess.exec_id}/resize?h={int(req.rows)}&w={int(req.cols)}" + try: + res = session.post(url, timeout=5) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Exec resize request failed: {str(e)}") + + if res.status_code >= 400: + detail = (res.text or "").strip() + if res.status_code == 500 and "not running" in detail.lower(): + raise HTTPException(status_code=409, detail="Exec session is not running") + raise HTTPException(status_code=502, detail=detail) + + sess.last_activity = int(time.time()) + return {"ok": True, "session_id": session_id, "rows": int(req.rows), "cols": int(req.cols)} + + @router.post("/containers/exec/{session_id}/stop") + def container_exec_stop(session_id: str): + _cleanup_exec_sessions() + sess = _get_exec_session_or_404(session_id) + if sess.closed: + return {"ok": True, "session_id": session_id, "already_closed": True, "reason": sess.close_reason} + + _close_exec_session(sess, "stopped-by-user") + return {"ok": True, "session_id": session_id, "already_closed": False, "reason": "stopped-by-user"} + return router