Voor remote client agent

This commit is contained in:
kodi
2026-03-25 18:21:54 +01:00
parent 9537a29de3
commit fc4ec39646
14 changed files with 1892 additions and 14 deletions
+522
View File
@@ -0,0 +1,522 @@
from __future__ import annotations
import fnmatch
import html
import mimetypes
import os
import secrets
import shutil
import stat
import time
from datetime import datetime
from pathlib import Path
from typing import Literal, Optional
from fastapi import Body, FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
APP_NAME = "Finder Commander"
HOME_ROOT = Path.home().resolve()
TRASH_DIR = HOME_ROOT / ".Trash"
MAX_TEXT_PREVIEW_BYTES = 2 * 1024 * 1024
CSRF_TOKEN = secrets.token_urlsafe(32)
app = FastAPI(title=APP_NAME)
app.mount("/static", StaticFiles(directory=str(Path(__file__).parent / "static")), name="static")
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))
class PathsPayload(BaseModel):
paths: list[str] = Field(default_factory=list)
destination_dir: Optional[str] = None
class RenamePayload(BaseModel):
path: str
new_name: str
class DeletePayload(BaseModel):
paths: list[str] = Field(default_factory=list)
mode: Literal["trash", "permanent"] = "trash"
class CommandPayload(BaseModel):
command: str
cwd: str = ""
PathsPayload.model_rebuild()
RenamePayload.model_rebuild()
DeletePayload.model_rebuild()
CommandPayload.model_rebuild()
TEXT_SUFFIXES = {
".md",
".txt",
".py",
".js",
".ts",
".tsx",
".jsx",
".css",
".html",
".json",
".yaml",
".yml",
".toml",
".ini",
".env",
".log",
".xml",
".sh",
".zsh",
".bash",
".c",
".cpp",
".h",
".java",
".go",
".rs",
".sql",
".conf",
".service",
".container",
".network",
".pod",
".kube",
}
def _now_iso() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def rel_from_home(path: Path) -> str:
return "" if path == HOME_ROOT else str(path.relative_to(HOME_ROOT))
def ensure_within_home(candidate: Path) -> Path:
try:
candidate.relative_to(HOME_ROOT)
except ValueError as exc:
raise HTTPException(status_code=403, detail="Path escapes home directory") from exc
return candidate
def sanitize_name(name: str) -> str:
name = (name or "").strip()
if not name or name in {".", ".."} or "/" in name:
raise HTTPException(status_code=400, detail="Invalid name")
return name
def resolve_user_path(raw_path: Optional[str], *, must_exist: bool = True) -> Path:
raw_path = (raw_path or "").strip()
candidate = (HOME_ROOT / raw_path).resolve(strict=False)
candidate = ensure_within_home(candidate)
if must_exist and not candidate.exists():
raise HTTPException(status_code=404, detail="Path not found")
return candidate
def check_origin(request: Request) -> None:
origin = request.headers.get("origin")
if not origin:
return
expected = str(request.base_url).rstrip("/")
if origin.rstrip("/") != expected:
raise HTTPException(status_code=403, detail="Origin not allowed")
def check_csrf(request: Request) -> None:
token = request.headers.get("x-csrf-token")
if token != CSRF_TOKEN:
raise HTTPException(status_code=403, detail="Invalid CSRF token")
def perms_string(mode: int) -> str:
return stat.filemode(mode)
def can_preview_text(path: Path) -> bool:
if path.is_dir():
return False
if path.stat().st_size > MAX_TEXT_PREVIEW_BYTES:
return False
mime, _ = mimetypes.guess_type(path.name)
if mime and (
mime.startswith("text/")
or mime in {"application/json", "application/xml", "application/javascript"}
):
return True
return path.suffix.lower() in TEXT_SUFFIXES
def entry_payload(path: Path) -> dict:
st = path.lstat()
kind = "directory" if path.is_dir() else "file"
mime, _ = mimetypes.guess_type(path.name)
return {
"name": path.name,
"rel_path": rel_from_home(path),
"parent_rel_path": rel_from_home(path.parent),
"kind": kind,
"is_symlink": path.is_symlink(),
"size": st.st_size,
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
"mime": mime or "application/octet-stream",
"perms": perms_string(st.st_mode),
"can_preview_text": can_preview_text(path) if path.is_file() else False,
}
def sorted_entries(path: Path, show_hidden: bool = False) -> list[dict]:
try:
children = list(path.iterdir())
except PermissionError as exc:
raise HTTPException(status_code=403, detail="Permission denied by operating system") from exc
filtered = []
for child in children:
if not show_hidden and child.name.startswith('.'):
continue
filtered.append(child)
filtered.sort(key=lambda p: (not p.is_dir(), p.name.lower()))
return [entry_payload(child) for child in filtered]
def move_to_trash(path: Path) -> Path:
TRASH_DIR.mkdir(parents=True, exist_ok=True)
target = TRASH_DIR / path.name
if target.exists():
target = TRASH_DIR / f"{target.stem}-{int(time.time())}{target.suffix}"
shutil.move(str(path), str(target))
return target
def copy_entry(source: Path, destination_dir: Path) -> Path:
destination = destination_dir / source.name
if destination.exists():
raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}")
if source.is_dir():
shutil.copytree(source, destination, symlinks=True)
else:
shutil.copy2(source, destination, follow_symlinks=False)
return destination
def move_entry(source: Path, destination_dir: Path) -> Path:
destination = destination_dir / source.name
if destination.exists():
raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}")
shutil.move(str(source), str(destination))
return destination
def select_paths_or_current(paths: list[str], cwd: str) -> list[Path]:
result = [resolve_user_path(p) for p in paths]
if not result:
raise HTTPException(status_code=400, detail="No paths selected")
return result
def resolve_from_cwd(cwd_path: Path, raw: str, *, must_exist: bool = True) -> Path:
raw = (raw or "").strip()
candidate = (cwd_path / raw).resolve(strict=False)
candidate = ensure_within_home(candidate)
if must_exist and not candidate.exists():
raise HTTPException(status_code=404, detail="Path not found")
return candidate
def run_command(command: str, cwd: str) -> dict:
command = (command or "").strip()
if not command:
raise HTTPException(status_code=400, detail="Empty command")
cwd_path = resolve_user_path(cwd)
if not cwd_path.is_dir():
raise HTTPException(status_code=400, detail="CWD is not a directory")
parts = command.split()
verb = parts[0].lower()
args = parts[1:]
if verb == "cd":
raw_target = " ".join(args) if args else ""
target = resolve_user_path(raw_target) if raw_target.startswith("/") else resolve_from_cwd(cwd_path, raw_target or ".")
if not target.is_dir():
raise HTTPException(status_code=400, detail="Target is not a directory")
return {"ok": True, "action": "cd", "cwd": rel_from_home(target), "message": str(target)}
if verb == "mkdir":
name = sanitize_name(" ".join(args))
target = resolve_from_cwd(cwd_path, name, must_exist=False)
target.mkdir(exist_ok=False)
return {"ok": True, "action": "mkdir", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"}
if verb == "touch":
name = sanitize_name(" ".join(args))
target = resolve_from_cwd(cwd_path, name, must_exist=False)
target.touch(exist_ok=False)
return {"ok": True, "action": "touch", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"}
if verb == "select":
pattern = " ".join(args).strip() or "*"
entries = sorted_entries(cwd_path, show_hidden=True)
matches = [e["rel_path"] for e in entries if fnmatch.fnmatch(e["name"], pattern)]
return {
"ok": True,
"action": "select",
"cwd": rel_from_home(cwd_path),
"message": f"Matched {len(matches)} item(s)",
"matches": matches,
}
if verb == "help":
return {
"ok": True,
"action": "help",
"cwd": rel_from_home(cwd_path),
"message": "Commands: cd <path>, mkdir <name>, touch <name>, select <glob>, help",
}
raise HTTPException(status_code=400, detail="Unsupported command")
@app.middleware("http")
async def harden_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Frame-Options"] = "DENY"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; img-src 'self' data:; style-src 'self'; script-src 'self'; "
"connect-src 'self'; frame-ancestors 'none'; base-uri 'self'; form-action 'self'"
)
response.headers["Referrer-Policy"] = "no-referrer"
response.headers["X-Content-Type-Options"] = "nosniff"
return response
@app.get("/health")
def health() -> dict:
return {"ok": True, "app": APP_NAME, "time": _now_iso(), "home": str(HOME_ROOT)}
@app.get("/", response_class=HTMLResponse)
def index(request: Request):
return templates.TemplateResponse(
request,
"index.html",
{
"app_name": APP_NAME,
"home_root": str(HOME_ROOT),
"csrf_token": CSRF_TOKEN,
},
)
@app.get("/api/list")
def api_list(path: str = "", show_hidden: bool = False) -> dict:
target = resolve_user_path(path)
if not target.is_dir():
raise HTTPException(status_code=400, detail="Path is not a directory")
return {
"cwd": rel_from_home(target),
"absolute": str(target),
"parent": "" if target == HOME_ROOT else rel_from_home(target.parent),
"entries": sorted_entries(target, show_hidden=show_hidden),
}
@app.get("/api/read")
def api_read(path: str) -> dict:
target = resolve_user_path(path)
if target.is_dir():
raise HTTPException(status_code=400, detail="Cannot read a directory as text")
if not can_preview_text(target):
raise HTTPException(status_code=415, detail="File is not previewable as text")
try:
content = target.read_text(encoding="utf-8")
encoding = "utf-8"
except UnicodeDecodeError:
content = target.read_text(encoding="utf-8", errors="replace")
encoding = "utf-8 (lossy)"
return {
"path": rel_from_home(target),
"name": target.name,
"encoding": encoding,
"content": content,
"size": target.stat().st_size,
}
@app.get("/api/meta")
def api_meta(path: str) -> dict:
target = resolve_user_path(path)
payload = entry_payload(target)
payload["absolute"] = str(target)
return payload
@app.get("/api/download")
def api_download(path: str):
target = resolve_user_path(path)
if target.is_dir():
raise HTTPException(status_code=400, detail="Cannot download a directory")
return FileResponse(path=target, filename=target.name)
@app.get("/api/preview")
def api_preview(path: str):
target = resolve_user_path(path)
if target.is_dir():
raise HTTPException(status_code=400, detail="Cannot preview a directory")
mime, _ = mimetypes.guess_type(target.name)
if not mime or not mime.startswith("image/"):
raise HTTPException(status_code=415, detail="Preview only supports images")
return FileResponse(path=target, media_type=mime)
@app.put("/api/write")
async def api_write(request: Request, path: str = Form(...), content: str = Form(...)) -> dict:
check_origin(request)
check_csrf(request)
target = resolve_user_path(path, must_exist=False)
ensure_within_home(target.parent.resolve(strict=False))
if target.exists() and target.is_dir():
raise HTTPException(status_code=400, detail="Cannot overwrite a directory")
target.parent.mkdir(parents=True, exist_ok=True)
tmp = target.with_name(target.name + ".tmp-write")
tmp.write_text(content, encoding="utf-8")
os.replace(tmp, target)
return {"ok": True, "path": rel_from_home(target)}
@app.post("/api/mkdir")
async def api_mkdir(request: Request, path: str = Form(...), name: str = Form(...)) -> dict:
check_origin(request)
check_csrf(request)
base = resolve_user_path(path)
if not base.is_dir():
raise HTTPException(status_code=400, detail="Base path is not a directory")
child = resolve_user_path(str(Path(rel_from_home(base)) / sanitize_name(name)), must_exist=False)
child.mkdir(parents=False, exist_ok=False)
return {"ok": True, "path": rel_from_home(child)}
@app.post("/api/upload")
async def api_upload(request: Request, path: str = Form(...), files: list[UploadFile] = File(...)) -> dict:
check_origin(request)
check_csrf(request)
base = resolve_user_path(path)
if not base.is_dir():
raise HTTPException(status_code=400, detail="Upload target is not a directory")
saved: list[str] = []
for upload in files:
filename = Path(upload.filename or "").name
if not filename:
continue
destination = resolve_user_path(str(Path(rel_from_home(base)) / filename), must_exist=False)
with destination.open("wb") as f:
while chunk := await upload.read(1024 * 1024):
f.write(chunk)
saved.append(rel_from_home(destination))
return {"ok": True, "saved": saved}
@app.post("/api/rename")
async def api_rename(request: Request, payload: RenamePayload) -> dict:
check_origin(request)
check_csrf(request)
source = resolve_user_path(payload.path)
destination = resolve_user_path(str(Path(rel_from_home(source.parent)) / sanitize_name(payload.new_name)), must_exist=False)
if destination.exists():
raise HTTPException(status_code=409, detail="Destination already exists")
os.replace(source, destination)
return {"ok": True, "old_path": rel_from_home(source), "new_path": rel_from_home(destination)}
@app.post("/api/copy")
async def api_copy(request: Request, payload: PathsPayload) -> dict:
check_origin(request)
check_csrf(request)
if payload.destination_dir is None:
raise HTTPException(status_code=400, detail="Missing destination_dir")
destination_dir = resolve_user_path(payload.destination_dir)
if not destination_dir.is_dir():
raise HTTPException(status_code=400, detail="Destination is not a directory")
results = []
for source in select_paths_or_current(payload.paths, payload.destination_dir):
copied = copy_entry(source, destination_dir)
results.append(rel_from_home(copied))
return {"ok": True, "copied": results}
@app.post("/api/move")
async def api_move(request: Request, payload: PathsPayload) -> dict:
check_origin(request)
check_csrf(request)
if payload.destination_dir is None:
raise HTTPException(status_code=400, detail="Missing destination_dir")
destination_dir = resolve_user_path(payload.destination_dir)
if not destination_dir.is_dir():
raise HTTPException(status_code=400, detail="Destination is not a directory")
results = []
for source in select_paths_or_current(payload.paths, payload.destination_dir):
moved = move_entry(source, destination_dir)
results.append(rel_from_home(moved))
return {"ok": True, "moved": results}
@app.post("/api/delete")
async def api_delete(request: Request, payload: DeletePayload) -> dict:
check_origin(request)
check_csrf(request)
paths = select_paths_or_current(payload.paths, "")
deleted = []
for target in paths:
if target == HOME_ROOT:
raise HTTPException(status_code=400, detail="Refusing to delete home root")
if payload.mode == "trash":
moved = move_to_trash(target)
deleted.append(str(moved))
else:
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
deleted.append(rel_from_home(target))
return {"ok": True, "mode": payload.mode, "deleted": deleted}
@app.post("/api/command")
async def api_command(request: Request, payload: CommandPayload) -> dict:
check_origin(request)
check_csrf(request)
return run_command(payload.command, payload.cwd)
@app.exception_handler(HTTPException)
async def http_exception_handler(_: Request, exc: HTTPException):
return JSONResponse(status_code=exc.status_code, content={"ok": False, "detail": exc.detail})
@app.exception_handler(Exception)
async def unhandled_exception_handler(_: Request, exc: Exception):
return JSONResponse(status_code=500, content={"ok": False, "detail": html.escape(str(exc))})