from __future__ import annotations from backend.app.api.errors import AppError from backend.app.api.schemas import SearchResponse, SearchResultItem from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.security.path_guard import PathGuard SEARCH_RESULT_LIMIT = 100 SEARCH_MIN_QUERY_LENGTH = 3 class SearchService: def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter): self._path_guard = path_guard self._filesystem = filesystem def search(self, path: str, query: str) -> SearchResponse: normalized_query = (query or "").strip() if len(normalized_query) < SEARCH_MIN_QUERY_LENGTH: raise AppError( code="invalid_request", message=f"Query must be at least {SEARCH_MIN_QUERY_LENGTH} characters", status_code=400, ) items: list[SearchResultItem] = [] truncated = False if self._path_guard.is_virtual_volumes_path(path): for entry in self._path_guard.virtual_volumes_entries(): resolved = self._path_guard.resolve_directory_path(entry["path"]) remaining = SEARCH_RESULT_LIMIT - len(items) if remaining <= 0: truncated = True break matches, chunk_truncated = self._filesystem.search_names( resolved.absolute, normalized_query, remaining, ) items.extend(self._map_results(matches, resolved.display_style)) if chunk_truncated or len(items) >= SEARCH_RESULT_LIMIT: truncated = True break return SearchResponse(items=items, truncated=truncated) resolved = self._path_guard.resolve_directory_path(path) matches, truncated = self._filesystem.search_names( resolved.absolute, normalized_query, SEARCH_RESULT_LIMIT, ) return SearchResponse(items=self._map_results(matches, resolved.display_style), truncated=truncated) def _map_results(self, matches: list[dict], display_style: str) -> list[SearchResultItem]: items: list[SearchResultItem] = [] for match in matches: absolute = match["absolute"] alias = self._match_alias_for_path(absolute) path = self._path_guard.entry_relative_path(alias, absolute, display_style=display_style) parent_path = self._path_guard.entry_relative_path(alias, absolute.parent, display_style=display_style) items.append( SearchResultItem( name=match["name"], path=path, type=match["kind"], parent_path=parent_path, root=alias, ) ) return items def _match_alias_for_path(self, absolute) -> str: resolved = absolute.resolve(strict=False) for alias, root in self._path_guard._roots.items(): # internal mapping, shared security source if self._path_guard._is_under_root(resolved, root): return alias raise AppError( code="path_outside_whitelist", message="Requested path is outside allowed roots", status_code=403, )