from __future__ import annotations from pathlib import Path import uuid from backend.app.api.errors import AppError from backend.app.api.schemas import TaskCreateResponse from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.security.path_guard import PathGuard from backend.app.tasks_runner import TaskRunner class CopyTaskService: def __init__(self, path_guard: PathGuard, repository: TaskRepository, runner: TaskRunner, history_repository: HistoryRepository | None = None): self._path_guard = path_guard self._repository = repository self._runner = runner self._history_repository = history_repository def create_copy_task(self, source: str, destination: str) -> TaskCreateResponse: try: resolved_source = self._path_guard.resolve_existing_path(source) _, _, lexical_source, _ = self._path_guard.resolve_lexical_path(source) if lexical_source.is_symlink(): raise AppError( code="type_conflict", message="Source must be a regular file", status_code=409, details={"path": source}, ) if not resolved_source.absolute.is_file(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": source}, ) resolved_destination = self._path_guard.resolve_path(destination) destination_parent = resolved_destination.absolute.parent parent_relative = self._path_guard.entry_relative_path( resolved_destination.alias, destination_parent, display_style=resolved_destination.display_style, ) self._map_directory_validation(parent_relative) if resolved_destination.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_destination.relative}, ) total_bytes = int(resolved_source.absolute.stat().st_size) task_id = str(uuid.uuid4()) task = self._repository.create_task( operation="copy", source=resolved_source.relative, destination=resolved_destination.relative, task_id=task_id, ) self._record_history( entry_id=task_id, operation="copy", status="queued", source=resolved_source.relative, destination=resolved_destination.relative, ) self._runner.enqueue_copy_file( task_id=task["id"], source=str(resolved_source.absolute), destination=str(resolved_destination.absolute), total_bytes=total_bytes, ) return TaskCreateResponse(task_id=task["id"], status=task["status"]) except AppError as exc: self._record_history( operation="copy", status="failed", source=source, destination=destination, error_code=exc.code, error_message=exc.message, finished_at=self._now_iso(), ) raise def _map_directory_validation(self, relative_path: str) -> None: try: self._path_guard.resolve_directory_path(relative_path) except AppError as exc: if exc.code == "path_type_conflict": raise AppError( code="type_conflict", message="Destination parent is not a directory", status_code=409, details=exc.details, ) raise def _record_history(self, **kwargs) -> None: if self._history_repository: self._history_repository.create_entry(**kwargs) @staticmethod def _now_iso() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")