312 lines
8.9 KiB
Python
312 lines
8.9 KiB
Python
"""Agent routes module"""
|
|
from typing import List, Optional, Dict, Any
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from luxx.agent.agent import agent_service, task_service
|
|
from luxx.agent.task import Agent, Task, TaskStatus, StepStatus
|
|
from luxx.utils.helpers import success_response, error_response
|
|
|
|
|
|
router = APIRouter(prefix="/agent", tags=["Agent"])
|
|
|
|
|
|
# ==================== Request Models ====================
|
|
|
|
class CreateAgentRequest(BaseModel):
|
|
"""Create agent request"""
|
|
name: str
|
|
description: str = ""
|
|
instructions: str = ""
|
|
tools: List[str] = []
|
|
|
|
|
|
class UpdateAgentRequest(BaseModel):
|
|
"""Update agent request"""
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
instructions: Optional[str] = None
|
|
tools: Optional[List[str]] = None
|
|
|
|
|
|
class TaskStep(BaseModel):
|
|
"""Task step"""
|
|
name: str
|
|
description: str = ""
|
|
|
|
|
|
class CreateTaskRequest(BaseModel):
|
|
"""Create task request"""
|
|
name: str
|
|
goal: str
|
|
description: str = ""
|
|
steps: List[TaskStep] = []
|
|
|
|
|
|
class CreateSubtaskRequest(BaseModel):
|
|
"""Create subtask request"""
|
|
name: str
|
|
goal: str
|
|
description: str = ""
|
|
|
|
|
|
class UpdateTaskStatusRequest(BaseModel):
|
|
"""Update task status request"""
|
|
status: str
|
|
|
|
|
|
# ==================== Agent Endpoints ====================
|
|
|
|
@router.post("/agents", response_model=dict)
|
|
def create_agent(request: CreateAgentRequest):
|
|
"""Create a new agent instance"""
|
|
agent = agent_service.create_agent(
|
|
name=request.name,
|
|
description=request.description,
|
|
instructions=request.instructions,
|
|
tools=request.tools
|
|
)
|
|
return success_response(
|
|
data=agent.to_dict(),
|
|
message="Agent created successfully"
|
|
)
|
|
|
|
|
|
@router.get("/agents", response_model=dict)
|
|
def list_agents():
|
|
"""List all agents"""
|
|
agents = agent_service.list_agents()
|
|
return success_response(
|
|
data={
|
|
"agents": [a.to_dict() for a in agents],
|
|
"total": len(agents)
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/agents/{agent_id}", response_model=dict)
|
|
def get_agent(agent_id: str):
|
|
"""Get agent details"""
|
|
agent = agent_service.get_agent(agent_id)
|
|
if not agent:
|
|
return error_response("Agent not found", 404)
|
|
return success_response(data=agent.to_dict())
|
|
|
|
|
|
@router.put("/agents/{agent_id}", response_model=dict)
|
|
def update_agent(agent_id: str, request: UpdateAgentRequest):
|
|
"""Update agent configuration"""
|
|
agent = agent_service.update_agent(
|
|
agent_id=agent_id,
|
|
name=request.name,
|
|
description=request.description,
|
|
instructions=request.instructions,
|
|
tools=request.tools
|
|
)
|
|
if not agent:
|
|
return error_response("Agent not found", 404)
|
|
return success_response(
|
|
data=agent.to_dict(),
|
|
message="Agent updated successfully"
|
|
)
|
|
|
|
|
|
@router.delete("/agents/{agent_id}", response_model=dict)
|
|
def delete_agent(agent_id: str):
|
|
"""Delete agent instance"""
|
|
if agent_service.delete_agent(agent_id):
|
|
return success_response(message="Agent deleted successfully")
|
|
return error_response("Agent not found", 404)
|
|
|
|
|
|
# ==================== Task Endpoints ====================
|
|
|
|
@router.post("/agents/{agent_id}/tasks", response_model=dict)
|
|
def create_task(agent_id: str, request: CreateTaskRequest):
|
|
"""Create task for agent"""
|
|
task = task_service.create_task(
|
|
agent_id=agent_id,
|
|
name=request.name,
|
|
goal=request.goal,
|
|
description=request.description,
|
|
steps=[s.dict() for s in request.steps] if request.steps else None
|
|
)
|
|
if not task:
|
|
return error_response("Agent not found", 404)
|
|
return success_response(
|
|
data=task.to_dict(),
|
|
message="Task created successfully"
|
|
)
|
|
|
|
|
|
@router.get("/agents/{agent_id}/tasks", response_model=dict)
|
|
def get_current_task(agent_id: str):
|
|
"""Get agent's current task"""
|
|
tasks = task_service.list_tasks(agent_id=agent_id)
|
|
if not tasks:
|
|
return success_response(
|
|
data={"task": None},
|
|
message="No task found"
|
|
)
|
|
return success_response(data={"task": tasks[0].to_dict()})
|
|
|
|
|
|
@router.get("/tasks", response_model=dict)
|
|
def list_tasks(agent_id: str = None):
|
|
"""List all tasks, optionally filtered by agent"""
|
|
tasks = task_service.list_tasks(agent_id=agent_id)
|
|
return success_response(
|
|
data={
|
|
"tasks": [t.to_dict() for t in tasks],
|
|
"total": len(tasks)
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/tasks/{task_id}", response_model=dict)
|
|
def get_task(task_id: str):
|
|
"""Get task details"""
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return error_response("Task not found", 404)
|
|
return success_response(data=task.to_dict())
|
|
|
|
|
|
@router.put("/tasks/{task_id}/status", response_model=dict)
|
|
def update_task_status(task_id: str, request: UpdateTaskStatusRequest):
|
|
"""Update task status"""
|
|
try:
|
|
task_status = TaskStatus(request.status)
|
|
except ValueError:
|
|
valid_statuses = [s.value for s in TaskStatus]
|
|
return error_response(f"Invalid status: {request.status}. Valid: {valid_statuses}", 400)
|
|
|
|
task = task_service.update_task_status(task_id, task_status)
|
|
if not task:
|
|
return error_response("Task not found", 404)
|
|
|
|
return success_response(
|
|
data=task.to_dict(),
|
|
message="Task status updated"
|
|
)
|
|
|
|
|
|
@router.delete("/tasks/{task_id}", response_model=dict)
|
|
def delete_task(task_id: str):
|
|
"""Delete task"""
|
|
if task_service.delete_task(task_id):
|
|
return success_response(message="Task deleted successfully")
|
|
return error_response("Task not found", 404)
|
|
|
|
|
|
# ==================== Step Endpoints ====================
|
|
|
|
@router.post("/tasks/{task_id}/steps", response_model=dict)
|
|
def add_step(task_id: str, name: str, description: str = ""):
|
|
"""Add step to task"""
|
|
step = task_service.add_step(task_id, name, description)
|
|
if not step:
|
|
return error_response("Task not found", 404)
|
|
|
|
return success_response(
|
|
data=step.to_dict(),
|
|
message="Step added successfully"
|
|
)
|
|
|
|
|
|
@router.put("/tasks/{task_id}/steps/{step_id}/status", response_model=dict)
|
|
def update_step_status(task_id: str, step_id: str, status: str):
|
|
"""Update step status"""
|
|
try:
|
|
step_status = StepStatus(status)
|
|
except ValueError:
|
|
valid_statuses = [s.value for s in StepStatus]
|
|
return error_response(f"Invalid status: {status}. Valid: {valid_statuses}", 400)
|
|
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return error_response("Task not found", 404)
|
|
|
|
for step in task.steps:
|
|
if step.id == step_id:
|
|
step.status = step_status
|
|
step.updated_at = __import__("datetime").datetime.now()
|
|
return success_response(
|
|
data=step.to_dict(),
|
|
message="Step status updated"
|
|
)
|
|
|
|
return error_response("Step not found", 404)
|
|
|
|
|
|
# ==================== Subtask Endpoints ====================
|
|
|
|
@router.post("/tasks/{task_id}/subtasks", response_model=dict)
|
|
def create_subtask(task_id: str, request: CreateSubtaskRequest):
|
|
"""Add subtask to parent task"""
|
|
subtask = task_service.add_subtask(
|
|
parent_task_id=task_id,
|
|
name=request.name,
|
|
goal=request.goal,
|
|
description=request.description
|
|
)
|
|
if not subtask:
|
|
return error_response("Parent task not found", 404)
|
|
return success_response(
|
|
data=subtask.to_dict(),
|
|
message="Subtask created successfully"
|
|
)
|
|
|
|
|
|
@router.get("/tasks/{task_id}/subtasks", response_model=dict)
|
|
def list_subtasks(task_id: str):
|
|
"""List subtasks of a task"""
|
|
task = task_service.get_task(task_id)
|
|
if not task:
|
|
return error_response("Task not found", 404)
|
|
|
|
return success_response(
|
|
data={
|
|
"subtasks": [t.to_dict() for t in task.subtasks],
|
|
"total": len(task.subtasks)
|
|
}
|
|
)
|
|
|
|
|
|
# ==================== Execution Endpoints ====================
|
|
|
|
@router.post("/agents/{agent_id}/execute", response_model=dict)
|
|
def execute_agent(agent_id: str, goal: str):
|
|
"""Trigger agent to execute a goal"""
|
|
agent = agent_service.get_agent(agent_id)
|
|
if not agent:
|
|
return error_response("Agent not found", 404)
|
|
|
|
if agent.status == "executing":
|
|
return error_response("Agent is already executing", 400)
|
|
|
|
# Update agent status
|
|
agent.status = "executing"
|
|
agent.updated_at = __import__("datetime").datetime.now()
|
|
|
|
# Create task
|
|
task = task_service.create_task(
|
|
agent_id=agent_id,
|
|
name=f"Task: {goal[:50]}...",
|
|
goal=goal,
|
|
description=f"Auto-generated task for goal: {goal}"
|
|
)
|
|
|
|
if not task:
|
|
return error_response("Failed to create task", 500)
|
|
|
|
return success_response(
|
|
data={
|
|
"agent_id": agent_id,
|
|
"task_id": task.id,
|
|
"status": "executing",
|
|
"message": "Agent execution started"
|
|
},
|
|
message="Execution started"
|
|
)
|