from __future__ import annotations import os import uuid from datetime import datetime, timezone from pathlib import Path from backend.app.api.errors import AppError from backend.app.api.schemas import TaskCreateResponse from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.security.path_guard import PathGuard, ResolvedPath from backend.app.tasks_runner import TaskRunner class DuplicateTaskService: def __init__(self, path_guard: PathGuard, repository: TaskRepository, runner: TaskRunner, history_repository: HistoryRepository | None = None): self._path_guard = path_guard self._repository = repository self._runner = runner self._history_repository = history_repository def create_duplicate_task(self, paths: list[str] | None) -> TaskCreateResponse: if not paths: raise AppError( code="invalid_request", message="At least 1 path is required", status_code=400, ) try: items: list[dict[str, str]] = [] reserved_destinations: set[str] = set() for input_path in paths: item = self._build_duplicate_item( input_path, reserved_destinations, include_root_prefix=len(paths) > 1, ) if item is None: continue reserved_destinations.add(item["destination_absolute"]) items.append(item) source_summary = self._source_summary(paths, items) destination_summary = self._destination_summary(items) task_id = str(uuid.uuid4()) task = self._repository.create_task( operation="duplicate", source=source_summary, destination=destination_summary, task_id=task_id, ) self._record_history( entry_id=task_id, operation="duplicate", status="queued", source=source_summary, destination=destination_summary, ) self._runner.enqueue_duplicate_batch( task_id=task["id"], items=[ { "source": item["source_absolute"], "destination": item["destination_absolute"], "kind": item["kind"], "files": item["files"], "directories": item["directories"], } for item in items ], ) return TaskCreateResponse(task_id=task["id"], status=task["status"]) except AppError as exc: self._record_history( operation="duplicate", status="failed", source=paths[0] if len(paths) == 1 else f"{len(paths)} items", destination="same directory", error_code=exc.code, error_message=exc.message, finished_at=self._now_iso(), ) raise def _build_duplicate_item( self, source: str, reserved_destinations: set[str], *, include_root_prefix: bool, ) -> dict[str, str] | None: resolved_source = self._path_guard.resolve_existing_path(source) _, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source) if self._should_skip_name(lexical_source.name): return None if lexical_source.is_symlink(): raise AppError( code="type_conflict", message="Source must not be a symlink", status_code=409, details={"path": source}, ) source_is_file = resolved_source.absolute.is_file() source_is_directory = resolved_source.absolute.is_dir() if not source_is_file and not source_is_directory: raise AppError( code="type_conflict", message="Unsupported source path type", status_code=409, details={"path": source}, ) destination_absolute = self._next_duplicate_destination(resolved_source.absolute, reserved_destinations) destination_relative = self._path_guard.entry_relative_path( resolved_source.alias, destination_absolute, display_style=resolved_source.display_style, ) if source_is_directory: directories, files = self._build_directory_plan( resolved_source=resolved_source, destination_root=destination_absolute, include_root_prefix=include_root_prefix, ) else: files = [ { "source": str(resolved_source.absolute), "destination": str(destination_absolute), "label": resolved_source.absolute.name, } ] directories = [] return { "source_relative": resolved_source.relative, "destination_relative": destination_relative, "source_absolute": str(resolved_source.absolute), "destination_absolute": str(destination_absolute), "kind": "directory" if source_is_directory else "file", "files": files, "directories": directories, } def _build_directory_plan( self, *, resolved_source: ResolvedPath, destination_root: Path, include_root_prefix: bool, ) -> tuple[list[dict[str, str]], list[dict[str, str]]]: directories: list[dict[str, str]] = [ { "source": str(resolved_source.absolute), "destination": str(destination_root), } ] files: list[dict[str, str]] = [] for root, dirnames, filenames in os.walk(resolved_source.absolute, followlinks=False): dirnames[:] = [name for name in dirnames if not self._should_skip_name(name)] dirnames.sort(key=str.lower) filenames = sorted(filenames, key=str.lower) root_path = Path(root) for name in dirnames: entry = root_path / name if entry.is_symlink(): raise AppError( code="type_conflict", message="Source directory must not contain symlinks", status_code=409, details={"path": resolved_source.relative}, ) relative = entry.relative_to(resolved_source.absolute) directories.append( { "source": str(entry), "destination": str(destination_root / relative), } ) for name in filenames: if self._should_skip_name(name): continue entry = root_path / name if entry.is_symlink(): raise AppError( code="type_conflict", message="Source directory must not contain symlinks", status_code=409, details={"path": resolved_source.relative}, ) relative = entry.relative_to(resolved_source.absolute) files.append( { "source": str(entry), "destination": str(destination_root / relative), "label": self._progress_label( top_level_name=resolved_source.absolute.name, relative_path=relative, include_root_prefix=include_root_prefix, ), } ) return directories, files @staticmethod def _progress_label(*, top_level_name: str, relative_path: Path, include_root_prefix: bool) -> str: relative_value = relative_path.as_posix() if not relative_value: return top_level_name return f"{top_level_name}/{relative_value}" if include_root_prefix else relative_value @classmethod def _next_duplicate_destination(cls, source: Path, reserved_destinations: set[str]) -> Path: parent = source.parent candidate_index = 1 while True: candidate_name = cls._duplicate_name(source.name, source.is_file(), candidate_index) candidate = parent / candidate_name if not candidate.exists() and str(candidate) not in reserved_destinations: return candidate candidate_index += 1 @classmethod def _duplicate_name(cls, original_name: str, is_file: bool, index: int) -> str: if not is_file: suffix = "" base_name = original_name else: suffixes = Path(original_name).suffixes suffix = "".join(suffixes) base_name = original_name[: -len(suffix)] if suffix else original_name copy_suffix = " copy" if index == 1 else f" copy {index}" return f"{base_name}{copy_suffix}{suffix}" @staticmethod def _should_skip_name(name: str) -> bool: return name.startswith("._") @staticmethod def _source_summary(paths: list[str], items: list[dict[str, str]]) -> str: if len(paths) == 1: return paths[0] if not items: return "0 items" return DuplicateTaskService._items_label(len(items)) @staticmethod def _destination_summary(items: list[dict[str, str]]) -> str: if len(items) == 1: return items[0]["destination_relative"] return "same directory" @staticmethod def _items_label(count: int) -> str: return "1 item" if count == 1 else f"{count} items" def _record_history(self, **kwargs) -> None: if self._history_repository: self._history_repository.create_entry(**kwargs) @staticmethod def _now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")