Files
webmanager-mvp/webui/backend/app/services/file_ops_service.py
T

562 lines
21 KiB
Python

from __future__ import annotations
from pathlib import Path
from backend.app.api.errors import AppError
from backend.app.api.schemas import DeleteResponse, FileInfoResponse, MkdirResponse, RenameResponse, SaveResponse, ViewResponse
from backend.app.db.history_repository import HistoryRepository
from backend.app.fs.filesystem_adapter import FilesystemAdapter
from backend.app.security.path_guard import PathGuard
TEXT_PREVIEW_MAX_BYTES = 256 * 1024
TEXT_EDIT_MAX_BYTES = 256 * 1024
TEXT_CONTENT_TYPES = {
".txt": "text/plain",
".log": "text/plain",
".md": "text/markdown",
".yml": "text/yaml",
".yaml": "text/yaml",
".json": "application/json",
".js": "text/javascript",
".py": "text/x-python",
".css": "text/css",
".html": "text/html",
}
SPECIAL_TEXT_FILENAMES = {
"dockerfile": "text/plain",
"containerfile": "text/plain",
}
THUMBNAIL_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".webp": "image/webp",
}
VIDEO_CONTENT_TYPES = {
".mp4": "video/mp4",
".mkv": "video/x-matroska",
}
PDF_CONTENT_TYPES = {
".pdf": "application/pdf",
}
class FileOpsService:
def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None):
self._path_guard = path_guard
self._filesystem = filesystem
self._history_repository = history_repository
def mkdir(self, parent_path: str, name: str) -> MkdirResponse:
try:
resolved_parent = self._path_guard.resolve_directory_path(parent_path)
safe_name = self._path_guard.validate_name(name, field="name")
target_relative = self._join_relative(resolved_parent.relative, safe_name)
resolved_target = self._path_guard.resolve_path(target_relative)
if resolved_target.absolute.exists():
raise AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.make_directory(resolved_target.absolute)
self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
return MkdirResponse(path=resolved_target.relative)
except FileExistsError:
error = AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": self._join_relative(parent_path, name)},
)
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
raise error
except AppError as exc:
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
raise error
def rename(self, path: str, new_name: str) -> RenameResponse:
try:
resolved_source = self._path_guard.resolve_existing_path(path)
safe_name = self._path_guard.validate_name(new_name, field="new_name")
parent_relative = self._path_guard.entry_relative_path(
resolved_source.alias,
resolved_source.absolute.parent,
display_style=resolved_source.display_style,
)
target_relative = self._join_relative(parent_relative, safe_name)
resolved_target = self._path_guard.resolve_path(target_relative)
if resolved_target.absolute.exists():
raise AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute)
self._record_history(
operation="rename",
status="completed",
source=path,
destination=resolved_target.relative,
path=resolved_target.relative,
finished_at=self._now_iso(),
)
return RenameResponse(path=resolved_target.relative)
except FileNotFoundError:
error = AppError(
code="path_not_found",
message="Requested path was not found",
status_code=404,
details={"path": path},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
except FileExistsError:
error = AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": new_name},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
except AppError as exc:
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
def delete(self, path: str) -> DeleteResponse:
try:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_file():
self._filesystem.delete_file(resolved_target.absolute)
elif resolved_target.absolute.is_dir():
if not self._filesystem.is_directory_empty(resolved_target.absolute):
raise AppError(
code="directory_not_empty",
message="Directory is not empty",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.delete_empty_directory(resolved_target.absolute)
else:
raise AppError(
code="type_conflict",
message="Unsupported path type for delete",
status_code=409,
details={"path": resolved_target.relative},
)
self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
return DeleteResponse(path=resolved_target.relative)
except AppError as exc:
self._record_history_error(operation="delete", path=path, error=exc)
raise
except FileNotFoundError:
error = AppError(
code="path_not_found",
message="Requested path was not found",
status_code=404,
details={"path": path},
)
self._record_history_error(operation="delete", path=path, error=error)
raise error
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="delete", path=path, error=error)
raise error
def view(self, path: str, for_edit: bool = False) -> ViewResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for view",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for preview",
status_code=409,
details={"path": resolved_target.relative},
)
if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES:
raise AppError(
code="file_too_large",
message="File is too large for edit",
status_code=409,
details={"path": resolved_target.relative},
)
try:
preview = self._filesystem.read_text_preview(
resolved_target.absolute,
max_bytes=TEXT_PREVIEW_MAX_BYTES,
encoding="utf-8",
)
except OSError as exc:
raise AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
return ViewResponse(
path=resolved_target.relative,
name=resolved_target.absolute.name,
content_type=content_type,
encoding="utf-8",
truncated=preview["truncated"],
size=preview["size"],
modified=preview["modified"],
content=preview["content"],
)
def info(self, path: str) -> FileInfoResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
metadata = self._filesystem.stat_info(resolved_target.absolute)
return FileInfoResponse(
name=metadata["name"],
path=resolved_target.relative,
type="directory" if resolved_target.absolute.is_dir() else "file",
size=metadata["size"],
modified=metadata["modified"],
root=resolved_target.alias,
extension=metadata["extension"],
content_type=metadata["content_type"],
owner=metadata["owner"],
group=metadata["group"],
)
def save(self, path: str, content: str, expected_modified: str) -> SaveResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for save",
status_code=409,
details={"path": resolved_target.relative},
)
if self._content_type_for(resolved_target.absolute) is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for edit",
status_code=409,
details={"path": resolved_target.relative},
)
if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES:
raise AppError(
code="file_too_large",
message="File is too large for edit",
status_code=409,
details={"path": resolved_target.relative},
)
current_modified = self._filesystem.modified_iso(resolved_target.absolute)
if current_modified != expected_modified:
raise AppError(
code="conflict",
message="File changed since it was opened",
status_code=409,
details={"path": resolved_target.relative},
)
try:
saved = self._filesystem.write_text_file(
resolved_target.absolute,
content=content,
encoding="utf-8",
)
except OSError as exc:
raise AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
return SaveResponse(
path=resolved_target.relative,
size=saved["size"],
modified=saved["modified"],
)
def prepare_video_stream(self, path: str, range_header: str | None = None) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for video",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._video_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for video playback",
status_code=409,
details={"path": resolved_target.relative},
)
file_size = int(resolved_target.absolute.stat().st_size)
start = 0
end = max(file_size - 1, 0)
status_code = 200
headers = {"Accept-Ranges": "bytes"}
if range_header:
start, end = self._parse_range_header(range_header, file_size)
status_code = 206
headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
headers["Content-Length"] = str(max((end - start) + 1, 0))
return {
"status_code": status_code,
"headers": headers,
"content_type": content_type,
"content": self._filesystem.stream_file_range(resolved_target.absolute, start, end),
}
def prepare_thumbnail_stream(self, path: str) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for thumbnail",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._thumbnail_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for thumbnail",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
"content_type": content_type,
"content": self._filesystem.stream_file(resolved_target.absolute),
}
def prepare_pdf_stream(self, path: str) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for pdf",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._pdf_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for pdf viewing",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
"content_type": content_type,
"content": self._filesystem.stream_file(resolved_target.absolute),
}
@staticmethod
def _join_relative(base: str, name: str) -> str:
return f"{base}/{name}" if base else name
@staticmethod
def _content_type_for(path: Path) -> str | None:
special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower())
if special_name:
return special_name
return TEXT_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _video_content_type_for(path: Path) -> str | None:
return VIDEO_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _thumbnail_content_type_for(path: Path) -> str | None:
return THUMBNAIL_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _pdf_content_type_for(path: Path) -> str | None:
return PDF_CONTENT_TYPES.get(path.suffix.lower())
def _record_history(
self,
*,
operation: str,
status: str,
source: str | None = None,
destination: str | None = None,
path: str | None = None,
error_code: str | None = None,
error_message: str | None = None,
finished_at: str | None = None,
) -> None:
if not self._history_repository:
return
self._history_repository.create_entry(
operation=operation,
status=status,
source=source,
destination=destination,
path=path,
error_code=error_code,
error_message=error_message,
finished_at=finished_at,
)
def _record_history_error(
self,
*,
operation: str,
error: AppError,
source: str | None = None,
destination: str | None = None,
path: str | None = None,
) -> None:
self._record_history(
operation=operation,
status="failed",
source=source,
destination=destination,
path=path,
error_code=error.code,
error_message=error.message,
finished_at=self._now_iso(),
)
@staticmethod
def _now_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
@staticmethod
def _parse_range_header(range_header: str, file_size: int) -> tuple[int, int]:
def invalid_range() -> AppError:
return AppError(
code="invalid_request",
message="Invalid Range header",
status_code=400,
)
if not range_header.startswith("bytes="):
raise invalid_range()
value = range_header[len("bytes="):].strip()
if "," in value or "-" not in value:
raise invalid_range()
start_text, end_text = value.split("-", 1)
if start_text == "" and end_text == "":
raise invalid_range()
try:
if start_text == "":
suffix_length = int(end_text)
if suffix_length <= 0:
raise invalid_range()
if suffix_length >= file_size:
return 0, max(file_size - 1, 0)
return file_size - suffix_length, file_size - 1
start = int(start_text)
if start < 0 or start >= file_size:
raise invalid_range()
if end_text == "":
return start, file_size - 1
end = int(end_text)
if end < start:
raise invalid_range()
return start, min(end, file_size - 1)
except ValueError as exc:
raise invalid_range() from exc