from __future__ import annotations from dataclasses import dataclass from pathlib import Path from backend.app.api.errors import AppError @dataclass(frozen=True) class ResolvedPath: alias: str relative: str absolute: Path class PathGuard: def __init__(self, root_aliases: dict[str, str]): normalized: dict[str, Path] = {} for alias, root in root_aliases.items(): normalized[alias] = Path(root).resolve() self._roots = normalized def resolve_directory_path(self, input_path: str) -> ResolvedPath: resolved = self.resolve_path(input_path) if not resolved.absolute.exists(): raise AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": input_path}, ) if not resolved.absolute.is_dir(): raise AppError( code="path_type_conflict", message="Requested path is not a directory", status_code=409, details={"path": input_path}, ) return resolved def resolve_existing_path(self, input_path: str) -> ResolvedPath: resolved = self.resolve_path(input_path) if not resolved.absolute.exists(): raise AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": input_path}, ) return resolved def resolve_path(self, input_path: str) -> ResolvedPath: alias, rel_segments, candidate = self.resolve_lexical_path(input_path) root = self._roots[alias] # Resolve symlinks for existing prefixes; for not-yet-existing tails strict=False keeps # path normalization while still enabling containment check. resolved_candidate = candidate.resolve(strict=False) if not self._is_under_root(resolved_candidate, root): raise AppError( code="path_outside_whitelist", message="Requested path is outside allowed roots", status_code=403, details={"path": input_path}, ) return ResolvedPath( alias=alias, relative=self._format_relative(alias, rel_segments), absolute=resolved_candidate, ) def resolve_lexical_path(self, input_path: str) -> tuple[str, list[str], Path]: normalized_input = (input_path or "").strip().strip("/") if not normalized_input: raise AppError( code="invalid_request", message="Query parameter 'path' is required", status_code=400, ) segments = [seg for seg in normalized_input.split("/") if seg] alias = segments[0] if segments else "" if alias not in self._roots: raise AppError( code="invalid_root_alias", message="Unknown root alias", status_code=403, details={"path": input_path}, ) rel_segments = segments[1:] if any(seg == ".." for seg in rel_segments): raise AppError( code="path_traversal_detected", message="Path traversal is not allowed", status_code=403, details={"path": input_path}, ) root = self._roots[alias] candidate = root.joinpath(*rel_segments) return alias, rel_segments, candidate def validate_name(self, name: str, field: str) -> str: normalized = (name or "").strip() if not normalized or normalized in {".", ".."} or "/" in normalized or "\\" in normalized: raise AppError( code="invalid_request", message="Invalid name", status_code=400, details={field: name}, ) return normalized def entry_relative_path(self, alias: str, absolute: Path) -> str: root = self._roots[alias] resolved_absolute = absolute.resolve(strict=False) if not self._is_under_root(resolved_absolute, root): raise AppError( code="symlink_escape_detected", message="Entry resolves outside allowed root", status_code=403, details={"path": f"{alias}"}, ) rel = resolved_absolute.relative_to(root).as_posix() return self._format_relative(alias, [p for p in rel.split("/") if p]) @staticmethod def _is_under_root(path: Path, root: Path) -> bool: try: path.relative_to(root) return True except ValueError: return False @staticmethod def _format_relative(alias: str, rel_segments: list[str]) -> str: return alias if not rel_segments else f"{alias}/{'/'.join(rel_segments)}"