from __future__ import annotations from pathlib import Path from backend.app.api.errors import AppError from backend.app.api.schemas import DeleteResponse, MkdirResponse, RenameResponse, ViewResponse from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.security.path_guard import PathGuard TEXT_PREVIEW_MAX_BYTES = 256 * 1024 TEXT_CONTENT_TYPES = { ".txt": "text/plain", ".log": "text/plain", ".md": "text/markdown", ".yml": "text/yaml", ".yaml": "text/yaml", ".json": "application/json", ".js": "text/javascript", ".css": "text/css", ".html": "text/html", } SPECIAL_TEXT_FILENAMES = { "dockerfile": "text/plain", "containerfile": "text/plain", } class FileOpsService: def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter): self._path_guard = path_guard self._filesystem = filesystem def mkdir(self, parent_path: str, name: str) -> MkdirResponse: resolved_parent = self._path_guard.resolve_directory_path(parent_path) safe_name = self._path_guard.validate_name(name, field="name") target_relative = self._join_relative(resolved_parent.relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) try: self._filesystem.make_directory(resolved_target.absolute) except FileExistsError: raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return MkdirResponse(path=resolved_target.relative) def rename(self, path: str, new_name: str) -> RenameResponse: resolved_source = self._path_guard.resolve_existing_path(path) safe_name = self._path_guard.validate_name(new_name, field="new_name") parent_relative = self._path_guard.entry_relative_path(resolved_source.alias, resolved_source.absolute.parent) target_relative = self._join_relative(parent_relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) try: self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute) except FileNotFoundError: raise AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) except FileExistsError: raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return RenameResponse(path=resolved_target.relative) def delete(self, path: str) -> DeleteResponse: resolved_target = self._path_guard.resolve_existing_path(path) try: if resolved_target.absolute.is_file(): self._filesystem.delete_file(resolved_target.absolute) elif resolved_target.absolute.is_dir(): if not self._filesystem.is_directory_empty(resolved_target.absolute): raise AppError( code="directory_not_empty", message="Directory is not empty", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.delete_empty_directory(resolved_target.absolute) else: raise AppError( code="type_conflict", message="Unsupported path type for delete", status_code=409, details={"path": resolved_target.relative}, ) except AppError: raise except FileNotFoundError: raise AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return DeleteResponse(path=resolved_target.relative) def view(self, path: str) -> ViewResponse: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for view", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for preview", status_code=409, details={"path": resolved_target.relative}, ) try: preview = self._filesystem.read_text_preview( resolved_target.absolute, max_bytes=TEXT_PREVIEW_MAX_BYTES, encoding="utf-8", ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return ViewResponse( path=resolved_target.relative, name=resolved_target.absolute.name, content_type=content_type, encoding="utf-8", truncated=preview["truncated"], size=preview["size"], content=preview["content"], ) @staticmethod def _join_relative(base: str, name: str) -> str: return f"{base}/{name}" if base else name @staticmethod def _content_type_for(path: Path) -> str | None: special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower()) if special_name: return special_name return TEXT_CONTENT_TYPES.get(path.suffix.lower())