from __future__ import annotations import asyncio import sys import tempfile import unittest from pathlib import Path import httpx sys.path.insert(0, str(Path(__file__).resolve().parents[3])) from backend.app.dependencies import get_file_ops_service, get_history_service from backend.app.db.history_repository import HistoryRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.main import app from backend.app.security.path_guard import PathGuard from backend.app.services.file_ops_service import FileOpsService from backend.app.services.history_service import HistoryService class UploadApiGoldenTest(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.root = Path(self.temp_dir.name) / "root" self.root.mkdir(parents=True, exist_ok=True) self.uploads_dir = self.root / "uploads" self.uploads_dir.mkdir(parents=True, exist_ok=True) self.db_path = str(Path(self.temp_dir.name) / "history.db") history_repository = HistoryRepository(self.db_path) file_ops_service = FileOpsService( path_guard=PathGuard({"storage1": str(self.root)}), filesystem=FilesystemAdapter(), history_repository=history_repository, ) history_service = HistoryService(repository=history_repository) async def _override_file_ops_service() -> FileOpsService: return file_ops_service async def _override_history_service() -> HistoryService: return history_service app.dependency_overrides[get_file_ops_service] = _override_file_ops_service app.dependency_overrides[get_history_service] = _override_history_service def tearDown(self) -> None: app.dependency_overrides.clear() self.temp_dir.cleanup() def _upload(self, *, target_path: str, filename: str, content: bytes, overwrite: bool = False) -> httpx.Response: async def _run() -> httpx.Response: transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: return await client.post( "/api/files/upload", data={"target_path": target_path, "overwrite": "true" if overwrite else "false"}, files={"file": (filename, content, "application/octet-stream")}, ) return asyncio.run(_run()) def _get_history(self) -> list[dict]: async def _run() -> list[dict]: transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: response = await client.get("/api/history") return response.json()["items"] return asyncio.run(_run()) def test_upload_single_file_success(self) -> None: response = self._upload(target_path="storage1/uploads", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 200) body = response.json() self.assertEqual(body["path"], "storage1/uploads/hello.txt") self.assertEqual(body["size"], 5) self.assertTrue((self.uploads_dir / "hello.txt").exists()) history = self._get_history() self.assertEqual(history[0]["operation"], "upload") self.assertEqual(history[0]["status"], "completed") self.assertEqual(history[0]["destination"], "storage1/uploads/hello.txt") def test_upload_target_path_not_found(self) -> None: response = self._upload(target_path="storage1/missing", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 404) self.assertEqual( response.json(), { "error": { "code": "path_not_found", "message": "Requested path was not found", "details": {"path": "storage1/missing"}, } }, ) def test_upload_target_path_is_file(self) -> None: target_file = self.root / "not_a_directory.txt" target_file.write_text("x", encoding="utf-8") response = self._upload(target_path="storage1/not_a_directory.txt", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 409) self.assertEqual( response.json(), { "error": { "code": "path_type_conflict", "message": "Requested path is not a directory", "details": {"path": "storage1/not_a_directory.txt"}, } }, ) def test_upload_traversal_blocked(self) -> None: response = self._upload(target_path="storage1/../etc", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 403) self.assertEqual( response.json(), { "error": { "code": "path_traversal_detected", "message": "Path traversal is not allowed", "details": {"path": "storage1/../etc"}, } }, ) def test_upload_invalid_root_alias(self) -> None: response = self._upload(target_path="unknown/uploads", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 403) self.assertEqual( response.json(), { "error": { "code": "invalid_root_alias", "message": "Unknown root alias", "details": {"path": "unknown/uploads"}, } }, ) def test_upload_invalid_filename_blocked(self) -> None: response = self._upload(target_path="storage1/uploads", filename="..", content=b"hello") self.assertEqual(response.status_code, 400) self.assertEqual( response.json(), { "error": { "code": "invalid_request", "message": "Invalid name", "details": {"name": ".."}, } }, ) def test_upload_conflict_on_existing_file(self) -> None: existing = self.uploads_dir / "hello.txt" existing.write_text("existing", encoding="utf-8") response = self._upload(target_path="storage1/uploads", filename="hello.txt", content=b"hello") self.assertEqual(response.status_code, 409) self.assertEqual( response.json(), { "error": { "code": "already_exists", "message": "Target path already exists", "details": {"path": "storage1/uploads/hello.txt"}, } }, ) history = self._get_history() self.assertEqual(history[0]["operation"], "upload") self.assertEqual(history[0]["status"], "failed") self.assertEqual(history[0]["error_code"], "already_exists") def test_upload_overwrite_existing_file_success(self) -> None: existing = self.uploads_dir / "hello.txt" existing.write_text("existing", encoding="utf-8") response = self._upload( target_path="storage1/uploads", filename="hello.txt", content=b"replacement", overwrite=True, ) self.assertEqual(response.status_code, 200) self.assertEqual((self.uploads_dir / "hello.txt").read_bytes(), b"replacement") history = self._get_history() self.assertEqual(history[0]["operation"], "upload") self.assertEqual(history[0]["status"], "completed")