from __future__ import annotations import fnmatch import html import mimetypes import os import secrets import shutil import stat import time from datetime import datetime from pathlib import Path from typing import Literal, Optional from fastapi import Body, FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field APP_NAME = "Finder Commander" HOME_ROOT = Path.home().resolve() TRASH_DIR = HOME_ROOT / ".Trash" MAX_TEXT_PREVIEW_BYTES = 2 * 1024 * 1024 CSRF_TOKEN = secrets.token_urlsafe(32) app = FastAPI(title=APP_NAME) app.mount("/static", StaticFiles(directory=str(Path(__file__).parent / "static")), name="static") templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates")) class PathsPayload(BaseModel): paths: list[str] = Field(default_factory=list) destination_dir: Optional[str] = None class RenamePayload(BaseModel): path: str new_name: str class DeletePayload(BaseModel): paths: list[str] = Field(default_factory=list) mode: Literal["trash", "permanent"] = "trash" class CommandPayload(BaseModel): command: str cwd: str = "" PathsPayload.model_rebuild() RenamePayload.model_rebuild() DeletePayload.model_rebuild() CommandPayload.model_rebuild() TEXT_SUFFIXES = { ".md", ".txt", ".py", ".js", ".ts", ".tsx", ".jsx", ".css", ".html", ".json", ".yaml", ".yml", ".toml", ".ini", ".env", ".log", ".xml", ".sh", ".zsh", ".bash", ".c", ".cpp", ".h", ".java", ".go", ".rs", ".sql", ".conf", ".service", ".container", ".network", ".pod", ".kube", } def _now_iso() -> str: return datetime.utcnow().isoformat(timespec="seconds") + "Z" def rel_from_home(path: Path) -> str: return "" if path == HOME_ROOT else str(path.relative_to(HOME_ROOT)) def ensure_within_home(candidate: Path) -> Path: try: candidate.relative_to(HOME_ROOT) except ValueError as exc: raise HTTPException(status_code=403, detail="Path escapes home directory") from exc return candidate def sanitize_name(name: str) -> str: name = (name or "").strip() if not name or name in {".", ".."} or "/" in name: raise HTTPException(status_code=400, detail="Invalid name") return name def resolve_user_path(raw_path: Optional[str], *, must_exist: bool = True) -> Path: raw_path = (raw_path or "").strip() candidate = (HOME_ROOT / raw_path).resolve(strict=False) candidate = ensure_within_home(candidate) if must_exist and not candidate.exists(): raise HTTPException(status_code=404, detail="Path not found") return candidate def check_origin(request: Request) -> None: origin = request.headers.get("origin") if not origin: return expected = str(request.base_url).rstrip("/") if origin.rstrip("/") != expected: raise HTTPException(status_code=403, detail="Origin not allowed") def check_csrf(request: Request) -> None: token = request.headers.get("x-csrf-token") if token != CSRF_TOKEN: raise HTTPException(status_code=403, detail="Invalid CSRF token") def perms_string(mode: int) -> str: return stat.filemode(mode) def can_preview_text(path: Path) -> bool: if path.is_dir(): return False if path.stat().st_size > MAX_TEXT_PREVIEW_BYTES: return False mime, _ = mimetypes.guess_type(path.name) if mime and ( mime.startswith("text/") or mime in {"application/json", "application/xml", "application/javascript"} ): return True return path.suffix.lower() in TEXT_SUFFIXES def entry_payload(path: Path) -> dict: st = path.lstat() kind = "directory" if path.is_dir() else "file" mime, _ = mimetypes.guess_type(path.name) return { "name": path.name, "rel_path": rel_from_home(path), "parent_rel_path": rel_from_home(path.parent), "kind": kind, "is_symlink": path.is_symlink(), "size": st.st_size, "modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"), "mime": mime or "application/octet-stream", "perms": perms_string(st.st_mode), "can_preview_text": can_preview_text(path) if path.is_file() else False, } def sorted_entries(path: Path, show_hidden: bool = False) -> list[dict]: try: children = list(path.iterdir()) except PermissionError as exc: raise HTTPException(status_code=403, detail="Permission denied by operating system") from exc filtered = [] for child in children: if not show_hidden and child.name.startswith('.'): continue filtered.append(child) filtered.sort(key=lambda p: (not p.is_dir(), p.name.lower())) return [entry_payload(child) for child in filtered] def move_to_trash(path: Path) -> Path: TRASH_DIR.mkdir(parents=True, exist_ok=True) target = TRASH_DIR / path.name if target.exists(): target = TRASH_DIR / f"{target.stem}-{int(time.time())}{target.suffix}" shutil.move(str(path), str(target)) return target def copy_entry(source: Path, destination_dir: Path) -> Path: destination = destination_dir / source.name if destination.exists(): raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}") if source.is_dir(): shutil.copytree(source, destination, symlinks=True) else: shutil.copy2(source, destination, follow_symlinks=False) return destination def move_entry(source: Path, destination_dir: Path) -> Path: destination = destination_dir / source.name if destination.exists(): raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}") shutil.move(str(source), str(destination)) return destination def select_paths_or_current(paths: list[str], cwd: str) -> list[Path]: result = [resolve_user_path(p) for p in paths] if not result: raise HTTPException(status_code=400, detail="No paths selected") return result def resolve_from_cwd(cwd_path: Path, raw: str, *, must_exist: bool = True) -> Path: raw = (raw or "").strip() candidate = (cwd_path / raw).resolve(strict=False) candidate = ensure_within_home(candidate) if must_exist and not candidate.exists(): raise HTTPException(status_code=404, detail="Path not found") return candidate def run_command(command: str, cwd: str) -> dict: command = (command or "").strip() if not command: raise HTTPException(status_code=400, detail="Empty command") cwd_path = resolve_user_path(cwd) if not cwd_path.is_dir(): raise HTTPException(status_code=400, detail="CWD is not a directory") parts = command.split() verb = parts[0].lower() args = parts[1:] if verb == "cd": raw_target = " ".join(args) if args else "" target = resolve_user_path(raw_target) if raw_target.startswith("/") else resolve_from_cwd(cwd_path, raw_target or ".") if not target.is_dir(): raise HTTPException(status_code=400, detail="Target is not a directory") return {"ok": True, "action": "cd", "cwd": rel_from_home(target), "message": str(target)} if verb == "mkdir": name = sanitize_name(" ".join(args)) target = resolve_from_cwd(cwd_path, name, must_exist=False) target.mkdir(exist_ok=False) return {"ok": True, "action": "mkdir", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"} if verb == "touch": name = sanitize_name(" ".join(args)) target = resolve_from_cwd(cwd_path, name, must_exist=False) target.touch(exist_ok=False) return {"ok": True, "action": "touch", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"} if verb == "select": pattern = " ".join(args).strip() or "*" entries = sorted_entries(cwd_path, show_hidden=True) matches = [e["rel_path"] for e in entries if fnmatch.fnmatch(e["name"], pattern)] return { "ok": True, "action": "select", "cwd": rel_from_home(cwd_path), "message": f"Matched {len(matches)} item(s)", "matches": matches, } if verb == "help": return { "ok": True, "action": "help", "cwd": rel_from_home(cwd_path), "message": "Commands: cd , mkdir , touch , select , help", } raise HTTPException(status_code=400, detail="Unsupported command") @app.middleware("http") async def harden_headers(request: Request, call_next): response = await call_next(request) response.headers["X-Frame-Options"] = "DENY" response.headers["Content-Security-Policy"] = ( "default-src 'self'; img-src 'self' data:; style-src 'self'; script-src 'self'; " "connect-src 'self'; frame-ancestors 'none'; base-uri 'self'; form-action 'self'" ) response.headers["Referrer-Policy"] = "no-referrer" response.headers["X-Content-Type-Options"] = "nosniff" return response @app.get("/health") def health() -> dict: return {"ok": True, "app": APP_NAME, "time": _now_iso(), "home": str(HOME_ROOT)} @app.get("/", response_class=HTMLResponse) def index(request: Request): return templates.TemplateResponse( request, "index.html", { "app_name": APP_NAME, "home_root": str(HOME_ROOT), "csrf_token": CSRF_TOKEN, }, ) @app.get("/api/list") def api_list(path: str = "", show_hidden: bool = False) -> dict: target = resolve_user_path(path) if not target.is_dir(): raise HTTPException(status_code=400, detail="Path is not a directory") return { "cwd": rel_from_home(target), "absolute": str(target), "parent": "" if target == HOME_ROOT else rel_from_home(target.parent), "entries": sorted_entries(target, show_hidden=show_hidden), } @app.get("/api/read") def api_read(path: str) -> dict: target = resolve_user_path(path) if target.is_dir(): raise HTTPException(status_code=400, detail="Cannot read a directory as text") if not can_preview_text(target): raise HTTPException(status_code=415, detail="File is not previewable as text") try: content = target.read_text(encoding="utf-8") encoding = "utf-8" except UnicodeDecodeError: content = target.read_text(encoding="utf-8", errors="replace") encoding = "utf-8 (lossy)" return { "path": rel_from_home(target), "name": target.name, "encoding": encoding, "content": content, "size": target.stat().st_size, } @app.get("/api/meta") def api_meta(path: str) -> dict: target = resolve_user_path(path) payload = entry_payload(target) payload["absolute"] = str(target) return payload @app.get("/api/download") def api_download(path: str): target = resolve_user_path(path) if target.is_dir(): raise HTTPException(status_code=400, detail="Cannot download a directory") return FileResponse(path=target, filename=target.name) @app.get("/api/preview") def api_preview(path: str): target = resolve_user_path(path) if target.is_dir(): raise HTTPException(status_code=400, detail="Cannot preview a directory") mime, _ = mimetypes.guess_type(target.name) if not mime or not mime.startswith("image/"): raise HTTPException(status_code=415, detail="Preview only supports images") return FileResponse(path=target, media_type=mime) @app.put("/api/write") async def api_write(request: Request, path: str = Form(...), content: str = Form(...)) -> dict: check_origin(request) check_csrf(request) target = resolve_user_path(path, must_exist=False) ensure_within_home(target.parent.resolve(strict=False)) if target.exists() and target.is_dir(): raise HTTPException(status_code=400, detail="Cannot overwrite a directory") target.parent.mkdir(parents=True, exist_ok=True) tmp = target.with_name(target.name + ".tmp-write") tmp.write_text(content, encoding="utf-8") os.replace(tmp, target) return {"ok": True, "path": rel_from_home(target)} @app.post("/api/mkdir") async def api_mkdir(request: Request, path: str = Form(...), name: str = Form(...)) -> dict: check_origin(request) check_csrf(request) base = resolve_user_path(path) if not base.is_dir(): raise HTTPException(status_code=400, detail="Base path is not a directory") child = resolve_user_path(str(Path(rel_from_home(base)) / sanitize_name(name)), must_exist=False) child.mkdir(parents=False, exist_ok=False) return {"ok": True, "path": rel_from_home(child)} @app.post("/api/upload") async def api_upload(request: Request, path: str = Form(...), files: list[UploadFile] = File(...)) -> dict: check_origin(request) check_csrf(request) base = resolve_user_path(path) if not base.is_dir(): raise HTTPException(status_code=400, detail="Upload target is not a directory") saved: list[str] = [] for upload in files: filename = Path(upload.filename or "").name if not filename: continue destination = resolve_user_path(str(Path(rel_from_home(base)) / filename), must_exist=False) with destination.open("wb") as f: while chunk := await upload.read(1024 * 1024): f.write(chunk) saved.append(rel_from_home(destination)) return {"ok": True, "saved": saved} @app.post("/api/rename") async def api_rename(request: Request, payload: RenamePayload) -> dict: check_origin(request) check_csrf(request) source = resolve_user_path(payload.path) destination = resolve_user_path(str(Path(rel_from_home(source.parent)) / sanitize_name(payload.new_name)), must_exist=False) if destination.exists(): raise HTTPException(status_code=409, detail="Destination already exists") os.replace(source, destination) return {"ok": True, "old_path": rel_from_home(source), "new_path": rel_from_home(destination)} @app.post("/api/copy") async def api_copy(request: Request, payload: PathsPayload) -> dict: check_origin(request) check_csrf(request) if payload.destination_dir is None: raise HTTPException(status_code=400, detail="Missing destination_dir") destination_dir = resolve_user_path(payload.destination_dir) if not destination_dir.is_dir(): raise HTTPException(status_code=400, detail="Destination is not a directory") results = [] for source in select_paths_or_current(payload.paths, payload.destination_dir): copied = copy_entry(source, destination_dir) results.append(rel_from_home(copied)) return {"ok": True, "copied": results} @app.post("/api/move") async def api_move(request: Request, payload: PathsPayload) -> dict: check_origin(request) check_csrf(request) if payload.destination_dir is None: raise HTTPException(status_code=400, detail="Missing destination_dir") destination_dir = resolve_user_path(payload.destination_dir) if not destination_dir.is_dir(): raise HTTPException(status_code=400, detail="Destination is not a directory") results = [] for source in select_paths_or_current(payload.paths, payload.destination_dir): moved = move_entry(source, destination_dir) results.append(rel_from_home(moved)) return {"ok": True, "moved": results} @app.post("/api/delete") async def api_delete(request: Request, payload: DeletePayload) -> dict: check_origin(request) check_csrf(request) paths = select_paths_or_current(payload.paths, "") deleted = [] for target in paths: if target == HOME_ROOT: raise HTTPException(status_code=400, detail="Refusing to delete home root") if payload.mode == "trash": moved = move_to_trash(target) deleted.append(str(moved)) else: if target.is_dir(): shutil.rmtree(target) else: target.unlink() deleted.append(rel_from_home(target)) return {"ok": True, "mode": payload.mode, "deleted": deleted} @app.post("/api/command") async def api_command(request: Request, payload: CommandPayload) -> dict: check_origin(request) check_csrf(request) return run_command(payload.command, payload.cwd) @app.exception_handler(HTTPException) async def http_exception_handler(_: Request, exc: HTTPException): return JSONResponse(status_code=exc.status_code, content={"ok": False, "detail": exc.detail}) @app.exception_handler(Exception) async def unhandled_exception_handler(_: Request, exc: Exception): return JSONResponse(status_code=500, content={"ok": False, "detail": html.escape(str(exc))})