From 119d566e89c66dc8167bba66548865349360136a Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Thu, 16 Apr 2026 20:27:34 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0task=20=E5=9B=BE?= =?UTF-8?q?=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- luxx/agent/task.py | 129 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/luxx/agent/task.py b/luxx/agent/task.py index 0f13b24..2ad4979 100644 --- a/luxx/agent/task.py +++ b/luxx/agent/task.py @@ -30,6 +30,7 @@ class Step: id: str name: str description: str = "" + depends_on: List[str] = field(default_factory=list) status: StepStatus = StepStatus.PENDING result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) @@ -41,6 +42,7 @@ class Step: "id": self.id, "name": self.name, "description": self.description, + "depends_on": self.depends_on, "status": self.status.value, "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, @@ -71,12 +73,129 @@ class Task: "goal": self.goal, "status": self.status.value, "steps": [s.to_dict() for s in self.steps], + "subtasks": [t.to_dict() for t in self.subtasks], "result": self.result, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } +class TaskGraph: + """Task graph for managing step dependencies""" + + def __init__(self, task: Task): + self.task = task + self._adjacency: Dict[str, List[str]] = {} # step_id -> [dependent_step_ids] + self._reverse_adjacency: Dict[str, List[str]] = {} # step_id -> [dependency_step_ids] + self._in_degree: Dict[str, int] = {} # step_id -> in-degree count + self._build_graph() + + def _build_graph(self) -> None: + """Build graph from task steps""" + # Initialize adjacency lists + for step in self.task.steps: + self._adjacency[step.id] = [] + self._reverse_adjacency[step.id] = [] + self._in_degree[step.id] = 0 + + # Build edges based on dependencies + for step in self.task.steps: + for dep_id in step.depends_on: + if dep_id in self._adjacency: + self._adjacency[dep_id].append(step.id) + self._reverse_adjacency[step.id].append(dep_id) + self._in_degree[step.id] += 1 + + def topological_sort(self) -> List[Step]: + """Get steps in topological order (Kahn's algorithm)""" + # Create a copy of in-degrees + in_degree = self._in_degree.copy() + + # Queue for steps with no dependencies + queue = [step_id for step_id, degree in in_degree.items() if degree == 0] + result = [] + + # Get step map for easy lookup + step_map = {step.id: step for step in self.task.steps} + + while queue: + # Sort for deterministic order + queue.sort() + current = queue.pop(0) + result.append(step_map[current]) + + # Reduce in-degree for dependent steps + for dependent_id in self._adjacency[current]: + in_degree[dependent_id] -= 1 + if in_degree[dependent_id] == 0: + queue.append(dependent_id) + + return result + + def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]: + """Get steps that are ready to execute (all dependencies completed)""" + step_map = {step.id: step for step in self.task.steps} + ready = [] + + for step in self.task.steps: + if step.id in completed_step_ids: + continue + if step.status != StepStatus.PENDING: + continue + + # Check if all dependencies are completed + deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on) + if deps_completed: + ready.append(step) + + return ready + + def detect_cycles(self) -> List[List[str]]: + """Detect cycles in the graph using DFS""" + WHITE, GRAY, BLACK = 0, 1, 2 + color = {step.id: WHITE for step in self.task.steps} + cycles = [] + + def dfs(node: str, path: List[str]) -> bool: + color[node] = GRAY + path.append(node) + + for neighbor in self._adjacency.get(node, []): + if color[neighbor] == GRAY: + # Found cycle + cycle_start = path.index(neighbor) + cycles.append(path[cycle_start:] + [neighbor]) + return True + elif color[neighbor] == WHITE: + if dfs(neighbor, path): + return True + + path.pop() + color[node] = BLACK + return False + + for step in self.task.steps: + if color[step.id] == WHITE: + dfs(step.id, []) + + return cycles + + def validate(self) -> tuple[bool, Optional[str]]: + """Validate the graph structure""" + # Check for cycles + cycles = self.detect_cycles() + if cycles: + return False, f"Circular dependency detected: {cycles[0]}" + + # Check for missing dependencies + step_ids = {step.id for step in self.task.steps} + for step in self.task.steps: + for dep_id in step.depends_on: + if dep_id not in step_ids: + return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'" + + return True, None + class TaskService: """Task service for managing tasks""" @@ -165,7 +284,8 @@ class TaskService: step = Step( id=generate_id("step"), name=step_data.get("name", ""), - description=step_data.get("description", "") + description=step_data.get("description", ""), + depends_on=step_data.get("depends_on", []) ) task.steps.append(step) result.append(step) @@ -180,6 +300,13 @@ class TaskService: del self._tasks[task_id] return True + + def build_graph(self, task_id: str) -> Optional[TaskGraph]: + """Build task graph for a task""" + task = self._tasks.get(task_id) + if not task: + return None + return TaskGraph(task) task_service = TaskService()