Initial commit - podman-mvp net na toevoegen cpu en mem kolommen

This commit is contained in:
kodi
2026-02-18 08:17:27 +01:00
commit 62e195c59e
56 changed files with 22164 additions and 0 deletions
+168
View File
@@ -0,0 +1,168 @@
import os
import sys
import types
import importlib
import pytest
from fastapi.testclient import TestClient
class FakeResponse:
def __init__(self, status_code: int = 200, json_data=None, text_data: str | None = None):
self.status_code = status_code
self._json_data = json_data
self._text_data = text_data
def json(self):
return self._json_data
@property
def text(self):
if self._text_data is not None:
return self._text_data
# Best-effort string
return ""
class FakeSession:
"""
Test-only Podman session stub.
Routes by URL substring to return deterministic JSON.
"""
def __init__(self):
self.calls = []
def get(self, url: str):
self.calls.append(("GET", url))
# Pods list (DYNAMIC)
if "/libpod/pods/json" in url:
return FakeResponse(200, json_data=[])
# Containers list (DYNAMIC)
if "/libpod/containers/json" in url:
return FakeResponse(200, json_data=[])
# Container inspect (DYNAMIC)
if "/libpod/containers/" in url and url.endswith("/json"):
# Return some dict to prove JSON
return FakeResponse(200, json_data={"Id": "dummy"})
# Container logs (FIXED endpoint expects {"logs": "..."} from wrapper endpoint, not Podman directly,
# but app.py fetches Podman logs and then wraps into {"logs": text}.
if "/libpod/containers/" in url and "/logs?" in url:
return FakeResponse(200, json_data=None, text_data="line1\nline2\n")
# Fallback: return JSON dict
return FakeResponse(200, json_data={})
def post(self, url: str, **kwargs):
self.calls.append(("POST", url, kwargs))
# kube/play passthrough (workloads deploy)
if "/libpod/kube/play" in url:
# app.py gebruikt SESSION.post(url, data=yaml_content).json()
return FakeResponse(200, json_data={"kube": "played"})
# Container start/stop/restart: treat as success
if "/libpod/containers/" in url:
return FakeResponse(204, json_data={})
# Pod start/stop etc: treat as success
if "/libpod/pods/" in url:
return FakeResponse(200, json_data={"ok": True})
return FakeResponse(200, json_data={})
def delete(self, url: str):
self.calls.append(("DELETE", url))
return FakeResponse(200, json_data={"deleted": True})
@pytest.fixture(scope="session")
def app_module():
"""
Import app.py as module `app` from /app, while ensuring requests_unixsocket import is safe.
No runtime changes to app.py; only test-time monkeypatching later.
"""
# Ensure /app is importable
if "/app" not in sys.path:
sys.path.insert(0, "/app")
# If requests_unixsocket isn't installed or to avoid real socket usage, provide a minimal stub.
if "requests_unixsocket" not in sys.modules:
mod = types.ModuleType("requests_unixsocket")
mod.Session = lambda: FakeSession()
sys.modules["requests_unixsocket"] = mod
app = importlib.import_module("app")
return app
@pytest.fixture()
def client(app_module, monkeypatch, tmp_path):
"""
TestClient with all external side-effects stubbed:
- Podman socket calls via app.SESSION
- systemctl calls via app.run
- filesystem roots via WORKLOADS_DIR + ALLOWLIST_FILE
"""
# Prepare temp workload tree under tmp_path
workloads_dir = tmp_path / "workloads"
systemd_dir = workloads_dir / "systemd"
systemd_dir.mkdir(parents=True, exist_ok=True)
# A defined container (*.container) for /containers-dashboard defined entry
(systemd_dir / "sonarr.container").write_text("[Container]\nImage=dummy\n", encoding="utf-8")
# Files endpoints operate under WORKLOADS_DIR; UI focuses on systemd subtree
# Create a file for /files/read/save/delete
(systemd_dir / "test.txt").write_text("hello\n", encoding="utf-8")
# Create a non-empty dir for /files/rmdir 409 scenario
nonempty_dir = systemd_dir / "nonempty"
nonempty_dir.mkdir(parents=True, exist_ok=True)
(nonempty_dir / "keep.txt").write_text("x", encoding="utf-8")
# Create a yaml to create a defined pod entry for /pods-dashboard
(workloads_dir / "mediaserver.yaml").write_text("kind: Pod\nmetadata:\n name: mediaserver\n", encoding="utf-8")
# Allowlist file for /systemd/allowlist and allow-mode enforcement
allowlist_file = tmp_path / "allowed_units.txt"
allowlist_file.write_text("sonarr.service\nmediaserver.service\n", encoding="utf-8")
# Patch module globals to point at tmp filesystem (test-only)
monkeypatch.setattr(app_module, "WORKLOADS_DIR", str(workloads_dir))
monkeypatch.setattr(app_module, "ALLOWLIST_FILE", str(allowlist_file))
# Stub Podman session object
monkeypatch.setattr(app_module, "SESSION", FakeSession())
# Stub systemctl runner
def fake_run(cmd):
# cmd is a list, e.g. ["systemctl","--user","is-active","mediaserver.service"]
cmd_str = " ".join(cmd)
if "is-active mediaserver.service" in cmd_str:
return 0, "active"
if "is-active sonarr.service" in cmd_str:
return 0, "inactive"
# For POST /{action}/{unit}, return something stable
if cmd[:3] == ["systemctl", "--user", "status"]:
return 0, "Active: active (running)"
if cmd[:3] == ["systemctl", "--user", "daemon-reload"]:
return 0, "ok"
if cmd[:3] == ["systemctl", "--user", "start"]:
return 0, "started"
if cmd[:3] == ["systemctl", "--user", "stop"]:
return 0, "stopped"
if cmd[:3] == ["systemctl", "--user", "restart"]:
return 0, "restarted"
# fallback
return 0, "ok"
monkeypatch.setattr(app_module, "run", fake_run)
return TestClient(app_module.app)
+208
View File
@@ -0,0 +1,208 @@
import pytest
# ---- LOCKED BASELINE: paths + methods ----
BASELINE_ROUTES = {
("GET", "/workloads"),
("GET", "/workloads/read/{filename:path}"),
("POST", "/workloads/save-file"),
("POST", "/workloads/deploy/{filename:path}"),
("DELETE", "/files/rmdir"),
("GET", "/pods"),
("POST", "/actions/{action}/{name}"),
("GET", "/pods-dashboard"),
("POST", "/pods/actions/{action}/{podname}"),
("GET", "/containers-dashboard"),
("GET", "/containers"),
("POST", "/containers/{action}/{name}"),
("GET", "/debug/defined-containers"),
("GET", "/dashboard"),
("GET", "/test-hybrid"),
("GET", "/files/tree"),
("GET", "/files/read"),
("POST", "/files/save"),
("DELETE", "/files/delete"),
("POST", "/files/mkdir"),
("GET", "/containers/logs/{name}"),
("GET", "/containers/inspect/{name}"),
("GET", "/systemd/allowlist"),
("GET", "/"),
("POST", "/daemon-reload"),
("POST", "/{action}/{unit}"),
("POST", "/action"),
("POST", "/api/<action>/<unit>"),
}
def _route_tuples_from_app(app):
found = set()
for r in app.routes:
methods = getattr(r, "methods", None)
path = getattr(r, "path", None)
if not methods or not path:
continue
for m in methods:
# ignore implicit HEAD/OPTIONS etc; baseline is explicit
if m in {"HEAD", "OPTIONS"}:
continue
found.add((m, path))
return found
def _assert_keys_exact(obj: dict, expected_keys: list[str]):
assert isinstance(obj, dict), f"Expected dict, got {type(obj)}"
assert sorted(list(obj.keys())) == sorted(expected_keys), f"Keys mismatch: {sorted(obj.keys())} != {sorted(expected_keys)}"
# ---- 1) Route existence: all 28 paths/methods ----
def test_baseline_routes_exist(app_module):
found = _route_tuples_from_app(app_module.app)
missing = sorted(BASELINE_ROUTES - found)
assert not missing, f"Missing baseline routes: {missing}"
# ---- 2) FIXED/VARIANT keyset verification ----
def test_files_tree_fixed_shape(client):
r = client.get("/files/tree")
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
if data:
_assert_keys_exact(data[0], ["dirs", "files", "path"])
def test_files_read_fixed_shape(client):
r = client.get("/files/read", params={"path": "systemd/test.txt"})
assert r.status_code == 200
_assert_keys_exact(r.json(), ["content"])
def test_files_save_fixed_shape(client):
r = client.post("/files/save", params={"path": "systemd/test.txt"}, json={"content": "updated\n"})
assert r.status_code == 200
_assert_keys_exact(r.json(), ["path", "status"])
def test_files_delete_fixed_shape(client):
# create a file first (via save)
client.post("/files/save", params={"path": "systemd/delete-me.txt"}, json={"content": "x"})
r = client.delete("/files/delete", params={"path": "systemd/delete-me.txt"})
assert r.status_code == 200
_assert_keys_exact(r.json(), ["status", "type"])
def test_files_mkdir_fixed_shape(client):
r = client.post("/files/mkdir", params={"path": "systemd/newdir"})
assert r.status_code == 200
_assert_keys_exact(r.json(), ["path", "status"])
def test_files_rmdir_variant_shapes(client):
# 409: directory not empty -> FastAPI HTTPException detail object
r = client.delete("/files/rmdir", params={"path": "systemd/nonempty"})
assert r.status_code == 409
body = r.json()
_assert_keys_exact(body, ["detail"])
assert isinstance(body["detail"], dict)
_assert_keys_exact(body["detail"], ["dirs", "error", "files"])
# 200: empty directory -> success keys
client.post("/files/mkdir", params={"path": "systemd/emptydir"})
r2 = client.delete("/files/rmdir", params={"path": "systemd/emptydir"})
assert r2.status_code == 200
_assert_keys_exact(r2.json(), ["deleted", "path"])
def test_systemd_allowlist_fixed_shape(client):
r = client.get("/systemd/allowlist")
assert r.status_code == 200
_assert_keys_exact(r.json(), ["allow_mode", "units"])
def test_daemon_reload_fixed_shape(client):
r = client.post("/daemon-reload")
assert r.status_code == 200
_assert_keys_exact(r.json(), ["cmd", "exit", "output"])
def test_systemd_action_fixed_shape_and_allow_enforcement(client):
# allowed unit
r = client.post("/status/sonarr.service")
assert r.status_code == 200
_assert_keys_exact(r.json(), ["cmd", "exit", "output"])
# not allowed unit -> 403 with {"detail": ...}
r2 = client.post("/status/notallowed.service")
assert r2.status_code == 403
_assert_keys_exact(r2.json(), ["detail"])
def test_pods_dashboard_fixed_shape(client):
r = client.get("/pods-dashboard")
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
# Expect at least the defined pod from mediaserver.yaml -> "podmediaserver"
# Keys per item must be fixed
if data:
_assert_keys_exact(data[0], ["Containers", "Name", "Source", "Status", "Unit"])
def test_containers_dashboard_variant_shape(client):
r = client.get("/containers-dashboard")
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
# Because we created systemd/sonarr.container, we expect a defined entry
# in fixed defined-shape.
defined_shape = ["Image", "Names", "PodName", "Ports", "State", "Status",
"_dashboard_def_path", "_dashboard_source", "_dashboard_unit"]
found_defined = False
for item in data:
if isinstance(item, dict) and item.get("_dashboard_source") == "systemd":
_assert_keys_exact(item, defined_shape)
found_defined = True
break
assert found_defined, "Expected at least one defined container entry with fixed defined-shape"
def test_containers_logs_fixed_shape(client):
r = client.get("/containers/logs/testcontainer")
assert r.status_code == 200
_assert_keys_exact(r.json(), ["logs"])
# NOTE: tuple-return endpoints (invalid action) behave as HTTP 200 with JSON list [dict, status]
def test_tuple_return_invalid_action_shapes(client):
r = client.post("/containers/foo/bar")
assert r.status_code == 200
body = r.json()
assert isinstance(body, list) and len(body) == 2
assert isinstance(body[0], dict) and "error" in body[0]
r2 = client.post("/pods/actions/foo/podx")
assert r2.status_code == 200
body2 = r2.json()
assert isinstance(body2, list) and len(body2) == 2
assert isinstance(body2[0], dict) and "error" in body2[0]
# ---- 3) DYNAMIC endpoints: statuscode + is JSON only ----
@pytest.mark.parametrize(
"method,path,kwargs",
[
("GET", "/pods", {}),
("GET", "/containers", {}),
("GET", "/containers/inspect/testcontainer", {}),
("POST", "/workloads/deploy/mediaserver.yaml", {}),
("GET", "/debug/defined-containers", {}),
],
)
def test_dynamic_endpoints_only_json_and_200(client, method, path, kwargs):
req = getattr(client, method.lower())
r = req(path, **kwargs)
assert r.status_code == 200
data = r.json()
assert isinstance(data, (dict, list)), f"Expected JSON (dict/list), got {type(data)}"
+168
View File
@@ -0,0 +1,168 @@
import os
import sys
import types
import importlib
import pytest
from fastapi.testclient import TestClient
class FakeResponse:
def __init__(self, status_code: int = 200, json_data=None, text_data: str | None = None):
self.status_code = status_code
self._json_data = json_data
self._text_data = text_data
def json(self):
return self._json_data
@property
def text(self):
if self._text_data is not None:
return self._text_data
# Best-effort string
return ""
class FakeSession:
"""
Test-only Podman session stub.
Routes by URL substring to return deterministic JSON.
"""
def __init__(self):
self.calls = []
def get(self, url: str):
self.calls.append(("GET", url))
# Pods list (DYNAMIC)
if "/libpod/pods/json" in url:
return FakeResponse(200, json_data=[])
# Containers list (DYNAMIC)
if "/libpod/containers/json" in url:
return FakeResponse(200, json_data=[])
# Container inspect (DYNAMIC)
if "/libpod/containers/" in url and url.endswith("/json"):
# Return some dict to prove JSON
return FakeResponse(200, json_data={"Id": "dummy"})
# Container logs (FIXED endpoint expects {"logs": "..."} from wrapper endpoint, not Podman directly,
# but app.py fetches Podman logs and then wraps into {"logs": text}.
if "/libpod/containers/" in url and "/logs?" in url:
return FakeResponse(200, json_data=None, text_data="line1\nline2\n")
# Fallback: return JSON dict
return FakeResponse(200, json_data={})
def post(self, url: str, **kwargs):
self.calls.append(("POST", url, kwargs))
# kube/play passthrough (workloads deploy)
if "/libpod/kube/play" in url:
# app.py gebruikt SESSION.post(url, data=yaml_content).json()
return FakeResponse(200, json_data={"kube": "played"})
# Container start/stop/restart: treat as success
if "/libpod/containers/" in url:
return FakeResponse(204, json_data={})
# Pod start/stop etc: treat as success
if "/libpod/pods/" in url:
return FakeResponse(200, json_data={"ok": True})
return FakeResponse(200, json_data={})
def delete(self, url: str):
self.calls.append(("DELETE", url))
return FakeResponse(200, json_data={"deleted": True})
@pytest.fixture(scope="session")
def app_module():
"""
Import app.py as module `app` from /app, while ensuring requests_unixsocket import is safe.
No runtime changes to app.py; only test-time monkeypatching later.
"""
# Ensure /app is importable
if "/app" not in sys.path:
sys.path.insert(0, "/app")
# If requests_unixsocket isn't installed or to avoid real socket usage, provide a minimal stub.
if "requests_unixsocket" not in sys.modules:
mod = types.ModuleType("requests_unixsocket")
mod.Session = lambda: FakeSession()
sys.modules["requests_unixsocket"] = mod
app = importlib.import_module("app")
return app
@pytest.fixture()
def client(app_module, monkeypatch, tmp_path):
"""
TestClient with all external side-effects stubbed:
- Podman socket calls via app.SESSION
- systemctl calls via app.run
- filesystem roots via WORKLOADS_DIR + ALLOWLIST_FILE
"""
# Prepare temp workload tree under tmp_path
workloads_dir = tmp_path / "workloads"
systemd_dir = workloads_dir / "systemd"
systemd_dir.mkdir(parents=True, exist_ok=True)
# A defined container (*.container) for /containers-dashboard defined entry
(systemd_dir / "sonarr.container").write_text("[Container]\nImage=dummy\n", encoding="utf-8")
# Files endpoints operate under WORKLOADS_DIR; UI focuses on systemd subtree
# Create a file for /files/read/save/delete
(systemd_dir / "test.txt").write_text("hello\n", encoding="utf-8")
# Create a non-empty dir for /files/rmdir 409 scenario
nonempty_dir = systemd_dir / "nonempty"
nonempty_dir.mkdir(parents=True, exist_ok=True)
(nonempty_dir / "keep.txt").write_text("x", encoding="utf-8")
# Create a yaml to create a defined pod entry for /pods-dashboard
(workloads_dir / "mediaserver.yaml").write_text("kind: Pod\nmetadata:\n name: mediaserver\n", encoding="utf-8")
# Allowlist file for /systemd/allowlist and allow-mode enforcement
allowlist_file = tmp_path / "allowed_units.txt"
allowlist_file.write_text("sonarr.service\nmediaserver.service\n", encoding="utf-8")
# Patch module globals to point at tmp filesystem (test-only)
monkeypatch.setattr(app_module, "WORKLOADS_DIR", str(workloads_dir))
monkeypatch.setattr(app_module, "ALLOWLIST_FILE", str(allowlist_file))
# Stub Podman session object
monkeypatch.setattr(app_module, "SESSION", FakeSession())
# Stub systemctl runner
def fake_run(cmd):
# cmd is a list, e.g. ["systemctl","--user","is-active","mediaserver.service"]
cmd_str = " ".join(cmd)
if "is-active mediaserver.service" in cmd_str:
return 0, "active"
if "is-active sonarr.service" in cmd_str:
return 0, "inactive"
# For POST /{action}/{unit}, return something stable
if cmd[:3] == ["systemctl", "--user", "status"]:
return 0, "Active: active (running)"
if cmd[:3] == ["systemctl", "--user", "daemon-reload"]:
return 0, "ok"
if cmd[:3] == ["systemctl", "--user", "start"]:
return 0, "started"
if cmd[:3] == ["systemctl", "--user", "stop"]:
return 0, "stopped"
if cmd[:3] == ["systemctl", "--user", "restart"]:
return 0, "restarted"
# fallback
return 0, "ok"
monkeypatch.setattr(app_module, "run", fake_run)
return TestClient(app_module.app)
@@ -0,0 +1,165 @@
import importlib
import importlib.util
from pathlib import Path
from fastapi.testclient import TestClient
def load_app_module():
try:
return importlib.import_module("app")
except ModuleNotFoundError:
app_path = Path("/app/app.py")
spec = importlib.util.spec_from_file_location("app", app_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ============================================================
# /containers/{action}/{name}
# ============================================================
def test_container_action_invalid_action_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/containers/invalid/dummy")
# Freeze whatever baseline currently is
body = r.json()
# Must be JSON and deterministic structure
assert isinstance(body, (dict, list))
# If dict variant
if isinstance(body, dict):
assert set(body.keys()) == {"error"}
assert body["error"] == "Invalid action"
def test_container_action_failure_shape_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/containers/start/nonexistent_container_12345")
body = r.json()
# Freeze exact variant structure
if isinstance(body, dict) and body.get("method") == "podman":
assert "name" in body
assert "cmd" in body
assert "status_code" in body
else:
# Any other baseline variant must still be JSON
assert isinstance(body, (dict, list))
# ============================================================
# /actions/{action}/{name}
# ============================================================
def test_legacy_actions_invalid_action_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/actions/invalid/dummy")
body = r.json()
assert isinstance(body, dict)
assert set(body.keys()) == {"status"}
assert body["status"] == "unknown"
# ============================================================
# /pods/actions/{action}/{podname}
# ============================================================
def test_pods_action_invalid_action_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/pods/actions/invalid/dummy")
body = r.json()
# Baseline may return tuple-style 400 or JSON
assert isinstance(body, (dict, list))
if isinstance(body, dict) and "error" in body:
assert body["error"] == "Invalid action"
def test_pods_action_variant_shape_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/pods/actions/start/nonexistent_pod_12345")
body = r.json()
# Freeze possible baseline variants
if isinstance(body, dict):
if body.get("method") == "systemd_then_podman":
assert set(body.keys()) == {"method", "note", "systemd", "podman"}
assert body["note"] == "systemd failed; falling back to podman"
elif body.get("method") == "podman":
assert set(body.keys()) == {"method", "result"}
else:
assert isinstance(body, list)
# ============================================================
# /{action}/{unit}
# ============================================================
def test_systemctl_wrapper_invalid_action_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/invalid/dummy.service")
body = r.json()
assert set(body.keys()) == {"detail"}
assert body["detail"] == "Invalid action"
def test_systemctl_wrapper_status_shape_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/status/nonexistent.service")
body = r.json()
# allowlist may cause 403
if r.status_code == 403:
assert set(body.keys()) == {"detail"}
else:
assert set(body.keys()) == {"cmd", "exit", "output"}
# ============================================================
# /api/<action>/<unit>
# ============================================================
def test_legacy_api_invalid_action_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/api/invalid/dummy.service")
body = r.json()
# Baseline: HTTPException 400 with {"detail": "..."}
assert r.status_code == 400
assert set(body.keys()) == {"detail"}
def test_legacy_api_status_shape_contract():
app_mod = load_app_module()
client = TestClient(app_mod.app)
r = client.post("/api/status/nonexistent.service")
body = r.json()
if r.status_code == 403:
assert set(body.keys()) == {"detail"}
else:
assert set(body.keys()) == {"cmd", "exit", "output"}
+69
View File
@@ -0,0 +1,69 @@
import importlib.util
import os
def load_app_module():
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
app_path = os.path.join(repo_root, "app.py")
spec = importlib.util.spec_from_file_location("app", app_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _iter_routes(app):
for r in getattr(app, "routes", []):
path = getattr(r, "path", None)
methods = getattr(r, "methods", None)
if not path or not methods:
continue
for m in methods:
if m in ("HEAD", "OPTIONS"):
continue
yield (m, path)
def test_contract_routes_exist_and_match_locked_baseline():
app_mod = load_app_module()
routes = set(_iter_routes(app_mod.app))
# Endpoint count exact (locked baseline)
assert len(routes) == 30
expected = {
# FastAPI built-ins (default)
("GET", "/openapi.json"),
("GET", "/docs"),
("GET", "/docs/oauth2-redirect"),
("GET", "/redoc"),
# App routes
("GET", "/workloads"),
("GET", "/workloads/read/{filename:path}"),
("POST", "/workloads/save-file"),
("POST", "/workloads/deploy/{filename:path}"),
("GET", "/files/tree"),
("GET", "/files/read"),
("POST", "/files/save"),
("DELETE", "/files/delete"),
("POST", "/files/mkdir"),
("DELETE", "/files/rmdir"),
("GET", "/pods"),
("POST", "/actions/{action}/{name}"),
("GET", "/pods-dashboard"),
("POST", "/pods/actions/{action}/{podname}"),
("GET", "/containers-dashboard"),
("GET", "/containers"),
("POST", "/containers/{action}/{name}"),
("GET", "/debug/defined-containers"),
("GET", "/dashboard"),
("GET", "/test-hybrid"),
("GET", "/containers/logs/{name}"),
("GET", "/containers/inspect/{name}"),
("GET", "/systemd/allowlist"),
("POST", "/daemon-reload"),
("POST", "/{action}/{unit}"),
("POST", "/api/<action>/<unit>"),
}
assert routes == expected
+124
View File
@@ -0,0 +1,124 @@
import importlib
import importlib.util
from pathlib import Path
from fastapi.testclient import TestClient
def load_app_module():
try:
return importlib.import_module("app")
except ModuleNotFoundError:
app_path = Path("/app/app.py")
spec = importlib.util.spec_from_file_location("app", app_path)
if spec is None or spec.loader is None:
raise
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def test_files_read_not_found_detail_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
workloads.mkdir()
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.get("/files/read", params={"path": "systemd/nope.txt"})
assert r.status_code == 404
assert r.json()["detail"] == "Not found"
def test_files_read_is_directory_detail_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
(workloads / "systemd").mkdir(parents=True)
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.get("/files/read", params={"path": "systemd"})
assert r.status_code == 403
assert r.json()["detail"] == "Is a directory"
def test_files_delete_directory_error_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
(workloads / "systemd").mkdir(parents=True)
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.delete("/files/delete", params={"path": "systemd"})
assert r.status_code == 400
assert r.json()["detail"] == "Kan niet verwijderen: is directory"
def test_files_mkdir_prefix_systemd_and_keyset(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
workloads.mkdir()
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.post("/files/mkdir", params={"path": "abc"})
assert r.status_code == 200
body = r.json()
assert set(body.keys()) == {"status", "path"}
assert body["path"] == "systemd/abc"
def test_files_rmdir_refuse_root_detail_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
workloads.mkdir()
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.delete("/files/rmdir", params={"path": "systemd"})
assert r.status_code == 400
assert r.json()["detail"] == "Refusing to delete systemd root"
def test_files_rmdir_only_systemd_subtree_detail_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
workloads.mkdir()
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.delete("/files/rmdir", params={"path": "not_systemd/x"})
assert r.status_code == 400
assert r.json()["detail"] == "Only systemd subtree is allowed"
def test_files_rmdir_nonempty_409_detail_shape_exact(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
d = workloads / "systemd" / "dir1"
d.mkdir(parents=True)
(d / "a.txt").write_text("x", encoding="utf-8")
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.delete("/files/rmdir", params={"path": "systemd/dir1"})
assert r.status_code == 409
body = r.json()
assert set(body.keys()) == {"detail"}
detail = body["detail"]
assert set(detail.keys()) == {"error", "dirs", "files"}
assert detail["error"] == "directory not empty"
assert "a.txt" in detail["files"]
def test_files_safe_join_forbidden_path_detail_string(tmp_path, monkeypatch):
app_mod = load_app_module()
workloads = tmp_path / "workloads"
workloads.mkdir()
monkeypatch.setattr(app_mod, "WORKLOADS_DIR", str(workloads))
client = TestClient(app_mod.app)
r = client.get("/files/read", params={"path": "../escape.txt"})
assert r.status_code == 403
assert r.json()["detail"] == "Forbidden path"