This commit is contained in:
kodi
2026-03-11 15:25:32 +01:00
parent 6a1a575383
commit d1f018a130
22 changed files with 909 additions and 49 deletions
+56 -20
View File
@@ -11,14 +11,34 @@ class ResolvedPath:
alias: str
relative: str
absolute: Path
display_style: str
class PathGuard:
def __init__(self, root_aliases: dict[str, str]):
normalized: dict[str, Path] = {}
volume_roots_candidates: dict[str, list[tuple[str, Path]]] = {}
for alias, root in root_aliases.items():
normalized[alias] = Path(root).resolve()
resolved_root = Path(root).resolve()
normalized[alias] = resolved_root
volume_name = resolved_root.name
volume_roots_candidates.setdefault(volume_name, []).append((alias, resolved_root))
self._roots = normalized
self._volume_roots = {
name: entries[0]
for name, entries in volume_roots_candidates.items()
if len(entries) == 1
}
def is_virtual_volumes_path(self, input_path: str) -> bool:
normalized_input = (input_path or "").strip()
return normalized_input == "/Volumes"
def virtual_volumes_entries(self) -> list[dict[str, str]]:
return [
{"name": name, "path": f"/Volumes/{name}"}
for name in sorted(self._volume_roots.keys(), key=str.lower)
]
def resolve_directory_path(self, input_path: str) -> ResolvedPath:
resolved = self.resolve_path(input_path)
@@ -50,7 +70,7 @@ class PathGuard:
return resolved
def resolve_path(self, input_path: str) -> ResolvedPath:
alias, rel_segments, candidate = self.resolve_lexical_path(input_path)
alias, rel_segments, candidate, display_style = self.resolve_lexical_path(input_path)
root = self._roots[alias]
# Resolve symlinks for existing prefixes; for not-yet-existing tails strict=False keeps
@@ -66,12 +86,14 @@ class PathGuard:
return ResolvedPath(
alias=alias,
relative=self._format_relative(alias, rel_segments),
relative=self._format_relative(alias, rel_segments, display_style),
absolute=resolved_candidate,
display_style=display_style,
)
def resolve_lexical_path(self, input_path: str) -> tuple[str, list[str], Path]:
normalized_input = (input_path or "").strip().strip("/")
def resolve_lexical_path(self, input_path: str) -> tuple[str, list[str], Path, str]:
raw_input = (input_path or "").strip()
normalized_input = raw_input.strip("/")
if not normalized_input:
raise AppError(
code="invalid_request",
@@ -80,16 +102,28 @@ class PathGuard:
)
segments = [seg for seg in normalized_input.split("/") if seg]
alias = segments[0] if segments else ""
if alias not in self._roots:
raise AppError(
code="invalid_root_alias",
message="Unknown root alias",
status_code=403,
details={"path": input_path},
)
display_style = "alias"
alias = ""
rel_segments: list[str] = []
root: Path
if len(segments) >= 2 and segments[0] == "Volumes" and segments[1] in self._volume_roots:
display_style = "virtual_volumes"
volume_name = segments[1]
alias, root = self._volume_roots[volume_name]
rel_segments = segments[2:]
else:
alias = segments[0] if segments else ""
if alias not in self._roots:
raise AppError(
code="invalid_root_alias",
message="Unknown root alias",
status_code=403,
details={"path": input_path},
)
root = self._roots[alias]
rel_segments = segments[1:]
rel_segments = segments[1:]
if any(seg == ".." for seg in rel_segments):
raise AppError(
code="path_traversal_detected",
@@ -98,9 +132,8 @@ class PathGuard:
details={"path": input_path},
)
root = self._roots[alias]
candidate = root.joinpath(*rel_segments)
return alias, rel_segments, candidate
return alias, rel_segments, candidate, display_style
def validate_name(self, name: str, field: str) -> str:
normalized = (name or "").strip()
@@ -113,7 +146,7 @@ class PathGuard:
)
return normalized
def entry_relative_path(self, alias: str, absolute: Path) -> str:
def entry_relative_path(self, alias: str, absolute: Path, display_style: str = "alias") -> str:
root = self._roots[alias]
resolved_absolute = absolute.resolve(strict=False)
if not self._is_under_root(resolved_absolute, root):
@@ -124,7 +157,7 @@ class PathGuard:
details={"path": f"{alias}"},
)
rel = resolved_absolute.relative_to(root).as_posix()
return self._format_relative(alias, [p for p in rel.split("/") if p])
return self._format_relative(alias, [p for p in rel.split("/") if p], display_style)
@staticmethod
def _is_under_root(path: Path, root: Path) -> bool:
@@ -134,6 +167,9 @@ class PathGuard:
except ValueError:
return False
@staticmethod
def _format_relative(alias: str, rel_segments: list[str]) -> str:
def _format_relative(self, alias: str, rel_segments: list[str], display_style: str = "alias") -> str:
if display_style == "virtual_volumes":
root = self._roots[alias]
prefix = f"/Volumes/{root.name}"
return prefix if not rel_segments else f"{prefix}/{'/'.join(rel_segments)}"
return alias if not rel_segments else f"{alias}/{'/'.join(rel_segments)}"
+13 -2
View File
@@ -11,13 +11,22 @@ class BrowseService:
self._filesystem = filesystem
def browse(self, path: str, show_hidden: bool) -> BrowseResponse:
if self._path_guard.is_virtual_volumes_path(path):
directories = [
DirectoryEntry(name=item["name"], path=item["path"], modified="")
for item in self._path_guard.virtual_volumes_entries()
]
return BrowseResponse(path="/Volumes", directories=directories, files=[])
resolved = self._path_guard.resolve_directory_path(path)
directories_raw, files_raw = self._filesystem.list_directory(resolved.absolute, show_hidden=show_hidden)
directories = [
DirectoryEntry(
name=item["name"],
path=self._path_guard.entry_relative_path(resolved.alias, item["absolute"]),
path=self._path_guard.entry_relative_path(
resolved.alias, item["absolute"], display_style=resolved.display_style
),
modified=item["modified"],
)
for item in directories_raw
@@ -26,7 +35,9 @@ class BrowseService:
files = [
FileEntry(
name=item["name"],
path=self._path_guard.entry_relative_path(resolved.alias, item["absolute"]),
path=self._path_guard.entry_relative_path(
resolved.alias, item["absolute"], display_style=resolved.display_style
),
size=item["size"],
modified=item["modified"],
)
@@ -17,7 +17,7 @@ class CopyTaskService:
def create_copy_task(self, source: str, destination: str) -> TaskCreateResponse:
resolved_source = self._path_guard.resolve_existing_path(source)
_, _, lexical_source = self._path_guard.resolve_lexical_path(source)
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source)
if lexical_source.is_symlink():
raise AppError(
code="type_conflict",
@@ -36,7 +36,11 @@ class CopyTaskService:
resolved_destination = self._path_guard.resolve_path(destination)
destination_parent = resolved_destination.absolute.parent
parent_relative = self._path_guard.entry_relative_path(resolved_destination.alias, destination_parent)
parent_relative = self._path_guard.entry_relative_path(
resolved_destination.alias,
destination_parent,
display_style=resolved_destination.display_style,
)
self._map_directory_validation(parent_relative)
if resolved_destination.absolute.exists():
@@ -68,7 +68,11 @@ class FileOpsService:
resolved_source = self._path_guard.resolve_existing_path(path)
safe_name = self._path_guard.validate_name(new_name, field="new_name")
parent_relative = self._path_guard.entry_relative_path(resolved_source.alias, resolved_source.absolute.parent)
parent_relative = self._path_guard.entry_relative_path(
resolved_source.alias,
resolved_source.absolute.parent,
display_style=resolved_source.display_style,
)
target_relative = self._join_relative(parent_relative, safe_name)
resolved_target = self._path_guard.resolve_path(target_relative)
@@ -15,7 +15,7 @@ class MoveTaskService:
def create_move_task(self, source: str, destination: str) -> TaskCreateResponse:
resolved_source = self._path_guard.resolve_existing_path(source)
_, _, lexical_source = self._path_guard.resolve_lexical_path(source)
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source)
if lexical_source.is_symlink():
raise AppError(
@@ -34,7 +34,11 @@ class MoveTaskService:
resolved_destination = self._path_guard.resolve_path(destination)
destination_parent = resolved_destination.absolute.parent
parent_relative = self._path_guard.entry_relative_path(resolved_destination.alias, destination_parent)
parent_relative = self._path_guard.entry_relative_path(
resolved_destination.alias,
destination_parent,
display_style=resolved_destination.display_style,
)
self._map_directory_validation(parent_relative)
if resolved_destination.absolute.exists():