bacab3b20a
Drie endpoints gebruikten os.path.join zonder validatie, waardoor een aanvaller buiten WORKLOADS_DIR kon lezen/schrijven. Vervangen door de bestaande _files_safe_join() helper die al door alle /files/ endpoints werd gebruikt. Endpoints: /workloads/read/, /workloads/save-file, /workloads/deploy/ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class FileContent(BaseModel):
|
|
content: str
|
|
|
|
|
|
def safe_join(base, path):
|
|
# prevent traversal
|
|
base = os.path.abspath(base)
|
|
final = os.path.abspath(os.path.join(base, path))
|
|
if not final.startswith(base):
|
|
raise HTTPException(status_code=403, detail="Forbidden path")
|
|
return final
|
|
|
|
|
|
def init_files_router(session, podman_api_base: str, workloads_dir: str) -> APIRouter:
|
|
router = APIRouter(tags=["files"])
|
|
|
|
def _podman_post(url: str, **kwargs):
|
|
# Keep behavior identical to app.py wrapper used by old /workloads/deploy.
|
|
return session.post(url, **kwargs)
|
|
|
|
# STEP 4: Centralize WORKLOADS_DIR subtree enforcement via one helper.
|
|
# MUST be behavior-identical to previous safe_join(WORKLOADS_DIR, ...) calls.
|
|
def _files_safe_join(path: str) -> str:
|
|
return safe_join(workloads_dir, path)
|
|
|
|
# --- WORKLOADS ---
|
|
@router.get("/workloads")
|
|
def list_workloads():
|
|
workloads = []
|
|
for root, _, files in os.walk(workloads_dir):
|
|
for f in files:
|
|
if f.endswith((".yaml", ".yml", ".json")):
|
|
full = os.path.join(root, f)
|
|
rel = os.path.relpath(full, workloads_dir)
|
|
workloads.append(rel)
|
|
return {"workloads": workloads}
|
|
|
|
@router.get("/workloads/read/{filename:path}")
|
|
def read_workload(filename: str):
|
|
path = _files_safe_join(filename)
|
|
if not os.path.exists(path):
|
|
raise HTTPException(404)
|
|
with open(path, 'r') as f:
|
|
content = f.read()
|
|
return {"filename": filename, "content": content}
|
|
|
|
@router.post("/workloads/save-file")
|
|
def save_workload_file(data: dict):
|
|
path = data.get("path")
|
|
content = data.get("content")
|
|
full_path = _files_safe_join(path)
|
|
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
|
with open(full_path, "w") as f:
|
|
f.write(content)
|
|
return {"status": "success"}
|
|
|
|
@router.post("/workloads/deploy/{filename:path}")
|
|
def deploy_workload(filename: str):
|
|
path = _files_safe_join(filename)
|
|
with open(path, 'r') as f:
|
|
yaml_content = f.read()
|
|
url = f"{podman_api_base}/libpod/kube/play"
|
|
return _podman_post(url, data=yaml_content).json()
|
|
|
|
# --- FILES API ---
|
|
@router.get("/files/tree")
|
|
def file_tree():
|
|
root = workloads_dir
|
|
result = []
|
|
for dirpath, dirnames, filenames in os.walk(root):
|
|
rel = os.path.relpath(dirpath, root)
|
|
if rel == ".":
|
|
rel = ""
|
|
result.append({
|
|
"path": rel,
|
|
"dirs": sorted(dirnames),
|
|
"files": sorted(filenames),
|
|
})
|
|
return result
|
|
|
|
@router.get("/files/read")
|
|
def file_read(path: str = Query(...)):
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
if os.path.isdir(full):
|
|
raise HTTPException(status_code=403, detail="Is a directory")
|
|
with open(full, "r") as f:
|
|
content = f.read()
|
|
return {"content": content}
|
|
|
|
@router.post("/files/save")
|
|
def file_save(path: str = Query(...), data: FileContent = None):
|
|
full = _files_safe_join(path)
|
|
os.makedirs(os.path.dirname(full), exist_ok=True)
|
|
with open(full, "w") as f:
|
|
f.write(data.content)
|
|
return {"status": "success", "path": path}
|
|
|
|
@router.delete("/files/delete")
|
|
def file_delete(path: str = Query(...)):
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
if os.path.isdir(full):
|
|
raise HTTPException(status_code=400, detail="Kan niet verwijderen: is directory")
|
|
try:
|
|
os.remove(full)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Kan niet verwijderen: {e}")
|
|
return {"status": "deleted", "type": "file"}
|
|
|
|
@router.post("/files/mkdir")
|
|
def file_mkdir(path: str = Query(...)):
|
|
# UI expects operations under systemd/; enforce prefix if absent.
|
|
if not path.startswith("systemd"):
|
|
path = os.path.join("systemd", path)
|
|
full = _files_safe_join(path)
|
|
os.makedirs(full, exist_ok=True)
|
|
return {"status": "directory created", "path": path}
|
|
|
|
@router.delete("/files/rmdir")
|
|
def file_rmdir(path: str = Query(..., description="Directory path under systemd/")):
|
|
# Only allow deletion under systemd subtree
|
|
if not path or path == "systemd" or path == "systemd/":
|
|
raise HTTPException(status_code=400, detail="Refusing to delete systemd root")
|
|
if not path.startswith("systemd/") and path != "systemd":
|
|
raise HTTPException(status_code=400, detail="Only systemd subtree is allowed")
|
|
|
|
full = _files_safe_join(path)
|
|
if not os.path.exists(full):
|
|
raise HTTPException(status_code=404, detail="Directory not found")
|
|
if not os.path.isdir(full):
|
|
raise HTTPException(status_code=400, detail="Path is not a directory")
|
|
|
|
# directory must be empty
|
|
try:
|
|
Path(full).rmdir()
|
|
except OSError:
|
|
# not empty
|
|
# build a stable detail payload
|
|
try:
|
|
dirs = []
|
|
files = []
|
|
for entry in os.listdir(full):
|
|
p = os.path.join(full, entry)
|
|
if os.path.isdir(p):
|
|
dirs.append(entry)
|
|
else:
|
|
files.append(entry)
|
|
except Exception:
|
|
dirs, files = [], []
|
|
raise HTTPException(status_code=409, detail={
|
|
"error": "directory not empty",
|
|
"dirs": sorted(dirs),
|
|
"files": sorted(files),
|
|
})
|
|
|
|
return {"deleted": True, "path": path}
|
|
|
|
return router
|