627 lines
20 KiB
Python
627 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import html
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import secrets
|
|
import shutil
|
|
import stat
|
|
import time
|
|
from datetime import datetime
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
from fastapi import Body, FastAPI, File, Form, HTTPException, Request, UploadFile
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel, Field
|
|
|
|
APP_NAME = "Finder Commander"
|
|
HOME_ROOT = Path.home().resolve()
|
|
TRASH_DIR = HOME_ROOT / ".Trash"
|
|
MAX_TEXT_PREVIEW_BYTES = 2 * 1024 * 1024
|
|
CSRF_TOKEN = secrets.token_urlsafe(32)
|
|
|
|
app = FastAPI(title=APP_NAME)
|
|
app.mount("/static", StaticFiles(directory=str(Path(__file__).parent / "static")), name="static")
|
|
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))
|
|
|
|
|
|
class PathsPayload(BaseModel):
|
|
paths: list[str] = Field(default_factory=list)
|
|
destination_dir: Optional[str] = None
|
|
|
|
|
|
class RenamePayload(BaseModel):
|
|
path: str
|
|
new_name: str
|
|
|
|
|
|
class DeletePayload(BaseModel):
|
|
paths: list[str] = Field(default_factory=list)
|
|
mode: Literal["trash", "permanent"] = "trash"
|
|
|
|
|
|
class CommandPayload(BaseModel):
|
|
command: str
|
|
cwd: str = ""
|
|
|
|
PathsPayload.model_rebuild()
|
|
RenamePayload.model_rebuild()
|
|
DeletePayload.model_rebuild()
|
|
CommandPayload.model_rebuild()
|
|
|
|
TEXT_SUFFIXES = {
|
|
".md",
|
|
".txt",
|
|
".py",
|
|
".js",
|
|
".ts",
|
|
".tsx",
|
|
".jsx",
|
|
".css",
|
|
".html",
|
|
".json",
|
|
".yaml",
|
|
".yml",
|
|
".toml",
|
|
".ini",
|
|
".env",
|
|
".log",
|
|
".xml",
|
|
".sh",
|
|
".zsh",
|
|
".bash",
|
|
".c",
|
|
".cpp",
|
|
".h",
|
|
".java",
|
|
".go",
|
|
".rs",
|
|
".sql",
|
|
".conf",
|
|
".service",
|
|
".container",
|
|
".network",
|
|
".pod",
|
|
".kube",
|
|
}
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def remote_agent_config() -> dict:
|
|
config_path = os.getenv("FINDER_COMMANDER_REMOTE_AGENT_CONFIG", "").strip()
|
|
if not config_path:
|
|
return {}
|
|
try:
|
|
return json.loads(Path(config_path).read_text(encoding="utf-8"))
|
|
except (OSError, ValueError):
|
|
return {}
|
|
|
|
|
|
def remote_agent_access_token() -> str:
|
|
return os.getenv("FINDER_COMMANDER_AGENT_ACCESS_TOKEN", "").strip() or str(
|
|
remote_agent_config().get("agent_access_token", "")
|
|
).strip()
|
|
|
|
|
|
def remote_agent_shares() -> dict[str, str]:
|
|
raw = remote_agent_config().get("shares", {})
|
|
if not isinstance(raw, dict):
|
|
return {}
|
|
normalized: dict[str, str] = {}
|
|
for key, value in raw.items():
|
|
share_key = str(key).strip()
|
|
share_root = str(value).strip()
|
|
if share_key and share_root:
|
|
normalized[share_key] = share_root
|
|
return normalized
|
|
|
|
|
|
def require_remote_agent_auth(request: Request) -> None:
|
|
expected_token = remote_agent_access_token()
|
|
if not expected_token:
|
|
return
|
|
authorization = request.headers.get("authorization", "").strip()
|
|
if authorization != f"Bearer {expected_token}":
|
|
raise HTTPException(status_code=403, detail="Invalid agent token")
|
|
|
|
|
|
def share_root(share: str) -> Path:
|
|
shares = remote_agent_shares()
|
|
normalized_share = (share or "").strip()
|
|
if normalized_share not in shares:
|
|
raise HTTPException(status_code=404, detail="Share not found")
|
|
return Path(shares[normalized_share]).expanduser().resolve(strict=False)
|
|
|
|
|
|
def ensure_within_share(root: Path, candidate: Path) -> Path:
|
|
try:
|
|
candidate.relative_to(root)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=403, detail="Path escapes share root") from exc
|
|
return candidate
|
|
|
|
|
|
def resolve_share_path(share: str, raw_path: str, *, must_exist: bool = True) -> Path:
|
|
root = share_root(share)
|
|
normalized_raw_path = (raw_path or "").strip().replace("\\", "/")
|
|
if normalized_raw_path.startswith("/") or any(part == ".." for part in normalized_raw_path.split("/")):
|
|
raise HTTPException(status_code=400, detail="Invalid share-relative path")
|
|
candidate = (root / normalized_raw_path).resolve(strict=False)
|
|
candidate = ensure_within_share(root, candidate)
|
|
if must_exist and not candidate.exists():
|
|
raise HTTPException(status_code=404, detail="Path not found")
|
|
return candidate
|
|
|
|
|
|
def remote_entry_payload(path: Path) -> dict:
|
|
st = path.lstat()
|
|
return {
|
|
"name": path.name,
|
|
"kind": "directory" if path.is_dir() else "file",
|
|
"size": st.st_size,
|
|
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
|
|
def sorted_share_entries(path: Path, show_hidden: bool = False) -> list[dict]:
|
|
try:
|
|
children = list(path.iterdir())
|
|
except PermissionError as exc:
|
|
raise HTTPException(status_code=403, detail="Permission denied by operating system") from exc
|
|
filtered = []
|
|
for child in children:
|
|
if not show_hidden and child.name.startswith("."):
|
|
continue
|
|
filtered.append(child)
|
|
filtered.sort(key=lambda p: (not p.is_dir(), p.name.lower()))
|
|
return [remote_entry_payload(child) for child in filtered]
|
|
|
|
|
|
|
|
def rel_from_home(path: Path) -> str:
|
|
return "" if path == HOME_ROOT else str(path.relative_to(HOME_ROOT))
|
|
|
|
|
|
|
|
def ensure_within_home(candidate: Path) -> Path:
|
|
try:
|
|
candidate.relative_to(HOME_ROOT)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=403, detail="Path escapes home directory") from exc
|
|
return candidate
|
|
|
|
|
|
|
|
def sanitize_name(name: str) -> str:
|
|
name = (name or "").strip()
|
|
if not name or name in {".", ".."} or "/" in name:
|
|
raise HTTPException(status_code=400, detail="Invalid name")
|
|
return name
|
|
|
|
|
|
|
|
def resolve_user_path(raw_path: Optional[str], *, must_exist: bool = True) -> Path:
|
|
raw_path = (raw_path or "").strip()
|
|
candidate = (HOME_ROOT / raw_path).resolve(strict=False)
|
|
candidate = ensure_within_home(candidate)
|
|
if must_exist and not candidate.exists():
|
|
raise HTTPException(status_code=404, detail="Path not found")
|
|
return candidate
|
|
|
|
|
|
|
|
def check_origin(request: Request) -> None:
|
|
origin = request.headers.get("origin")
|
|
if not origin:
|
|
return
|
|
expected = str(request.base_url).rstrip("/")
|
|
if origin.rstrip("/") != expected:
|
|
raise HTTPException(status_code=403, detail="Origin not allowed")
|
|
|
|
|
|
|
|
def check_csrf(request: Request) -> None:
|
|
token = request.headers.get("x-csrf-token")
|
|
if token != CSRF_TOKEN:
|
|
raise HTTPException(status_code=403, detail="Invalid CSRF token")
|
|
|
|
|
|
|
|
def perms_string(mode: int) -> str:
|
|
return stat.filemode(mode)
|
|
|
|
|
|
|
|
def can_preview_text(path: Path) -> bool:
|
|
if path.is_dir():
|
|
return False
|
|
if path.stat().st_size > MAX_TEXT_PREVIEW_BYTES:
|
|
return False
|
|
mime, _ = mimetypes.guess_type(path.name)
|
|
if mime and (
|
|
mime.startswith("text/")
|
|
or mime in {"application/json", "application/xml", "application/javascript"}
|
|
):
|
|
return True
|
|
return path.suffix.lower() in TEXT_SUFFIXES
|
|
|
|
|
|
|
|
def entry_payload(path: Path) -> dict:
|
|
st = path.lstat()
|
|
kind = "directory" if path.is_dir() else "file"
|
|
mime, _ = mimetypes.guess_type(path.name)
|
|
return {
|
|
"name": path.name,
|
|
"rel_path": rel_from_home(path),
|
|
"parent_rel_path": rel_from_home(path.parent),
|
|
"kind": kind,
|
|
"is_symlink": path.is_symlink(),
|
|
"size": st.st_size,
|
|
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
|
|
"mime": mime or "application/octet-stream",
|
|
"perms": perms_string(st.st_mode),
|
|
"can_preview_text": can_preview_text(path) if path.is_file() else False,
|
|
}
|
|
|
|
|
|
|
|
def sorted_entries(path: Path, show_hidden: bool = False) -> list[dict]:
|
|
try:
|
|
children = list(path.iterdir())
|
|
except PermissionError as exc:
|
|
raise HTTPException(status_code=403, detail="Permission denied by operating system") from exc
|
|
filtered = []
|
|
for child in children:
|
|
if not show_hidden and child.name.startswith('.'):
|
|
continue
|
|
filtered.append(child)
|
|
filtered.sort(key=lambda p: (not p.is_dir(), p.name.lower()))
|
|
return [entry_payload(child) for child in filtered]
|
|
|
|
|
|
|
|
def move_to_trash(path: Path) -> Path:
|
|
TRASH_DIR.mkdir(parents=True, exist_ok=True)
|
|
target = TRASH_DIR / path.name
|
|
if target.exists():
|
|
target = TRASH_DIR / f"{target.stem}-{int(time.time())}{target.suffix}"
|
|
shutil.move(str(path), str(target))
|
|
return target
|
|
|
|
|
|
|
|
def copy_entry(source: Path, destination_dir: Path) -> Path:
|
|
destination = destination_dir / source.name
|
|
if destination.exists():
|
|
raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}")
|
|
if source.is_dir():
|
|
shutil.copytree(source, destination, symlinks=True)
|
|
else:
|
|
shutil.copy2(source, destination, follow_symlinks=False)
|
|
return destination
|
|
|
|
|
|
|
|
def move_entry(source: Path, destination_dir: Path) -> Path:
|
|
destination = destination_dir / source.name
|
|
if destination.exists():
|
|
raise HTTPException(status_code=409, detail=f"Destination already exists: {destination.name}")
|
|
shutil.move(str(source), str(destination))
|
|
return destination
|
|
|
|
|
|
|
|
def select_paths_or_current(paths: list[str], cwd: str) -> list[Path]:
|
|
result = [resolve_user_path(p) for p in paths]
|
|
if not result:
|
|
raise HTTPException(status_code=400, detail="No paths selected")
|
|
return result
|
|
|
|
|
|
|
|
def resolve_from_cwd(cwd_path: Path, raw: str, *, must_exist: bool = True) -> Path:
|
|
raw = (raw or "").strip()
|
|
candidate = (cwd_path / raw).resolve(strict=False)
|
|
candidate = ensure_within_home(candidate)
|
|
if must_exist and not candidate.exists():
|
|
raise HTTPException(status_code=404, detail="Path not found")
|
|
return candidate
|
|
|
|
|
|
def run_command(command: str, cwd: str) -> dict:
|
|
command = (command or "").strip()
|
|
if not command:
|
|
raise HTTPException(status_code=400, detail="Empty command")
|
|
cwd_path = resolve_user_path(cwd)
|
|
if not cwd_path.is_dir():
|
|
raise HTTPException(status_code=400, detail="CWD is not a directory")
|
|
|
|
parts = command.split()
|
|
verb = parts[0].lower()
|
|
args = parts[1:]
|
|
|
|
if verb == "cd":
|
|
raw_target = " ".join(args) if args else ""
|
|
target = resolve_user_path(raw_target) if raw_target.startswith("/") else resolve_from_cwd(cwd_path, raw_target or ".")
|
|
if not target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Target is not a directory")
|
|
return {"ok": True, "action": "cd", "cwd": rel_from_home(target), "message": str(target)}
|
|
|
|
if verb == "mkdir":
|
|
name = sanitize_name(" ".join(args))
|
|
target = resolve_from_cwd(cwd_path, name, must_exist=False)
|
|
target.mkdir(exist_ok=False)
|
|
return {"ok": True, "action": "mkdir", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"}
|
|
|
|
if verb == "touch":
|
|
name = sanitize_name(" ".join(args))
|
|
target = resolve_from_cwd(cwd_path, name, must_exist=False)
|
|
target.touch(exist_ok=False)
|
|
return {"ok": True, "action": "touch", "cwd": rel_from_home(cwd_path), "message": f"Created {name}"}
|
|
|
|
if verb == "select":
|
|
pattern = " ".join(args).strip() or "*"
|
|
entries = sorted_entries(cwd_path, show_hidden=True)
|
|
matches = [e["rel_path"] for e in entries if fnmatch.fnmatch(e["name"], pattern)]
|
|
return {
|
|
"ok": True,
|
|
"action": "select",
|
|
"cwd": rel_from_home(cwd_path),
|
|
"message": f"Matched {len(matches)} item(s)",
|
|
"matches": matches,
|
|
}
|
|
|
|
if verb == "help":
|
|
return {
|
|
"ok": True,
|
|
"action": "help",
|
|
"cwd": rel_from_home(cwd_path),
|
|
"message": "Commands: cd <path>, mkdir <name>, touch <name>, select <glob>, help",
|
|
}
|
|
|
|
raise HTTPException(status_code=400, detail="Unsupported command")
|
|
|
|
|
|
@app.middleware("http")
|
|
async def harden_headers(request: Request, call_next):
|
|
response = await call_next(request)
|
|
response.headers["X-Frame-Options"] = "DENY"
|
|
response.headers["Content-Security-Policy"] = (
|
|
"default-src 'self'; img-src 'self' data:; style-src 'self'; script-src 'self'; "
|
|
"connect-src 'self'; frame-ancestors 'none'; base-uri 'self'; form-action 'self'"
|
|
)
|
|
response.headers["Referrer-Policy"] = "no-referrer"
|
|
response.headers["X-Content-Type-Options"] = "nosniff"
|
|
return response
|
|
|
|
|
|
@app.get("/health")
|
|
def health(request: Request) -> dict:
|
|
require_remote_agent_auth(request)
|
|
return {"ok": True, "app": APP_NAME, "time": _now_iso(), "home": str(HOME_ROOT)}
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index(request: Request):
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"index.html",
|
|
{
|
|
"app_name": APP_NAME,
|
|
"home_root": str(HOME_ROOT),
|
|
"csrf_token": CSRF_TOKEN,
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/api/list")
|
|
def api_list(request: Request, path: str = "", share: str = "", show_hidden: bool = False) -> dict:
|
|
if share.strip():
|
|
require_remote_agent_auth(request)
|
|
target = resolve_share_path(share, path)
|
|
if not target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Path is not a directory")
|
|
return {
|
|
"share": share.strip(),
|
|
"path": path.strip().replace("\\", "/").strip("/"),
|
|
"entries": sorted_share_entries(target, show_hidden=show_hidden),
|
|
}
|
|
target = resolve_user_path(path)
|
|
if not target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Path is not a directory")
|
|
return {
|
|
"cwd": rel_from_home(target),
|
|
"absolute": str(target),
|
|
"parent": "" if target == HOME_ROOT else rel_from_home(target.parent),
|
|
"entries": sorted_entries(target, show_hidden=show_hidden),
|
|
}
|
|
|
|
|
|
@app.get("/api/read")
|
|
def api_read(path: str) -> dict:
|
|
target = resolve_user_path(path)
|
|
if target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Cannot read a directory as text")
|
|
if not can_preview_text(target):
|
|
raise HTTPException(status_code=415, detail="File is not previewable as text")
|
|
try:
|
|
content = target.read_text(encoding="utf-8")
|
|
encoding = "utf-8"
|
|
except UnicodeDecodeError:
|
|
content = target.read_text(encoding="utf-8", errors="replace")
|
|
encoding = "utf-8 (lossy)"
|
|
return {
|
|
"path": rel_from_home(target),
|
|
"name": target.name,
|
|
"encoding": encoding,
|
|
"content": content,
|
|
"size": target.stat().st_size,
|
|
}
|
|
|
|
|
|
@app.get("/api/meta")
|
|
def api_meta(path: str) -> dict:
|
|
target = resolve_user_path(path)
|
|
payload = entry_payload(target)
|
|
payload["absolute"] = str(target)
|
|
return payload
|
|
|
|
|
|
@app.get("/api/download")
|
|
def api_download(path: str):
|
|
target = resolve_user_path(path)
|
|
if target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Cannot download a directory")
|
|
return FileResponse(path=target, filename=target.name)
|
|
|
|
|
|
@app.get("/api/preview")
|
|
def api_preview(path: str):
|
|
target = resolve_user_path(path)
|
|
if target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Cannot preview a directory")
|
|
mime, _ = mimetypes.guess_type(target.name)
|
|
if not mime or not mime.startswith("image/"):
|
|
raise HTTPException(status_code=415, detail="Preview only supports images")
|
|
return FileResponse(path=target, media_type=mime)
|
|
|
|
|
|
@app.put("/api/write")
|
|
async def api_write(request: Request, path: str = Form(...), content: str = Form(...)) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
target = resolve_user_path(path, must_exist=False)
|
|
ensure_within_home(target.parent.resolve(strict=False))
|
|
if target.exists() and target.is_dir():
|
|
raise HTTPException(status_code=400, detail="Cannot overwrite a directory")
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = target.with_name(target.name + ".tmp-write")
|
|
tmp.write_text(content, encoding="utf-8")
|
|
os.replace(tmp, target)
|
|
return {"ok": True, "path": rel_from_home(target)}
|
|
|
|
|
|
@app.post("/api/mkdir")
|
|
async def api_mkdir(request: Request, path: str = Form(...), name: str = Form(...)) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
base = resolve_user_path(path)
|
|
if not base.is_dir():
|
|
raise HTTPException(status_code=400, detail="Base path is not a directory")
|
|
child = resolve_user_path(str(Path(rel_from_home(base)) / sanitize_name(name)), must_exist=False)
|
|
child.mkdir(parents=False, exist_ok=False)
|
|
return {"ok": True, "path": rel_from_home(child)}
|
|
|
|
|
|
@app.post("/api/upload")
|
|
async def api_upload(request: Request, path: str = Form(...), files: list[UploadFile] = File(...)) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
base = resolve_user_path(path)
|
|
if not base.is_dir():
|
|
raise HTTPException(status_code=400, detail="Upload target is not a directory")
|
|
saved: list[str] = []
|
|
for upload in files:
|
|
filename = Path(upload.filename or "").name
|
|
if not filename:
|
|
continue
|
|
destination = resolve_user_path(str(Path(rel_from_home(base)) / filename), must_exist=False)
|
|
with destination.open("wb") as f:
|
|
while chunk := await upload.read(1024 * 1024):
|
|
f.write(chunk)
|
|
saved.append(rel_from_home(destination))
|
|
return {"ok": True, "saved": saved}
|
|
|
|
|
|
@app.post("/api/rename")
|
|
async def api_rename(request: Request, payload: RenamePayload) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
source = resolve_user_path(payload.path)
|
|
destination = resolve_user_path(str(Path(rel_from_home(source.parent)) / sanitize_name(payload.new_name)), must_exist=False)
|
|
if destination.exists():
|
|
raise HTTPException(status_code=409, detail="Destination already exists")
|
|
os.replace(source, destination)
|
|
return {"ok": True, "old_path": rel_from_home(source), "new_path": rel_from_home(destination)}
|
|
|
|
|
|
@app.post("/api/copy")
|
|
async def api_copy(request: Request, payload: PathsPayload) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
if payload.destination_dir is None:
|
|
raise HTTPException(status_code=400, detail="Missing destination_dir")
|
|
destination_dir = resolve_user_path(payload.destination_dir)
|
|
if not destination_dir.is_dir():
|
|
raise HTTPException(status_code=400, detail="Destination is not a directory")
|
|
results = []
|
|
for source in select_paths_or_current(payload.paths, payload.destination_dir):
|
|
copied = copy_entry(source, destination_dir)
|
|
results.append(rel_from_home(copied))
|
|
return {"ok": True, "copied": results}
|
|
|
|
|
|
@app.post("/api/move")
|
|
async def api_move(request: Request, payload: PathsPayload) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
if payload.destination_dir is None:
|
|
raise HTTPException(status_code=400, detail="Missing destination_dir")
|
|
destination_dir = resolve_user_path(payload.destination_dir)
|
|
if not destination_dir.is_dir():
|
|
raise HTTPException(status_code=400, detail="Destination is not a directory")
|
|
results = []
|
|
for source in select_paths_or_current(payload.paths, payload.destination_dir):
|
|
moved = move_entry(source, destination_dir)
|
|
results.append(rel_from_home(moved))
|
|
return {"ok": True, "moved": results}
|
|
|
|
|
|
@app.post("/api/delete")
|
|
async def api_delete(request: Request, payload: DeletePayload) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
paths = select_paths_or_current(payload.paths, "")
|
|
deleted = []
|
|
for target in paths:
|
|
if target == HOME_ROOT:
|
|
raise HTTPException(status_code=400, detail="Refusing to delete home root")
|
|
if payload.mode == "trash":
|
|
moved = move_to_trash(target)
|
|
deleted.append(str(moved))
|
|
else:
|
|
if target.is_dir():
|
|
shutil.rmtree(target)
|
|
else:
|
|
target.unlink()
|
|
deleted.append(rel_from_home(target))
|
|
return {"ok": True, "mode": payload.mode, "deleted": deleted}
|
|
|
|
|
|
@app.post("/api/command")
|
|
async def api_command(request: Request, payload: CommandPayload) -> dict:
|
|
check_origin(request)
|
|
check_csrf(request)
|
|
return run_command(payload.command, payload.cwd)
|
|
|
|
|
|
@app.exception_handler(HTTPException)
|
|
async def http_exception_handler(_: Request, exc: HTTPException):
|
|
return JSONResponse(status_code=exc.status_code, content={"ok": False, "detail": exc.detail})
|
|
|
|
|
|
@app.exception_handler(Exception)
|
|
async def unhandled_exception_handler(_: Request, exc: Exception):
|
|
return JSONResponse(status_code=500, content={"ok": False, "detail": html.escape(str(exc))})
|