from __future__ import annotations from pathlib import Path from backend.app.api.errors import AppError from backend.app.api.schemas import DeleteResponse, MkdirResponse, RenameResponse, SaveResponse, ViewResponse from backend.app.db.history_repository import HistoryRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.security.path_guard import PathGuard TEXT_PREVIEW_MAX_BYTES = 256 * 1024 TEXT_EDIT_MAX_BYTES = 256 * 1024 TEXT_CONTENT_TYPES = { ".txt": "text/plain", ".log": "text/plain", ".md": "text/markdown", ".yml": "text/yaml", ".yaml": "text/yaml", ".json": "application/json", ".js": "text/javascript", ".css": "text/css", ".html": "text/html", } SPECIAL_TEXT_FILENAMES = { "dockerfile": "text/plain", "containerfile": "text/plain", } class FileOpsService: def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None): self._path_guard = path_guard self._filesystem = filesystem self._history_repository = history_repository def mkdir(self, parent_path: str, name: str) -> MkdirResponse: try: resolved_parent = self._path_guard.resolve_directory_path(parent_path) safe_name = self._path_guard.validate_name(name, field="name") target_relative = self._join_relative(resolved_parent.relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.make_directory(resolved_target.absolute) self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso()) return MkdirResponse(path=resolved_target.relative) except FileExistsError: error = AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": self._join_relative(parent_path, name)}, ) self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error) raise error except AppError as exc: self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc) raise except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error) raise error def rename(self, path: str, new_name: str) -> RenameResponse: try: resolved_source = self._path_guard.resolve_existing_path(path) safe_name = self._path_guard.validate_name(new_name, field="new_name") parent_relative = self._path_guard.entry_relative_path( resolved_source.alias, resolved_source.absolute.parent, display_style=resolved_source.display_style, ) target_relative = self._join_relative(parent_relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute) self._record_history( operation="rename", status="completed", source=path, destination=resolved_target.relative, path=resolved_target.relative, finished_at=self._now_iso(), ) return RenameResponse(path=resolved_target.relative) except FileNotFoundError: error = AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error except FileExistsError: error = AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": new_name}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error except AppError as exc: self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc) raise except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error def delete(self, path: str) -> DeleteResponse: try: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_file(): self._filesystem.delete_file(resolved_target.absolute) elif resolved_target.absolute.is_dir(): if not self._filesystem.is_directory_empty(resolved_target.absolute): raise AppError( code="directory_not_empty", message="Directory is not empty", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.delete_empty_directory(resolved_target.absolute) else: raise AppError( code="type_conflict", message="Unsupported path type for delete", status_code=409, details={"path": resolved_target.relative}, ) self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso()) return DeleteResponse(path=resolved_target.relative) except AppError as exc: self._record_history_error(operation="delete", path=path, error=exc) raise except FileNotFoundError: error = AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) self._record_history_error(operation="delete", path=path, error=error) raise error except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="delete", path=path, error=error) raise error def view(self, path: str, for_edit: bool = False) -> ViewResponse: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for view", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for preview", status_code=409, details={"path": resolved_target.relative}, ) if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES: raise AppError( code="file_too_large", message="File is too large for edit", status_code=409, details={"path": resolved_target.relative}, ) try: preview = self._filesystem.read_text_preview( resolved_target.absolute, max_bytes=TEXT_PREVIEW_MAX_BYTES, encoding="utf-8", ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return ViewResponse( path=resolved_target.relative, name=resolved_target.absolute.name, content_type=content_type, encoding="utf-8", truncated=preview["truncated"], size=preview["size"], modified=preview["modified"], content=preview["content"], ) def save(self, path: str, content: str, expected_modified: str) -> SaveResponse: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for save", status_code=409, details={"path": resolved_target.relative}, ) if self._content_type_for(resolved_target.absolute) is None: raise AppError( code="unsupported_type", message="File type is not supported for edit", status_code=409, details={"path": resolved_target.relative}, ) if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES: raise AppError( code="file_too_large", message="File is too large for edit", status_code=409, details={"path": resolved_target.relative}, ) current_modified = self._filesystem.modified_iso(resolved_target.absolute) if current_modified != expected_modified: raise AppError( code="conflict", message="File changed since it was opened", status_code=409, details={"path": resolved_target.relative}, ) try: saved = self._filesystem.write_text_file( resolved_target.absolute, content=content, encoding="utf-8", ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return SaveResponse( path=resolved_target.relative, size=saved["size"], modified=saved["modified"], ) @staticmethod def _join_relative(base: str, name: str) -> str: return f"{base}/{name}" if base else name @staticmethod def _content_type_for(path: Path) -> str | None: special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower()) if special_name: return special_name return TEXT_CONTENT_TYPES.get(path.suffix.lower()) def _record_history( self, *, operation: str, status: str, source: str | None = None, destination: str | None = None, path: str | None = None, error_code: str | None = None, error_message: str | None = None, finished_at: str | None = None, ) -> None: if not self._history_repository: return self._history_repository.create_entry( operation=operation, status=status, source=source, destination=destination, path=path, error_code=error_code, error_message=error_message, finished_at=finished_at, ) def _record_history_error( self, *, operation: str, error: AppError, source: str | None = None, destination: str | None = None, path: str | None = None, ) -> None: self._record_history( operation=operation, status="failed", source=source, destination=destination, path=path, error_code=error.code, error_message=error.message, finished_at=self._now_iso(), ) @staticmethod def _now_iso() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")