upload volledige repo
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,157 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_bookmark_service
|
||||
from backend.app.db.bookmark_repository import BookmarkRepository
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.bookmark_service import BookmarkService
|
||||
|
||||
|
||||
class BookmarksApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root = Path(self.temp_dir.name) / "root"
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
self.repo = BookmarkRepository(str(Path(self.temp_dir.name) / "bookmarks.db"))
|
||||
|
||||
path_guard = PathGuard({"storage1": str(self.root)})
|
||||
service = BookmarkService(path_guard=path_guard, repository=self.repo)
|
||||
|
||||
async def _override_bookmark_service() -> BookmarkService:
|
||||
return service
|
||||
|
||||
app.dependency_overrides[get_bookmark_service] = _override_bookmark_service
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _request(self, method: str, url: str, payload: dict | None = None) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
if method == "POST":
|
||||
return await client.post(url, json=payload)
|
||||
if method == "DELETE":
|
||||
return await client.delete(url)
|
||||
return await client.get(url)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def test_create_success(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/my/path", "label": "My Path"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = response.json()
|
||||
self.assertEqual(body["path"], "storage1/my/path")
|
||||
self.assertEqual(body["label"], "My Path")
|
||||
self.assertIn("id", body)
|
||||
self.assertIn("created_at", body)
|
||||
|
||||
def test_list_shape(self) -> None:
|
||||
self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/a", "label": "A"},
|
||||
)
|
||||
|
||||
response = self._request("GET", "/api/bookmarks")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(response.json()["items"]), 1)
|
||||
item = response.json()["items"][0]
|
||||
self.assertEqual(set(item.keys()), {"id", "path", "label", "created_at"})
|
||||
|
||||
def test_delete_success(self) -> None:
|
||||
created = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/a", "label": "A"},
|
||||
).json()
|
||||
|
||||
response = self._request("DELETE", f"/api/bookmarks/{created['id']}")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"id": created["id"]})
|
||||
|
||||
def test_invalid_path(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "unknown/path", "label": "A"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "invalid_root_alias")
|
||||
|
||||
def test_invalid_label(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/a", "label": " "},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_request",
|
||||
"message": "Label is required",
|
||||
"details": {"label": " "},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_duplicate_conflict(self) -> None:
|
||||
self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/a", "label": "A"},
|
||||
)
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/a", "label": "Again"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "already_exists",
|
||||
"message": "Bookmark already exists for path",
|
||||
"details": {"path": "storage1/a"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_traversal_attempt(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/bookmarks",
|
||||
{"path": "storage1/../etc", "label": "Bad"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,105 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_browse_service
|
||||
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.browse_service import BrowseService
|
||||
|
||||
|
||||
class BrowseApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root = Path(self.temp_dir.name) / "root"
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
folder = self.root / "folder"
|
||||
folder.mkdir()
|
||||
file_path = self.root / "video.mkv"
|
||||
file_path.write_bytes(b"abc")
|
||||
|
||||
hidden_dir = self.root / ".hidden_dir"
|
||||
hidden_dir.mkdir()
|
||||
hidden_file = self.root / ".secret"
|
||||
hidden_file.write_bytes(b"x")
|
||||
|
||||
mtime = 1710000000
|
||||
for path in [folder, file_path, hidden_dir, hidden_file]:
|
||||
Path(path).touch()
|
||||
Path(path).chmod(0o755)
|
||||
import os
|
||||
os.utime(path, (mtime, mtime))
|
||||
|
||||
service = BrowseService(
|
||||
path_guard=PathGuard({"storage1": str(self.root)}),
|
||||
filesystem=FilesystemAdapter(),
|
||||
)
|
||||
async def _override_browse_service() -> BrowseService:
|
||||
return service
|
||||
|
||||
app.dependency_overrides[get_browse_service] = _override_browse_service
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _get(self, path: str, show_hidden: str | None = None) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
params = {"path": path}
|
||||
if show_hidden is not None:
|
||||
params["show_hidden"] = show_hidden
|
||||
return await client.get("/api/browse", params=params)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def test_browse_success_default_hides_hidden_entries(self) -> None:
|
||||
response = self._get("storage1")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
modified = datetime.fromtimestamp(1710000000, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
expected = {
|
||||
"path": "storage1",
|
||||
"directories": [
|
||||
{
|
||||
"name": "folder",
|
||||
"path": "storage1/folder",
|
||||
"modified": modified,
|
||||
}
|
||||
],
|
||||
"files": [
|
||||
{
|
||||
"name": "video.mkv",
|
||||
"path": "storage1/video.mkv",
|
||||
"size": 3,
|
||||
"modified": modified,
|
||||
}
|
||||
],
|
||||
}
|
||||
self.assertEqual(response.json(), expected)
|
||||
|
||||
def test_browse_success_show_hidden_true(self) -> None:
|
||||
response = self._get("storage1", show_hidden="true")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = response.json()
|
||||
directory_names = [item["name"] for item in body["directories"]]
|
||||
file_names = [item["name"] for item in body["files"]]
|
||||
self.assertEqual(directory_names, [".hidden_dir", "folder"])
|
||||
self.assertEqual(file_names, [".secret", "video.mkv"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,211 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_copy_task_service, get_task_service
|
||||
from backend.app.db.task_repository import TaskRepository
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.copy_task_service import CopyTaskService
|
||||
from backend.app.services.task_service import TaskService
|
||||
from backend.app.tasks_runner import TaskRunner
|
||||
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
||||
|
||||
|
||||
class FailingFilesystemAdapter(FilesystemAdapter):
|
||||
def copy_file(self, source: str, destination: str, on_progress: callable | None = None) -> None:
|
||||
raise OSError("forced copy failure")
|
||||
|
||||
|
||||
class CopyApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root = Path(self.temp_dir.name) / "root"
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
self.repo = TaskRepository(str(Path(self.temp_dir.name) / "tasks.db"))
|
||||
|
||||
path_guard = PathGuard({"storage1": str(self.root), "storage2": str(self.root)})
|
||||
self._set_services(path_guard=path_guard, filesystem=FilesystemAdapter())
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _set_services(self, path_guard: PathGuard, filesystem: FilesystemAdapter) -> None:
|
||||
runner = TaskRunner(repository=self.repo, filesystem=filesystem)
|
||||
copy_service = CopyTaskService(path_guard=path_guard, repository=self.repo, runner=runner)
|
||||
task_service = TaskService(repository=self.repo)
|
||||
|
||||
async def _override_copy_service() -> CopyTaskService:
|
||||
return copy_service
|
||||
|
||||
async def _override_task_service() -> TaskService:
|
||||
return task_service
|
||||
|
||||
app.dependency_overrides[get_copy_task_service] = _override_copy_service
|
||||
app.dependency_overrides[get_task_service] = _override_task_service
|
||||
|
||||
def _request(self, method: str, url: str, payload: dict | None = None) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
if method == "POST":
|
||||
return await client.post(url, json=payload)
|
||||
return await client.get(url)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def _wait_task(self, task_id: str, timeout_s: float = 2.0) -> dict:
|
||||
deadline = time.time() + timeout_s
|
||||
while time.time() < deadline:
|
||||
response = self._request("GET", f"/api/tasks/{task_id}")
|
||||
body = response.json()
|
||||
if body["status"] in {"completed", "failed"}:
|
||||
return body
|
||||
time.sleep(0.02)
|
||||
self.fail("task did not reach terminal state in time")
|
||||
|
||||
def test_copy_success_create_task_shape(self) -> None:
|
||||
src = self.root / "source.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/copy.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
body = response.json()
|
||||
self.assertIn("task_id", body)
|
||||
self.assertEqual(body["status"], "queued")
|
||||
|
||||
detail = self._wait_task(body["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertEqual(detail["total_bytes"], 5)
|
||||
self.assertEqual(detail["done_bytes"], 5)
|
||||
self.assertTrue((self.root / "copy.txt").exists())
|
||||
self.assertEqual((self.root / "copy.txt").read_text(encoding="utf-8"), "hello")
|
||||
|
||||
def test_copy_source_not_found(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/missing.txt", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_not_found",
|
||||
"message": "Requested path was not found",
|
||||
"details": {"path": "storage1/missing.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_copy_source_is_directory_type_conflict(self) -> None:
|
||||
(self.root / "dir").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/dir", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "type_conflict")
|
||||
|
||||
def test_copy_destination_exists_already_exists(self) -> None:
|
||||
(self.root / "source.txt").write_text("x", encoding="utf-8")
|
||||
(self.root / "exists.txt").write_text("y", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/exists.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "already_exists",
|
||||
"message": "Target path already exists",
|
||||
"details": {"path": "storage1/exists.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_copy_traversal_source(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/../etc/passwd", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
def test_copy_traversal_destination(self) -> None:
|
||||
(self.root / "source.txt").write_text("x", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/../etc/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
def test_copy_source_symlink_rejected(self) -> None:
|
||||
target = self.root / "real.txt"
|
||||
target.write_text("x", encoding="utf-8")
|
||||
link = self.root / "link.txt"
|
||||
link.symlink_to(target)
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/link.txt", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "type_conflict")
|
||||
|
||||
def test_copy_runtime_io_error_failed_task_shape(self) -> None:
|
||||
src = self.root / "source.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
|
||||
path_guard = PathGuard({"storage1": str(self.root), "storage2": str(self.root)})
|
||||
self._set_services(path_guard=path_guard, filesystem=FailingFilesystemAdapter())
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/copy.txt"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 202)
|
||||
|
||||
task_id = response.json()["task_id"]
|
||||
detail = self._wait_task(task_id)
|
||||
self.assertEqual(detail["status"], "failed")
|
||||
self.assertEqual(detail["error_code"], "io_error")
|
||||
self.assertEqual(detail["failed_item"], str(src))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,110 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_browse_service
|
||||
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.browse_service import BrowseService
|
||||
|
||||
|
||||
class BrowseApiErrorsGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root = Path(self.temp_dir.name) / "root"
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
(self.root / "a.txt").write_text("a", encoding="utf-8")
|
||||
|
||||
service = BrowseService(
|
||||
path_guard=PathGuard({"storage1": str(self.root)}),
|
||||
filesystem=FilesystemAdapter(),
|
||||
)
|
||||
async def _override_browse_service() -> BrowseService:
|
||||
return service
|
||||
|
||||
app.dependency_overrides[get_browse_service] = _override_browse_service
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _get(self, path: str) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
return await client.get("/api/browse", params={"path": path})
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def test_invalid_root_alias_error_shape(self) -> None:
|
||||
response = self._get("unknown/path")
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_root_alias",
|
||||
"message": "Unknown root alias",
|
||||
"details": {"path": "unknown/path"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_traversal_error_shape(self) -> None:
|
||||
response = self._get("storage1/../etc")
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_traversal_detected",
|
||||
"message": "Path traversal is not allowed",
|
||||
"details": {"path": "storage1/../etc"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_not_found_error_shape(self) -> None:
|
||||
response = self._get("storage1/missing")
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_not_found",
|
||||
"message": "Requested path was not found",
|
||||
"details": {"path": "storage1/missing"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_type_conflict_error_shape(self) -> None:
|
||||
response = self._get("storage1/a.txt")
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_type_conflict",
|
||||
"message": "Requested path is not a directory",
|
||||
"details": {"path": "storage1/a.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,323 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_file_ops_service
|
||||
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.file_ops_service import FileOpsService
|
||||
|
||||
|
||||
class FileOpsApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root = Path(self.temp_dir.name) / "root"
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.scope = self.root / "scope"
|
||||
self.scope.mkdir(parents=True, exist_ok=True)
|
||||
(self.scope / "old.txt").write_text("x", encoding="utf-8")
|
||||
(self.scope / "existing.txt").write_text("y", encoding="utf-8")
|
||||
|
||||
service = FileOpsService(
|
||||
path_guard=PathGuard({"storage1": str(self.root)}),
|
||||
filesystem=FilesystemAdapter(),
|
||||
)
|
||||
|
||||
async def _override_file_ops_service() -> FileOpsService:
|
||||
return service
|
||||
|
||||
app.dependency_overrides[get_file_ops_service] = _override_file_ops_service
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _post(self, url: str, payload: dict[str, str]) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
return await client.post(url, json=payload)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def test_mkdir_success(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/mkdir",
|
||||
{"parent_path": "storage1/scope", "name": "new_folder"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"path": "storage1/scope/new_folder"})
|
||||
self.assertTrue((self.scope / "new_folder").is_dir())
|
||||
|
||||
def test_mkdir_conflict_directory_exists(self) -> None:
|
||||
(self.scope / "existing_dir").mkdir()
|
||||
response = self._post(
|
||||
"/api/files/mkdir",
|
||||
{"parent_path": "storage1/scope", "name": "existing_dir"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "already_exists",
|
||||
"message": "Target path already exists",
|
||||
"details": {"path": "storage1/scope/existing_dir"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_mkdir_conflict_file_exists(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/mkdir",
|
||||
{"parent_path": "storage1/scope", "name": "existing.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "already_exists",
|
||||
"message": "Target path already exists",
|
||||
"details": {"path": "storage1/scope/existing.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_rename_success(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/rename",
|
||||
{"path": "storage1/scope/old.txt", "new_name": "renamed.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"path": "storage1/scope/renamed.txt"})
|
||||
self.assertFalse((self.scope / "old.txt").exists())
|
||||
self.assertTrue((self.scope / "renamed.txt").exists())
|
||||
|
||||
def test_rename_conflict(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/rename",
|
||||
{"path": "storage1/scope/old.txt", "new_name": "existing.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "already_exists",
|
||||
"message": "Target path already exists",
|
||||
"details": {"path": "storage1/scope/existing.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_rename_not_found(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/rename",
|
||||
{"path": "storage1/scope/missing.txt", "new_name": "renamed.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_not_found",
|
||||
"message": "Requested path was not found",
|
||||
"details": {"path": "storage1/scope/missing.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_rename_invalid_new_name_dotdot(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/rename",
|
||||
{"path": "storage1/scope/old.txt", "new_name": ".."},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_request",
|
||||
"message": "Invalid name",
|
||||
"details": {"new_name": ".."},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_rename_invalid_new_name_with_slash(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/rename",
|
||||
{"path": "storage1/scope/old.txt", "new_name": "a/b"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_request",
|
||||
"message": "Invalid name",
|
||||
"details": {"new_name": "a/b"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_mkdir_invalid_path(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/mkdir",
|
||||
{"parent_path": "storage1/scope", "name": "bad/name"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_request",
|
||||
"message": "Invalid name",
|
||||
"details": {"name": "bad/name"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_mkdir_traversal_attempt(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/mkdir",
|
||||
{"parent_path": "storage1/../etc", "name": "x"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_traversal_detected",
|
||||
"message": "Path traversal is not allowed",
|
||||
"details": {"path": "storage1/../etc"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_delete_file_success(self) -> None:
|
||||
target = self.scope / "delete_me.txt"
|
||||
target.write_text("z", encoding="utf-8")
|
||||
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": "storage1/scope/delete_me.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"path": "storage1/scope/delete_me.txt"})
|
||||
self.assertFalse(target.exists())
|
||||
|
||||
def test_delete_empty_directory_success(self) -> None:
|
||||
target = self.scope / "empty_dir"
|
||||
target.mkdir()
|
||||
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": "storage1/scope/empty_dir"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"path": "storage1/scope/empty_dir"})
|
||||
self.assertFalse(target.exists())
|
||||
|
||||
def test_delete_not_found(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": "storage1/scope/missing.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_not_found",
|
||||
"message": "Requested path was not found",
|
||||
"details": {"path": "storage1/scope/missing.txt"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_delete_traversal_attempt(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": "storage1/../etc/passwd"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "path_traversal_detected",
|
||||
"message": "Path traversal is not allowed",
|
||||
"details": {"path": "storage1/../etc/passwd"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_delete_non_empty_directory_conflict(self) -> None:
|
||||
target = self.scope / "non_empty"
|
||||
target.mkdir()
|
||||
(target / "a.txt").write_text("a", encoding="utf-8")
|
||||
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": "storage1/scope/non_empty"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "directory_not_empty",
|
||||
"message": "Directory is not empty",
|
||||
"details": {"path": "storage1/scope/non_empty"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def test_delete_invalid_path(self) -> None:
|
||||
response = self._post(
|
||||
"/api/files/delete",
|
||||
{"path": ""},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "invalid_request",
|
||||
"message": "Query parameter 'path' is required",
|
||||
"details": None,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,215 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_move_task_service, get_task_service
|
||||
from backend.app.db.task_repository import TaskRepository
|
||||
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
||||
from backend.app.main import app
|
||||
from backend.app.security.path_guard import PathGuard
|
||||
from backend.app.services.move_task_service import MoveTaskService
|
||||
from backend.app.services.task_service import TaskService
|
||||
from backend.app.tasks_runner import TaskRunner
|
||||
|
||||
|
||||
class FailingDeleteFilesystemAdapter(FilesystemAdapter):
|
||||
def delete_file(self, path: Path) -> None:
|
||||
raise OSError("forced delete failure")
|
||||
|
||||
|
||||
class MoveApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.root1 = Path(self.temp_dir.name) / "root1"
|
||||
self.root2 = Path(self.temp_dir.name) / "root2"
|
||||
self.root1.mkdir(parents=True, exist_ok=True)
|
||||
self.root2.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.repo = TaskRepository(str(Path(self.temp_dir.name) / "tasks.db"))
|
||||
path_guard = PathGuard({"storage1": str(self.root1), "storage2": str(self.root2)})
|
||||
self._set_services(path_guard=path_guard, filesystem=FilesystemAdapter())
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _set_services(self, path_guard: PathGuard, filesystem: FilesystemAdapter) -> None:
|
||||
runner = TaskRunner(repository=self.repo, filesystem=filesystem)
|
||||
move_service = MoveTaskService(path_guard=path_guard, repository=self.repo, runner=runner)
|
||||
task_service = TaskService(repository=self.repo)
|
||||
|
||||
async def _override_move_service() -> MoveTaskService:
|
||||
return move_service
|
||||
|
||||
async def _override_task_service() -> TaskService:
|
||||
return task_service
|
||||
|
||||
app.dependency_overrides[get_move_task_service] = _override_move_service
|
||||
app.dependency_overrides[get_task_service] = _override_task_service
|
||||
|
||||
def _request(self, method: str, url: str, payload: dict | None = None) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
if method == "POST":
|
||||
return await client.post(url, json=payload)
|
||||
return await client.get(url)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def _wait_task(self, task_id: str, timeout_s: float = 2.0) -> dict:
|
||||
deadline = time.time() + timeout_s
|
||||
while time.time() < deadline:
|
||||
response = self._request("GET", f"/api/tasks/{task_id}")
|
||||
body = response.json()
|
||||
if body["status"] in {"completed", "failed"}:
|
||||
return body
|
||||
time.sleep(0.02)
|
||||
self.fail("task did not reach terminal state in time")
|
||||
|
||||
def test_move_success_same_root_create_task_shape_and_completed(self) -> None:
|
||||
src = self.root1 / "source.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/moved.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
body = response.json()
|
||||
self.assertIn("task_id", body)
|
||||
self.assertEqual(body["status"], "queued")
|
||||
|
||||
detail = self._wait_task(body["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertTrue((self.root1 / "moved.txt").exists())
|
||||
self.assertFalse(src.exists())
|
||||
|
||||
def test_move_success_cross_root_create_task_shape_and_completed(self) -> None:
|
||||
src = self.root1 / "source.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/source.txt", "destination": "storage2/moved.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
body = response.json()
|
||||
self.assertIn("task_id", body)
|
||||
self.assertEqual(body["status"], "queued")
|
||||
|
||||
detail = self._wait_task(body["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertTrue((self.root2 / "moved.txt").exists())
|
||||
self.assertFalse(src.exists())
|
||||
|
||||
def test_move_source_not_found(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/missing.txt", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_not_found")
|
||||
|
||||
def test_move_source_is_directory_type_conflict(self) -> None:
|
||||
(self.root1 / "dir").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/dir", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "type_conflict")
|
||||
|
||||
def test_move_destination_exists_already_exists(self) -> None:
|
||||
(self.root1 / "source.txt").write_text("x", encoding="utf-8")
|
||||
(self.root1 / "exists.txt").write_text("y", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/exists.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "already_exists")
|
||||
|
||||
def test_move_traversal_source(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/../etc/passwd", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
def test_move_traversal_destination(self) -> None:
|
||||
(self.root1 / "source.txt").write_text("x", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/source.txt", "destination": "storage1/../etc/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
def test_move_source_symlink_rejected(self) -> None:
|
||||
target = self.root1 / "real.txt"
|
||||
target.write_text("x", encoding="utf-8")
|
||||
link = self.root1 / "link.txt"
|
||||
link.symlink_to(target)
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/link.txt", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "type_conflict")
|
||||
|
||||
def test_move_runtime_io_error_failed_task_shape(self) -> None:
|
||||
src = self.root1 / "source.txt"
|
||||
src.write_text("hello", encoding="utf-8")
|
||||
|
||||
path_guard = PathGuard({"storage1": str(self.root1), "storage2": str(self.root2)})
|
||||
self._set_services(path_guard=path_guard, filesystem=FailingDeleteFilesystemAdapter())
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{"source": "storage1/source.txt", "destination": "storage2/moved.txt"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 202)
|
||||
|
||||
task_id = response.json()["task_id"]
|
||||
detail = self._wait_task(task_id)
|
||||
|
||||
self.assertEqual(detail["status"], "failed")
|
||||
self.assertEqual(detail["error_code"], "io_error")
|
||||
self.assertTrue((self.root2 / "moved.txt").exists())
|
||||
self.assertTrue(src.exists())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,261 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.dependencies import get_task_service
|
||||
from backend.app.db.task_repository import TaskRepository
|
||||
from backend.app.main import app
|
||||
from backend.app.services.task_service import TaskService
|
||||
|
||||
|
||||
class TasksApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.db_path = str(Path(self.temp_dir.name) / "tasks.db")
|
||||
self.repo = TaskRepository(self.db_path)
|
||||
self.service = TaskService(self.repo)
|
||||
|
||||
async def _override_task_service() -> TaskService:
|
||||
return self.service
|
||||
|
||||
app.dependency_overrides[get_task_service] = _override_task_service
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
def _get(self, url: str) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
return await client.get(url)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def _insert_task(
|
||||
self,
|
||||
*,
|
||||
task_id: str,
|
||||
operation: str,
|
||||
status: str,
|
||||
source: str,
|
||||
destination: str,
|
||||
created_at: str,
|
||||
started_at: str | None = None,
|
||||
finished_at: str | None = None,
|
||||
done_bytes: int | None = None,
|
||||
total_bytes: int | None = None,
|
||||
done_items: int | None = None,
|
||||
total_items: int | None = None,
|
||||
current_item: str | None = None,
|
||||
failed_item: str | None = None,
|
||||
error_code: str | None = None,
|
||||
error_message: str | None = None,
|
||||
) -> None:
|
||||
self.repo.insert_task_for_testing(
|
||||
{
|
||||
"id": task_id,
|
||||
"operation": operation,
|
||||
"status": status,
|
||||
"source": source,
|
||||
"destination": destination,
|
||||
"done_bytes": done_bytes,
|
||||
"total_bytes": total_bytes,
|
||||
"done_items": done_items,
|
||||
"total_items": total_items,
|
||||
"current_item": current_item,
|
||||
"failed_item": failed_item,
|
||||
"error_code": error_code,
|
||||
"error_message": error_message,
|
||||
"created_at": created_at,
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at,
|
||||
}
|
||||
)
|
||||
|
||||
def test_get_tasks_empty_list(self) -> None:
|
||||
response = self._get("/api/tasks")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.json(), {"items": []})
|
||||
|
||||
def test_get_tasks_list_shape(self) -> None:
|
||||
self._insert_task(
|
||||
task_id="task-old",
|
||||
operation="copy",
|
||||
status="completed",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
created_at="2026-03-10T10:00:00Z",
|
||||
finished_at="2026-03-10T10:00:05Z",
|
||||
)
|
||||
self._insert_task(
|
||||
task_id="task-new",
|
||||
operation="move",
|
||||
status="running",
|
||||
source="storage1/b.txt",
|
||||
destination="storage2/b.txt",
|
||||
created_at="2026-03-10T10:01:00Z",
|
||||
)
|
||||
|
||||
response = self._get("/api/tasks")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"id": "task-new",
|
||||
"operation": "move",
|
||||
"status": "running",
|
||||
"source": "storage1/b.txt",
|
||||
"destination": "storage2/b.txt",
|
||||
"created_at": "2026-03-10T10:01:00Z",
|
||||
"finished_at": None,
|
||||
},
|
||||
{
|
||||
"id": "task-old",
|
||||
"operation": "copy",
|
||||
"status": "completed",
|
||||
"source": "storage1/a.txt",
|
||||
"destination": "storage2/a.txt",
|
||||
"created_at": "2026-03-10T10:00:00Z",
|
||||
"finished_at": "2026-03-10T10:00:05Z",
|
||||
},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
def test_get_task_detail_queued(self) -> None:
|
||||
self._insert_task(
|
||||
task_id="task-queued",
|
||||
operation="copy",
|
||||
status="queued",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
created_at="2026-03-10T10:00:00Z",
|
||||
)
|
||||
|
||||
response = self._get("/api/tasks/task-queued")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"id": "task-queued",
|
||||
"operation": "copy",
|
||||
"status": "queued",
|
||||
"source": "storage1/a.txt",
|
||||
"destination": "storage2/a.txt",
|
||||
"done_bytes": None,
|
||||
"total_bytes": None,
|
||||
"done_items": None,
|
||||
"total_items": None,
|
||||
"current_item": None,
|
||||
"failed_item": None,
|
||||
"error_code": None,
|
||||
"error_message": None,
|
||||
"created_at": "2026-03-10T10:00:00Z",
|
||||
"started_at": None,
|
||||
"finished_at": None,
|
||||
},
|
||||
)
|
||||
|
||||
def test_get_task_detail_running(self) -> None:
|
||||
self._insert_task(
|
||||
task_id="task-running",
|
||||
operation="move",
|
||||
status="running",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
created_at="2026-03-10T10:00:00Z",
|
||||
started_at="2026-03-10T10:00:01Z",
|
||||
done_bytes=1024,
|
||||
total_bytes=2048,
|
||||
done_items=1,
|
||||
total_items=2,
|
||||
current_item="storage1/a.txt",
|
||||
)
|
||||
|
||||
response = self._get("/api/tasks/task-running")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = response.json()
|
||||
self.assertEqual(body["status"], "running")
|
||||
self.assertEqual(body["done_bytes"], 1024)
|
||||
self.assertEqual(body["total_bytes"], 2048)
|
||||
self.assertEqual(body["current_item"], "storage1/a.txt")
|
||||
|
||||
def test_get_task_detail_completed(self) -> None:
|
||||
self._insert_task(
|
||||
task_id="task-completed",
|
||||
operation="copy",
|
||||
status="completed",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
created_at="2026-03-10T10:00:00Z",
|
||||
started_at="2026-03-10T10:00:01Z",
|
||||
finished_at="2026-03-10T10:00:03Z",
|
||||
done_bytes=2048,
|
||||
total_bytes=2048,
|
||||
)
|
||||
|
||||
response = self._get("/api/tasks/task-completed")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = response.json()
|
||||
self.assertEqual(body["status"], "completed")
|
||||
self.assertEqual(body["finished_at"], "2026-03-10T10:00:03Z")
|
||||
self.assertEqual(body["error_code"], None)
|
||||
|
||||
def test_get_task_detail_failed(self) -> None:
|
||||
self._insert_task(
|
||||
task_id="task-failed",
|
||||
operation="move",
|
||||
status="failed",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
created_at="2026-03-10T10:00:00Z",
|
||||
started_at="2026-03-10T10:00:01Z",
|
||||
finished_at="2026-03-10T10:00:02Z",
|
||||
failed_item="storage1/a.txt",
|
||||
error_code="io_error",
|
||||
error_message="write failed",
|
||||
)
|
||||
|
||||
response = self._get("/api/tasks/task-failed")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
body = response.json()
|
||||
self.assertEqual(body["status"], "failed")
|
||||
self.assertEqual(body["failed_item"], "storage1/a.txt")
|
||||
self.assertEqual(body["error_code"], "io_error")
|
||||
self.assertEqual(body["error_message"], "write failed")
|
||||
|
||||
def test_get_task_not_found(self) -> None:
|
||||
response = self._get("/api/tasks/task-missing")
|
||||
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertEqual(
|
||||
response.json(),
|
||||
{
|
||||
"error": {
|
||||
"code": "task_not_found",
|
||||
"message": "Task was not found",
|
||||
"details": {"task_id": "task-missing"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,49 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.routing import Mount
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.main import app
|
||||
|
||||
|
||||
class UiSmokeGoldenTest(unittest.TestCase):
|
||||
def _ui_mount(self) -> Mount:
|
||||
for route in app.routes:
|
||||
if isinstance(route, Mount) and route.path == "/ui":
|
||||
return route
|
||||
self.fail("Expected /ui mount to be registered")
|
||||
|
||||
def test_ui_mount_and_index_contains_expected_panels(self) -> None:
|
||||
mount = self._ui_mount()
|
||||
self.assertIsInstance(mount.app, StaticFiles)
|
||||
index_path = Path(mount.app.directory) / "index.html"
|
||||
self.assertTrue(index_path.exists())
|
||||
|
||||
body = index_path.read_text(encoding="utf-8")
|
||||
self.assertIn('id="workspace"', body)
|
||||
self.assertIn('id="footer-bar"', body)
|
||||
self.assertIn('id="left-pane"', body)
|
||||
self.assertIn('id="right-pane"', body)
|
||||
self.assertNotIn('id="bookmarks-panel"', body)
|
||||
self.assertNotIn('id="tasks-panel"', body)
|
||||
|
||||
def test_ui_static_assets_are_present_and_mapped(self) -> None:
|
||||
mount = self._ui_mount()
|
||||
static_root = Path(mount.app.directory)
|
||||
self.assertTrue((static_root / "app.js").exists())
|
||||
self.assertTrue((static_root / "style.css").exists())
|
||||
|
||||
app_js_url = app.url_path_for("ui", path="/app.js")
|
||||
style_css_url = app.url_path_for("ui", path="/style.css")
|
||||
self.assertEqual(app_js_url, "/ui/app.js")
|
||||
self.assertEqual(style_css_url, "/ui/style.css")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user