"""Agent module for autonomous task execution""" from enum import Enum import logging from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional, Any, TYPE_CHECKING from luxx.utils.helpers import generate_id if TYPE_CHECKING: from luxx.agent.task import Task logger = logging.getLogger(__name__) class AgentStatus(Enum): """Agent status""" READY = "ready" RUNNING = "running" BLOCK = "block" TERMINATED = "terminated" @dataclass class Agent: """Agent""" id: str name: str description: str = "" instructions: str = "" tools: List[str] = field(default_factory=list) current_task: Optional["Task"] = None status: AgentStatus = AgentStatus.READY result: Optional[Dict[str, Any]] = None error: Optional[str] = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "id": self.id, "name": self.name, "description": self.description, "instructions": self.instructions, "tools": self.tools, "current_task": self.current_task.to_dict() if self.current_task else None, "status": self.status, "result": self.result, "error": self.error, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None } class AgentService: """Agent service for managing agent instances""" def __init__(self): self._agents: Dict[str, Agent] = {} def create_agent( self, name: str, description: str = "", instructions: str = "", tools: List[str] = None ) -> Agent: """Create a new agent instance""" agent_id = generate_id("agent") agent = Agent( id=agent_id, name=name, description=description, instructions=instructions, tools=tools or [] ) self._agents[agent_id] = agent logger.info(f"Created agent: {agent_id} - {name}") return agent def get_agent(self, agent_id: str) -> Optional[Agent]: """Get agent by ID""" return self._agents.get(agent_id) def list_agents(self) -> List[Agent]: """List all agents""" return list(self._agents.values()) def delete_agent(self, agent_id: str) -> bool: """Delete agent by ID""" if agent_id in self._agents: del self._agents[agent_id] logger.info(f"Deleted agent: {agent_id}") return True return False def update_agent( self, agent_id: str, name: str = None, description: str = None, instructions: str = None, tools: List[str] = None ) -> Optional[Agent]: """Update agent configuration""" agent = self._agents.get(agent_id) if not agent: return None if name is not None: agent.name = name if description is not None: agent.description = description if instructions is not None: agent.instructions = instructions if tools is not None: agent.tools = tools agent.updated_at = datetime.now() return agent # Global service instances agent_service = AgentService()