Luxx/luxx/agent/task.py

289 lines
8.5 KiB
Python

"""Task module for autonomous task execution"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import logging
from typing import List, Optional, Dict, Any
from luxx.utils.helpers import generate_id
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""Task status enum"""
PENDING = "pending"
READY = "ready"
RUNNING = "running"
BLOCK = "block"
TERMINATED = "terminated"
class StepStatus(Enum):
"""Step status enum"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Step:
"""Task step"""
id: str
name: str
description: str = ""
depends_on: List[str] = field(default_factory=list)
status: StepStatus = StepStatus.PENDING
result: Optional[Dict[str, Any]] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"depends_on": self.depends_on,
"status": self.status.value,
"result": self.result,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
@dataclass
class Task:
"""Task entity"""
id: str
name: str
description: str = ""
goal: str = ""
status: TaskStatus = TaskStatus.PENDING
steps: List[Step] = field(default_factory=list)
subtasks: List["Task"] = field(default_factory=list)
result: Optional[Dict[str, Any]] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"goal": self.goal,
"status": self.status.value,
"steps": [s.to_dict() for s in self.steps],
"subtasks": [t.to_dict() for t in self.subtasks],
"result": self.result,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class TaskGraph:
"""Task graph for managing step dependencies"""
def __init__(self, task: Task):
self.task = task
self._adjacency: Dict[str, List[str]] = {}
self._reverse_adjacency: Dict[str, List[str]] = {}
self._in_degree: Dict[str, int] = {}
self._build_graph()
def _build_graph(self) -> None:
"""Build graph from task steps"""
for step in self.task.steps:
self._adjacency[step.id] = []
self._reverse_adjacency[step.id] = []
self._in_degree[step.id] = 0
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id in self._adjacency:
self._adjacency[dep_id].append(step.id)
self._reverse_adjacency[step.id].append(dep_id)
self._in_degree[step.id] += 1
def topological_sort(self) -> List[Step]:
"""Get steps in topological order"""
in_degree = self._in_degree.copy()
queue = [step_id for step_id, degree in in_degree.items() if degree == 0]
result = []
step_map = {step.id: step for step in self.task.steps}
while queue:
queue.sort()
current = queue.pop(0)
result.append(step_map[current])
for dependent_id in self._adjacency[current]:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
queue.append(dependent_id)
return result
def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]:
"""Get steps that are ready to execute"""
step_map = {step.id: step for step in self.task.steps}
ready = []
for step in self.task.steps:
if step.id in completed_step_ids:
continue
if step.status != StepStatus.PENDING:
continue
deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on)
if deps_completed:
ready.append(step)
return ready
def detect_cycles(self) -> List[List[str]]:
"""Detect cycles in the graph"""
WHITE, GRAY, BLACK = 0, 1, 2
color = {step.id: WHITE for step in self.task.steps}
cycles = []
def dfs(node: str, path: List[str]) -> bool:
color[node] = GRAY
path.append(node)
for neighbor in self._adjacency.get(node, []):
if color[neighbor] == GRAY:
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
return True
elif color[neighbor] == WHITE:
if dfs(neighbor, path):
return True
path.pop()
color[node] = BLACK
return False
for step in self.task.steps:
if color[step.id] == WHITE:
dfs(step.id, [])
return cycles
def validate(self) -> tuple[bool, Optional[str]]:
"""Validate the graph structure"""
cycles = self.detect_cycles()
if cycles:
return False, f"Circular dependency detected: {cycles[0]}"
step_ids = {step.id for step in self.task.steps}
for step in self.task.steps:
for dep_id in step.depends_on:
if dep_id not in step_ids:
return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'"
return True, None
class TaskService:
"""Task service for managing tasks"""
def __init__(self):
self._tasks: Dict[str, Task] = {}
def create_task(
self,
name: str,
goal: str,
description: str = "",
steps: List[Dict[str, Any]] = None
) -> Task:
"""Create a new task"""
task_id = generate_id("task")
task = Task(
id=task_id,
name=name,
description=description,
goal=goal
)
if steps:
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", "")
)
task.steps.append(step)
self._tasks[task_id] = task
logger.info(f"Created task: {task_id}")
return task
def get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID"""
return self._tasks.get(task_id)
def list_tasks(self) -> List[Task]:
"""List all tasks"""
return list(self._tasks.values())
def update_task_status(
self,
task_id: str,
status: TaskStatus,
result: Any = None
) -> Optional[Task]:
"""Update task status"""
task = self._tasks.get(task_id)
if not task:
return None
task.status = status
task.result = result
task.updated_at = datetime.now()
return task
def add_steps(
self,
task_id: str,
steps: List[Dict[str, Any]]
) -> Optional[List[Step]]:
"""Add steps to task"""
task = self._tasks.get(task_id)
if not task:
return None
result = []
for step_data in steps:
step = Step(
id=generate_id("step"),
name=step_data.get("name", ""),
description=step_data.get("description", ""),
depends_on=step_data.get("depends_on", [])
)
task.steps.append(step)
result.append(step)
task.updated_at = datetime.now()
return result
def delete_task(self, task_id: str) -> bool:
"""Delete task"""
if task_id not in self._tasks:
return False
del self._tasks[task_id]
return True
def build_graph(self, task_id: str) -> Optional[TaskGraph]:
"""Build task graph for a task"""
task = self._tasks.get(task_id)
if not task:
return None
return TaskGraph(task)
task_service = TaskService()