from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import List, Optional, Dict, Any from luxx.agent.agent import agent_service from luxx.utils.helpers import generate_id class TaskStatus(Enum): """Task status enum""" PENDING = "pending" READY = "ready" RUNNING = "running" BLOCK = "block" TERMINATED = "terminated" class StepStatus(Enum): """Step status enum""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" @dataclass class Step: """Task step""" id: str name: str description: str = "" status: StepStatus = StepStatus.PENDING result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "id": self.id, "name": self.name, "description": self.description, "status": self.status.value, "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } @dataclass class Task: """Task entity""" id: str name: str description: str = "" goal: str = "" status: TaskStatus = TaskStatus.PENDING steps: List[Step] = field(default_factory=list) subtasks: List["Task"] = field(default_factory=list) result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "id": self.id, "name": self.name, "description": self.description, "goal": self.goal, "status": self.status.value, "steps": [s.to_dict() for s in self.steps], "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class TaskService: """Task service for managing tasks""" def __init__(self): self._tasks: Dict[str, Task] = {} def create_task( self, agent_id: str, name: str, goal: str, description: str = "", steps: List[Dict[str, Any]] = None ) -> Optional[Task]: """Create task for agent, optionally as subtask""" agent = agent_service.get_agent(agent_id) if not agent: return None task_id = generate_id("task") task = Task( id=task_id, name=name, description=description, goal=goal ) # Add steps if steps: for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", "") ) task.steps.append(step) agent.current_task = task self._tasks[task_id] = task self._logger.info(f"Created task: {task_id} for agent: {agent_id}") return task def get_task(self, task_id: str) -> Optional[Task]: """Get task by ID""" return self._tasks.get(task_id) def list_tasks(self, agent_id: str = None) -> List[Task]: """List tasks, optionally filtered by agent""" if agent_id: agent = self._agent_service.get_agent(agent_id) if agent and agent.current_task: return [agent.current_task] return [] return list(self._tasks.values()) def update_task_status( self, task_id: str, status: TaskStatus, result: Any = None ) -> Optional[Task]: """Update task status""" task = self._tasks.get(task_id) if not task: return None task.status = status task.result = result task.updated_at = datetime.now() return task def add_steps( self, task_id: str, steps: List[Dict[str, Any]] ) -> Optional[List[Step]]: """Add steps to task""" task = self._tasks.get(task_id) if not task: return None result = [] for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", "") ) task.steps.append(step) result.append(step) task.updated_at = datetime.now() return result def delete_task(self, task_id: str) -> bool: """Delete task""" if task_id not in self._tasks: return False del self._tasks[task_id] return True task_service = TaskService()