83 lines
3.3 KiB
Python
83 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
from backend.app.api.errors import AppError
|
|
from backend.app.api.schemas import SearchResponse, SearchResultItem
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
from backend.app.security.path_guard import PathGuard
|
|
|
|
SEARCH_RESULT_LIMIT = 100
|
|
SEARCH_MIN_QUERY_LENGTH = 3
|
|
|
|
|
|
class SearchService:
|
|
def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter):
|
|
self._path_guard = path_guard
|
|
self._filesystem = filesystem
|
|
|
|
def search(self, path: str, query: str) -> SearchResponse:
|
|
normalized_query = (query or "").strip()
|
|
if len(normalized_query) < SEARCH_MIN_QUERY_LENGTH:
|
|
raise AppError(
|
|
code="invalid_request",
|
|
message=f"Query must be at least {SEARCH_MIN_QUERY_LENGTH} characters",
|
|
status_code=400,
|
|
)
|
|
|
|
items: list[SearchResultItem] = []
|
|
truncated = False
|
|
|
|
if self._path_guard.is_virtual_volumes_path(path):
|
|
for entry in self._path_guard.virtual_volumes_entries():
|
|
resolved = self._path_guard.resolve_directory_path(entry["path"])
|
|
remaining = SEARCH_RESULT_LIMIT - len(items)
|
|
if remaining <= 0:
|
|
truncated = True
|
|
break
|
|
matches, chunk_truncated = self._filesystem.search_names(
|
|
resolved.absolute,
|
|
normalized_query,
|
|
remaining,
|
|
)
|
|
items.extend(self._map_results(matches, resolved.display_style))
|
|
if chunk_truncated or len(items) >= SEARCH_RESULT_LIMIT:
|
|
truncated = True
|
|
break
|
|
return SearchResponse(items=items, truncated=truncated)
|
|
|
|
resolved = self._path_guard.resolve_directory_path(path)
|
|
matches, truncated = self._filesystem.search_names(
|
|
resolved.absolute,
|
|
normalized_query,
|
|
SEARCH_RESULT_LIMIT,
|
|
)
|
|
return SearchResponse(items=self._map_results(matches, resolved.display_style), truncated=truncated)
|
|
|
|
def _map_results(self, matches: list[dict], display_style: str) -> list[SearchResultItem]:
|
|
items: list[SearchResultItem] = []
|
|
for match in matches:
|
|
absolute = match["absolute"]
|
|
alias = self._match_alias_for_path(absolute)
|
|
path = self._path_guard.entry_relative_path(alias, absolute, display_style=display_style)
|
|
parent_path = self._path_guard.entry_relative_path(alias, absolute.parent, display_style=display_style)
|
|
items.append(
|
|
SearchResultItem(
|
|
name=match["name"],
|
|
path=path,
|
|
type=match["kind"],
|
|
parent_path=parent_path,
|
|
root=alias,
|
|
)
|
|
)
|
|
return items
|
|
|
|
def _match_alias_for_path(self, absolute) -> str:
|
|
resolved = absolute.resolve(strict=False)
|
|
for alias, root in self._path_guard._roots.items(): # internal mapping, shared security source
|
|
if self._path_guard._is_under_root(resolved, root):
|
|
return alias
|
|
raise AppError(
|
|
code="path_outside_whitelist",
|
|
message="Requested path is outside allowed roots",
|
|
status_code=403,
|
|
)
|