"""Agent Registry - manages agent lifecycle and access""" from typing import Dict, List, Optional, Set import threading import uuid from luxx.agents.core import Agent, AgentConfig, AgentType, AgentStatus class AgentRegistry: """ Agent Registry (Singleton) Thread-safe registry for managing all agents in the system. Provides agent creation, retrieval, and lifecycle management. """ _instance: Optional["AgentRegistry"] = None _lock: threading.Lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._agents: Dict[str, Agent] = {} self._user_agents: Dict[int, Set[str]] = {} # user_id -> set of agent_ids self._conversation_agents: Dict[str, Set[str]] = {} # conversation_id -> set of agent_ids self._registry_lock = threading.Lock() self._initialized = True def _generate_agent_id(self) -> str: """Generate unique agent ID""" return f"agent_{uuid.uuid4().hex[:12]}" def create_agent( self, config: AgentConfig, user_id: Optional[int] = None, conversation_id: Optional[str] = None, workspace: Optional[str] = None, ) -> Agent: """ Create and register a new agent Args: config: Agent configuration user_id: Associated user ID conversation_id: Associated conversation ID workspace: Agent's workspace path Returns: Created Agent instance """ with self._registry_lock: agent_id = self._generate_agent_id() agent = Agent( id=agent_id, config=config, user_id=user_id, conversation_id=conversation_id, workspace=workspace, ) self._agents[agent_id] = agent # Track by user if user_id: if user_id not in self._user_agents: self._user_agents[user_id] = set() self._user_agents[user_id].add(agent_id) # Track by conversation if conversation_id: if conversation_id not in self._conversation_agents: self._conversation_agents[conversation_id] = set() self._conversation_agents[conversation_id].add(agent_id) return agent def get(self, agent_id: str) -> Optional[Agent]: """ Get agent by ID Args: agent_id: Agent ID Returns: Agent if found, None otherwise """ with self._registry_lock: return self._agents.get(agent_id) def list_user_agents(self, user_id: int) -> List[Agent]: """ List all agents for a user Args: user_id: User ID Returns: List of user's agents """ with self._registry_lock: agent_ids = self._user_agents.get(user_id, set()) return [self._agents[aid] for aid in agent_ids if aid in self._agents] def list_conversation_agents(self, conversation_id: str) -> List[Agent]: """ List all agents in a conversation Args: conversation_id: Conversation ID Returns: List of conversation's agents """ with self._registry_lock: agent_ids = self._conversation_agents.get(conversation_id, set()) return [self._agents[aid] for aid in agent_ids if aid in self._agents] def list_by_type(self, agent_type: AgentType) -> List[Agent]: """ List all agents of a specific type Args: agent_type: Type of agent to filter Returns: List of agents of the specified type """ with self._registry_lock: return [ a for a in self._agents.values() if a.config.agent_type == agent_type ] def list_by_status(self, status: AgentStatus) -> List[Agent]: """ List all agents with a specific status Args: status: Status to filter Returns: List of agents with the specified status """ with self._registry_lock: return [a for a in self._agents.values() if a.status == status] def update_status(self, agent_id: str, status: AgentStatus) -> bool: """ Update agent status Args: agent_id: Agent ID status: New status Returns: True if updated, False if agent not found """ with self._registry_lock: agent = self._agents.get(agent_id) if agent: agent.status = status return True return False def remove(self, agent_id: str) -> bool: """ Remove agent from registry Args: agent_id: Agent ID Returns: True if removed, False if not found """ with self._registry_lock: agent = self._agents.pop(agent_id, None) if agent is None: return False # Remove from user tracking if agent.user_id and agent.user_id in self._user_agents: self._user_agents[agent.user_id].discard(agent_id) # Remove from conversation tracking if agent.conversation_id and agent.conversation_id in self._conversation_agents: self._conversation_agents[agent.conversation_id].discard(agent_id) return True def remove_user_agents(self, user_id: int) -> int: """ Remove all agents for a user Args: user_id: User ID Returns: Number of agents removed """ with self._registry_lock: agent_ids = self._user_agents.pop(user_id, set()) count = 0 for agent_id in agent_ids: if agent_id in self._agents: del self._agents[agent_id] count += 1 return count def remove_conversation_agents(self, conversation_id: str) -> int: """ Remove all agents in a conversation Args: conversation_id: Conversation ID Returns: Number of agents removed """ with self._registry_lock: agent_ids = self._conversation_agents.pop(conversation_id, set()) count = 0 for agent_id in agent_ids: if agent_id in self._agents: del self._agents[agent_id] count += 1 return count def clear(self) -> None: """Clear all agents from registry""" with self._registry_lock: self._agents.clear() self._user_agents.clear() self._conversation_agents.clear() @property def agent_count(self) -> int: """Total number of agents""" with self._registry_lock: return len(self._agents) def get_stats(self) -> Dict: """Get registry statistics""" with self._registry_lock: status_counts = {} type_counts = {} for agent in self._agents.values(): status_counts[agent.status.value] = status_counts.get(agent.status.value, 0) + 1 type_counts[agent.config.agent_type.value] = type_counts.get(agent.config.agent_type.value, 0) + 1 return { "total": len(self._agents), "by_status": status_counts, "by_type": type_counts, } # Global registry instance registry = AgentRegistry()