from __future__ import annotations import asyncio import os import sys import tempfile import unittest from datetime import datetime, timezone from pathlib import Path import httpx sys.path.insert(0, str(Path(__file__).resolve().parents[3])) from backend.app.api.errors import AppError from backend.app.dependencies import get_browse_service from backend.app.db.remote_client_repository import RemoteClientRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.main import app from backend.app.security.path_guard import PathGuard from backend.app.services.browse_service import BrowseService from backend.app.services.remote_browse_service import RemoteBrowseService from backend.app.services.remote_client_service import RemoteClientService class _StubRemoteBrowseService(RemoteBrowseService): def __init__( self, remote_client_service: RemoteClientService, listings: dict[tuple[str, str, str], dict], failing_client_ids: set[str], ): super().__init__( remote_client_service=remote_client_service, agent_auth_header="Authorization", agent_auth_scheme="Bearer", agent_auth_token="agent-secret", agent_timeout_seconds=0.25, ) self._listings = listings self._failing_client_ids = failing_client_ids def _fetch_remote_listing(self, *, client, share_key: str, relative_path: str, show_hidden: bool) -> dict: if client.client_id in self._failing_client_ids: raise AppError( code="remote_client_unreachable", message=f"Remote client '{client.display_name}' is unreachable", status_code=502, details={"client_id": client.client_id, "endpoint": client.endpoint}, ) return self._listings[(client.client_id, share_key, relative_path)] class BrowseApiGoldenTest(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.volumes_root = Path(self.temp_dir.name) / "Volumes" self.volumes_root.mkdir(parents=True, exist_ok=True) self.root = self.volumes_root / "8TB" self.root.mkdir(parents=True, exist_ok=True) self.second_root = self.volumes_root / "8TB_RAID1" self.second_root.mkdir(parents=True, exist_ok=True) self.unconfigured_root = self.volumes_root / "Other" self.unconfigured_root.mkdir(parents=True, exist_ok=True) folder = self.root / "folder" folder.mkdir() file_path = self.root / "video.mkv" file_path.write_bytes(b"abc") second_file = self.second_root / "archive.txt" second_file.write_text("z", encoding="utf-8") remote_root = Path(self.temp_dir.name) / "remote-downloads" remote_root.mkdir(parents=True, exist_ok=True) remote_dir = remote_root / "Series" remote_dir.mkdir() remote_file = remote_root / "episode.mkv" remote_file.write_bytes(b"remote") hidden_dir = self.root / ".hidden_dir" hidden_dir.mkdir() hidden_file = self.root / ".secret" hidden_file.write_bytes(b"x") mtime = 1710000000 for path in [folder, file_path, hidden_dir, hidden_file, second_file, remote_dir, remote_file]: Path(path).touch() Path(path).chmod(0o755) os.utime(path, (mtime, mtime)) repository = RemoteClientRepository(str(Path(self.temp_dir.name) / "remote-clients.db")) now_iso = "2026-03-26T12:00:00Z" repository.upsert_client( client_id="client-123", display_name="Jan MacBook", platform="macos", agent_version="1.1.0", endpoint="http://agent.test", shares=[{"key": "downloads", "label": "Downloads"}], now_iso=now_iso, ) repository.upsert_client( client_id="broken-client", display_name="Offline iMac", platform="macos", agent_version="1.1.0", endpoint="http://127.0.0.1:1", shares=[{"key": "downloads", "label": "Downloads"}], now_iso=now_iso, ) service = BrowseService( path_guard=PathGuard({"storage1": str(self.root), "storage2": str(self.second_root)}), filesystem=FilesystemAdapter(), remote_browse_service=_StubRemoteBrowseService( remote_client_service=RemoteClientService( repository=repository, registration_token="secret-token", offline_timeout_seconds=60, now=lambda: datetime(2026, 3, 26, 12, 0, 0, tzinfo=timezone.utc), ), listings={ ( "client-123", "downloads", "", ): { "entries": [ { "name": "Series", "kind": "directory", "size": remote_dir.stat().st_size, "modified": datetime.fromtimestamp(remote_dir.stat().st_mtime, tz=timezone.utc) .isoformat() .replace("+00:00", "Z"), }, { "name": "episode.mkv", "kind": "file", "size": remote_file.stat().st_size, "modified": datetime.fromtimestamp(remote_file.stat().st_mtime, tz=timezone.utc) .isoformat() .replace("+00:00", "Z"), }, ] } }, failing_client_ids={"broken-client"}, ), ) async def _override_browse_service() -> BrowseService: return service app.dependency_overrides[get_browse_service] = _override_browse_service def tearDown(self) -> None: app.dependency_overrides.clear() self.temp_dir.cleanup() def _get(self, path: str, show_hidden: str | None = None) -> httpx.Response: async def _run() -> httpx.Response: transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: params = {"path": path} if show_hidden is not None: params["show_hidden"] = show_hidden return await client.get("/api/browse", params=params) return asyncio.run(_run()) def test_browse_success_default_hides_hidden_entries(self) -> None: response = self._get("storage1") self.assertEqual(response.status_code, 200) modified = datetime.fromtimestamp(1710000000, tz=timezone.utc).isoformat().replace("+00:00", "Z") expected = { "path": "storage1", "directories": [ { "name": "folder", "path": "storage1/folder", "modified": modified, } ], "files": [ { "name": "video.mkv", "path": "storage1/video.mkv", "size": 3, "modified": modified, } ], } self.assertEqual(response.json(), expected) def test_browse_success_show_hidden_true(self) -> None: response = self._get("storage1", show_hidden="true") self.assertEqual(response.status_code, 200) body = response.json() directory_names = [item["name"] for item in body["directories"]] file_names = [item["name"] for item in body["files"]] self.assertEqual(directory_names, [".hidden_dir", "folder"]) self.assertEqual(file_names, [".secret", "video.mkv"]) def test_browse_virtual_volumes_lists_only_configured_mounts(self) -> None: response = self._get("/Volumes") self.assertEqual(response.status_code, 200) self.assertEqual( response.json(), { "path": "/Volumes", "directories": [ {"name": "8TB", "path": "/Volumes/8TB", "modified": ""}, {"name": "8TB_RAID1", "path": "/Volumes/8TB_RAID1", "modified": ""}, ], "files": [], }, ) def test_browse_virtual_mount_maps_to_configured_root(self) -> None: response = self._get("/Volumes/8TB") self.assertEqual(response.status_code, 200) modified = datetime.fromtimestamp(1710000000, tz=timezone.utc).isoformat().replace("+00:00", "Z") self.assertEqual( response.json(), { "path": "/Volumes/8TB", "directories": [ { "name": "folder", "path": "/Volumes/8TB/folder", "modified": modified, } ], "files": [ { "name": "video.mkv", "path": "/Volumes/8TB/video.mkv", "size": 3, "modified": modified, } ], }, ) def test_browse_virtual_clients_and_remote_share(self) -> None: clients_response = self._get("/Clients") self.assertEqual(clients_response.status_code, 200) self.assertEqual( clients_response.json(), { "path": "/Clients", "directories": [ { "name": "Jan MacBook", "path": "/Clients/client-123", "modified": "2026-03-26T12:00:00Z", }, { "name": "Offline iMac", "path": "/Clients/broken-client", "modified": "2026-03-26T12:00:00Z", }, ], "files": [], }, ) shares_response = self._get("/Clients/client-123") self.assertEqual(shares_response.status_code, 200) self.assertEqual( shares_response.json(), { "path": "/Clients/client-123", "directories": [ { "name": "Downloads", "path": "/Clients/client-123/downloads", "modified": "2026-03-26T12:00:00Z", } ], "files": [], }, ) browse_response = self._get("/Clients/client-123/downloads") self.assertEqual(browse_response.status_code, 200) modified = datetime.fromtimestamp(1710000000, tz=timezone.utc).isoformat().replace("+00:00", "Z") self.assertEqual( browse_response.json(), { "path": "/Clients/client-123/downloads", "directories": [ { "name": "Series", "path": "/Clients/client-123/downloads/Series", "modified": modified, } ], "files": [ { "name": "episode.mkv", "path": "/Clients/client-123/downloads/episode.mkv", "size": 6, "modified": modified, } ], }, ) def test_remote_client_failure_stays_local_to_remote_subtree(self) -> None: broken_response = self._get("/Clients/broken-client/downloads") self.assertEqual(broken_response.status_code, 502) self.assertEqual(broken_response.json()["error"]["code"], "remote_client_unreachable") volumes_response = self._get("/Volumes") self.assertEqual(volumes_response.status_code, 200) self.assertEqual(volumes_response.json()["path"], "/Volumes") if __name__ == "__main__": unittest.main()