feat: contextmenu copy folders toegevoegd
This commit is contained in:
@@ -25,6 +25,9 @@ class FailingFilesystemAdapter(FilesystemAdapter):
|
||||
def copy_file(self, source: str, destination: str, on_progress: callable | None = None) -> None:
|
||||
raise OSError("forced copy failure")
|
||||
|
||||
def copy_directory(self, source: str, destination: str) -> None:
|
||||
raise OSError("forced copy failure")
|
||||
|
||||
|
||||
class CopyApiGoldenTest(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
@@ -96,6 +99,96 @@ class CopyApiGoldenTest(unittest.TestCase):
|
||||
self.assertTrue((self.root / "copy.txt").exists())
|
||||
self.assertEqual((self.root / "copy.txt").read_text(encoding="utf-8"), "hello")
|
||||
|
||||
def test_copy_batch_multi_file_success(self) -> None:
|
||||
(self.root / "a.txt").write_text("A", encoding="utf-8")
|
||||
(self.root / "b.txt").write_text("B", encoding="utf-8")
|
||||
(self.root / "dest").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{
|
||||
"sources": ["storage1/a.txt", "storage1/b.txt"],
|
||||
"destination_base": "storage1/dest",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
detail = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertEqual(detail["done_items"], 2)
|
||||
self.assertEqual(detail["total_items"], 2)
|
||||
self.assertEqual((self.root / "dest" / "a.txt").read_text(encoding="utf-8"), "A")
|
||||
self.assertEqual((self.root / "dest" / "b.txt").read_text(encoding="utf-8"), "B")
|
||||
|
||||
def test_copy_single_directory_success(self) -> None:
|
||||
src = self.root / "photos"
|
||||
(src / "nested").mkdir(parents=True)
|
||||
(src / "cover.jpg").write_text("img", encoding="utf-8")
|
||||
(src / "nested" / "a.txt").write_text("nested", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/photos", "destination": "storage1/photos-copy"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
detail = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertEqual(detail["done_items"], 1)
|
||||
self.assertEqual(detail["total_items"], 1)
|
||||
self.assertTrue((self.root / "photos-copy").is_dir())
|
||||
self.assertEqual((self.root / "photos-copy" / "cover.jpg").read_text(encoding="utf-8"), "img")
|
||||
self.assertEqual((self.root / "photos-copy" / "nested" / "a.txt").read_text(encoding="utf-8"), "nested")
|
||||
|
||||
def test_copy_batch_multi_directory_success(self) -> None:
|
||||
(self.root / "dir1" / "sub").mkdir(parents=True)
|
||||
(self.root / "dir2").mkdir()
|
||||
(self.root / "dir1" / "sub" / "a.txt").write_text("A", encoding="utf-8")
|
||||
(self.root / "dir2" / "b.txt").write_text("B", encoding="utf-8")
|
||||
(self.root / "dest").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{
|
||||
"sources": ["storage1/dir1", "storage1/dir2"],
|
||||
"destination_base": "storage1/dest",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
detail = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertEqual(detail["done_items"], 2)
|
||||
self.assertEqual(detail["total_items"], 2)
|
||||
self.assertEqual((self.root / "dest" / "dir1" / "sub" / "a.txt").read_text(encoding="utf-8"), "A")
|
||||
self.assertEqual((self.root / "dest" / "dir2" / "b.txt").read_text(encoding="utf-8"), "B")
|
||||
|
||||
def test_copy_batch_mixed_file_and_directory_success(self) -> None:
|
||||
(self.root / "file.txt").write_text("F", encoding="utf-8")
|
||||
(self.root / "docs" / "nested").mkdir(parents=True)
|
||||
(self.root / "docs" / "nested" / "note.txt").write_text("N", encoding="utf-8")
|
||||
(self.root / "dest").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{
|
||||
"sources": ["storage1/file.txt", "storage1/docs"],
|
||||
"destination_base": "storage1/dest",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
detail = self._wait_task(response.json()["task_id"])
|
||||
self.assertEqual(detail["status"], "completed")
|
||||
self.assertEqual(detail["done_items"], 2)
|
||||
self.assertEqual(detail["total_items"], 2)
|
||||
self.assertEqual((self.root / "dest" / "file.txt").read_text(encoding="utf-8"), "F")
|
||||
self.assertEqual((self.root / "dest" / "docs" / "nested" / "note.txt").read_text(encoding="utf-8"), "N")
|
||||
|
||||
def test_copy_source_not_found(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
@@ -115,18 +208,6 @@ class CopyApiGoldenTest(unittest.TestCase):
|
||||
},
|
||||
)
|
||||
|
||||
def test_copy_source_is_directory_type_conflict(self) -> None:
|
||||
(self.root / "dir").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/dir", "destination": "storage1/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "type_conflict")
|
||||
|
||||
def test_copy_destination_exists_already_exists(self) -> None:
|
||||
(self.root / "source.txt").write_text("x", encoding="utf-8")
|
||||
(self.root / "exists.txt").write_text("y", encoding="utf-8")
|
||||
@@ -149,6 +230,38 @@ class CopyApiGoldenTest(unittest.TestCase):
|
||||
},
|
||||
)
|
||||
|
||||
def test_copy_directory_destination_exists_already_exists(self) -> None:
|
||||
(self.root / "src").mkdir()
|
||||
(self.root / "src" / "a.txt").write_text("x", encoding="utf-8")
|
||||
(self.root / "exists").mkdir()
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/src", "destination": "storage1/exists"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "already_exists")
|
||||
|
||||
def test_copy_batch_destination_exists_already_exists(self) -> None:
|
||||
(self.root / "a.txt").write_text("A", encoding="utf-8")
|
||||
(self.root / "dest").mkdir()
|
||||
(self.root / "dest" / "a.txt").write_text("exists", encoding="utf-8")
|
||||
(self.root / "b.txt").write_text("B", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{
|
||||
"sources": ["storage1/a.txt", "storage1/b.txt"],
|
||||
"destination_base": "storage1/dest",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 409)
|
||||
self.assertEqual(response.json()["error"]["code"], "already_exists")
|
||||
|
||||
def test_copy_traversal_source(self) -> None:
|
||||
response = self._request(
|
||||
"POST",
|
||||
@@ -171,6 +284,31 @@ class CopyApiGoldenTest(unittest.TestCase):
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "path_traversal_detected")
|
||||
|
||||
def test_copy_invalid_root_alias(self) -> None:
|
||||
(self.root / "source.txt").write_text("x", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/source.txt", "destination": "unknown/out.txt"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertEqual(response.json()["error"]["code"], "invalid_root_alias")
|
||||
|
||||
def test_copy_destination_inside_directory_source_blocked(self) -> None:
|
||||
(self.root / "src").mkdir()
|
||||
(self.root / "src" / "a.txt").write_text("x", encoding="utf-8")
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/files/copy",
|
||||
{"source": "storage1/src", "destination": "storage1/src/child"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(response.json()["error"]["code"], "invalid_request")
|
||||
|
||||
def test_copy_source_symlink_rejected(self) -> None:
|
||||
target = self.root / "real.txt"
|
||||
target.write_text("x", encoding="utf-8")
|
||||
|
||||
Reference in New Issue
Block a user