From 4de03866f49b6d09a91c49dd677a4e7c82865105 Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Thu, 16 Apr 2026 21:21:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0tas?= =?UTF-8?q?k?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dashboard/src/components/ProcessBlock.vue | 82 +++++ dashboard/src/views/ConversationView.vue | 31 +- luxx/agent/agent.py | 8 +- luxx/agent/task.py | 132 +++---- luxx/routes/agent.py | 9 +- luxx/services/chat.py | 1 - luxx/tools/builtin/__init__.py | 3 +- luxx/tools/builtin/task.py | 429 ++++++++++++++++++++++ 8 files changed, 603 insertions(+), 92 deletions(-) create mode 100644 luxx/tools/builtin/task.py diff --git a/dashboard/src/components/ProcessBlock.vue b/dashboard/src/components/ProcessBlock.vue index c516f5a..613a8a2 100644 --- a/dashboard/src/components/ProcessBlock.vue +++ b/dashboard/src/components/ProcessBlock.vue @@ -55,6 +55,49 @@ + +
+
+ + 开始任务 + {{ item.taskName || '新任务' }} + +
+
+
+
目标: {{ item.goal }}
+
+ 步骤: +
    +
  1. {{ step.name }}
  2. +
+
+
+
+
+ + +
+
+ + 任务完成 + {{ item.taskName || '任务' }} + + 成功 + 失败 + + +
+
+
+ 结果: {{ item.result }} +
+
+ 完成步骤: {{ item.summary.completed_steps || 0 }} / {{ item.summary.total_steps || 0 }} +
+
+
+
@@ -167,6 +210,25 @@ const allItems = computed(() => { index: step.index, content: step.content || '', }) + } else if (step.type === 'task_start') { + items.push({ + key: step.id || `task-start-${step.index}`, + type: 'task_start', + index: step.index, + taskName: step.taskName || step.name || '', + goal: step.goal || '', + steps: step.steps || [], + }) + } else if (step.type === 'task_complete') { + items.push({ + key: step.id || `task-complete-${step.index}`, + type: 'task_complete', + index: step.index, + taskName: step.taskName || step.name || '', + success: step.success, + result: step.result || '', + summary: step.summary || {}, + }) } } } else if (props.toolCalls && props.toolCalls.length > 0) { @@ -246,6 +308,10 @@ const chevronDown = `` const alertIcon = `` + +const targetIcon = `` + +const checkCircleIcon = `` diff --git a/dashboard/src/views/ConversationView.vue b/dashboard/src/views/ConversationView.vue index 4e5f41f..98bf30a 100644 --- a/dashboard/src/views/ConversationView.vue +++ b/dashboard/src/views/ConversationView.vue @@ -68,6 +68,9 @@ 加载中...
+ + + Luxx
@@ -207,6 +210,28 @@ const observedElements = new Set() const editConv = ref(null) +// 当前任务状态 +const currentTask = ref(null) + +// 从流式消息中提取任务信息 +watch(() => currentStreamState?.process_steps, (steps) => { + if (!steps || !steps.length) { + currentTask.value = null + return + } + + // 查找最新的 task_start 事件 + const taskStart = [...steps].reverse().find(s => s.type === 'task_start') + if (taskStart) { + currentTask.value = { + name: taskStart.taskName || taskStart.name, + goal: taskStart.goal, + status: 'running', + steps: taskStart.steps || [] + } + } +}, { deep: true }) + // 处理发送消息 const handleSend = async () => { if (!newMessage.value.trim()) return @@ -293,7 +318,7 @@ watch(convMessages, () => { scrollToBottom() }, { deep: true }) -watch(() => currentStreamState.value?.process_steps?.length, () => { +watch(() => currentStreamState?.process_steps?.length, () => { scrollToBottom() }) diff --git a/luxx/agent/agent.py b/luxx/agent/agent.py index 621e0c8..b745038 100644 --- a/luxx/agent/agent.py +++ b/luxx/agent/agent.py @@ -3,10 +3,12 @@ from enum import Enum import logging from dataclasses import dataclass, field from datetime import datetime -from typing import Dict, List, Optional, Any -from luxx.agent.task import Task +from typing import Dict, List, Optional, Any, TYPE_CHECKING from luxx.utils.helpers import generate_id +if TYPE_CHECKING: + from luxx.agent.task import Task + logger = logging.getLogger(__name__) @@ -26,7 +28,7 @@ class Agent: description: str = "" instructions: str = "" tools: List[str] = field(default_factory=list) - current_task: Optional[Task] = None + current_task: Optional["Task"] = None status: AgentStatus = AgentStatus.READY result: Optional[Dict[str, Any]] = None error: Optional[str] = None diff --git a/luxx/agent/task.py b/luxx/agent/task.py index 2ad4979..3043ffe 100644 --- a/luxx/agent/task.py +++ b/luxx/agent/task.py @@ -1,17 +1,20 @@ +"""Task module for autonomous task execution""" from dataclasses import dataclass, field from datetime import datetime from enum import Enum +import logging from typing import List, Optional, Dict, Any -from luxx.agent.agent import agent_service from luxx.utils.helpers import generate_id +logger = logging.getLogger(__name__) + class TaskStatus(Enum): """Task status enum""" - PENDING = "pending" - READY = "ready" - RUNNING = "running" - BLOCK = "block" + PENDING = "pending" + READY = "ready" + RUNNING = "running" + BLOCK = "block" TERMINATED = "terminated" @@ -63,7 +66,7 @@ class Task: result: Optional[Dict[str, Any]] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) - + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { @@ -82,139 +85,121 @@ class Task: class TaskGraph: """Task graph for managing step dependencies""" - + def __init__(self, task: Task): self.task = task - self._adjacency: Dict[str, List[str]] = {} # step_id -> [dependent_step_ids] - self._reverse_adjacency: Dict[str, List[str]] = {} # step_id -> [dependency_step_ids] - self._in_degree: Dict[str, int] = {} # step_id -> in-degree count + self._adjacency: Dict[str, List[str]] = {} + self._reverse_adjacency: Dict[str, List[str]] = {} + self._in_degree: Dict[str, int] = {} self._build_graph() - + def _build_graph(self) -> None: """Build graph from task steps""" - # Initialize adjacency lists for step in self.task.steps: self._adjacency[step.id] = [] self._reverse_adjacency[step.id] = [] self._in_degree[step.id] = 0 - - # Build edges based on dependencies + for step in self.task.steps: for dep_id in step.depends_on: if dep_id in self._adjacency: self._adjacency[dep_id].append(step.id) self._reverse_adjacency[step.id].append(dep_id) self._in_degree[step.id] += 1 - + def topological_sort(self) -> List[Step]: - """Get steps in topological order (Kahn's algorithm)""" - # Create a copy of in-degrees + """Get steps in topological order""" in_degree = self._in_degree.copy() - - # Queue for steps with no dependencies queue = [step_id for step_id, degree in in_degree.items() if degree == 0] result = [] - - # Get step map for easy lookup step_map = {step.id: step for step in self.task.steps} - + while queue: - # Sort for deterministic order queue.sort() current = queue.pop(0) result.append(step_map[current]) - - # Reduce in-degree for dependent steps + for dependent_id in self._adjacency[current]: in_degree[dependent_id] -= 1 if in_degree[dependent_id] == 0: queue.append(dependent_id) - + return result - + def get_ready_steps(self, completed_step_ids: List[str]) -> List[Step]: - """Get steps that are ready to execute (all dependencies completed)""" + """Get steps that are ready to execute""" step_map = {step.id: step for step in self.task.steps} ready = [] - + for step in self.task.steps: if step.id in completed_step_ids: continue if step.status != StepStatus.PENDING: continue - - # Check if all dependencies are completed deps_completed = all(dep_id in completed_step_ids for dep_id in step.depends_on) if deps_completed: ready.append(step) - + return ready - + def detect_cycles(self) -> List[List[str]]: - """Detect cycles in the graph using DFS""" + """Detect cycles in the graph""" WHITE, GRAY, BLACK = 0, 1, 2 color = {step.id: WHITE for step in self.task.steps} cycles = [] - + def dfs(node: str, path: List[str]) -> bool: color[node] = GRAY path.append(node) - + for neighbor in self._adjacency.get(node, []): if color[neighbor] == GRAY: - # Found cycle cycle_start = path.index(neighbor) cycles.append(path[cycle_start:] + [neighbor]) return True elif color[neighbor] == WHITE: if dfs(neighbor, path): return True - + path.pop() color[node] = BLACK return False - + for step in self.task.steps: if color[step.id] == WHITE: dfs(step.id, []) - + return cycles - + def validate(self) -> tuple[bool, Optional[str]]: """Validate the graph structure""" - # Check for cycles cycles = self.detect_cycles() if cycles: return False, f"Circular dependency detected: {cycles[0]}" - - # Check for missing dependencies + step_ids = {step.id for step in self.task.steps} for step in self.task.steps: for dep_id in step.depends_on: if dep_id not in step_ids: return False, f"Step '{step.name}' depends on non-existent step '{dep_id}'" - + return True, None + class TaskService: """Task service for managing tasks""" - + def __init__(self): self._tasks: Dict[str, Task] = {} - + def create_task( self, - agent_id: str, name: str, goal: str, description: str = "", steps: List[Dict[str, Any]] = None - ) -> Optional[Task]: - """Create task for agent, optionally as subtask""" - agent = agent_service.get_agent(agent_id) - if not agent: - return None - + ) -> Task: + """Create a new task""" task_id = generate_id("task") task = Task( id=task_id, @@ -222,8 +207,7 @@ class TaskService: description=description, goal=goal ) - - # Add steps + if steps: for step_data in steps: step = Step( @@ -232,27 +216,19 @@ class TaskService: description=step_data.get("description", "") ) task.steps.append(step) - - agent.current_task = task - + self._tasks[task_id] = task - - self._logger.info(f"Created task: {task_id} for agent: {agent_id}") + logger.info(f"Created task: {task_id}") return task - + def get_task(self, task_id: str) -> Optional[Task]: """Get task by ID""" return self._tasks.get(task_id) - - def list_tasks(self, agent_id: str = None) -> List[Task]: - """List tasks, optionally filtered by agent""" - if agent_id: - agent = self._agent_service.get_agent(agent_id) - if agent and agent.current_task: - return [agent.current_task] - return [] + + def list_tasks(self) -> List[Task]: + """List all tasks""" return list(self._tasks.values()) - + def update_task_status( self, task_id: str, @@ -263,12 +239,12 @@ class TaskService: task = self._tasks.get(task_id) if not task: return None - + task.status = status task.result = result task.updated_at = datetime.now() return task - + def add_steps( self, task_id: str, @@ -278,7 +254,7 @@ class TaskService: task = self._tasks.get(task_id) if not task: return None - + result = [] for step_data in steps: step = Step( @@ -289,18 +265,18 @@ class TaskService: ) task.steps.append(step) result.append(step) - + task.updated_at = datetime.now() return result - + def delete_task(self, task_id: str) -> bool: """Delete task""" if task_id not in self._tasks: return False - + del self._tasks[task_id] return True - + def build_graph(self, task_id: str) -> Optional[TaskGraph]: """Build task graph for a task""" task = self._tasks.get(task_id) diff --git a/luxx/routes/agent.py b/luxx/routes/agent.py index 6ccbbfc..8ae9134 100644 --- a/luxx/routes/agent.py +++ b/luxx/routes/agent.py @@ -121,18 +121,15 @@ def delete_agent(agent_id: str): # ==================== Task Endpoints ==================== -@router.post("/agents/{agent_id}/tasks", response_model=dict) -def create_task(agent_id: str, request: CreateTaskRequest): - """Create task for agent""" +@router.post("/tasks", response_model=dict) +def create_task(request: CreateTaskRequest): + """Create a new task""" task = task_service.create_task( - agent_id=agent_id, name=request.name, goal=request.goal, description=request.description, steps=[s.dict() for s in request.steps] if request.steps else None ) - if not task: - return error_response("Agent not found", 404) return success_response( data=task.to_dict(), message="Task created successfully" diff --git a/luxx/services/chat.py b/luxx/services/chat.py index c264443..b5c9819 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -336,7 +336,6 @@ class ChatService: result_step_id = f"step-{step_index}" step_index += 1 - # 解析 content 中的 success 状态 content = tr.get("content", "") success = True try: diff --git a/luxx/tools/builtin/__init__.py b/luxx/tools/builtin/__init__.py index 3271d54..2c856ee 100644 --- a/luxx/tools/builtin/__init__.py +++ b/luxx/tools/builtin/__init__.py @@ -5,5 +5,6 @@ from luxx.tools.builtin import code from luxx.tools.builtin import data from luxx.tools.builtin import file from luxx.tools.builtin import shell +from luxx.tools.builtin import task -__all__ = ["crawler", "code", "data", "file", "shell"] +__all__ = ["crawler", "code", "data", "file", "shell", "task"] diff --git a/luxx/tools/builtin/task.py b/luxx/tools/builtin/task.py new file mode 100644 index 0000000..2a26dbb --- /dev/null +++ b/luxx/tools/builtin/task.py @@ -0,0 +1,429 @@ +"""Task management tools for LLM agent""" +from typing import Dict, Any, Optional +from luxx.agent.task import task_service, TaskStatus, StepStatus +from luxx.tools.factory import tool +from luxx.tools.core import ToolContext + + +# Current active task ID (session level) +_current_task_id: Optional[str] = None + + +def get_current_task_id() -> Optional[str]: + """Get current active task ID""" + return _current_task_id + + +def set_current_task_id(task_id: Optional[str]): + """Set current active task ID""" + global _current_task_id + _current_task_id = task_id + + +@tool( + name="set_task", + description=( + "Set up or create a task. " + "Use this tool when user requests a goal to accomplish. " + "Parameters: " + "- name: Task name (short description) " + "- goal: Task goal (detailed description of what to accomplish) " + "- description: Optional detailed description " + "- steps: Optional list of task steps, each with name and description" + ), + parameters={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Task name (short description)" + }, + "goal": { + "type": "string", + "description": "Task goal (detailed description)" + }, + "description": { + "type": "string", + "description": "Optional detailed description" + }, + "steps": { + "type": "array", + "description": "Optional list of task steps", + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Step name"}, + "description": {"type": "string", "description": "Step description"}, + "depends_on": { + "type": "array", + "description": "List of step IDs this step depends on", + "items": {"type": "string"} + } + }, + "required": ["name"] + } + } + }, + "required": ["name", "goal"] + }, + category="task", + required_params=["name", "goal"] +) +def set_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]: + """Set up or create a task""" + name = arguments.get("name", "") + goal = arguments.get("goal", "") + description = arguments.get("description", "") + steps_data = arguments.get("steps", []) + + # Convert steps data + steps = None + if steps_data: + steps = [] + for step_data in steps_data: + step = { + "name": step_data.get("name", ""), + "description": step_data.get("description", "") + } + if "depends_on" in step_data: + step["depends_on"] = step_data["depends_on"] + steps.append(step) + + # Create task + task = task_service.create_task( + name=name, + goal=goal, + description=description, + steps=steps + ) + + if not task: + return {"error": "Failed to create task."} + + # Add extra steps if needed + if steps_data and len(steps_data) > len(task.steps): + additional_steps = steps_data[len(task.steps):] + task_service.add_steps(task.id, additional_steps) + task = task_service.get_task(task.id) + + # Update task status to READY + task_service.update_task_status(task.id, TaskStatus.READY) + + # Set current active task + set_current_task_id(task.id) + + # Build task info + task_info = { + "task_id": task.id, + "name": task.name, + "goal": task.goal, + "status": "ready", + "steps": [ + { + "id": s.id, + "name": s.name, + "description": s.description, + "status": s.status.value + } + for s in task.steps + ] + } + + return { + "message": (f"Task '{name}' has been set up successfully."), + "task": task_info + } + + +@tool( + name="add_task_steps", + description=( + "Add steps to an existing task. " + "Use when you need to add more steps to an already created task. " + "Parameters: " + "- steps: List of steps, each with name and description" + ), + parameters={ + "type": "object", + "properties": { + "steps": { + "type": "array", + "description": "List of steps to add", + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Step name"}, + "description": {"type": "string", "description": "Step description"}, + "depends_on": { + "type": "array", + "description": "List of step IDs this step depends on", + "items": {"type": "string"} + } + }, + "required": ["name"] + } + } + }, + "required": ["steps"] + }, + category="task", + required_params=["steps"] +) +def add_task_steps(arguments: dict, context: ToolContext = None) -> Dict[str, Any]: + """Add steps to a task""" + task_id = get_current_task_id() + + if not task_id: + return {"error": "No active task found. Use set_task first."} + + task = task_service.get_task(task_id) + if not task: + return {"error": (f"Task not found: {task_id}")} + + steps_data = arguments.get("steps", []) + + # Convert steps data + steps = [] + for step_data in steps_data: + step = { + "name": step_data.get("name", ""), + "description": step_data.get("description", "") + } + if "depends_on" in step_data: + step["depends_on"] = step_data["depends_on"] + steps.append(step) + + # Add steps + added_steps = task_service.add_steps(task_id, steps) + + if not added_steps: + return {"error": "Failed to add steps."} + + return { + "message": (f"Added {len(added_steps)} step(s) to task '{task.name}'."), + "steps": [ + { + "id": s.id, + "name": s.name, + "description": s.description, + "status": s.status.value + } + for s in added_steps + ] + } + + +@tool( + name="update_step_status", + description=( + "Update task step status. " + "Use when completing a step or when a step fails. " + "Parameters: " + "- step_id: Step ID " + "- status: New status (pending/running/completed/failed/skipped)" + ), + parameters={ + "type": "object", + "properties": { + "step_id": { + "type": "string", + "description": "Step ID" + }, + "status": { + "type": "string", + "description": "New status: pending, running, completed, failed, skipped" + }, + "result": { + "type": "object", + "description": "Optional step execution result" + } + }, + "required": ["step_id", "status"] + }, + category="task", + required_params=["step_id", "status"] +) +def update_step_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]: + """Update step status""" + task_id = get_current_task_id() + + if not task_id: + return {"error": "No active task."} + + task = task_service.get_task(task_id) + if not task: + return {"error": "Task not found."} + + step_id = arguments.get("step_id") + status_str = arguments.get("status", "").lower() + result = arguments.get("result") + + # Find step + step = None + for s in task.steps: + if s.id == step_id: + step = s + break + + if not step: + return {"error": (f"Step '{step_id}' not found.")} + + # Update status + from datetime import datetime + try: + step.status = StepStatus(status_str) + step.updated_at = datetime.now() + if result: + step.result = result + except ValueError: + valid_statuses = [s.value for s in StepStatus] + return {"error": (f"Invalid status '{status_str}'. Valid values: {valid_statuses}")} + + task.updated_at = datetime.now() + + return { + "message": (f"Step '{step.name}' status updated to '{status_str}'."), + "step": { + "id": step.id, + "name": step.name, + "status": step.status.value, + "result": step.result + } + } + + +@tool( + name="complete_task", + description=( + "Complete a task. " + "Use this tool to mark a task as completed when all steps are done. " + "Parameters: " + "- result: Optional final result summary " + "- success: Whether task completed successfully (default true)" + ), + parameters={ + "type": "object", + "properties": { + "result": { + "type": "string", + "description": "Optional final result summary" + }, + "success": { + "type": "boolean", + "description": "Whether task completed successfully (default true)" + } + }, + "required": [] + }, + category="task", + required_params=[] +) +def complete_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]: + """Complete a task""" + task_id = get_current_task_id() + + if not task_id: + return {"error": "No active task."} + + task = task_service.get_task(task_id) + if not task: + return {"error": "Task not found."} + + success = arguments.get("success", True) + result_summary = arguments.get("result", "") + + # Check if all steps are completed + incomplete_steps = [s for s in task.steps if s.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]] + + if incomplete_steps and success: + return { + "warning": (f"Task has {len(incomplete_steps)} incomplete step(s)."), + "incomplete_steps": [{"id": s.id, "name": s.name, "status": s.status.value} for s in incomplete_steps] + } + + # Update task status + from datetime import datetime + final_status = TaskStatus.TERMINATED if success else TaskStatus.BLOCK + task_service.update_task_status( + task_id, + final_status, + result={"summary": result_summary, "success": success} + ) + + # Build completion summary + completed_steps = [s for s in task.steps if s.status == StepStatus.COMPLETED] + + summary = { + "task_id": task.id, + "name": task.name, + "status": "completed" if success else "failed", + "completed_steps": len(completed_steps), + "total_steps": len(task.steps), + "result": result_summary + } + + # Clear current active task + set_current_task_id(None) + + return { + "message": (f"Task '{task.name}' has been {'completed successfully' if success else 'marked as failed'}."), + "summary": summary + } + + +@tool( + name="get_task_status", + description=( + "Get current task status. " + "View details of the current task including all step statuses." + ), + parameters={ + "type": "object", + "properties": {}, + "required": [] + }, + category="task", + required_params=[] +) +def get_task_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]: + """Get task status""" + task_id = get_current_task_id() + + if not task_id: + return {"active": False, "message": "No active task."} + + task = task_service.get_task(task_id) + if not task: + return {"active": False, "message": "Task not found."} + + return { + "active": True, + "task_id": task.id, + "name": task.name, + "goal": task.goal, + "status": task.status.value, + "steps": [ + { + "id": s.id, + "name": s.name, + "description": s.description, + "status": s.status.value, + "result": s.result + } + for s in task.steps + ], + "pending_steps": len([s for s in task.steps if s.status == StepStatus.PENDING]), + "completed_steps": len([s for s in task.steps if s.status == StepStatus.COMPLETED]) + } + + +# Export all task tools +__all__ = [ + "set_task", + "add_task_steps", + "update_step_status", + "complete_task", + "get_task_status", + "get_current_task_id", + "set_current_task_id" +]