Files
webmanager-mvp/webui/backend/app/fs/filesystem_adapter.py
T
2026-03-12 11:45:56 +01:00

154 lines
5.6 KiB
Python

from __future__ import annotations
import shutil
import mimetypes
import grp
import pwd
from datetime import datetime, timezone
from pathlib import Path
class FilesystemAdapter:
def stat_info(self, path: Path) -> dict:
stat = path.stat()
owner = None
group = None
try:
owner = pwd.getpwuid(stat.st_uid).pw_name
except (KeyError, ImportError, AttributeError):
owner = None
try:
group = grp.getgrgid(stat.st_gid).gr_name
except (KeyError, ImportError, AttributeError):
group = None
content_type, _ = mimetypes.guess_type(path.name)
return {
"name": path.name,
"size": int(stat.st_size) if path.is_file() else None,
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"),
"owner": owner,
"group": group,
"content_type": content_type,
"extension": path.suffix.lower() or None,
}
def list_directory(self, directory: Path, show_hidden: bool) -> tuple[list[dict], list[dict]]:
directories: list[dict] = []
files: list[dict] = []
for entry in sorted(directory.iterdir(), key=lambda item: item.name.lower()):
if not show_hidden and entry.name.startswith("."):
continue
stat = entry.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
if entry.is_dir():
directories.append({"name": entry.name, "modified": modified, "absolute": entry})
elif entry.is_file():
files.append(
{
"name": entry.name,
"size": int(stat.st_size),
"modified": modified,
"absolute": entry,
}
)
return directories, files
def search_names(self, directory: Path, query: str, limit: int) -> tuple[list[dict], bool]:
normalized_query = query.lower()
results: list[dict] = []
for root, dirnames, filenames in __import__("os").walk(directory):
dirnames[:] = sorted([name for name in dirnames if not name.startswith(".")], key=str.lower)
filenames = sorted([name for name in filenames if not name.startswith(".")], key=str.lower)
root_path = Path(root)
for name in dirnames:
if normalized_query in name.lower():
results.append({"name": name, "kind": "directory", "absolute": root_path / name})
if len(results) >= limit:
return results, True
for name in filenames:
if normalized_query in name.lower():
results.append({"name": name, "kind": "file", "absolute": root_path / name})
if len(results) >= limit:
return results, True
return results, False
def make_directory(self, path: Path) -> None:
path.mkdir(parents=False, exist_ok=False)
def rename_path(self, source: Path, destination: Path) -> None:
source.rename(destination)
def move_file(self, source: str, destination: str) -> None:
Path(source).rename(Path(destination))
def move_directory(self, source: str, destination: str) -> None:
Path(source).rename(Path(destination))
def is_directory_empty(self, path: Path) -> bool:
return not any(path.iterdir())
def delete_file(self, path: Path) -> None:
path.unlink()
def delete_empty_directory(self, path: Path) -> None:
path.rmdir()
def copy_file(self, source: str, destination: str, on_progress: callable | None = None) -> None:
src = Path(source)
dst = Path(destination)
with src.open("rb") as in_f, dst.open("xb") as out_f:
while True:
chunk = in_f.read(1024 * 1024)
if not chunk:
break
out_f.write(chunk)
if on_progress:
on_progress(out_f.tell())
shutil.copystat(src, dst, follow_symlinks=False)
def read_text_preview(self, path: Path, max_bytes: int, encoding: str = "utf-8") -> dict:
size = int(path.stat().st_size)
limit = max_bytes + 1
with path.open("rb") as in_f:
raw = in_f.read(limit)
modified = self.modified_iso(path)
truncated = size > max_bytes or len(raw) > max_bytes
if truncated:
raw = raw[:max_bytes]
return {
"size": size,
"modified": modified,
"truncated": truncated,
"content": raw.decode(encoding, errors="replace"),
}
def write_text_file(self, path: Path, content: str, encoding: str = "utf-8") -> dict:
path.write_text(content, encoding=encoding)
return {
"size": int(path.stat().st_size),
"modified": self.modified_iso(path),
}
async def stream_file_range(self, path: Path, start: int, end: int, chunk_size: int = 1024 * 1024):
with path.open("rb") as handle:
handle.seek(start)
remaining = (end - start) + 1
while remaining > 0:
chunk = handle.read(min(chunk_size, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
@staticmethod
def modified_iso(path: Path) -> str:
stat = path.stat()
return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")