"""Agent module for autonomous task execution""" import logging from datetime import datetime from typing import Dict, List, Optional, Any from luxx.agent.task import Agent, Task, TaskStatus, Step, StepStatus from luxx.utils.helpers import generate_id logger = logging.getLogger(__name__) class AgentService: """Agent service for managing agent instances""" def __init__(self): self._agents: Dict[str, Agent] = {} def create_agent( self, name: str, description: str = "", instructions: str = "", tools: List[str] = None ) -> Agent: """Create a new agent instance""" agent_id = generate_id("agent") agent = Agent( id=agent_id, name=name, description=description, instructions=instructions, tools=tools or [] ) self._agents[agent_id] = agent logger.info(f"Created agent: {agent_id} - {name}") return agent def get_agent(self, agent_id: str) -> Optional[Agent]: """Get agent by ID""" return self._agents.get(agent_id) def list_agents(self) -> List[Agent]: """List all agents""" return list(self._agents.values()) def delete_agent(self, agent_id: str) -> bool: """Delete agent by ID""" if agent_id in self._agents: del self._agents[agent_id] logger.info(f"Deleted agent: {agent_id}") return True return False def update_agent( self, agent_id: str, name: str = None, description: str = None, instructions: str = None, tools: List[str] = None ) -> Optional[Agent]: """Update agent configuration""" agent = self._agents.get(agent_id) if not agent: return None if name is not None: agent.name = name if description is not None: agent.description = description if instructions is not None: agent.instructions = instructions if tools is not None: agent.tools = tools agent.updated_at = datetime.now() return agent class TaskService: """Task service for managing tasks""" def __init__(self, agent_service: AgentService): self._tasks: Dict[str, Task] = {} self._agent_service = agent_service def create_task( self, agent_id: str, name: str, goal: str, description: str = "", parent_task_id: str = None, steps: List[Dict[str, Any]] = None ) -> Optional[Task]: """Create task for agent, optionally as subtask""" agent = self._agent_service.get_agent(agent_id) if not agent: return None task_id = generate_id("task") task = Task( id=task_id, name=name, description=description, goal=goal ) # Add steps if steps: for step_data in steps: step = Step( id=generate_id("step"), name=step_data.get("name", ""), description=step_data.get("description", "") ) task.steps.append(step) # Add to parent task or agent if parent_task_id: parent_task = self._tasks.get(parent_task_id) if parent_task: task.parent_id = parent_task_id parent_task.subtasks.append(task) else: agent.current_task = task self._tasks[task_id] = task logger.info(f"Created task: {task_id} for agent: {agent_id}") return task def get_task(self, task_id: str) -> Optional[Task]: """Get task by ID""" return self._tasks.get(task_id) def list_tasks(self, agent_id: str = None) -> List[Task]: """List tasks, optionally filtered by agent""" if agent_id: agent = self._agent_service.get_agent(agent_id) if agent and agent.current_task: return [agent.current_task] return [] return list(self._tasks.values()) def update_task_status( self, task_id: str, status: TaskStatus, result: Any = None ) -> Optional[Task]: """Update task status""" task = self._tasks.get(task_id) if not task: return None task.status = status task.result = result task.updated_at = datetime.now() return task def add_step( self, task_id: str, step_name: str, step_description: str = "" ) -> Optional[Step]: """Add step to task""" task = self._tasks.get(task_id) if not task: return None step = Step( id=generate_id("step"), name=step_name, description=step_description ) task.steps.append(step) task.updated_at = datetime.now() return step def add_subtask( self, parent_task_id: str, name: str, goal: str, description: str = "" ) -> Optional[Task]: """Add subtask to parent task""" parent_task = self._tasks.get(parent_task_id) if not parent_task: return None subtask_id = generate_id("task") subtask = Task( id=subtask_id, name=name, description=description, goal=goal ) subtask.parent_id = parent_task_id parent_task.subtasks.append(subtask) self._tasks[subtask_id] = subtask parent_task.updated_at = datetime.now() return subtask def delete_task(self, task_id: str) -> bool: """Delete task""" task = self._tasks.get(task_id) if not task: return False # Remove from parent's subtasks if has parent if hasattr(task, 'parent_id') and task.parent_id: parent = self._tasks.get(task.parent_id) if parent: parent.subtasks = [t for t in parent.subtasks if t.id != task_id] del self._tasks[task_id] return True # Global service instances agent_service = AgentService() task_service = TaskService(agent_service)