diff --git a/project_docs/API_GOLDEN.md b/project_docs/API_GOLDEN.md index 8842f3f..600c567 100644 --- a/project_docs/API_GOLDEN.md +++ b/project_docs/API_GOLDEN.md @@ -129,6 +129,11 @@ Response shape: } ``` +Voor task-based file-actions `copy`, `move` en `duplicate` betekenen progressvelden: +- `done_items`: aantal volledig verwerkte bestanden +- `total_items`: exact aantal te verwerken bestanden in de hele task +- `current_item`: taakrelatief bestandspad als beschikbaar, anders bestandsnaam + ### `POST /api/tasks/{task_id}/cancel` Success for cancellable file-action task: ```json diff --git a/webui/backend/app/__pycache__/tasks_runner.cpython-313.pyc b/webui/backend/app/__pycache__/tasks_runner.cpython-313.pyc index 4efaa92..be7b724 100644 Binary files a/webui/backend/app/__pycache__/tasks_runner.cpython-313.pyc and b/webui/backend/app/__pycache__/tasks_runner.cpython-313.pyc differ diff --git a/webui/backend/app/services/__pycache__/copy_task_service.cpython-313.pyc b/webui/backend/app/services/__pycache__/copy_task_service.cpython-313.pyc index 6aa2d74..7c8afbc 100644 Binary files a/webui/backend/app/services/__pycache__/copy_task_service.cpython-313.pyc and b/webui/backend/app/services/__pycache__/copy_task_service.cpython-313.pyc differ diff --git a/webui/backend/app/services/__pycache__/duplicate_task_service.cpython-313.pyc b/webui/backend/app/services/__pycache__/duplicate_task_service.cpython-313.pyc index 33bfdba..449af6a 100644 Binary files a/webui/backend/app/services/__pycache__/duplicate_task_service.cpython-313.pyc and b/webui/backend/app/services/__pycache__/duplicate_task_service.cpython-313.pyc differ diff --git a/webui/backend/app/services/__pycache__/move_task_service.cpython-313.pyc b/webui/backend/app/services/__pycache__/move_task_service.cpython-313.pyc index 7319377..3cf8340 100644 Binary files a/webui/backend/app/services/__pycache__/move_task_service.cpython-313.pyc and b/webui/backend/app/services/__pycache__/move_task_service.cpython-313.pyc differ diff --git a/webui/backend/app/services/copy_task_service.py b/webui/backend/app/services/copy_task_service.py index fe24e70..b8e0d2c 100644 --- a/webui/backend/app/services/copy_task_service.py +++ b/webui/backend/app/services/copy_task_service.py @@ -45,17 +45,14 @@ class CopyTaskService: ) if item["kind"] == "directory": - self._runner.enqueue_copy_directory( - task_id=task["id"], - source=item["source_absolute"], - destination=item["destination_absolute"], - ) + self._runner.enqueue_copy_directory(task_id=task["id"], item=item) else: self._runner.enqueue_copy_file( task_id=task["id"], source=item["source_absolute"], destination=item["destination_absolute"], total_bytes=item["total_bytes"], + current_item=item["files"][0]["label"], ) return TaskCreateResponse(task_id=task["id"], status=task["status"]) @@ -94,6 +91,7 @@ class CopyTaskService: destination=destination, resolved_destination=resolved_destination_base, destination_base=destination_base, + include_root_prefix=True, ) items.append(item) @@ -118,6 +116,8 @@ class CopyTaskService: "source": item["source_absolute"], "destination": item["destination_absolute"], "kind": item["kind"], + "files": item["files"], + "directories": item["directories"], } for item in items ], @@ -130,6 +130,7 @@ class CopyTaskService: destination: str, resolved_destination: ResolvedPath | None = None, destination_base: str | None = None, + include_root_prefix: bool = False, ) -> dict: resolved_source = self._path_guard.resolve_existing_path(source) _, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source) @@ -151,9 +152,6 @@ class CopyTaskService: details={"path": source}, ) - if source_is_directory: - self._validate_directory_tree(resolved_source) - resolved_destination = resolved_destination or self._path_guard.resolve_path(destination) destination_absolute = ( resolved_destination.absolute / resolved_source.absolute.name @@ -189,6 +187,22 @@ class CopyTaskService: details={"path": source, "destination": destination_relative}, ) + if source_is_directory: + directories, files = self._build_directory_plan( + resolved_source=resolved_source, + destination_root=destination_absolute, + include_root_prefix=include_root_prefix, + ) + else: + files = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_absolute), + "label": resolved_source.absolute.name, + } + ] + directories = [] + return { "source_relative": resolved_source.relative, "destination_relative": destination_relative, @@ -196,6 +210,8 @@ class CopyTaskService: "destination_absolute": str(destination_absolute), "kind": "directory" if source_is_directory else "file", "total_bytes": int(resolved_source.absolute.stat().st_size) if source_is_file else None, + "files": files, + "directories": directories, } def _map_directory_validation(self, relative_path: str) -> None: @@ -211,10 +227,25 @@ class CopyTaskService: ) raise - def _validate_directory_tree(self, resolved_source: ResolvedPath) -> None: + def _build_directory_plan( + self, + *, + resolved_source: ResolvedPath, + destination_root: Path, + include_root_prefix: bool, + ) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + directories: list[dict[str, str]] = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_root), + } + ] + files: list[dict[str, str]] = [] for root, dirnames, filenames in os.walk(resolved_source.absolute, followlinks=False): root_path = Path(root) - for name in [*dirnames, *filenames]: + dirnames.sort(key=str.lower) + filenames.sort(key=str.lower) + for name in dirnames: entry = root_path / name if entry.is_symlink(): raise AppError( @@ -223,6 +254,42 @@ class CopyTaskService: status_code=409, details={"path": resolved_source.relative}, ) + relative = entry.relative_to(resolved_source.absolute) + directories.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + } + ) + for name in filenames: + entry = root_path / name + if entry.is_symlink(): + raise AppError( + code="type_conflict", + message="Source directory must not contain symlinks", + status_code=409, + details={"path": resolved_source.relative}, + ) + relative = entry.relative_to(resolved_source.absolute) + files.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + "label": self._progress_label( + top_level_name=resolved_source.absolute.name, + relative_path=relative, + include_root_prefix=include_root_prefix, + ), + } + ) + return directories, files + + @staticmethod + def _progress_label(*, top_level_name: str, relative_path: Path, include_root_prefix: bool) -> str: + relative_value = relative_path.as_posix() + if not relative_value: + return top_level_name + return f"{top_level_name}/{relative_value}" if include_root_prefix else relative_value @staticmethod def _join_destination_base(destination_base: str, name: str) -> str: diff --git a/webui/backend/app/services/duplicate_task_service.py b/webui/backend/app/services/duplicate_task_service.py index 2442e1e..52dec54 100644 --- a/webui/backend/app/services/duplicate_task_service.py +++ b/webui/backend/app/services/duplicate_task_service.py @@ -31,7 +31,11 @@ class DuplicateTaskService: items: list[dict[str, str]] = [] reserved_destinations: set[str] = set() for input_path in paths: - item = self._build_duplicate_item(input_path, reserved_destinations) + item = self._build_duplicate_item( + input_path, + reserved_destinations, + include_root_prefix=len(paths) > 1, + ) if item is None: continue reserved_destinations.add(item["destination_absolute"]) @@ -60,6 +64,8 @@ class DuplicateTaskService: "source": item["source_absolute"], "destination": item["destination_absolute"], "kind": item["kind"], + "files": item["files"], + "directories": item["directories"], } for item in items ], @@ -77,7 +83,13 @@ class DuplicateTaskService: ) raise - def _build_duplicate_item(self, source: str, reserved_destinations: set[str]) -> dict[str, str] | None: + def _build_duplicate_item( + self, + source: str, + reserved_destinations: set[str], + *, + include_root_prefix: bool, + ) -> dict[str, str] | None: resolved_source = self._path_guard.resolve_existing_path(source) _, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source) if self._should_skip_name(lexical_source.name): @@ -100,9 +112,6 @@ class DuplicateTaskService: details={"path": source}, ) - if source_is_directory: - self._validate_directory_tree(resolved_source) - destination_absolute = self._next_duplicate_destination(resolved_source.absolute, reserved_destinations) destination_relative = self._path_guard.entry_relative_path( resolved_source.alias, @@ -110,19 +119,68 @@ class DuplicateTaskService: display_style=resolved_source.display_style, ) + if source_is_directory: + directories, files = self._build_directory_plan( + resolved_source=resolved_source, + destination_root=destination_absolute, + include_root_prefix=include_root_prefix, + ) + else: + files = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_absolute), + "label": resolved_source.absolute.name, + } + ] + directories = [] + return { "source_relative": resolved_source.relative, "destination_relative": destination_relative, "source_absolute": str(resolved_source.absolute), "destination_absolute": str(destination_absolute), "kind": "directory" if source_is_directory else "file", + "files": files, + "directories": directories, } - def _validate_directory_tree(self, resolved_source: ResolvedPath) -> None: + def _build_directory_plan( + self, + *, + resolved_source: ResolvedPath, + destination_root: Path, + include_root_prefix: bool, + ) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + directories: list[dict[str, str]] = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_root), + } + ] + files: list[dict[str, str]] = [] for root, dirnames, filenames in os.walk(resolved_source.absolute, followlinks=False): dirnames[:] = [name for name in dirnames if not self._should_skip_name(name)] + dirnames.sort(key=str.lower) + filenames = sorted(filenames, key=str.lower) root_path = Path(root) - for name in [*dirnames, *filenames]: + for name in dirnames: + entry = root_path / name + if entry.is_symlink(): + raise AppError( + code="type_conflict", + message="Source directory must not contain symlinks", + status_code=409, + details={"path": resolved_source.relative}, + ) + relative = entry.relative_to(resolved_source.absolute) + directories.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + } + ) + for name in filenames: if self._should_skip_name(name): continue entry = root_path / name @@ -133,6 +191,26 @@ class DuplicateTaskService: status_code=409, details={"path": resolved_source.relative}, ) + relative = entry.relative_to(resolved_source.absolute) + files.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + "label": self._progress_label( + top_level_name=resolved_source.absolute.name, + relative_path=relative, + include_root_prefix=include_root_prefix, + ), + } + ) + return directories, files + + @staticmethod + def _progress_label(*, top_level_name: str, relative_path: Path, include_root_prefix: bool) -> str: + relative_value = relative_path.as_posix() + if not relative_value: + return top_level_name + return f"{top_level_name}/{relative_value}" if include_root_prefix else relative_value @classmethod def _next_duplicate_destination(cls, source: Path, reserved_destinations: set[str]) -> Path: diff --git a/webui/backend/app/services/move_task_service.py b/webui/backend/app/services/move_task_service.py index abfaa19..09bed2e 100644 --- a/webui/backend/app/services/move_task_service.py +++ b/webui/backend/app/services/move_task_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from pathlib import Path import uuid @@ -45,11 +46,7 @@ class MoveTaskService: ) if item["kind"] == "directory": - self._runner.enqueue_move_directory( - task_id=task["id"], - source=item["source_absolute"], - destination=item["destination_absolute"], - ) + self._runner.enqueue_move_directory(task_id=task["id"], item=item) else: self._runner.enqueue_move_file( task_id=task["id"], @@ -57,6 +54,7 @@ class MoveTaskService: destination=item["destination_absolute"], total_bytes=item["total_bytes"], same_root=item["same_root"], + current_item=item["files"][0]["label"], ) return TaskCreateResponse(task_id=task["id"], status=task["status"]) @@ -113,6 +111,7 @@ class MoveTaskService: destination=destination, resolved_destination=resolved_destination_base, destination_base=destination_base, + include_root_prefix=True, ) items.append(item) @@ -137,6 +136,8 @@ class MoveTaskService: "source": item["source_absolute"], "destination": item["destination_absolute"], "kind": item["kind"], + "files": item["files"], + "directories": item["directories"], } for item in items ], @@ -149,6 +150,7 @@ class MoveTaskService: destination: str, resolved_destination: ResolvedPath | None = None, destination_base: str | None = None, + include_root_prefix: bool = False, ) -> dict: resolved_source = self._path_guard.resolve_existing_path(source) _, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source) @@ -224,6 +226,22 @@ class MoveTaskService: details={"path": source, "destination": destination_relative}, ) + if source_is_directory: + directories, files = self._build_directory_plan( + resolved_source=resolved_source, + destination_root=destination_absolute, + include_root_prefix=include_root_prefix, + ) + else: + files = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_absolute), + "label": resolved_source.absolute.name, + } + ] + directories = [] + return { "source_relative": resolved_source.relative, "destination_relative": destination_relative, @@ -232,6 +250,8 @@ class MoveTaskService: "kind": "directory" if source_is_directory else "file", "same_root": same_root, "total_bytes": int(resolved_source.absolute.stat().st_size) if source_is_file else None, + "files": files, + "directories": directories, } def _map_directory_validation(self, relative_path: str) -> None: @@ -251,6 +271,70 @@ class MoveTaskService: def _join_destination_base(destination_base: str, name: str) -> str: return f"{destination_base.rstrip('/')}/{name}" if destination_base.rstrip("/") else f"/{name}" + def _build_directory_plan( + self, + *, + resolved_source: ResolvedPath, + destination_root: Path, + include_root_prefix: bool, + ) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + directories: list[dict[str, str]] = [ + { + "source": str(resolved_source.absolute), + "destination": str(destination_root), + } + ] + files: list[dict[str, str]] = [] + for root, dirnames, filenames in os.walk(resolved_source.absolute, followlinks=False): + root_path = Path(root) + dirnames.sort(key=str.lower) + filenames.sort(key=str.lower) + for name in dirnames: + entry = root_path / name + if entry.is_symlink(): + raise AppError( + code="type_conflict", + message="Source directory must not contain symlinks", + status_code=409, + details={"path": resolved_source.relative}, + ) + relative = entry.relative_to(resolved_source.absolute) + directories.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + } + ) + for name in filenames: + entry = root_path / name + if entry.is_symlink(): + raise AppError( + code="type_conflict", + message="Source directory must not contain symlinks", + status_code=409, + details={"path": resolved_source.relative}, + ) + relative = entry.relative_to(resolved_source.absolute) + files.append( + { + "source": str(entry), + "destination": str(destination_root / relative), + "label": self._progress_label( + top_level_name=resolved_source.absolute.name, + relative_path=relative, + include_root_prefix=include_root_prefix, + ), + } + ) + return directories, files + + @staticmethod + def _progress_label(*, top_level_name: str, relative_path: Path, include_root_prefix: bool) -> str: + relative_value = relative_path.as_posix() + if not relative_value: + return top_level_name + return f"{top_level_name}/{relative_value}" if include_root_prefix else relative_value + @staticmethod def _is_nested_destination(source: Path, destination: Path) -> bool: try: diff --git a/webui/backend/app/tasks_runner.py b/webui/backend/app/tasks_runner.py index f07d1f4..e9ffe1d 100644 --- a/webui/backend/app/tasks_runner.py +++ b/webui/backend/app/tasks_runner.py @@ -16,18 +16,18 @@ class TaskRunner: self._filesystem = filesystem self._history_repository = history_repository - def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: + def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None: thread = threading.Thread( target=self._run_copy_file, - args=(task_id, source, destination, total_bytes), + args=(task_id, source, destination, total_bytes, current_item), daemon=True, ) thread.start() - def enqueue_copy_directory(self, task_id: str, source: str, destination: str) -> None: + def enqueue_copy_directory(self, task_id: str, item: dict[str, object]) -> None: thread = threading.Thread( target=self._run_copy_directory, - args=(task_id, source, destination), + args=(task_id, item), daemon=True, ) thread.start() @@ -47,18 +47,19 @@ class TaskRunner: destination: str, total_bytes: int, same_root: bool, + current_item: str, ) -> None: thread = threading.Thread( target=self._run_move_file, - args=(task_id, source, destination, total_bytes, same_root), + args=(task_id, source, destination, total_bytes, same_root, current_item), daemon=True, ) thread.start() - def enqueue_move_directory(self, task_id: str, source: str, destination: str) -> None: + def enqueue_move_directory(self, task_id: str, item: dict[str, object]) -> None: thread = threading.Thread( target=self._run_move_directory, - args=(task_id, source, destination), + args=(task_id, item), daemon=True, ) thread.start() @@ -94,14 +95,16 @@ class TaskRunner: ) thread.start() - def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: + def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, - current_item=source, + done_items=0, + total_items=1, + current_item=current_item, ): - self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes) + self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1) return progress = {"done": 0} @@ -112,7 +115,9 @@ class TaskRunner: task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, - current_item=source, + done_items=0, + total_items=1, + current_item=current_item, ) try: @@ -121,32 +126,6 @@ class TaskRunner: task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, - ) - except OSError as exc: - self._repository.mark_failed( - task_id=task_id, - error_code="io_error", - error_message=str(exc), - failed_item=source, - done_bytes=progress["done"], - total_bytes=total_bytes, - ) - self._update_history_failed(task_id, str(exc)) - - def _run_copy_directory(self, task_id: str, source: str, destination: str) -> None: - if not self._repository.mark_running( - task_id=task_id, - done_items=0, - total_items=1, - current_item=source, - ): - self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) - return - - try: - self._filesystem.copy_directory(source=source, destination=destination) - self._complete_or_cancel_item_task( - task_id=task_id, done_items=1, total_items=1, ) @@ -156,16 +135,58 @@ class TaskRunner: error_code="io_error", error_message=str(exc), failed_item=source, - done_bytes=None, - total_bytes=None, + done_bytes=progress["done"], + total_bytes=total_bytes, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) + def _run_copy_directory(self, task_id: str, item: dict[str, object]) -> None: + files = self._file_entries(item) + directories = self._directory_entries(item) + total_items = len(files) + if not self._repository.mark_running( + task_id=task_id, + done_items=0, + total_items=total_items, + current_item=files[0]["label"] if files else None, + ): + self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) + return + + try: + completed_items = self._copy_directory_files( + directories, + files, + task_id=task_id, + completed_items=0, + total_items=total_items, + ) + if self._is_cancel_requested(task_id): + self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) + return + self._complete_or_cancel_item_task( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + ) + except OSError as exc: + self._repository.mark_failed( + task_id=task_id, + error_code="io_error", + error_message=str(exc), + failed_item=str(item["source"]), + done_bytes=None, + total_bytes=None, + done_items=self._completed_files(task_id), + total_items=total_items, + ) + self._update_history_failed(task_id, str(exc)) + def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: - total_items = len(items) - current_item = items[0]["source"] if items else None + total_items = self._total_file_count(items) + current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, @@ -180,21 +201,12 @@ class TaskRunner: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return - source = item["source"] - destination = item["destination"] try: if item["kind"] == "directory": - self._filesystem.copy_directory(source=source, destination=destination) + completed_items = self._copy_directory_item(task_id, item, completed_items, total_items) else: - self._filesystem.copy_file(source=source, destination=destination) - completed_items = index + 1 - next_item = items[index + 1]["source"] if index + 1 < total_items else source - self._repository.update_progress( - task_id=task_id, - done_items=completed_items, - total_items=total_items, - current_item=next_item, - ) + file_entry = self._file_entries(item)[0] + completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return @@ -203,7 +215,7 @@ class TaskRunner: task_id=task_id, error_code="io_error", error_message=str(exc), - failed_item=source, + failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, @@ -225,14 +237,17 @@ class TaskRunner: destination: str, total_bytes: int, same_root: bool, + current_item: str, ) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, - current_item=source, + done_items=0, + total_items=1, + current_item=current_item, ): - self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes) + self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1) return progress = {"done": 0} @@ -244,6 +259,8 @@ class TaskRunner: task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, + done_items=1, + total_items=1, ) return @@ -253,7 +270,9 @@ class TaskRunner: task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, - current_item=source, + done_items=0, + total_items=1, + current_item=current_item, ) self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) @@ -262,32 +281,6 @@ class TaskRunner: task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, - ) - except OSError as exc: - self._repository.mark_failed( - task_id=task_id, - error_code="io_error", - error_message=str(exc), - failed_item=source, - done_bytes=progress["done"], - total_bytes=total_bytes, - ) - self._update_history_failed(task_id, str(exc)) - - def _run_move_directory(self, task_id: str, source: str, destination: str) -> None: - if not self._repository.mark_running( - task_id=task_id, - done_items=0, - total_items=1, - current_item=source, - ): - self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) - return - - try: - self._filesystem.move_directory(source=source, destination=destination) - self._complete_or_cancel_item_task( - task_id=task_id, done_items=1, total_items=1, ) @@ -297,14 +290,56 @@ class TaskRunner: error_code="io_error", error_message=str(exc), failed_item=source, + done_bytes=progress["done"], + total_bytes=total_bytes, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) + def _run_move_directory(self, task_id: str, item: dict[str, object]) -> None: + files = self._file_entries(item) + directories = self._directory_entries(item) + total_items = len(files) + if not self._repository.mark_running( + task_id=task_id, + done_items=0, + total_items=total_items, + current_item=files[0]["label"] if files else None, + ): + self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) + return + + try: + completed_items = self._move_directory_files( + directories, + files, + task_id=task_id, + completed_items=0, + total_items=total_items, + ) + if self._is_cancel_requested(task_id): + self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) + return + self._complete_or_cancel_item_task( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + ) + except OSError as exc: + self._repository.mark_failed( + task_id=task_id, + error_code="io_error", + error_message=str(exc), + failed_item=str(item["source"]), + done_items=self._completed_files(task_id), + total_items=total_items, + ) + self._update_history_failed(task_id, str(exc)) + def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: - total_items = len(items) - current_item = items[0]["source"] if items else None + total_items = self._total_file_count(items) + current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, @@ -319,21 +354,12 @@ class TaskRunner: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return - source = item["source"] - destination = item["destination"] try: if item["kind"] == "directory": - self._filesystem.move_directory(source=source, destination=destination) + completed_items = self._move_directory_item(task_id, item, completed_items, total_items) else: - self._filesystem.move_file(source=source, destination=destination) - completed_items = index + 1 - next_item = items[index + 1]["source"] if index + 1 < total_items else source - self._repository.update_progress( - task_id=task_id, - done_items=completed_items, - total_items=total_items, - current_item=next_item, - ) + file_entry = self._file_entries(item)[0] + completed_items = self._move_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return @@ -342,7 +368,7 @@ class TaskRunner: task_id=task_id, error_code="io_error", error_message=str(exc), - failed_item=source, + failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, @@ -358,8 +384,8 @@ class TaskRunner: ) def _run_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None: - total_items = len(items) - current_item = items[0]["source"] if items else None + total_items = self._total_file_count(items) + current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, @@ -374,31 +400,25 @@ class TaskRunner: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return - source = item["source"] - destination = item["destination"] try: if item["kind"] == "directory": - self._duplicate_directory(source=Path(source), destination=Path(destination)) + completed_items = self._copy_directory_item(task_id, item, completed_items, total_items, cleanup_on_failure=True) else: - self._filesystem.copy_file(source=source, destination=destination) - completed_items = index + 1 - next_item = items[index + 1]["source"] if index + 1 < total_items else source - self._repository.update_progress( - task_id=task_id, - done_items=completed_items, - total_items=total_items, - current_item=next_item, - ) + file_entry = self._file_entries(item)[0] + completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: - self._cleanup_partial_duplicate(Path(destination)) + if item["kind"] == "directory": + self._cleanup_partial_duplicate(Path(str(item["destination"]))) + else: + self._cleanup_partial_duplicate(Path(self._file_entries(item)[0]["destination"])) self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), - failed_item=source, + failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, @@ -449,40 +469,6 @@ class TaskRunner: ) self._update_history_failed(task_id, str(exc)) - def _duplicate_directory(self, source: Path, destination: Path) -> None: - destination.mkdir() - copied_directories: list[tuple[Path, Path]] = [(source, destination)] - try: - for root, dirnames, filenames in os.walk(source, topdown=True, followlinks=False): - root_path = Path(root) - target_root = destination / root_path.relative_to(source) - dirnames[:] = [name for name in dirnames if not name.startswith("._")] - - for name in dirnames: - source_dir = root_path / name - if source_dir.is_symlink(): - raise OSError("Source directory must not contain symlinks") - target_dir = target_root / name - target_dir.mkdir() - copied_directories.append((source_dir, target_dir)) - - for name in filenames: - if name.startswith("._"): - continue - source_file = root_path / name - if source_file.is_symlink(): - raise OSError("Source directory must not contain symlinks") - self._filesystem.copy_file( - source=str(source_file), - destination=str(target_root / name), - ) - - for source_dir, target_dir in reversed(copied_directories): - shutil.copystat(source_dir, target_dir, follow_symlinks=False) - except Exception: - self._cleanup_partial_duplicate(destination) - raise - def _cleanup_partial_duplicate(self, path: Path) -> None: if not path.exists(): return @@ -491,6 +477,180 @@ class TaskRunner: return path.unlink() + @staticmethod + def _file_entries(item: dict[str, object]) -> list[dict[str, str]]: + return list(item.get("files", [])) # type: ignore[arg-type] + + @staticmethod + def _directory_entries(item: dict[str, object]) -> list[dict[str, str]]: + return list(item.get("directories", [])) # type: ignore[arg-type] + + def _total_file_count(self, items: list[dict[str, object]]) -> int: + return sum(len(self._file_entries(item)) for item in items) + + def _first_file_label(self, items: list[dict[str, object]]) -> str | None: + for item in items: + files = self._file_entries(item) + if files: + return files[0]["label"] + return None + + def _completed_files(self, task_id: str) -> int: + task = self._repository.get_task(task_id) + if not task or task["done_items"] is None: + return 0 + return int(task["done_items"]) + + def _copy_single_planned_file( + self, + task_id: str, + file_entry: dict[str, str], + completed_items: int, + total_items: int, + ) -> int: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=file_entry["label"], + ) + self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"]) + completed_items += 1 + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), + ) + return completed_items + + def _copy_directory_item( + self, + task_id: str, + item: dict[str, object], + completed_items: int, + total_items: int, + cleanup_on_failure: bool = False, + ) -> int: + directories = self._directory_entries(item) + files = self._file_entries(item) + try: + return self._copy_directory_files(directories, files, task_id=task_id, completed_items=completed_items, total_items=total_items) + except Exception: + if cleanup_on_failure: + self._cleanup_partial_duplicate(Path(str(item["destination"]))) + raise + + def _copy_directory_files( + self, + directories: list[dict[str, str]], + files: list[dict[str, str]], + *, + task_id: str | None = None, + completed_items: int = 0, + total_items: int = 0, + ) -> int: + for directory in directories: + Path(directory["destination"]).mkdir(parents=True, exist_ok=True) + for file_entry in files: + if task_id is not None and self._is_cancel_requested(task_id): + return completed_items + if task_id is not None: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=file_entry["label"], + ) + self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"]) + completed_items += 1 + if task_id is not None: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), + ) + if task_id is not None and self._is_cancel_requested(task_id): + return completed_items + for directory in reversed(directories): + shutil.copystat(Path(directory["source"]), Path(directory["destination"]), follow_symlinks=False) + return completed_items + + def _move_single_planned_file( + self, + task_id: str, + file_entry: dict[str, str], + completed_items: int, + total_items: int, + ) -> int: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=file_entry["label"], + ) + self._filesystem.move_file(source=file_entry["source"], destination=file_entry["destination"]) + completed_items += 1 + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), + ) + return completed_items + + def _move_directory_item( + self, + task_id: str, + item: dict[str, object], + completed_items: int, + total_items: int, + ) -> int: + return self._move_directory_files(self._directory_entries(item), self._file_entries(item), task_id=task_id, completed_items=completed_items, total_items=total_items) + + def _move_directory_files( + self, + directories: list[dict[str, str]], + files: list[dict[str, str]], + *, + task_id: str | None = None, + completed_items: int = 0, + total_items: int = 0, + ) -> int: + for directory in directories: + Path(directory["destination"]).mkdir(parents=True, exist_ok=True) + for file_entry in files: + if task_id is not None and self._is_cancel_requested(task_id): + return completed_items + if task_id is not None: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=file_entry["label"], + ) + self._filesystem.move_file(source=file_entry["source"], destination=file_entry["destination"]) + completed_items += 1 + if task_id is not None: + self._repository.update_progress( + task_id=task_id, + done_items=completed_items, + total_items=total_items, + current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), + ) + if task_id is not None and self._is_cancel_requested(task_id): + return completed_items + for directory in reversed(directories): + shutil.copystat(Path(directory["source"]), Path(directory["destination"]), follow_symlinks=False) + for directory in reversed(directories): + self._filesystem.delete_empty_directory(Path(directory["source"])) + return completed_items + + @staticmethod + def _next_item_label_after_completion(completed_items: int, total_items: int, current_label: str) -> str | None: + return None + def _is_cancel_requested(self, task_id: str) -> bool: task = self._repository.get_task(task_id) return bool(task) and task["status"] == "cancelling" @@ -523,18 +683,22 @@ class TaskRunner: task_id: str, done_bytes: int | None, total_bytes: int | None, + done_items: int | None = None, + total_items: int | None = None, ) -> None: if self._is_cancel_requested(task_id): - self._finalize_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes) + self._finalize_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items) return if self._repository.mark_completed( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, + done_items=done_items, + total_items=total_items, ): self._update_history_completed(task_id) return - self._finalize_if_already_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes) + self._finalize_if_already_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items) def _complete_or_cancel_item_task( self, diff --git a/webui/backend/tests/golden/__pycache__/test_api_copy_golden.cpython-313.pyc b/webui/backend/tests/golden/__pycache__/test_api_copy_golden.cpython-313.pyc index 0467993..12609fc 100644 Binary files a/webui/backend/tests/golden/__pycache__/test_api_copy_golden.cpython-313.pyc and b/webui/backend/tests/golden/__pycache__/test_api_copy_golden.cpython-313.pyc differ diff --git a/webui/backend/tests/golden/__pycache__/test_api_duplicate_golden.cpython-313.pyc b/webui/backend/tests/golden/__pycache__/test_api_duplicate_golden.cpython-313.pyc index a4ee20e..2fc7b7f 100644 Binary files a/webui/backend/tests/golden/__pycache__/test_api_duplicate_golden.cpython-313.pyc and b/webui/backend/tests/golden/__pycache__/test_api_duplicate_golden.cpython-313.pyc differ diff --git a/webui/backend/tests/golden/__pycache__/test_api_move_golden.cpython-313.pyc b/webui/backend/tests/golden/__pycache__/test_api_move_golden.cpython-313.pyc index ebda128..112cbe9 100644 Binary files a/webui/backend/tests/golden/__pycache__/test_api_move_golden.cpython-313.pyc and b/webui/backend/tests/golden/__pycache__/test_api_move_golden.cpython-313.pyc differ diff --git a/webui/backend/tests/golden/test_api_copy_golden.py b/webui/backend/tests/golden/test_api_copy_golden.py index f49ad47..d2afb50 100644 --- a/webui/backend/tests/golden/test_api_copy_golden.py +++ b/webui/backend/tests/golden/test_api_copy_golden.py @@ -117,6 +117,8 @@ class CopyApiGoldenTest(unittest.TestCase): detail = self._wait_task(body["task_id"]) self.assertEqual(detail["status"], "completed") + self.assertEqual(detail["done_items"], 1) + self.assertEqual(detail["total_items"], 1) self.assertEqual(detail["total_bytes"], 5) self.assertEqual(detail["done_bytes"], 5) self.assertTrue((self.root / "copy.txt").exists()) @@ -159,8 +161,8 @@ class CopyApiGoldenTest(unittest.TestCase): self.assertEqual(response.status_code, 202) detail = self._wait_task(response.json()["task_id"]) self.assertEqual(detail["status"], "completed") - self.assertEqual(detail["done_items"], 1) - self.assertEqual(detail["total_items"], 1) + self.assertEqual(detail["done_items"], 2) + self.assertEqual(detail["total_items"], 2) self.assertTrue((self.root / "photos-copy").is_dir()) self.assertEqual((self.root / "photos-copy" / "cover.jpg").read_text(encoding="utf-8"), "img") self.assertEqual((self.root / "photos-copy" / "nested" / "a.txt").read_text(encoding="utf-8"), "nested") @@ -232,7 +234,7 @@ class CopyApiGoldenTest(unittest.TestCase): task_id = response.json()["task_id"] self.assertTrue(blocking_fs.entered.wait(timeout=2.0)) running = self._wait_for_status(task_id, {"running"}) - self.assertEqual(running["current_item"], str(self.root / "a.txt")) + self.assertEqual(running["current_item"], "a.txt") cancel_response = self._request("POST", f"/api/tasks/{task_id}/cancel") self.assertEqual(cancel_response.status_code, 200) diff --git a/webui/backend/tests/golden/test_api_duplicate_golden.py b/webui/backend/tests/golden/test_api_duplicate_golden.py index 22a5a4d..8ed0b16 100644 --- a/webui/backend/tests/golden/test_api_duplicate_golden.py +++ b/webui/backend/tests/golden/test_api_duplicate_golden.py @@ -128,8 +128,8 @@ class DuplicateApiGoldenTest(unittest.TestCase): self.assertEqual(response.status_code, 202) detail = self._wait_task(response.json()["task_id"]) self.assertEqual(detail["status"], "completed") - self.assertEqual(detail["done_items"], 1) - self.assertEqual(detail["total_items"], 1) + self.assertEqual(detail["done_items"], 2) + self.assertEqual(detail["total_items"], 2) self.assertTrue((self.root / "Folder copy").is_dir()) self.assertEqual((self.root / "Folder copy" / "alpha.txt").read_text(encoding="utf-8"), "A") self.assertEqual((self.root / "Folder copy" / "nested" / "beta.txt").read_text(encoding="utf-8"), "B") @@ -171,7 +171,7 @@ class DuplicateApiGoldenTest(unittest.TestCase): task_id = response.json()["task_id"] self.assertTrue(blocking_fs.entered.wait(timeout=2.0)) running = self._wait_for_status(task_id, {"running"}) - self.assertEqual(running["current_item"], str(self.root / "a.txt")) + self.assertEqual(running["current_item"], "a.txt") cancel_response = self._request("POST", f"/api/tasks/{task_id}/cancel") self.assertEqual(cancel_response.status_code, 200) diff --git a/webui/backend/tests/golden/test_api_move_golden.py b/webui/backend/tests/golden/test_api_move_golden.py index fb9ffcd..33bd55a 100644 --- a/webui/backend/tests/golden/test_api_move_golden.py +++ b/webui/backend/tests/golden/test_api_move_golden.py @@ -29,7 +29,8 @@ class FailingDeleteFilesystemAdapter(FilesystemAdapter): class FailingBatchFilesystemAdapter(FilesystemAdapter): def move_file(self, source: str, destination: str) -> None: - if Path(source).name == "fail-file.txt": + source_path = Path(source) + if source_path.name == "fail-file.txt" or "fail-dir" in source_path.parts: raise OSError("forced batch move failure") super().move_file(source, destination) @@ -128,6 +129,8 @@ class MoveApiGoldenTest(unittest.TestCase): detail = self._wait_task(body["task_id"]) self.assertEqual(detail["status"], "completed") + self.assertEqual(detail["done_items"], 1) + self.assertEqual(detail["total_items"], 1) self.assertTrue((self.root1 / "moved.txt").exists()) self.assertFalse(src.exists()) @@ -269,7 +272,7 @@ class MoveApiGoldenTest(unittest.TestCase): task_id = response.json()["task_id"] self.assertTrue(blocking_fs.entered.wait(timeout=2.0)) running = self._wait_for_status(task_id, {"running"}) - self.assertEqual(running["current_item"], str(self.root1 / "a.txt")) + self.assertEqual(running["current_item"], "a.txt") cancel_response = self._request("POST", f"/api/tasks/{task_id}/cancel") self.assertEqual(cancel_response.status_code, 200) @@ -387,8 +390,10 @@ class MoveApiGoldenTest(unittest.TestCase): def test_move_batch_runtime_io_error_failed_task_shape(self) -> None: first = self.root1 / "ok-dir" first.mkdir() + (first / "a.txt").write_text("A", encoding="utf-8") second = self.root1 / "fail-dir" second.mkdir() + (second / "b.txt").write_text("B", encoding="utf-8") target = self.root1 / "target" target.mkdir()