feat (backend): exec fase 1
This commit is contained in:
+386
-1
@@ -1,9 +1,17 @@
|
||||
import os
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import socket
|
||||
import secrets
|
||||
import threading
|
||||
from collections import deque
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel, Field
|
||||
from common import (
|
||||
_map_pod_to_unit,
|
||||
_podman_action_post,
|
||||
@@ -22,6 +30,199 @@ _STATS_CACHE_BY_NAME = {} # name -> {"cpu": float|None, "mem_usage": float|None
|
||||
_STATS_CACHE_TS = None
|
||||
_STATS_POLLER_TASK = None
|
||||
|
||||
# --- EXEC SESSION CACHE (in-memory) ---
|
||||
_EXEC_SESSIONS = {} # session_id -> _ExecSessionState
|
||||
_EXEC_SESSIONS_LOCK = threading.Lock()
|
||||
_EXEC_SESSION_IDLE_TTL_SECONDS = 60 * 60
|
||||
_EXEC_SESSION_CLOSED_GC_SECONDS = 5 * 60
|
||||
|
||||
|
||||
class ExecStartRequest(BaseModel):
|
||||
cmd: list[str] = Field(default_factory=lambda: ["/bin/sh"])
|
||||
tty: bool = True
|
||||
|
||||
|
||||
class ExecInputRequest(BaseModel):
|
||||
data: str = ""
|
||||
|
||||
|
||||
class ExecResizeRequest(BaseModel):
|
||||
rows: int = 24
|
||||
cols: int = 80
|
||||
|
||||
|
||||
class _ExecSessionState:
|
||||
def __init__(self, session_id: str, exec_id: str, container: str, sock: socket.socket, tty: bool):
|
||||
self.session_id = session_id
|
||||
self.exec_id = exec_id
|
||||
self.container = container
|
||||
self.sock = sock
|
||||
self.tty = tty
|
||||
self.created_at = int(time.time())
|
||||
self.last_activity = self.created_at
|
||||
self.closed = False
|
||||
self.close_reason = ""
|
||||
self.seq = 0
|
||||
self.events = deque(maxlen=2000) # {"seq","ts","type","data"}
|
||||
self.lock = threading.Lock()
|
||||
self.reader_thread = None
|
||||
|
||||
def push_event(self, event_type: str, data: str):
|
||||
with self.lock:
|
||||
self.seq += 1
|
||||
self.events.append({
|
||||
"seq": self.seq,
|
||||
"ts": int(time.time()),
|
||||
"type": event_type,
|
||||
"data": data,
|
||||
})
|
||||
self.last_activity = int(time.time())
|
||||
|
||||
def mark_closed(self, reason: str):
|
||||
if self.closed:
|
||||
return
|
||||
self.closed = True
|
||||
self.close_reason = reason or "closed"
|
||||
self.push_event("closed", self.close_reason)
|
||||
|
||||
|
||||
def _parse_podman_unix_socket_and_base(api_base: str) -> tuple[str, str]:
|
||||
if not isinstance(api_base, str) or not api_base.startswith("http+unix://"):
|
||||
raise HTTPException(status_code=500, detail="Unsupported PODMAN_API_BASE for exec")
|
||||
|
||||
tail = api_base[len("http+unix://"):]
|
||||
slash = tail.find("/")
|
||||
if slash < 0:
|
||||
encoded_socket = tail
|
||||
base_path = ""
|
||||
else:
|
||||
encoded_socket = tail[:slash]
|
||||
base_path = tail[slash:]
|
||||
|
||||
socket_path = unquote(encoded_socket)
|
||||
if not socket_path:
|
||||
raise HTTPException(status_code=500, detail="Podman socket path missing")
|
||||
|
||||
if not base_path:
|
||||
base_path = ""
|
||||
if base_path and not base_path.startswith("/"):
|
||||
base_path = "/" + base_path
|
||||
|
||||
return socket_path, base_path
|
||||
|
||||
|
||||
def _open_exec_hijacked_socket(exec_id: str, tty: bool) -> tuple[socket.socket, bytes]:
|
||||
socket_path, base_path = _parse_podman_unix_socket_and_base(_PODMAN_API_BASE)
|
||||
req_path = f"{base_path}/libpod/exec/{exec_id}/start"
|
||||
body = json.dumps({"Detach": False, "Tty": bool(tty)}, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.settimeout(10.0)
|
||||
sock.connect(socket_path)
|
||||
|
||||
req = (
|
||||
f"POST {req_path} HTTP/1.1\r\n"
|
||||
"Host: d\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
f"Content-Length: {len(body)}\r\n"
|
||||
"\r\n"
|
||||
).encode("utf-8") + body
|
||||
|
||||
sock.sendall(req)
|
||||
|
||||
raw = b""
|
||||
while b"\r\n\r\n" not in raw:
|
||||
chunk = sock.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
raw += chunk
|
||||
if len(raw) > 64 * 1024:
|
||||
sock.close()
|
||||
raise HTTPException(status_code=502, detail="Exec start response headers too large")
|
||||
|
||||
if b"\r\n\r\n" not in raw:
|
||||
sock.close()
|
||||
raise HTTPException(status_code=502, detail="Exec start invalid HTTP response")
|
||||
|
||||
head, _, rest = raw.partition(b"\r\n\r\n")
|
||||
status_line = head.split(b"\r\n", 1)[0].decode("utf-8", errors="replace")
|
||||
try:
|
||||
parts = status_line.split(" ", 2)
|
||||
status_code = int(parts[1])
|
||||
except Exception:
|
||||
sock.close()
|
||||
raise HTTPException(status_code=502, detail=f"Exec start parse error: {status_line}")
|
||||
|
||||
if status_code != 200:
|
||||
body_preview = rest.decode("utf-8", errors="replace")
|
||||
sock.close()
|
||||
raise HTTPException(status_code=502, detail=f"Exec start failed ({status_code}): {body_preview}")
|
||||
|
||||
sock.settimeout(1.0)
|
||||
return sock, rest
|
||||
|
||||
|
||||
def _get_exec_session_or_404(session_id: str) -> _ExecSessionState:
|
||||
with _EXEC_SESSIONS_LOCK:
|
||||
sess = _EXEC_SESSIONS.get(session_id)
|
||||
if not sess:
|
||||
raise HTTPException(status_code=404, detail=f"Unknown exec session: {session_id}")
|
||||
return sess
|
||||
|
||||
|
||||
def _close_exec_session(sess: _ExecSessionState, reason: str):
|
||||
try:
|
||||
sess.sock.shutdown(socket.SHUT_RDWR)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
sess.sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
sess.mark_closed(reason)
|
||||
|
||||
|
||||
def _cleanup_exec_sessions():
|
||||
now = int(time.time())
|
||||
to_delete = []
|
||||
with _EXEC_SESSIONS_LOCK:
|
||||
for sid, sess in _EXEC_SESSIONS.items():
|
||||
idle = now - int(sess.last_activity or now)
|
||||
if sess.closed and idle > _EXEC_SESSION_CLOSED_GC_SECONDS:
|
||||
to_delete.append(sid)
|
||||
continue
|
||||
if (not sess.closed) and idle > _EXEC_SESSION_IDLE_TTL_SECONDS:
|
||||
_close_exec_session(sess, "idle-timeout")
|
||||
for sid in to_delete:
|
||||
_EXEC_SESSIONS.pop(sid, None)
|
||||
|
||||
|
||||
def _reader_loop(session_id: str, sess: _ExecSessionState, initial_rest: bytes):
|
||||
try:
|
||||
if initial_rest:
|
||||
txt = initial_rest.decode("utf-8", errors="replace")
|
||||
if txt:
|
||||
sess.push_event("stdout", txt)
|
||||
|
||||
while not sess.closed:
|
||||
try:
|
||||
chunk = sess.sock.recv(4096)
|
||||
except socket.timeout:
|
||||
continue
|
||||
except Exception as e:
|
||||
_close_exec_session(sess, f"read-error: {str(e)}")
|
||||
break
|
||||
|
||||
if not chunk:
|
||||
_close_exec_session(sess, "eof")
|
||||
break
|
||||
|
||||
txt = chunk.decode("utf-8", errors="replace")
|
||||
if txt:
|
||||
sess.push_event("stdout", txt)
|
||||
finally:
|
||||
sess.mark_closed(sess.close_reason or "reader-exit")
|
||||
|
||||
|
||||
def _norm_container_name(name) -> str:
|
||||
try:
|
||||
@@ -384,4 +585,188 @@ def init_containers_router(
|
||||
|
||||
return _systemd_then_podman(_systemd_call, _podman_call)
|
||||
|
||||
@router.post("/containers/{name}/exec/start")
|
||||
def container_exec_start(name: str, req: Optional[ExecStartRequest] = None):
|
||||
_cleanup_exec_sessions()
|
||||
if req is None:
|
||||
req = ExecStartRequest()
|
||||
cmd = req.cmd or ["/bin/sh"]
|
||||
|
||||
create_url = f"{podman_api_base}/libpod/containers/{name}/exec"
|
||||
payload = {
|
||||
"AttachStdin": True,
|
||||
"AttachStdout": True,
|
||||
"AttachStderr": True,
|
||||
"Tty": bool(req.tty),
|
||||
"Cmd": cmd,
|
||||
}
|
||||
try:
|
||||
create_res = session.post(create_url, json=payload, timeout=10)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=502, detail=f"Exec create request failed: {str(e)}")
|
||||
|
||||
if create_res.status_code >= 400:
|
||||
raise HTTPException(status_code=502, detail=create_res.text)
|
||||
|
||||
try:
|
||||
exec_id = (create_res.json() or {}).get("Id")
|
||||
except Exception:
|
||||
exec_id = None
|
||||
|
||||
if not exec_id:
|
||||
raise HTTPException(status_code=502, detail=f"Exec create returned no Id: {create_res.text}")
|
||||
|
||||
sock, initial_rest = _open_exec_hijacked_socket(exec_id, bool(req.tty))
|
||||
|
||||
session_id = secrets.token_hex(8)
|
||||
sess = _ExecSessionState(
|
||||
session_id=session_id,
|
||||
exec_id=exec_id,
|
||||
container=name,
|
||||
sock=sock,
|
||||
tty=bool(req.tty),
|
||||
)
|
||||
t = threading.Thread(target=_reader_loop, args=(session_id, sess, initial_rest), daemon=True)
|
||||
sess.reader_thread = t
|
||||
|
||||
with _EXEC_SESSIONS_LOCK:
|
||||
_EXEC_SESSIONS[session_id] = sess
|
||||
t.start()
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"exec_id": exec_id,
|
||||
"container": name,
|
||||
"tty": bool(req.tty),
|
||||
"cmd": cmd,
|
||||
"created_at": sess.created_at,
|
||||
}
|
||||
|
||||
@router.get("/containers/exec/{session_id}")
|
||||
def container_exec_session_info(session_id: str):
|
||||
_cleanup_exec_sessions()
|
||||
sess = _get_exec_session_or_404(session_id)
|
||||
with sess.lock:
|
||||
events = len(sess.events)
|
||||
seq = sess.seq
|
||||
return {
|
||||
"session_id": sess.session_id,
|
||||
"exec_id": sess.exec_id,
|
||||
"container": sess.container,
|
||||
"tty": sess.tty,
|
||||
"created_at": sess.created_at,
|
||||
"last_activity": sess.last_activity,
|
||||
"closed": sess.closed,
|
||||
"close_reason": sess.close_reason,
|
||||
"event_count": events,
|
||||
"event_seq": seq,
|
||||
}
|
||||
|
||||
@router.get("/containers/exec/{session_id}/stream")
|
||||
async def container_exec_stream(session_id: str, after: int = 0):
|
||||
_cleanup_exec_sessions()
|
||||
sess = _get_exec_session_or_404(session_id)
|
||||
|
||||
async def event_gen():
|
||||
cursor = int(after or 0)
|
||||
last_ping = time.time()
|
||||
try:
|
||||
while True:
|
||||
pending = []
|
||||
closed = False
|
||||
with sess.lock:
|
||||
pending = [e for e in sess.events if e["seq"] > cursor]
|
||||
closed = sess.closed
|
||||
|
||||
if pending:
|
||||
for ev in pending:
|
||||
cursor = ev["seq"]
|
||||
yield "event: exec\n"
|
||||
yield f"data: {json.dumps(ev, separators=(',',':'))}\n\n"
|
||||
else:
|
||||
now = time.time()
|
||||
if (now - last_ping) >= 10.0:
|
||||
last_ping = now
|
||||
yield "event: ping\n"
|
||||
yield f"data: {int(now)}\n\n"
|
||||
|
||||
if closed and not pending:
|
||||
break
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
headers = {
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
return StreamingResponse(event_gen(), media_type="text/event-stream", headers=headers)
|
||||
|
||||
@router.post("/containers/exec/{session_id}/input")
|
||||
def container_exec_input(session_id: str, req: ExecInputRequest):
|
||||
_cleanup_exec_sessions()
|
||||
sess = _get_exec_session_or_404(session_id)
|
||||
if sess.closed:
|
||||
raise HTTPException(status_code=409, detail=f"Exec session is closed: {sess.close_reason or 'closed'}")
|
||||
|
||||
data = (req.data or "").encode("utf-8")
|
||||
if not data:
|
||||
return {"ok": True, "session_id": session_id, "bytes": 0}
|
||||
|
||||
try:
|
||||
sess.sock.sendall(data)
|
||||
sess.last_activity = int(time.time())
|
||||
return {"ok": True, "session_id": session_id, "bytes": len(data)}
|
||||
except Exception as e:
|
||||
_close_exec_session(sess, f"write-error: {str(e)}")
|
||||
raise HTTPException(status_code=409, detail=f"Exec input failed: {str(e)}")
|
||||
|
||||
@router.post("/containers/exec/{session_id}/resize")
|
||||
def container_exec_resize(session_id: str, req: ExecResizeRequest):
|
||||
_cleanup_exec_sessions()
|
||||
sess = _get_exec_session_or_404(session_id)
|
||||
|
||||
try:
|
||||
insp = session.get(f"{podman_api_base}/libpod/exec/{sess.exec_id}/json", timeout=5)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=502, detail=f"Exec inspect failed: {str(e)}")
|
||||
|
||||
if insp.status_code >= 400:
|
||||
raise HTTPException(status_code=502, detail=insp.text)
|
||||
|
||||
try:
|
||||
running = bool((insp.json() or {}).get("Running"))
|
||||
except Exception:
|
||||
running = False
|
||||
|
||||
if not running:
|
||||
raise HTTPException(status_code=409, detail="Exec session is not running; resize requires running session")
|
||||
|
||||
url = f"{podman_api_base}/libpod/exec/{sess.exec_id}/resize?h={int(req.rows)}&w={int(req.cols)}"
|
||||
try:
|
||||
res = session.post(url, timeout=5)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=502, detail=f"Exec resize request failed: {str(e)}")
|
||||
|
||||
if res.status_code >= 400:
|
||||
detail = (res.text or "").strip()
|
||||
if res.status_code == 500 and "not running" in detail.lower():
|
||||
raise HTTPException(status_code=409, detail="Exec session is not running")
|
||||
raise HTTPException(status_code=502, detail=detail)
|
||||
|
||||
sess.last_activity = int(time.time())
|
||||
return {"ok": True, "session_id": session_id, "rows": int(req.rows), "cols": int(req.cols)}
|
||||
|
||||
@router.post("/containers/exec/{session_id}/stop")
|
||||
def container_exec_stop(session_id: str):
|
||||
_cleanup_exec_sessions()
|
||||
sess = _get_exec_session_or_404(session_id)
|
||||
if sess.closed:
|
||||
return {"ok": True, "session_id": session_id, "already_closed": True, "reason": sess.close_reason}
|
||||
|
||||
_close_exec_session(sess, "stopped-by-user")
|
||||
return {"ok": True, "session_id": session_id, "already_closed": False, "reason": "stopped-by-user"}
|
||||
|
||||
return router
|
||||
|
||||
Reference in New Issue
Block a user