Luxx/luxx/agent/agent.py

122 lines
3.5 KiB
Python

"""Agent module for autonomous task execution"""
from enum import Enum
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Any
from luxx.agent.task import Task
from luxx.utils.helpers import generate_id
logger = logging.getLogger(__name__)
class AgentStatus(Enum):
"""Agent status"""
READY = "ready"
RUNNING = "running"
BLOCK = "block"
TERMINATED = "terminated"
@dataclass
class Agent:
"""Agent"""
id: str
name: str
description: str = ""
instructions: str = ""
tools: List[str] = field(default_factory=list)
current_task: Optional[Task] = None
status: AgentStatus = AgentStatus.READY
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"instructions": self.instructions,
"tools": self.tools,
"current_task": self.current_task.to_dict() if self.current_task else None,
"status": self.status,
"result": self.result,
"error": self.error,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class AgentService:
"""Agent service for managing agent instances"""
def __init__(self):
self._agents: Dict[str, Agent] = {}
def create_agent(
self,
name: str,
description: str = "",
instructions: str = "",
tools: List[str] = None
) -> Agent:
"""Create a new agent instance"""
agent_id = generate_id("agent")
agent = Agent(
id=agent_id,
name=name,
description=description,
instructions=instructions,
tools=tools or []
)
self._agents[agent_id] = agent
logger.info(f"Created agent: {agent_id} - {name}")
return agent
def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
return self._agents.get(agent_id)
def list_agents(self) -> List[Agent]:
"""List all agents"""
return list(self._agents.values())
def delete_agent(self, agent_id: str) -> bool:
"""Delete agent by ID"""
if agent_id in self._agents:
del self._agents[agent_id]
logger.info(f"Deleted agent: {agent_id}")
return True
return False
def update_agent(
self,
agent_id: str,
name: str = None,
description: str = None,
instructions: str = None,
tools: List[str] = None
) -> Optional[Agent]:
"""Update agent configuration"""
agent = self._agents.get(agent_id)
if not agent:
return None
if name is not None:
agent.name = name
if description is not None:
agent.description = description
if instructions is not None:
agent.instructions = instructions
if tools is not None:
agent.tools = tools
agent.updated_at = datetime.now()
return agent
# Global service instances
agent_service = AgentService()