Files
webmanager-mvp/webui/backend/app/services/task_service.py
T
2026-03-11 09:39:41 +01:00

43 lines
1.5 KiB
Python

from __future__ import annotations
from backend.app.api.errors import AppError
from backend.app.api.schemas import TaskDetailResponse, TaskListItem, TaskListResponse
from backend.app.db.task_repository import TaskRepository
class TaskService:
def __init__(self, repository: TaskRepository):
self._repository = repository
def create_task(self, operation: str, source: str, destination: str) -> TaskDetailResponse:
task = self._repository.create_task(operation=operation, source=source, destination=destination)
return TaskDetailResponse(**task)
def get_task(self, task_id: str) -> TaskDetailResponse:
task = self._repository.get_task(task_id)
if not task:
raise AppError(
code="task_not_found",
message="Task was not found",
status_code=404,
details={"task_id": task_id},
)
return TaskDetailResponse(**task)
def list_tasks(self) -> TaskListResponse:
tasks = self._repository.list_tasks()
return TaskListResponse(
items=[
TaskListItem(
id=task["id"],
operation=task["operation"],
status=task["status"],
source=task["source"],
destination=task["destination"],
created_at=task["created_at"],
finished_at=task["finished_at"],
)
for task in tasks
]
)