Files
webmanager-mvp/finder_commander/app/main.py
T

231 lines
8.1 KiB
Python

from __future__ import annotations
import json
import mimetypes
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
APP_NAME = "Finder Commander Remote Agent"
DEFAULT_PORT = 8765
@dataclass(frozen=True)
class AgentRuntimeConfig:
config_path: Path | None
agent_access_token: str
shares: dict[str, str]
display_name: str
endpoint: str
client_id: str
platform: str
def _now_iso() -> str:
return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _candidate_config_paths() -> list[Path]:
candidates: list[Path] = []
env_path = os.getenv("FINDER_COMMANDER_REMOTE_AGENT_CONFIG", "").strip()
if env_path:
candidates.append(Path(env_path).expanduser().resolve(strict=False))
base_dir = Path(__file__).resolve().parents[1]
candidates.append(base_dir / "remote_client_agent.launchd.json")
candidates.append(base_dir / "remote_client_agent.example.json")
return candidates
def _load_raw_config() -> tuple[Path | None, dict]:
for candidate in _candidate_config_paths():
if candidate.is_file():
try:
raw = json.loads(candidate.read_text(encoding="utf-8"))
except ValueError as exc:
raise RuntimeError(f"Invalid JSON in config file: {candidate}") from exc
if not isinstance(raw, dict):
raise RuntimeError(f"Config file must contain a JSON object: {candidate}")
return candidate.resolve(strict=False), raw
return None, {}
@lru_cache(maxsize=1)
def get_runtime_config() -> AgentRuntimeConfig:
config_path, raw = _load_raw_config()
shares_raw = raw.get("shares", {})
shares: dict[str, str] = {}
if isinstance(shares_raw, dict):
for key, value in shares_raw.items():
normalized_key = str(key).strip()
normalized_value = str(value).strip()
if normalized_key and normalized_value:
shares[normalized_key] = normalized_value
return AgentRuntimeConfig(
config_path=config_path,
agent_access_token=os.getenv("FINDER_COMMANDER_AGENT_ACCESS_TOKEN", "").strip()
or str(raw.get("agent_access_token", "")).strip(),
shares=shares,
display_name=str(raw.get("display_name", "")).strip(),
endpoint=str(raw.get("public_endpoint", raw.get("endpoint", ""))).strip(),
client_id=str(raw.get("client_id", "")).strip(),
platform=str(raw.get("platform", "macos")).strip() or "macos",
)
def require_agent_auth(request: Request) -> None:
config = get_runtime_config()
if not config.agent_access_token:
return
authorization = request.headers.get("authorization", "").strip()
if authorization != f"Bearer {config.agent_access_token}":
raise HTTPException(
status_code=403,
detail={
"message": "Invalid agent token",
"config_path": str(config.config_path) if config.config_path else None,
"client_id": config.client_id or None,
"display_name": config.display_name or None,
},
)
def get_share_root(share: str) -> Path:
config = get_runtime_config()
normalized_share = (share or "").strip()
if normalized_share not in config.shares:
raise HTTPException(status_code=404, detail="Share not found")
return Path(config.shares[normalized_share]).expanduser().resolve(strict=False)
def ensure_within_root(root: Path, candidate: Path) -> Path:
try:
candidate.relative_to(root)
except ValueError as exc:
raise HTTPException(status_code=403, detail="Path escapes share root") from exc
return candidate
def resolve_share_path(share: str, raw_path: str, *, must_exist: bool = True) -> Path:
root = get_share_root(share)
normalized = (raw_path or "").strip().replace("\\", "/")
if normalized.startswith("/") or any(part == ".." for part in normalized.split("/")):
raise HTTPException(status_code=400, detail="Invalid share-relative path")
candidate = (root / normalized).resolve(strict=False)
candidate = ensure_within_root(root, candidate)
if must_exist and not candidate.exists():
raise HTTPException(status_code=404, detail="Path not found")
return candidate
def directory_entry_payload(path: Path) -> dict:
stat_result = path.lstat()
return {
"name": path.name,
"kind": "directory" if path.is_dir() else "file",
"size": stat_result.st_size,
"modified": datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"),
}
def info_payload(path: Path, *, share: str, raw_path: str) -> dict:
stat_result = path.lstat()
kind = "directory" if path.is_dir() else "file"
mime, _ = mimetypes.guess_type(path.name)
return {
"share": share,
"path": raw_path.strip().replace("\\", "/").strip("/"),
"name": path.name,
"kind": kind,
"size": None if path.is_dir() else stat_result.st_size,
"modified": datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"),
"content_type": mime or "application/octet-stream",
"config_path": str(get_runtime_config().config_path) if get_runtime_config().config_path else None,
}
def list_directory(path: Path, *, show_hidden: bool) -> list[dict]:
try:
children = list(path.iterdir())
except PermissionError as exc:
raise HTTPException(status_code=403, detail="Permission denied by operating system") from exc
filtered = []
for child in children:
if not show_hidden and child.name.startswith("."):
continue
filtered.append(child)
filtered.sort(key=lambda item: (not item.is_dir(), item.name.lower()))
return [directory_entry_payload(child) for child in filtered]
app = FastAPI(title=APP_NAME)
@app.get("/")
def root() -> dict:
config = get_runtime_config()
return {
"ok": True,
"app": APP_NAME,
"time": _now_iso(),
"client_id": config.client_id or None,
"display_name": config.display_name or None,
"config_path": str(config.config_path) if config.config_path else None,
"shares": sorted(config.shares.keys()),
"auth_enabled": bool(config.agent_access_token),
}
@app.get("/health")
def health(request: Request) -> dict:
require_agent_auth(request)
config = get_runtime_config()
return {
"ok": True,
"app": APP_NAME,
"time": _now_iso(),
"client_id": config.client_id or None,
"display_name": config.display_name or None,
"platform": config.platform,
"endpoint": config.endpoint or None,
"shares": sorted(config.shares.keys()),
"config_path": str(config.config_path) if config.config_path else None,
"port_hint": DEFAULT_PORT,
"auth_enabled": bool(config.agent_access_token),
}
@app.get("/api/list")
def api_list(request: Request, share: str, path: str = "", show_hidden: bool = False) -> dict:
require_agent_auth(request)
target = resolve_share_path(share, path)
if not target.is_dir():
raise HTTPException(status_code=400, detail="Path is not a directory")
return {
"share": share.strip(),
"path": path.strip().replace("\\", "/").strip("/"),
"entries": list_directory(target, show_hidden=show_hidden),
}
@app.get("/api/info")
def api_info(request: Request, share: str, path: str = "") -> dict:
require_agent_auth(request)
target = resolve_share_path(share, path)
return info_payload(target, share=share.strip(), raw_path=path)
@app.exception_handler(HTTPException)
async def http_exception_handler(_: Request, exc: HTTPException) -> JSONResponse:
return JSONResponse(status_code=exc.status_code, content={"ok": False, "detail": exc.detail})
@app.exception_handler(Exception)
async def unhandled_exception_handler(_: Request, exc: Exception) -> JSONResponse:
return JSONResponse(status_code=500, content={"ok": False, "detail": str(exc)})