430 lines
12 KiB
Python
430 lines
12 KiB
Python
"""Task management tools for LLM agent"""
|
|
from typing import Dict, Any, Optional
|
|
from luxx.agent.task import task_service, TaskStatus, StepStatus
|
|
from luxx.tools.factory import tool
|
|
from luxx.tools.core import ToolContext
|
|
|
|
|
|
# Current active task ID (session level)
|
|
_current_task_id: Optional[str] = None
|
|
|
|
|
|
def get_current_task_id() -> Optional[str]:
|
|
"""Get current active task ID"""
|
|
return _current_task_id
|
|
|
|
|
|
def set_current_task_id(task_id: Optional[str]):
|
|
"""Set current active task ID"""
|
|
global _current_task_id
|
|
_current_task_id = task_id
|
|
|
|
|
|
@tool(
|
|
name="set_task",
|
|
description=(
|
|
"Set up or create a task. "
|
|
"Use this tool when user requests a goal to accomplish. "
|
|
"Parameters: "
|
|
"- name: Task name (short description) "
|
|
"- goal: Task goal (detailed description of what to accomplish) "
|
|
"- description: Optional detailed description "
|
|
"- steps: Optional list of task steps, each with name and description"
|
|
),
|
|
parameters={
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Task name (short description)"
|
|
},
|
|
"goal": {
|
|
"type": "string",
|
|
"description": "Task goal (detailed description)"
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "Optional detailed description"
|
|
},
|
|
"steps": {
|
|
"type": "array",
|
|
"description": "Optional list of task steps",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "Step name"},
|
|
"description": {"type": "string", "description": "Step description"},
|
|
"depends_on": {
|
|
"type": "array",
|
|
"description": "List of step IDs this step depends on",
|
|
"items": {"type": "string"}
|
|
}
|
|
},
|
|
"required": ["name"]
|
|
}
|
|
}
|
|
},
|
|
"required": ["name", "goal"]
|
|
},
|
|
category="task",
|
|
required_params=["name", "goal"]
|
|
)
|
|
def set_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
|
|
"""Set up or create a task"""
|
|
name = arguments.get("name", "")
|
|
goal = arguments.get("goal", "")
|
|
description = arguments.get("description", "")
|
|
steps_data = arguments.get("steps", [])
|
|
|
|
# Convert steps data
|
|
steps = None
|
|
if steps_data:
|
|
steps = []
|
|
for step_data in steps_data:
|
|
step = {
|
|
"name": step_data.get("name", ""),
|
|
"description": step_data.get("description", "")
|
|
}
|
|
if "depends_on" in step_data:
|
|
step["depends_on"] = step_data["depends_on"]
|
|
steps.append(step)
|
|
|
|
# Create task
|
|
task = task_service.create_task(
|
|
name=name,
|
|
goal=goal,
|
|
description=description,
|
|
steps=steps
|
|
)
|
|
|
|
if not task:
|
|
return {"error": "Failed to create task."}
|
|
|
|
# Add extra steps if needed
|
|
if steps_data and len(steps_data) > len(task.steps):
|
|
additional_steps = steps_data[len(task.steps):]
|
|
task_service.add_steps(task.id, additional_steps)
|
|
task = task_service.get_task(task.id)
|
|
|
|
# Update task status to READY
|
|
task_service.update_task_status(task.id, TaskStatus.READY)
|
|
|
|
# Set current active task
|
|
set_current_task_id(task.id)
|
|
|
|
# Build task info
|
|
task_info = {
|
|
"task_id": task.id,
|
|
"name": task.name,
|
|
"goal": task.goal,
|
|
"status": "ready",
|
|
"steps": [
|
|
{
|
|
"id": s.id,
|
|
"name": s.name,
|
|
"description": s.description,
|
|
"status": s.status.value
|
|
}
|
|
for s in task.steps
|
|
]
|
|
}
|
|
|
|
return {
|
|
"message": (f"Task '{name}' has been set up successfully."),
|
|
"task": task_info
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="add_task_steps",
|
|
description=(
|
|
"Add steps to an existing task. "
|
|
"Use when you need to add more steps to an already created task. "
|
|
"Parameters: "
|
|
"- steps: List of steps, each with name and description"
|
|
),
|
|
parameters={
|
|
"type": "object",
|
|
"properties": {
|
|
"steps": {
|
|
"type": "array",
|
|
"description": "List of steps to add",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "Step name"},
|
|
"description": {"type": "string", "description": "Step description"},
|
|
"depends_on": {
|
|
"type": "array",
|
|
"description": "List of step IDs this step depends on",
|
|
"items": {"type": "string"}
|
|
}
|
|
},
|
|
"required": ["name"]
|
|
}
|
|
}
|
|
},
|
|
"required": ["steps"]
|
|
},
|
|
category="task",
|
|
required_params=["steps"]
|
|
)
|
|
def add_task_steps(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
|
|
"""Add steps to a task"""
|
|
task_id = get_current_task_id()
|
|
|
|
if not task_id:
|
|
return {"error": "No active task found. Use set_task first."}
|
|
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return {"error": (f"Task not found: {task_id}")}
|
|
|
|
steps_data = arguments.get("steps", [])
|
|
|
|
# Convert steps data
|
|
steps = []
|
|
for step_data in steps_data:
|
|
step = {
|
|
"name": step_data.get("name", ""),
|
|
"description": step_data.get("description", "")
|
|
}
|
|
if "depends_on" in step_data:
|
|
step["depends_on"] = step_data["depends_on"]
|
|
steps.append(step)
|
|
|
|
# Add steps
|
|
added_steps = task_service.add_steps(task_id, steps)
|
|
|
|
if not added_steps:
|
|
return {"error": "Failed to add steps."}
|
|
|
|
return {
|
|
"message": (f"Added {len(added_steps)} step(s) to task '{task.name}'."),
|
|
"steps": [
|
|
{
|
|
"id": s.id,
|
|
"name": s.name,
|
|
"description": s.description,
|
|
"status": s.status.value
|
|
}
|
|
for s in added_steps
|
|
]
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="update_step_status",
|
|
description=(
|
|
"Update task step status. "
|
|
"Use when completing a step or when a step fails. "
|
|
"Parameters: "
|
|
"- step_id: Step ID "
|
|
"- status: New status (pending/running/completed/failed/skipped)"
|
|
),
|
|
parameters={
|
|
"type": "object",
|
|
"properties": {
|
|
"step_id": {
|
|
"type": "string",
|
|
"description": "Step ID"
|
|
},
|
|
"status": {
|
|
"type": "string",
|
|
"description": "New status: pending, running, completed, failed, skipped"
|
|
},
|
|
"result": {
|
|
"type": "object",
|
|
"description": "Optional step execution result"
|
|
}
|
|
},
|
|
"required": ["step_id", "status"]
|
|
},
|
|
category="task",
|
|
required_params=["step_id", "status"]
|
|
)
|
|
def update_step_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
|
|
"""Update step status"""
|
|
task_id = get_current_task_id()
|
|
|
|
if not task_id:
|
|
return {"error": "No active task."}
|
|
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return {"error": "Task not found."}
|
|
|
|
step_id = arguments.get("step_id")
|
|
status_str = arguments.get("status", "").lower()
|
|
result = arguments.get("result")
|
|
|
|
# Find step
|
|
step = None
|
|
for s in task.steps:
|
|
if s.id == step_id:
|
|
step = s
|
|
break
|
|
|
|
if not step:
|
|
return {"error": (f"Step '{step_id}' not found.")}
|
|
|
|
# Update status
|
|
from datetime import datetime
|
|
try:
|
|
step.status = StepStatus(status_str)
|
|
step.updated_at = datetime.now()
|
|
if result:
|
|
step.result = result
|
|
except ValueError:
|
|
valid_statuses = [s.value for s in StepStatus]
|
|
return {"error": (f"Invalid status '{status_str}'. Valid values: {valid_statuses}")}
|
|
|
|
task.updated_at = datetime.now()
|
|
|
|
return {
|
|
"message": (f"Step '{step.name}' status updated to '{status_str}'."),
|
|
"step": {
|
|
"id": step.id,
|
|
"name": step.name,
|
|
"status": step.status.value,
|
|
"result": step.result
|
|
}
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="complete_task",
|
|
description=(
|
|
"Complete a task. "
|
|
"Use this tool to mark a task as completed when all steps are done. "
|
|
"Parameters: "
|
|
"- result: Optional final result summary "
|
|
"- success: Whether task completed successfully (default true)"
|
|
),
|
|
parameters={
|
|
"type": "object",
|
|
"properties": {
|
|
"result": {
|
|
"type": "string",
|
|
"description": "Optional final result summary"
|
|
},
|
|
"success": {
|
|
"type": "boolean",
|
|
"description": "Whether task completed successfully (default true)"
|
|
}
|
|
},
|
|
"required": []
|
|
},
|
|
category="task",
|
|
required_params=[]
|
|
)
|
|
def complete_task(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
|
|
"""Complete a task"""
|
|
task_id = get_current_task_id()
|
|
|
|
if not task_id:
|
|
return {"error": "No active task."}
|
|
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return {"error": "Task not found."}
|
|
|
|
success = arguments.get("success", True)
|
|
result_summary = arguments.get("result", "")
|
|
|
|
# Check if all steps are completed
|
|
incomplete_steps = [s for s in task.steps if s.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]]
|
|
|
|
if incomplete_steps and success:
|
|
return {
|
|
"warning": (f"Task has {len(incomplete_steps)} incomplete step(s)."),
|
|
"incomplete_steps": [{"id": s.id, "name": s.name, "status": s.status.value} for s in incomplete_steps]
|
|
}
|
|
|
|
# Update task status
|
|
from datetime import datetime
|
|
final_status = TaskStatus.TERMINATED if success else TaskStatus.BLOCK
|
|
task_service.update_task_status(
|
|
task_id,
|
|
final_status,
|
|
result={"summary": result_summary, "success": success}
|
|
)
|
|
|
|
# Build completion summary
|
|
completed_steps = [s for s in task.steps if s.status == StepStatus.COMPLETED]
|
|
|
|
summary = {
|
|
"task_id": task.id,
|
|
"name": task.name,
|
|
"status": "completed" if success else "failed",
|
|
"completed_steps": len(completed_steps),
|
|
"total_steps": len(task.steps),
|
|
"result": result_summary
|
|
}
|
|
|
|
# Clear current active task
|
|
set_current_task_id(None)
|
|
|
|
return {
|
|
"message": (f"Task '{task.name}' has been {'completed successfully' if success else 'marked as failed'}."),
|
|
"summary": summary
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="get_task_status",
|
|
description=(
|
|
"Get current task status. "
|
|
"View details of the current task including all step statuses."
|
|
),
|
|
parameters={
|
|
"type": "object",
|
|
"properties": {},
|
|
"required": []
|
|
},
|
|
category="task",
|
|
required_params=[]
|
|
)
|
|
def get_task_status(arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
|
|
"""Get task status"""
|
|
task_id = get_current_task_id()
|
|
|
|
if not task_id:
|
|
return {"active": False, "message": "No active task."}
|
|
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return {"active": False, "message": "Task not found."}
|
|
|
|
return {
|
|
"active": True,
|
|
"task_id": task.id,
|
|
"name": task.name,
|
|
"goal": task.goal,
|
|
"status": task.status.value,
|
|
"steps": [
|
|
{
|
|
"id": s.id,
|
|
"name": s.name,
|
|
"description": s.description,
|
|
"status": s.status.value,
|
|
"result": s.result
|
|
}
|
|
for s in task.steps
|
|
],
|
|
"pending_steps": len([s for s in task.steps if s.status == StepStatus.PENDING]),
|
|
"completed_steps": len([s for s in task.steps if s.status == StepStatus.COMPLETED])
|
|
}
|
|
|
|
|
|
# Export all task tools
|
|
__all__ = [
|
|
"set_task",
|
|
"add_task_steps",
|
|
"update_step_status",
|
|
"complete_task",
|
|
"get_task_status",
|
|
"get_current_task_id",
|
|
"set_current_task_id"
|
|
]
|