"""Task module for autonomous task execution""" from dataclasses import dataclass, field from datetime import datetime from enum import Enum import logging from typing import List, Optional, Dict, Any from luxx.utils.helpers import generate_id logger = logging.getLogger(__name__) class TaskStatus(Enum): """Task status enum""" PENDING = "pending" READY = "ready" RUNNING = "running" BLOCK = "block" TERMINATED = "terminated" class StepStatus(Enum): """Step status enum""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" @dataclass class Step: """Task step""" id: str name: str description: str = "" depends_on: List[str] = field(default_factory=list) status: StepStatus = StepStatus.PENDING result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "id": self.id, "name": self.name, "description": self.description, "depends_on": self.depends_on, "status": self.status.value, "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } @dataclass class Task: """Task entity""" id: str name: str description: str = "" goal: str = "" status: TaskStatus = TaskStatus.PENDING steps: List[Step] = field(default_factory=list) subtasks: List["Task"] = field(default_factory=list) result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "id": self.id, "name": self.name, "description": self.description, "goal": self.goal, "status": self.status.value, "steps": [s.to_dict() for s in self.steps], "subtasks": [t.to_dict() for t in self.subtasks], "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class TaskGraph: """Task graph for managing step dependencies""" def __init__(self, task: Task): self.task = task self._adjacency: Dict[str, List[str]] = {} self._reverse_adjacency: Dict[str, List[str]] = {} self._in_degree: Dict[str, int] = {} self._build_graph() def _build_graph(self) -> None: """Build graph from task steps""" for step in self.task.steps: self._adjacency[step.id] = [] self._reverse_adjacency[step.id] = [] self._in_degree[step.id] = 0 for step in self.task.steps: for dep_id in step.depends_on: if dep_id in self._adjacency: self._adjacency[dep_id].append(step.id) self._reverse_adjacency[step.id].append(dep_id) self._in_degree[step.id] += 1 def topological_sort(self) -> List[Step]: """Get steps in topological order""" in_degree = self._in_degree.copy() queue = [step_id for step_id, degree in in_degree.items() if degree == 0] result = [] step_map = {step.id: step for step in self.task.steps} while queue: queue.sort() current = queue.pop(0) result.append(step_map[current]) for dependent_id in self._adjacency[current]: in_degree[dependent_id] -= 1 if in_degree[dependent_id] == 0: queue.append(dependent_id) return result def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]: """Get steps that are ready to execute""" step_map = {step.id: step for step in self.task.steps} ready = [] for step in self.task.steps: if step.id in completed_step_ids: continue if step.status != StepStatus.PENDING: continue deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on) if deps_completed: ready.append(step) return ready def detect_cycles(self) -> List[List[str]]: """Detect cycles in the graph""" WHITE, GRAY, BLACK = 0, 1, 2 color = {step.id: WHITE for step in self.task.steps} cycles = [] def dfs(node: str, path: List[str]) -> bool: color[node] = GRAY path.append(node) for neighbor in self._adjacency.get(node, []): if color[neighbor] == GRAY: cycle_start = path.index(neighbor) cycles.append(path[cycle_start:] + [neighbor]) return True elif color[neighbor] == WHITE: if dfs(neighbor, path): return True path.pop() color[node] = BLACK return False for step in self.task.steps: if color[step.id] == WHITE: dfs(step.id, []) return cycles def validate(self) -> tuple[bool, Optional[str]]: """Validate the graph structure""" cycles = self.detect_cycles() if cycles: return False, f"Circular dependency detected: {cycles[0]}" step_ids = {step.id for step in self.task.steps} for step in self.task.steps: for dep_id in step.depends_on: if dep_id not in step_ids: return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'" return True, None class TaskService: """Task service for managing tasks""" def __init__(self): self._tasks: Dict[str, Task] = {} def create_task( self, name: str, goal: str, description: str = "", steps: List[Dict[str, Any]] = None ) -> Task: """Create a new task""" task_id = generate_id("task") task = Task( id=task_id, name=name, description=description, goal=goal ) if steps: for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", "") ) task.steps.append(step) self._tasks[task_id] = task logger.info(f"Created task: {task_id}") return task def get_task(self, task_id: str) -> Optional[Task]: """Get task by ID""" return self._tasks.get(task_id) def list_tasks(self) -> List[Task]: """List all tasks""" return list(self._tasks.values()) def update_task_status( self, task_id: str, status: TaskStatus, result: Any = None ) -> Optional[Task]: """Update task status""" task = self._tasks.get(task_id) if not task: return None task.status = status task.result = result task.updated_at = datetime.now() return task def add_steps( self, task_id: str, steps: List[Dict[str, Any]] ) -> Optional[List[Step]]: """Add steps to task""" task = self._tasks.get(task_id) if not task: return None result = [] for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", ""), depends_on=step_data.get("depends_on", []) ) task.steps.append(step) result.append(step) task.updated_at = datetime.now() return result def delete_task(self, task_id: str) -> bool: """Delete task""" if task_id not in self._tasks: return False del self._tasks[task_id] return True def build_graph(self, task_id: str) -> Optional[TaskGraph]: """Build task graph for a task""" task = self._tasks.get(task_id) if not task: return None return TaskGraph(task) task_service = TaskService()