from __future__ import annotations import json import mimetypes import os import struct from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from fastapi import FastAPI, HTTPException, Request from fastapi.responses import FileResponse, JSONResponse APP_NAME = "Finder Commander Remote Agent" DEFAULT_PORT = 8765 TEXT_PREVIEW_MAX_BYTES = 256 * 1024 TEXT_CONTENT_TYPES = { ".txt": "text/plain", ".log": "text/plain", ".conf": "text/plain", ".ini": "text/plain", ".cfg": "text/plain", ".md": "text/markdown", ".yml": "text/yaml", ".yaml": "text/yaml", ".json": "application/json", ".js": "text/javascript", ".py": "text/x-python", ".css": "text/css", ".html": "text/html", } SPECIAL_TEXT_FILENAMES = { "dockerfile": "text/plain", "containerfile": "text/plain", } @dataclass(frozen=True) class AgentRuntimeConfig: config_path: Path | None agent_access_token: str shares: dict[str, str] display_name: str endpoint: str client_id: str platform: str def _now_iso() -> str: return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z") def _candidate_config_paths() -> list[Path]: candidates: list[Path] = [] env_path = os.getenv("FINDER_COMMANDER_REMOTE_AGENT_CONFIG", "").strip() if env_path: candidates.append(Path(env_path).expanduser().resolve(strict=False)) base_dir = Path(__file__).resolve().parents[1] candidates.append(base_dir / "remote_client_agent.launchd.json") candidates.append(base_dir / "remote_client_agent.example.json") return candidates def _load_raw_config() -> tuple[Path | None, dict]: for candidate in _candidate_config_paths(): if candidate.is_file(): try: raw = json.loads(candidate.read_text(encoding="utf-8")) except ValueError as exc: raise RuntimeError(f"Invalid JSON in config file: {candidate}") from exc if not isinstance(raw, dict): raise RuntimeError(f"Config file must contain a JSON object: {candidate}") return candidate.resolve(strict=False), raw return None, {} @lru_cache(maxsize=1) def get_runtime_config() -> AgentRuntimeConfig: config_path, raw = _load_raw_config() shares_raw = raw.get("shares", {}) shares: dict[str, str] = {} if isinstance(shares_raw, dict): for key, value in shares_raw.items(): normalized_key = str(key).strip() normalized_value = str(value).strip() if normalized_key and normalized_value: shares[normalized_key] = normalized_value return AgentRuntimeConfig( config_path=config_path, agent_access_token=os.getenv("FINDER_COMMANDER_AGENT_ACCESS_TOKEN", "").strip() or str(raw.get("agent_access_token", "")).strip(), shares=shares, display_name=str(raw.get("display_name", "")).strip(), endpoint=str(raw.get("public_endpoint", raw.get("endpoint", ""))).strip(), client_id=str(raw.get("client_id", "")).strip(), platform=str(raw.get("platform", "macos")).strip() or "macos", ) def require_agent_auth(request: Request) -> None: config = get_runtime_config() if not config.agent_access_token: return authorization = request.headers.get("authorization", "").strip() if authorization != f"Bearer {config.agent_access_token}": raise_agent_error( status_code=403, code="invalid_agent_token", message="Invalid agent token", extra={ "config_path": str(config.config_path) if config.config_path else None, "client_id": config.client_id or None, "display_name": config.display_name or None, }, ) def raise_agent_error(status_code: int, code: str, message: str, *, extra: dict | None = None) -> None: detail = {"code": code, "message": message} if extra: detail.update(extra) raise HTTPException(status_code=status_code, detail=detail) def get_share_root(share: str) -> Path: config = get_runtime_config() normalized_share = (share or "").strip() if normalized_share not in config.shares: raise_agent_error(404, "path_not_found", "Share not found") return Path(config.shares[normalized_share]).expanduser().resolve(strict=False) def ensure_within_root(root: Path, candidate: Path) -> Path: try: candidate.relative_to(root) except ValueError as exc: _ = exc raise_agent_error(403, "path_traversal_detected", "Path escapes share root") return candidate def resolve_share_path(share: str, raw_path: str, *, must_exist: bool = True) -> Path: root = get_share_root(share) normalized = (raw_path or "").strip().replace("\\", "/") if normalized.startswith("/") or any(part == ".." for part in normalized.split("/")): raise_agent_error(400, "invalid_request", "Invalid share-relative path") candidate = (root / normalized).resolve(strict=False) candidate = ensure_within_root(root, candidate) if must_exist and not candidate.exists(): raise_agent_error(404, "path_not_found", "Path not found") return candidate def directory_entry_payload(path: Path) -> dict: stat_result = path.lstat() return { "name": path.name, "kind": "directory" if path.is_dir() else "file", "size": stat_result.st_size, "modified": datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"), } def info_payload(path: Path, *, share: str, raw_path: str) -> dict: stat_result = path.lstat() kind = "directory" if path.is_dir() else "file" mime, _ = mimetypes.guess_type(path.name) width, height = image_dimensions(path) if path.is_file() else (None, None) return { "share": share, "path": raw_path.strip().replace("\\", "/").strip("/"), "name": path.name, "kind": kind, "size": None if path.is_dir() else stat_result.st_size, "modified": datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"), "content_type": mime or "application/octet-stream", "extension": path.suffix.lower() or None, "width": width, "height": height, "owner": None, "group": None, "config_path": str(get_runtime_config().config_path) if get_runtime_config().config_path else None, } def list_directory(path: Path, *, show_hidden: bool) -> list[dict]: try: children = list(path.iterdir()) except PermissionError as exc: _ = exc raise_agent_error(403, "forbidden", "Permission denied by operating system") filtered = [] for child in children: if not show_hidden and child.name.startswith("."): continue filtered.append(child) filtered.sort(key=lambda item: (not item.is_dir(), item.name.lower())) return [directory_entry_payload(child) for child in filtered] def text_content_type_for_name(name: str) -> str | None: lowered = (name or "").lower() special = SPECIAL_TEXT_FILENAMES.get(lowered) if special: return special return TEXT_CONTENT_TYPES.get(Path(name).suffix.lower()) def read_text_preview(path: Path, *, max_bytes: int) -> dict: size = int(path.stat().st_size) preview_limit = min(max(1, int(max_bytes)), TEXT_PREVIEW_MAX_BYTES) with path.open("rb") as handle: raw = handle.read(preview_limit + 1) truncated = size > preview_limit or len(raw) > preview_limit if truncated: raw = raw[:preview_limit] if b"\x00" in raw: raise_agent_error(409, "unsupported_type", "Binary content is not supported for text preview") try: content = raw.decode("utf-8") except UnicodeDecodeError as exc: _ = exc raise_agent_error(409, "unsupported_type", "Binary content is not supported for text preview") return { "size": size, "modified": datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"), "encoding": "utf-8", "truncated": truncated, "content": content, } def image_dimensions(path: Path) -> tuple[int | None, int | None]: suffix = path.suffix.lower() try: if suffix == ".png": with path.open("rb") as handle: header = handle.read(24) if len(header) < 24 or header[:8] != b"\x89PNG\r\n\x1a\n": return None, None return struct.unpack(">II", header[16:24]) if suffix == ".gif": with path.open("rb") as handle: header = handle.read(10) if len(header) < 10 or header[:6] not in {b"GIF87a", b"GIF89a"}: return None, None return struct.unpack(" dict: config = get_runtime_config() return { "ok": True, "app": APP_NAME, "time": _now_iso(), "client_id": config.client_id or None, "display_name": config.display_name or None, "config_path": str(config.config_path) if config.config_path else None, "shares": sorted(config.shares.keys()), "auth_enabled": bool(config.agent_access_token), } @app.get("/health") def health(request: Request) -> dict: require_agent_auth(request) config = get_runtime_config() return { "ok": True, "app": APP_NAME, "time": _now_iso(), "client_id": config.client_id or None, "display_name": config.display_name or None, "platform": config.platform, "endpoint": config.endpoint or None, "shares": sorted(config.shares.keys()), "config_path": str(config.config_path) if config.config_path else None, "port_hint": DEFAULT_PORT, "auth_enabled": bool(config.agent_access_token), } @app.get("/api/list") def api_list(request: Request, share: str, path: str = "", show_hidden: bool = False) -> dict: require_agent_auth(request) target = resolve_share_path(share, path) if not target.is_dir(): raise HTTPException(status_code=400, detail="Path is not a directory") return { "share": share.strip(), "path": path.strip().replace("\\", "/").strip("/"), "entries": list_directory(target, show_hidden=show_hidden), } @app.get("/api/info") def api_info(request: Request, share: str, path: str = "") -> dict: require_agent_auth(request) target = resolve_share_path(share, path) return info_payload(target, share=share.strip(), raw_path=path) @app.get("/api/read") def api_read(request: Request, share: str, path: str = "", max_bytes: int = TEXT_PREVIEW_MAX_BYTES) -> dict: require_agent_auth(request) target = resolve_share_path(share, path) if target.is_dir(): raise_agent_error(409, "type_conflict", "Source must be a file") if not target.is_file(): raise_agent_error(409, "type_conflict", "Unsupported path type for read") content_type = text_content_type_for_name(target.name) if content_type is None: raise_agent_error(409, "unsupported_type", "File type is not supported for text preview") return { "name": target.name, "path": path.strip().replace("\\", "/").strip("/"), "content_type": content_type, **read_text_preview(target, max_bytes=max_bytes), } @app.get("/api/download") def api_download(request: Request, share: str, path: str = "") -> FileResponse: require_agent_auth(request) target = resolve_share_path(share, path) if target.is_dir(): raise_agent_error(409, "type_conflict", "Source must be a file") if not target.is_file(): raise_agent_error(409, "type_conflict", "Unsupported path type for download") return FileResponse( path=target, media_type=mimetypes.guess_type(target.name)[0] or "application/octet-stream", filename=target.name, ) @app.exception_handler(HTTPException) async def http_exception_handler(_: Request, exc: HTTPException) -> JSONResponse: return JSONResponse(status_code=exc.status_code, content={"ok": False, "detail": exc.detail}) @app.exception_handler(Exception) async def unhandled_exception_handler(_: Request, exc: Exception) -> JSONResponse: return JSONResponse(status_code=500, content={"ok": False, "detail": str(exc)})