281 lines
10 KiB
Python
281 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from backend.app.api.errors import AppError
|
|
from backend.app.api.schemas import DeleteResponse, MkdirResponse, RenameResponse, SaveResponse, ViewResponse
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
from backend.app.security.path_guard import PathGuard
|
|
|
|
TEXT_PREVIEW_MAX_BYTES = 256 * 1024
|
|
TEXT_EDIT_MAX_BYTES = 256 * 1024
|
|
TEXT_CONTENT_TYPES = {
|
|
".txt": "text/plain",
|
|
".log": "text/plain",
|
|
".md": "text/markdown",
|
|
".yml": "text/yaml",
|
|
".yaml": "text/yaml",
|
|
".json": "application/json",
|
|
".js": "text/javascript",
|
|
".css": "text/css",
|
|
".html": "text/html",
|
|
}
|
|
SPECIAL_TEXT_FILENAMES = {
|
|
"dockerfile": "text/plain",
|
|
"containerfile": "text/plain",
|
|
}
|
|
|
|
|
|
class FileOpsService:
|
|
def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter):
|
|
self._path_guard = path_guard
|
|
self._filesystem = filesystem
|
|
|
|
def mkdir(self, parent_path: str, name: str) -> MkdirResponse:
|
|
resolved_parent = self._path_guard.resolve_directory_path(parent_path)
|
|
safe_name = self._path_guard.validate_name(name, field="name")
|
|
target_relative = self._join_relative(resolved_parent.relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
self._filesystem.make_directory(resolved_target.absolute)
|
|
except FileExistsError:
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return MkdirResponse(path=resolved_target.relative)
|
|
|
|
def rename(self, path: str, new_name: str) -> RenameResponse:
|
|
resolved_source = self._path_guard.resolve_existing_path(path)
|
|
safe_name = self._path_guard.validate_name(new_name, field="new_name")
|
|
|
|
parent_relative = self._path_guard.entry_relative_path(resolved_source.alias, resolved_source.absolute.parent)
|
|
target_relative = self._join_relative(parent_relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute)
|
|
except FileNotFoundError:
|
|
raise AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
except FileExistsError:
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return RenameResponse(path=resolved_target.relative)
|
|
|
|
def delete(self, path: str) -> DeleteResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
try:
|
|
if resolved_target.absolute.is_file():
|
|
self._filesystem.delete_file(resolved_target.absolute)
|
|
elif resolved_target.absolute.is_dir():
|
|
if not self._filesystem.is_directory_empty(resolved_target.absolute):
|
|
raise AppError(
|
|
code="directory_not_empty",
|
|
message="Directory is not empty",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
self._filesystem.delete_empty_directory(resolved_target.absolute)
|
|
else:
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for delete",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
except AppError:
|
|
raise
|
|
except FileNotFoundError:
|
|
raise AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return DeleteResponse(path=resolved_target.relative)
|
|
|
|
def view(self, path: str, for_edit: bool = False) -> ViewResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for view",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for preview",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
preview = self._filesystem.read_text_preview(
|
|
resolved_target.absolute,
|
|
max_bytes=TEXT_PREVIEW_MAX_BYTES,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return ViewResponse(
|
|
path=resolved_target.relative,
|
|
name=resolved_target.absolute.name,
|
|
content_type=content_type,
|
|
encoding="utf-8",
|
|
truncated=preview["truncated"],
|
|
size=preview["size"],
|
|
modified=preview["modified"],
|
|
content=preview["content"],
|
|
)
|
|
|
|
def save(self, path: str, content: str, expected_modified: str) -> SaveResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for save",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if self._content_type_for(resolved_target.absolute) is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
current_modified = self._filesystem.modified_iso(resolved_target.absolute)
|
|
if current_modified != expected_modified:
|
|
raise AppError(
|
|
code="conflict",
|
|
message="File changed since it was opened",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
saved = self._filesystem.write_text_file(
|
|
resolved_target.absolute,
|
|
content=content,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return SaveResponse(
|
|
path=resolved_target.relative,
|
|
size=saved["size"],
|
|
modified=saved["modified"],
|
|
)
|
|
|
|
@staticmethod
|
|
def _join_relative(base: str, name: str) -> str:
|
|
return f"{base}/{name}" if base else name
|
|
|
|
@staticmethod
|
|
def _content_type_for(path: Path) -> str | None:
|
|
special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower())
|
|
if special_name:
|
|
return special_name
|
|
return TEXT_CONTENT_TYPES.get(path.suffix.lower())
|