from __future__ import annotations from pathlib import Path from backend.app.api.errors import AppError from backend.app.api.schemas import DeleteResponse, FileInfoResponse, MkdirResponse, RenameResponse, SaveResponse, UploadResponse, ViewResponse from backend.app.db.history_repository import HistoryRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter from backend.app.security.path_guard import PathGuard TEXT_PREVIEW_MAX_BYTES = 256 * 1024 TEXT_EDIT_MAX_BYTES = 256 * 1024 TEXT_CONTENT_TYPES = { ".txt": "text/plain", ".log": "text/plain", ".md": "text/markdown", ".yml": "text/yaml", ".yaml": "text/yaml", ".json": "application/json", ".js": "text/javascript", ".py": "text/x-python", ".css": "text/css", ".html": "text/html", } SPECIAL_TEXT_FILENAMES = { "dockerfile": "text/plain", "containerfile": "text/plain", } THUMBNAIL_CONTENT_TYPES = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp", } IMAGE_CONTENT_TYPES = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp", ".gif": "image/gif", ".bmp": "image/bmp", ".avif": "image/avif", } VIDEO_CONTENT_TYPES = { ".mp4": "video/mp4", ".mkv": "video/x-matroska", } PDF_CONTENT_TYPES = { ".pdf": "application/pdf", } class FileOpsService: def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None): self._path_guard = path_guard self._filesystem = filesystem self._history_repository = history_repository def mkdir(self, parent_path: str, name: str) -> MkdirResponse: try: resolved_parent = self._path_guard.resolve_directory_path(parent_path) safe_name = self._path_guard.validate_name(name, field="name") target_relative = self._join_relative(resolved_parent.relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.make_directory(resolved_target.absolute) self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso()) return MkdirResponse(path=resolved_target.relative) except FileExistsError: error = AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": self._join_relative(parent_path, name)}, ) self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error) raise error except AppError as exc: self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc) raise except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error) raise error def rename(self, path: str, new_name: str) -> RenameResponse: try: resolved_source = self._path_guard.resolve_existing_path(path) safe_name = self._path_guard.validate_name(new_name, field="new_name") parent_relative = self._path_guard.entry_relative_path( resolved_source.alias, resolved_source.absolute.parent, display_style=resolved_source.display_style, ) target_relative = self._join_relative(parent_relative, safe_name) resolved_target = self._path_guard.resolve_path(target_relative) if resolved_target.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute) self._record_history( operation="rename", status="completed", source=path, destination=resolved_target.relative, path=resolved_target.relative, finished_at=self._now_iso(), ) return RenameResponse(path=resolved_target.relative) except FileNotFoundError: error = AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error except FileExistsError: error = AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": new_name}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error except AppError as exc: self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc) raise except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error) raise error def delete(self, path: str) -> DeleteResponse: try: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_file(): self._filesystem.delete_file(resolved_target.absolute) elif resolved_target.absolute.is_dir(): if not self._filesystem.is_directory_empty(resolved_target.absolute): raise AppError( code="directory_not_empty", message="Directory is not empty", status_code=409, details={"path": resolved_target.relative}, ) self._filesystem.delete_empty_directory(resolved_target.absolute) else: raise AppError( code="type_conflict", message="Unsupported path type for delete", status_code=409, details={"path": resolved_target.relative}, ) self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso()) return DeleteResponse(path=resolved_target.relative) except AppError as exc: self._record_history_error(operation="delete", path=path, error=exc) raise except FileNotFoundError: error = AppError( code="path_not_found", message="Requested path was not found", status_code=404, details={"path": path}, ) self._record_history_error(operation="delete", path=path, error=error) raise error except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error(operation="delete", path=path, error=error) raise error def upload(self, target_path: str, upload_file) -> UploadResponse: destination_relative = None history_path = target_path try: resolved_target = self._path_guard.resolve_directory_path(target_path) filename = Path(upload_file.filename or "").name safe_name = self._path_guard.validate_name(filename, field="name") destination_relative = self._join_relative(resolved_target.relative, safe_name) history_path = destination_relative resolved_destination = self._path_guard.resolve_path(destination_relative) if resolved_destination.absolute.exists(): raise AppError( code="already_exists", message="Target path already exists", status_code=409, details={"path": resolved_destination.relative}, ) saved = self._filesystem.write_uploaded_file(resolved_destination.absolute, upload_file.file) self._record_history( operation="upload", status="completed", destination=resolved_destination.relative, path=resolved_destination.relative, finished_at=self._now_iso(), ) return UploadResponse( path=resolved_destination.relative, size=saved["size"], modified=saved["modified"], ) except AppError as exc: self._record_history_error( operation="upload", destination=destination_relative, path=history_path, error=exc, ) raise except OSError as exc: error = AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) self._record_history_error( operation="upload", destination=destination_relative, path=history_path, error=error, ) raise error def view(self, path: str, for_edit: bool = False) -> ViewResponse: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for view", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for preview", status_code=409, details={"path": resolved_target.relative}, ) if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES: raise AppError( code="file_too_large", message="File is too large for edit", status_code=409, details={"path": resolved_target.relative}, ) try: preview = self._filesystem.read_text_preview( resolved_target.absolute, max_bytes=TEXT_PREVIEW_MAX_BYTES, encoding="utf-8", ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return ViewResponse( path=resolved_target.relative, name=resolved_target.absolute.name, content_type=content_type, encoding="utf-8", truncated=preview["truncated"], size=preview["size"], modified=preview["modified"], content=preview["content"], ) def info(self, path: str) -> FileInfoResponse: resolved_target = self._path_guard.resolve_existing_path(path) metadata = self._filesystem.stat_info(resolved_target.absolute) return FileInfoResponse( name=metadata["name"], path=resolved_target.relative, type="directory" if resolved_target.absolute.is_dir() else "file", size=metadata["size"], modified=metadata["modified"], root=resolved_target.alias, extension=metadata["extension"], content_type=metadata["content_type"], owner=metadata["owner"], group=metadata["group"], width=metadata["width"], height=metadata["height"], ) def save(self, path: str, content: str, expected_modified: str) -> SaveResponse: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for save", status_code=409, details={"path": resolved_target.relative}, ) if self._content_type_for(resolved_target.absolute) is None: raise AppError( code="unsupported_type", message="File type is not supported for edit", status_code=409, details={"path": resolved_target.relative}, ) if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES: raise AppError( code="file_too_large", message="File is too large for edit", status_code=409, details={"path": resolved_target.relative}, ) current_modified = self._filesystem.modified_iso(resolved_target.absolute) if current_modified != expected_modified: raise AppError( code="conflict", message="File changed since it was opened", status_code=409, details={"path": resolved_target.relative}, ) try: saved = self._filesystem.write_text_file( resolved_target.absolute, content=content, encoding="utf-8", ) except OSError as exc: raise AppError( code="io_error", message="Filesystem operation failed", status_code=500, details={"reason": str(exc)}, ) return SaveResponse( path=resolved_target.relative, size=saved["size"], modified=saved["modified"], ) def prepare_video_stream(self, path: str, range_header: str | None = None) -> dict: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for video", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._video_content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for video playback", status_code=409, details={"path": resolved_target.relative}, ) file_size = int(resolved_target.absolute.stat().st_size) start = 0 end = max(file_size - 1, 0) status_code = 200 headers = {"Accept-Ranges": "bytes"} if range_header: start, end = self._parse_range_header(range_header, file_size) status_code = 206 headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" headers["Content-Length"] = str(max((end - start) + 1, 0)) return { "status_code": status_code, "headers": headers, "content_type": content_type, "content": self._filesystem.stream_file_range(resolved_target.absolute, start, end), } def prepare_thumbnail_stream(self, path: str) -> dict: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for thumbnail", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._thumbnail_content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for thumbnail", status_code=409, details={"path": resolved_target.relative}, ) return { "headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))}, "content_type": content_type, "content": self._filesystem.stream_file(resolved_target.absolute), } def prepare_image_stream(self, path: str) -> dict: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for image", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._image_content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for image viewing", status_code=409, details={"path": resolved_target.relative}, ) return { "headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))}, "content_type": content_type, "content": self._filesystem.stream_file(resolved_target.absolute), } def prepare_pdf_stream(self, path: str) -> dict: resolved_target = self._path_guard.resolve_existing_path(path) if resolved_target.absolute.is_dir(): raise AppError( code="type_conflict", message="Source must be a file", status_code=409, details={"path": resolved_target.relative}, ) if not resolved_target.absolute.is_file(): raise AppError( code="type_conflict", message="Unsupported path type for pdf", status_code=409, details={"path": resolved_target.relative}, ) content_type = self._pdf_content_type_for(resolved_target.absolute) if content_type is None: raise AppError( code="unsupported_type", message="File type is not supported for pdf viewing", status_code=409, details={"path": resolved_target.relative}, ) return { "headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))}, "content_type": content_type, "content": self._filesystem.stream_file(resolved_target.absolute), } @staticmethod def _join_relative(base: str, name: str) -> str: return f"{base}/{name}" if base else name @staticmethod def _content_type_for(path: Path) -> str | None: special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower()) if special_name: return special_name return TEXT_CONTENT_TYPES.get(path.suffix.lower()) @staticmethod def _video_content_type_for(path: Path) -> str | None: return VIDEO_CONTENT_TYPES.get(path.suffix.lower()) @staticmethod def _thumbnail_content_type_for(path: Path) -> str | None: return THUMBNAIL_CONTENT_TYPES.get(path.suffix.lower()) @staticmethod def _image_content_type_for(path: Path) -> str | None: return IMAGE_CONTENT_TYPES.get(path.suffix.lower()) @staticmethod def _pdf_content_type_for(path: Path) -> str | None: return PDF_CONTENT_TYPES.get(path.suffix.lower()) def _record_history( self, *, operation: str, status: str, source: str | None = None, destination: str | None = None, path: str | None = None, error_code: str | None = None, error_message: str | None = None, finished_at: str | None = None, ) -> None: if not self._history_repository: return self._history_repository.create_entry( operation=operation, status=status, source=source, destination=destination, path=path, error_code=error_code, error_message=error_message, finished_at=finished_at, ) def _record_history_error( self, *, operation: str, error: AppError, source: str | None = None, destination: str | None = None, path: str | None = None, ) -> None: self._record_history( operation=operation, status="failed", source=source, destination=destination, path=path, error_code=error.code, error_message=error.message, finished_at=self._now_iso(), ) @staticmethod def _now_iso() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") @staticmethod def _parse_range_header(range_header: str, file_size: int) -> tuple[int, int]: def invalid_range() -> AppError: return AppError( code="invalid_request", message="Invalid Range header", status_code=400, ) if not range_header.startswith("bytes="): raise invalid_range() value = range_header[len("bytes="):].strip() if "," in value or "-" not in value: raise invalid_range() start_text, end_text = value.split("-", 1) if start_text == "" and end_text == "": raise invalid_range() try: if start_text == "": suffix_length = int(end_text) if suffix_length <= 0: raise invalid_range() if suffix_length >= file_size: return 0, max(file_size - 1, 0) return file_size - suffix_length, file_size - 1 start = int(start_text) if start < 0 or start >= file_size: raise invalid_range() if end_text == "": return start, file_size - 1 end = int(end_text) if end < start: raise invalid_range() return start, min(end, file_size - 1) except ValueError as exc: raise invalid_range() from exc