From dc08267c15b10a8ce74b45cb074c2a5271633a72 Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Fri, 17 Apr 2026 23:05:54 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0task=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- luxx/services/task.py | 288 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 luxx/services/task.py diff --git a/luxx/services/task.py b/luxx/services/task.py new file mode 100644 index 0000000..3043ffe --- /dev/null +++ b/luxx/services/task.py @@ -0,0 +1,288 @@ +"""Task module for autonomous task execution""" +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +import logging +from typing import List, Optional, Dict, Any +from luxx.utils.helpers import generate_id + +logger = logging.getLogger(__name__) + + +class TaskStatus(Enum): + """Task status enum""" + PENDING = "pending" + READY = "ready" + RUNNING = "running" + BLOCK = "block" + TERMINATED = "terminated" + + +class StepStatus(Enum): + """Step status enum""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + + +@dataclass +class Step: + """Task step""" + id: str + name: str + description: str = "" + depends_on: List[str] = field(default_factory=list) + status: StepStatus = StepStatus.PENDING + result: Optional[Dict[str, Any]] = None + created_at: datetime = field(default_factory=datetime.now) + updated_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary""" + return { + "id": self.id, + "name": self.name, + "description": self.description, + "depends_on": self.depends_on, + "status": self.status.value, + "result": self.result, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + + +@dataclass +class Task: + """Task entity""" + id: str + name: str + description: str = "" + goal: str = "" + status: TaskStatus = TaskStatus.PENDING + steps: List[Step] = field(default_factory=list) + subtasks: List["Task"] = field(default_factory=list) + result: Optional[Dict[str, Any]] = None + created_at: datetime = field(default_factory=datetime.now) + updated_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary""" + return { + "id": self.id, + "name": self.name, + "description": self.description, + "goal": self.goal, + "status": self.status.value, + "steps": [s.to_dict() for s in self.steps], + "subtasks": [t.to_dict() for t in self.subtasks], + "result": self.result, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None + } + + +class TaskGraph: + """Task graph for managing step dependencies""" + + def __init__(self, task: Task): + self.task = task + self._adjacency: Dict[str, List[str]] = {} + self._reverse_adjacency: Dict[str, List[str]] = {} + self._in_degree: Dict[str, int] = {} + self._build_graph() + + def _build_graph(self) -> None: + """Build graph from task steps""" + for step in self.task.steps: + self._adjacency[step.id] = [] + self._reverse_adjacency[step.id] = [] + self._in_degree[step.id] = 0 + + for step in self.task.steps: + for dep_id in step.depends_on: + if dep_id in self._adjacency: + self._adjacency[dep_id].append(step.id) + self._reverse_adjacency[step.id].append(dep_id) + self._in_degree[step.id] += 1 + + def topological_sort(self) -> List[Step]: + """Get steps in topological order""" + in_degree = self._in_degree.copy() + queue = [step_id for step_id, degree in in_degree.items() if degree == 0] + result = [] + step_map = {step.id: step for step in self.task.steps} + + while queue: + queue.sort() + current = queue.pop(0) + result.append(step_map[current]) + + for dependent_id in self._adjacency[current]: + in_degree[dependent_id] -= 1 + if in_degree[dependent_id] == 0: + queue.append(dependent_id) + + return result + + def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]: + """Get steps that are ready to execute""" + step_map = {step.id: step for step in self.task.steps} + ready = [] + + for step in self.task.steps: + if step.id in completed_step_ids: + continue + if step.status != StepStatus.PENDING: + continue + deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on) + if deps_completed: + ready.append(step) + + return ready + + def detect_cycles(self) -> List[List[str]]: + """Detect cycles in the graph""" + WHITE, GRAY, BLACK = 0, 1, 2 + color = {step.id: WHITE for step in self.task.steps} + cycles = [] + + def dfs(node: str, path: List[str]) -> bool: + color[node] = GRAY + path.append(node) + + for neighbor in self._adjacency.get(node, []): + if color[neighbor] == GRAY: + cycle_start = path.index(neighbor) + cycles.append(path[cycle_start:] + [neighbor]) + return True + elif color[neighbor] == WHITE: + if dfs(neighbor, path): + return True + + path.pop() + color[node] = BLACK + return False + + for step in self.task.steps: + if color[step.id] == WHITE: + dfs(step.id, []) + + return cycles + + def validate(self) -> tuple[bool, Optional[str]]: + """Validate the graph structure""" + cycles = self.detect_cycles() + if cycles: + return False, f"Circular dependency detected: {cycles[0]}" + + step_ids = {step.id for step in self.task.steps} + for step in self.task.steps: + for dep_id in step.depends_on: + if dep_id not in step_ids: + return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'" + + return True, None + + +class TaskService: + """Task service for managing tasks""" + + def __init__(self): + self._tasks: Dict[str, Task] = {} + + def create_task( + self, + name: str, + goal: str, + description: str = "", + steps: List[Dict[str, Any]] = None + ) -> Task: + """Create a new task""" + task_id = generate_id("task") + task = Task( + id=task_id, + name=name, + description=description, + goal=goal + ) + + if steps: + for step_data in steps: + step = Step( + id=generate_id("step"), + name=step_data.get("name", ""), + description=step_data.get("description", "") + ) + task.steps.append(step) + + self._tasks[task_id] = task + logger.info(f"Created task: {task_id}") + return task + + def get_task(self, task_id: str) -> Optional[Task]: + """Get task by ID""" + return self._tasks.get(task_id) + + def list_tasks(self) -> List[Task]: + """List all tasks""" + return list(self._tasks.values()) + + def update_task_status( + self, + task_id: str, + status: TaskStatus, + result: Any = None + ) -> Optional[Task]: + """Update task status""" + task = self._tasks.get(task_id) + if not task: + return None + + task.status = status + task.result = result + task.updated_at = datetime.now() + return task + + def add_steps( + self, + task_id: str, + steps: List[Dict[str, Any]] + ) -> Optional[List[Step]]: + """Add steps to task""" + task = self._tasks.get(task_id) + if not task: + return None + + result = [] + for step_data in steps: + step = Step( + id=generate_id("step"), + name=step_data.get("name", ""), + description=step_data.get("description", ""), + depends_on=step_data.get("depends_on", []) + ) + task.steps.append(step) + result.append(step) + + task.updated_at = datetime.now() + return result + + def delete_task(self, task_id: str) -> bool: + """Delete task""" + if task_id not in self._tasks: + return False + + del self._tasks[task_id] + return True + + def build_graph(self, task_id: str) -> Optional[TaskGraph]: + """Build task graph for a task""" + task = self._tasks.get(task_id) + if not task: + return None + return TaskGraph(task) + + +task_service = TaskService()