fix: copy and move
This commit is contained in:
BIN
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,133 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app import dependencies
|
||||
from backend.app.main import app
|
||||
|
||||
|
||||
class TaskSchemaMigrationApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
base = Path(self.temp_dir.name)
|
||||
self.root1 = base / "root1"
|
||||
self.root2 = base / "root2"
|
||||
(self.root1 / "Shared_Folders" / "Downloads").mkdir(parents=True, exist_ok=True)
|
||||
(self.root2 / "Shared_Folders" / "Downloads").mkdir(parents=True, exist_ok=True)
|
||||
self.db_path = base / "tasks-legacy.db"
|
||||
self._create_legacy_schema_db(self.db_path)
|
||||
|
||||
self._orig_aliases = os.environ.get("WEBMANAGER_ROOT_ALIASES")
|
||||
self._orig_db_path = os.environ.get("WEBMANAGER_TASK_DB_PATH")
|
||||
os.environ["WEBMANAGER_ROOT_ALIASES"] = f"storage1={self.root1},storage2={self.root2}"
|
||||
os.environ["WEBMANAGER_TASK_DB_PATH"] = str(self.db_path)
|
||||
self._clear_dependency_caches()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
app.dependency_overrides.clear()
|
||||
if self._orig_aliases is None:
|
||||
os.environ.pop("WEBMANAGER_ROOT_ALIASES", None)
|
||||
else:
|
||||
os.environ["WEBMANAGER_ROOT_ALIASES"] = self._orig_aliases
|
||||
if self._orig_db_path is None:
|
||||
os.environ.pop("WEBMANAGER_TASK_DB_PATH", None)
|
||||
else:
|
||||
os.environ["WEBMANAGER_TASK_DB_PATH"] = self._orig_db_path
|
||||
self._clear_dependency_caches()
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
@staticmethod
|
||||
def _create_legacy_schema_db(db_path: Path) -> None:
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
operation TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@staticmethod
|
||||
def _clear_dependency_caches() -> None:
|
||||
dependencies.get_path_guard.cache_clear()
|
||||
dependencies.get_task_repository.cache_clear()
|
||||
dependencies.get_task_runner.cache_clear()
|
||||
dependencies.get_bookmark_repository.cache_clear()
|
||||
|
||||
def _request(self, method: str, url: str, payload: dict | None = None) -> httpx.Response:
|
||||
async def _run() -> httpx.Response:
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
if method == "POST":
|
||||
return await client.post(url, json=payload)
|
||||
return await client.get(url)
|
||||
|
||||
return asyncio.run(_run())
|
||||
|
||||
def _wait_task(self, task_id: str, timeout_s: float = 2.0) -> dict:
|
||||
deadline = time.time() + timeout_s
|
||||
while time.time() < deadline:
|
||||
response = self._request("GET", f"/api/tasks/{task_id}")
|
||||
body = response.json()
|
||||
if body["status"] in {"completed", "failed"}:
|
||||
return body
|
||||
time.sleep(0.02)
|
||||
self.fail("task did not reach terminal state in time")
|
||||
|
||||
def test_move_task_creation_works_with_legacy_tasks_schema(self) -> None:
|
||||
source = self.root1 / "Shared_Folders" / "Downloads" / "PLAN.md"
|
||||
source.write_text("plan", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/move",
|
||||
{
|
||||
"source": "storage1/Shared_Folders/Downloads/PLAN.md",
|
||||
"destination": "storage2/Shared_Folders/Downloads/PLAN.md",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
task = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(task["status"], "completed")
|
||||
self.assertFalse(source.exists())
|
||||
self.assertTrue((self.root2 / "Shared_Folders" / "Downloads" / "PLAN.md").exists())
|
||||
|
||||
def test_copy_task_creation_works_with_legacy_tasks_schema(self) -> None:
|
||||
source = self.root1 / "Shared_Folders" / "Downloads" / "COPY.md"
|
||||
source.write_text("copy", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{
|
||||
"source": "storage1/Shared_Folders/Downloads/COPY.md",
|
||||
"destination": "storage2/Shared_Folders/Downloads/COPY.md",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
task = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(task["status"], "completed")
|
||||
self.assertTrue(source.exists())
|
||||
self.assertTrue((self.root2 / "Shared_Folders" / "Downloads" / "COPY.md").exists())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -33,6 +33,8 @@ class UiSmokeGoldenTest(unittest.TestCase):
|
||||
self.assertIn('id="left-items"', body)
|
||||
self.assertIn('id="right-items"', body)
|
||||
self.assertIn('id="mkdir-btn"', body)
|
||||
self.assertIn('id="copy-btn"', body)
|
||||
self.assertIn('id="move-btn"', body)
|
||||
self.assertIn('id="left-breadcrumbs"', body)
|
||||
self.assertIn('id="right-breadcrumbs"', body)
|
||||
self.assertNotIn('id="bookmarks-panel"', body)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,31 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from backend.app.config import get_settings
|
||||
|
||||
|
||||
class ConfigTest(unittest.TestCase):
|
||||
def test_default_task_db_path_is_backend_data_absolute(self) -> None:
|
||||
original = os.environ.get("WEBMANAGER_TASK_DB_PATH")
|
||||
try:
|
||||
os.environ.pop("WEBMANAGER_TASK_DB_PATH", None)
|
||||
settings = get_settings()
|
||||
finally:
|
||||
if original is None:
|
||||
os.environ.pop("WEBMANAGER_TASK_DB_PATH", None)
|
||||
else:
|
||||
os.environ["WEBMANAGER_TASK_DB_PATH"] = original
|
||||
|
||||
resolved = Path(settings.task_db_path).resolve()
|
||||
expected = Path(__file__).resolve().parents[3] / "backend" / "data" / "tasks.db"
|
||||
self.assertEqual(resolved, expected.resolve())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
@@ -58,6 +59,33 @@ class TaskRepositoryTest(unittest.TestCase):
|
||||
}
|
||||
)
|
||||
|
||||
def test_migrates_legacy_tasks_schema_missing_source_destination(self) -> None:
|
||||
legacy_db_path = Path(self.temp_dir.name) / "legacy.db"
|
||||
conn = sqlite3.connect(legacy_db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
operation TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
repo = TaskRepository(str(legacy_db_path))
|
||||
created = repo.create_task(
|
||||
operation="move",
|
||||
source="storage1/a.txt",
|
||||
destination="storage2/a.txt",
|
||||
)
|
||||
|
||||
self.assertEqual(created["operation"], "move")
|
||||
self.assertEqual(created["source"], "storage1/a.txt")
|
||||
self.assertEqual(created["destination"], "storage2/a.txt")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user