176 lines
6.5 KiB
Python
176 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from backend.app.api.errors import AppError
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ResolvedPath:
|
|
alias: str
|
|
relative: str
|
|
absolute: Path
|
|
display_style: str
|
|
|
|
|
|
class PathGuard:
|
|
def __init__(self, root_aliases: dict[str, str]):
|
|
normalized: dict[str, Path] = {}
|
|
volume_roots_candidates: dict[str, list[tuple[str, Path]]] = {}
|
|
for alias, root in root_aliases.items():
|
|
resolved_root = Path(root).resolve()
|
|
normalized[alias] = resolved_root
|
|
volume_name = resolved_root.name
|
|
volume_roots_candidates.setdefault(volume_name, []).append((alias, resolved_root))
|
|
self._roots = normalized
|
|
self._volume_roots = {
|
|
name: entries[0]
|
|
for name, entries in volume_roots_candidates.items()
|
|
if len(entries) == 1
|
|
}
|
|
|
|
def is_virtual_volumes_path(self, input_path: str) -> bool:
|
|
normalized_input = (input_path or "").strip()
|
|
return normalized_input == "/Volumes"
|
|
|
|
def virtual_volumes_entries(self) -> list[dict[str, str]]:
|
|
return [
|
|
{"name": name, "path": f"/Volumes/{name}"}
|
|
for name in sorted(self._volume_roots.keys(), key=str.lower)
|
|
]
|
|
|
|
def resolve_directory_path(self, input_path: str) -> ResolvedPath:
|
|
resolved = self.resolve_path(input_path)
|
|
if not resolved.absolute.exists():
|
|
raise AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": input_path},
|
|
)
|
|
if not resolved.absolute.is_dir():
|
|
raise AppError(
|
|
code="path_type_conflict",
|
|
message="Requested path is not a directory",
|
|
status_code=409,
|
|
details={"path": input_path},
|
|
)
|
|
return resolved
|
|
|
|
def resolve_existing_path(self, input_path: str) -> ResolvedPath:
|
|
resolved = self.resolve_path(input_path)
|
|
if not resolved.absolute.exists():
|
|
raise AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": input_path},
|
|
)
|
|
return resolved
|
|
|
|
def resolve_path(self, input_path: str) -> ResolvedPath:
|
|
alias, rel_segments, candidate, display_style = self.resolve_lexical_path(input_path)
|
|
root = self._roots[alias]
|
|
|
|
# Resolve symlinks for existing prefixes; for not-yet-existing tails strict=False keeps
|
|
# path normalization while still enabling containment check.
|
|
resolved_candidate = candidate.resolve(strict=False)
|
|
if not self._is_under_root(resolved_candidate, root):
|
|
raise AppError(
|
|
code="path_outside_whitelist",
|
|
message="Requested path is outside allowed roots",
|
|
status_code=403,
|
|
details={"path": input_path},
|
|
)
|
|
|
|
return ResolvedPath(
|
|
alias=alias,
|
|
relative=self._format_relative(alias, rel_segments, display_style),
|
|
absolute=resolved_candidate,
|
|
display_style=display_style,
|
|
)
|
|
|
|
def resolve_lexical_path(self, input_path: str) -> tuple[str, list[str], Path, str]:
|
|
raw_input = (input_path or "").strip()
|
|
normalized_input = raw_input.strip("/")
|
|
if not normalized_input:
|
|
raise AppError(
|
|
code="invalid_request",
|
|
message="Query parameter 'path' is required",
|
|
status_code=400,
|
|
)
|
|
|
|
segments = [seg for seg in normalized_input.split("/") if seg]
|
|
display_style = "alias"
|
|
alias = ""
|
|
rel_segments: list[str] = []
|
|
root: Path
|
|
|
|
if len(segments) >= 2 and segments[0] == "Volumes" and segments[1] in self._volume_roots:
|
|
display_style = "virtual_volumes"
|
|
volume_name = segments[1]
|
|
alias, root = self._volume_roots[volume_name]
|
|
rel_segments = segments[2:]
|
|
else:
|
|
alias = segments[0] if segments else ""
|
|
if alias not in self._roots:
|
|
raise AppError(
|
|
code="invalid_root_alias",
|
|
message="Unknown root alias",
|
|
status_code=403,
|
|
details={"path": input_path},
|
|
)
|
|
root = self._roots[alias]
|
|
rel_segments = segments[1:]
|
|
|
|
if any(seg == ".." for seg in rel_segments):
|
|
raise AppError(
|
|
code="path_traversal_detected",
|
|
message="Path traversal is not allowed",
|
|
status_code=403,
|
|
details={"path": input_path},
|
|
)
|
|
|
|
candidate = root.joinpath(*rel_segments)
|
|
return alias, rel_segments, candidate, display_style
|
|
|
|
def validate_name(self, name: str, field: str) -> str:
|
|
normalized = (name or "").strip()
|
|
if not normalized or normalized in {".", ".."} or "/" in normalized or "\\" in normalized:
|
|
raise AppError(
|
|
code="invalid_request",
|
|
message="Invalid name",
|
|
status_code=400,
|
|
details={field: name},
|
|
)
|
|
return normalized
|
|
|
|
def entry_relative_path(self, alias: str, absolute: Path, display_style: str = "alias") -> str:
|
|
root = self._roots[alias]
|
|
resolved_absolute = absolute.resolve(strict=False)
|
|
if not self._is_under_root(resolved_absolute, root):
|
|
raise AppError(
|
|
code="symlink_escape_detected",
|
|
message="Entry resolves outside allowed root",
|
|
status_code=403,
|
|
details={"path": f"{alias}"},
|
|
)
|
|
rel = resolved_absolute.relative_to(root).as_posix()
|
|
return self._format_relative(alias, [p for p in rel.split("/") if p], display_style)
|
|
|
|
@staticmethod
|
|
def _is_under_root(path: Path, root: Path) -> bool:
|
|
try:
|
|
path.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def _format_relative(self, alias: str, rel_segments: list[str], display_style: str = "alias") -> str:
|
|
if display_style == "virtual_volumes":
|
|
root = self._roots[alias]
|
|
prefix = f"/Volumes/{root.name}"
|
|
return prefix if not rel_segments else f"{prefix}/{'/'.join(rel_segments)}"
|
|
return alias if not rel_segments else f"{alias}/{'/'.join(rel_segments)}"
|