diff --git a/dashboard/src/components/AppHeader.vue b/dashboard/src/components/AppHeader.vue index 8e60ba0..4477dbb 100644 --- a/dashboard/src/components/AppHeader.vue +++ b/dashboard/src/components/AppHeader.vue @@ -32,6 +32,10 @@ const navItems = [ path: '/conversations', icon: `` }, + { + path: '/agents', + icon: `` + }, { path: '/tools', icon: `` diff --git a/dashboard/src/router/index.js b/dashboard/src/router/index.js index 9e41d99..147398c 100644 --- a/dashboard/src/router/index.js +++ b/dashboard/src/router/index.js @@ -32,6 +32,18 @@ const routes = [ component: () => import('../views/ToolsView.vue'), meta: { requiresAuth: true } }, + { + path: '/agents', + name: 'Agents', + component: () => import('../views/AgentView.vue'), + meta: { requiresAuth: true } + }, + { + path: '/agents/:taskId', + name: 'AgentTask', + component: () => import('../views/AgentView.vue'), + meta: { requiresAuth: true } + }, // 首页重定向 { path: '/home', diff --git a/dashboard/src/utils/agentWs.js b/dashboard/src/utils/agentWs.js new file mode 100644 index 0000000..9d2c129 --- /dev/null +++ b/dashboard/src/utils/agentWs.js @@ -0,0 +1,381 @@ +/** + * AgentWs - WebSocket client for agent real-time communication + * + * 功能: + * 1. 连接 WebSocket 订阅 DAG 进度 + * 2. 处理服务端推送的事件 + * 3. 支持心跳检测和自动重连 + */ +import { ref, reactive } from 'vue' + +class AgentWsClient { + constructor() { + // WebSocket 连接 + this.ws = null + // 当前订阅的任务 ID + this.taskId = null + // 连接状态 + this.connected = ref(false) + // 状态 + this.status = ref('disconnected') + // DAG 数据 + this.dagData = reactive({ + id: null, + name: '', + description: '', + nodes: [], + edges: [], + progress: 0, + completed_count: 0, + total_count: 0 + }) + // 节点状态映射 + this.nodeStatus = reactive({}) + // 事件回调 + this.callbacks = {} + // 重连配置 + this.reconnectAttempts = 0 + this.maxReconnectAttempts = 5 + this.reconnectDelay = 3000 + // 心跳 + this.pingInterval = null + this.pingIntervalTime = 25000 + // 订阅确认 + this.subscribed = ref(false) + } + + /** + * 连接到 WebSocket 并订阅任务 + * @param {string} taskId - 任务 ID + */ + connect(taskId) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + if (this.taskId === taskId) { + return // 已连接到此任务 + } + this.disconnect() + } + + this.taskId = taskId + this.status.value = 'connecting' + + // 构建 WebSocket URL + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + const host = window.location.host + const wsUrl = `${protocol}//${host}/api/ws/dag/${taskId}` + + try { + this.ws = new WebSocket(wsUrl) + this._setupEventHandlers() + } catch (e) { + console.error('WebSocket connection error:', e) + this.status.value = 'error' + } + } + + /** + * 设置 WebSocket 事件处理器 + */ + _setupEventHandlers() { + this.ws.onopen = () => { + console.log('WebSocket connected') + this.connected.value = true + this.status.value = 'connected' + this.reconnectAttempts = 0 + + // 发送订阅消息 + this.send({ type: 'subscribe' }) + } + + this.ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data) + this._handleMessage(message) + } catch (e) { + console.error('Failed to parse WebSocket message:', e) + } + } + + this.ws.onerror = (error) => { + console.error('WebSocket error:', error) + this.status.value = 'error' + } + + this.ws.onclose = () => { + console.log('WebSocket closed') + this.connected.value = false + this.status.value = 'disconnected' + this.subscribed.value = false + this._clearPingInterval() + + // 尝试重连 + if (this.reconnectAttempts < this.maxReconnectAttempts && this.taskId) { + this.reconnectAttempts++ + console.log(`Reconnecting... attempt ${this.reconnectAttempts}`) + setTimeout(() => { + if (this.taskId) { + this.connect(this.taskId) + } + }, this.reconnectDelay) + } + } + } + + /** + * 处理收到的消息 + */ + _handleMessage(message) { + const type = message.type + const data = message.data || {} + + switch (type) { + case 'subscribed': + console.log('Subscribed to task:', message.task_id) + this.subscribed.value = true + this.status.value = 'subscribed' + this._triggerCallback('subscribed', message) + break + + case 'heartbeat': + // 心跳响应 + break + + case 'dag_start': + this._updateDagData(data.graph) + this._triggerCallback('dag_start', data) + break + + case 'dag_progress': + this.dagData.progress = data.progress + this._triggerCallback('dag_progress', data) + break + + case 'dag_status': + if (data) { + this._updateDagData(data) + } + this._triggerCallback('dag_status', data) + break + + case 'node_start': + this._updateNodeStatus(data.node_id, 'running', data) + this._triggerCallback('node_start', data) + break + + case 'node_progress': + this._updateNodeProgress(data.node_id, data.progress, data.message) + this._triggerCallback('node_progress', data) + break + + case 'node_complete': + this._updateNodeStatus(data.node_id, 'completed', data) + this._updateNodeResult(data.node_id, data) + this._triggerCallback('node_complete', data) + break + + case 'node_error': + this._updateNodeStatus(data.node_id, 'failed', { error: data.error }) + this._triggerCallback('node_error', data) + break + + case 'dag_complete': + this._triggerCallback('dag_complete', data) + break + + case 'task_cancelled': + this._triggerCallback('task_cancelled', data) + break + + case 'pong': + // 心跳响应 + break + + case 'error': + console.error('Server error:', message.message) + this._triggerCallback('error', message) + break + } + } + + /** + * 更新 DAG 数据 + */ + _updateDagData(graph) { + if (!graph) return + + this.dagData.id = graph.id + this.dagData.name = graph.name + this.dagData.description = graph.description + this.dagData.nodes = graph.nodes || [] + this.dagData.edges = graph.edges || [] + this.dagData.progress = graph.progress || 0 + this.dagData.completed_count = graph.completed_count || 0 + this.dagData.total_count = graph.total_count || 0 + + // 初始化节点状态 + for (const node of this.dagData.nodes) { + if (!this.nodeStatus[node.id]) { + this.nodeStatus[node.id] = reactive({ + status: node.status, + progress: node.progress || 0, + message: node.progress_message || '', + result: null, + error: null + }) + } + } + } + + /** + * 更新节点状态 + */ + _updateNodeStatus(nodeId, status, data) { + if (!this.nodeStatus[nodeId]) { + this.nodeStatus[nodeId] = reactive({}) + } + this.nodeStatus[nodeId].status = status + this.nodeStatus[nodeId].data = data + + // 更新 DAG 中的节点 + const node = this.dagData.nodes.find(n => n.id === nodeId) + if (node) { + node.status = status + } + } + + /** + * 更新节点进度 + */ + _updateNodeProgress(nodeId, progress, message) { + if (this.nodeStatus[nodeId]) { + this.nodeStatus[nodeId].progress = progress + if (message) { + this.nodeStatus[nodeId].message = message + } + } + } + + /** + * 更新节点结果 + */ + _updateNodeResult(nodeId, data) { + if (this.nodeStatus[nodeId]) { + this.nodeStatus[nodeId].result = data.result + this.nodeStatus[nodeId].output_data = data.output_data + } + } + + /** + * 发送消息到服务器 + */ + send(message) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(message)) + } + } + + /** + * 获取状态 + */ + getStatus() { + this.send({ type: 'get_status' }) + } + + /** + * 发送 ping + */ + ping() { + this.send({ type: 'ping' }) + } + + /** + * 取消任务 + */ + cancelTask() { + this.send({ type: 'cancel_task' }) + } + + /** + * 断开连接 + */ + disconnect() { + this._clearPingInterval() + if (this.ws) { + this.ws.close() + this.ws = null + } + this.taskId = null + this.connected.value = false + this.subscribed.value = false + this.status.value = 'disconnected' + } + + /** + * 设置心跳间隔 + */ + _startPingInterval() { + this._clearPingInterval() + this.pingInterval = setInterval(() => { + this.ping() + }, this.pingIntervalTime) + } + + /** + * 清除心跳间隔 + */ + _clearPingInterval() { + if (this.pingInterval) { + clearInterval(this.pingInterval) + this.pingInterval = null + } + } + + /** + * 注册回调 + */ + on(event, callback) { + if (!this.callbacks[event]) { + this.callbacks[event] = [] + } + this.callbacks[event].push(callback) + } + + /** + * 移除回调 + */ + off(event, callback) { + if (this.callbacks[event]) { + const index = this.callbacks[event].indexOf(callback) + if (index > -1) { + this.callbacks[event].splice(index, 1) + } + } + } + + /** + * 触发回调 + */ + _triggerCallback(event, data) { + if (this.callbacks[event]) { + this.callbacks[event].forEach(cb => cb(data)) + } + } + + /** + * 获取节点状态 + */ + getNodeStatus(nodeId) { + return this.nodeStatus[nodeId] + } + + /** + * 获取所有节点状态 + */ + getAllNodeStatus() { + return this.nodeStatus + } +} + +// 导出单例 +export const agentWs = new AgentWsClient() +export default agentWs diff --git a/dashboard/src/utils/api.js b/dashboard/src/utils/api.js index b703224..6fc0b4f 100644 --- a/dashboard/src/utils/api.js +++ b/dashboard/src/utils/api.js @@ -210,4 +210,19 @@ export const providersAPI = { test: (id) => api.post(`/providers/${id}/test`) } +// ============ Agent 接口 ============ + +export const agentsAPI = { + // 创建 Agent 任务 + create: (data) => api.post('/agents/request', data), + // 获取任务状态 + getTask: (taskId) => api.get(`/agents/task/${taskId}`), + // 取消任务 + cancelTask: (taskId) => api.post(`/agents/task/${taskId}/cancel`), + // 列出用户的任务 + listTasks: (params) => api.get('/agents/tasks', { params }), + // 删除任务 + deleteTask: (taskId) => api.delete(`/agents/task/${taskId}`) +} + export default api diff --git a/dashboard/src/views/AgentView.vue b/dashboard/src/views/AgentView.vue new file mode 100644 index 0000000..8e7c725 --- /dev/null +++ b/dashboard/src/views/AgentView.vue @@ -0,0 +1,910 @@ + + + + + diff --git a/luxx/agents/supervisor.py b/luxx/agents/supervisor.py index 1f1cb7c..bb1015a 100644 --- a/luxx/agents/supervisor.py +++ b/luxx/agents/supervisor.py @@ -111,18 +111,21 @@ Guidelines: # Call LLM try: + logger.info(f"Calling LLM with model: {self.agent.config.model}, messages count: {len(messages)}") + logger.info(f"LLM client - api_url: {self.llm_client.api_url}, api_key set: {bool(self.llm_client.api_key)}") response = await self.llm_client.sync_call( model=self.agent.config.model, messages=messages, temperature=self.agent.config.temperature, max_tokens=self.agent.config.max_tokens ) + logger.info(f"LLM response received: {response}") if progress_callback: progress_callback(0.5, "Processing decomposition...") # Parse LLM response to extract DAG - dag = self._parse_dag_from_response(response.content, task) + dag = self._parse_dag_from_response(response.content if response else "", task) # Add assistant response to context self.agent.add_message("assistant", response.content) diff --git a/luxx/routes/__init__.py b/luxx/routes/__init__.py index 6464f6d..50f7b43 100644 --- a/luxx/routes/__init__.py +++ b/luxx/routes/__init__.py @@ -3,6 +3,7 @@ from fastapi import APIRouter from luxx.routes import auth, conversations, messages, tools, providers from luxx.routes.agents_ws import router as agents_ws_router +from luxx.routes.agents import router as agents_router api_router = APIRouter() @@ -14,3 +15,4 @@ api_router.include_router(messages.router) api_router.include_router(tools.router) api_router.include_router(providers.router) api_router.include_router(agents_ws_router) +api_router.include_router(agents_router) diff --git a/luxx/routes/agents.py b/luxx/routes/agents.py new file mode 100644 index 0000000..7fe6357 --- /dev/null +++ b/luxx/routes/agents.py @@ -0,0 +1,295 @@ +"""Agent routes - REST API for agent task management""" +import asyncio +import logging +from typing import Optional +from fastapi import APIRouter, Depends, WebSocket +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from luxx.database import get_db +from luxx.models import User +from luxx.routes.auth import get_current_user +from luxx.utils.helpers import success_response, error_response, generate_id +from luxx.agents.core import AgentConfig, AgentType, AgentStatus +from luxx.agents.registry import AgentRegistry +from luxx.agents.supervisor import SupervisorAgent +from luxx.agents.worker import WorkerAgent +from luxx.agents.dag_scheduler import SchedulerPool, DAGScheduler +from luxx.services.llm_client import LLMClient +from luxx.tools.executor import ToolExecutor + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/agents", tags=["Agents"]) + + +# ============ Request/Response Models ============ + +class AgentCreateRequest(BaseModel): + """Create agent task request""" + conversation_id: str + task: str + options: Optional[dict] = None + + +class AgentTaskResponse(BaseModel): + """Agent task response""" + task_id: str + status: str + conversation_id: str + + +# ============ 全局实例 ============ + +# Scheduler pool for managing concurrent DAG executions +scheduler_pool = SchedulerPool(max_concurrent=10) + +# Tool executor +tool_executor = ToolExecutor() + + +# ============ Routes ============ + +@router.post("/request", response_model=dict) +async def create_agent_task( + data: AgentCreateRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) +): + """ + Create a new agent task + + This will: + 1. Create a Supervisor agent + 2. Decompose the task into a DAG + 3. Start DAG execution + 4. Return task_id for WebSocket subscription + """ + from luxx.config import config + from luxx.models import Conversation, LLMProvider + + # Get conversation to find provider + conversation = db.query(Conversation).filter( + Conversation.id == data.conversation_id, + Conversation.user_id == current_user.id + ).first() + + if not conversation: + return error_response("Conversation not found", 404) + + # Determine LLM client configuration + llm_api_key = None + llm_api_url = None + llm_model = None + + logger.info(f"Conversation provider_id: {conversation.provider_id}") + + if conversation.provider_id: + provider = db.query(LLMProvider).filter(LLMProvider.id == conversation.provider_id).first() + logger.info(f"Provider found: {provider}") + if provider: + llm_api_key = provider.api_key + llm_api_url = provider.base_url + llm_model = provider.default_model + logger.info(f"Provider config - api_key: {'set' if llm_api_key else 'None'}, url: {llm_api_url}, model: {llm_model}") + + # Fallback to config if no provider + if not llm_api_key: + llm_api_key = config.llm_api_key + llm_api_url = config.llm_api_url + llm_model = "deepseek-chat" + + # Check if LLM API key is configured + if not llm_api_key: + return error_response( + "LLM API key not configured. Please set up a provider in settings or set DEEPSEEK_API_KEY environment variable.", + 500 + ) + + task_id = generate_id("task") + + try: + # Create LLM client with proper configuration + llm_client = LLMClient( + api_key=llm_api_key, + api_url=llm_api_url, + model=llm_model + ) + + # Create supervisor agent + agent_registry = AgentRegistry() + + supervisor_config = AgentConfig( + name=f"supervisor_{task_id}", + agent_type=AgentType.SUPERVISOR, + description=f"Supervisor for task {task_id}", + model=llm_model, # Use the model's default model + max_turns=10 + ) + + supervisor_agent = agent_registry.create_agent( + config=supervisor_config, + user_id=current_user.id, + conversation_id=data.conversation_id + ) + + # Create supervisor instance + supervisor = SupervisorAgent( + agent=supervisor_agent, + llm_client=llm_client + ) + + # Decompose task into DAG + context = { + "user_id": current_user.id, + "username": current_user.username, + "conversation_id": data.conversation_id + } + + dag = await supervisor.decompose_task(data.task, context) + + # Create worker factory + def worker_factory(): + # Create new LLM client for each worker with proper config + worker_llm_client = LLMClient( + api_key=llm_api_key, + api_url=llm_api_url, + model=llm_model + ) + worker_config = AgentConfig( + name=f"worker_{task_id}", + agent_type=AgentType.WORKER, + description=f"Worker for task {task_id}", + model=llm_model, + max_turns=5 + ) + worker_agent = agent_registry.create_agent( + config=worker_config, + user_id=current_user.id, + conversation_id=data.conversation_id + ) + return WorkerAgent( + agent=worker_agent, + llm_client=worker_llm_client, + tool_executor=tool_executor + ) + + # Create scheduler + scheduler = scheduler_pool.create_scheduler( + task_id=task_id, + dag=dag, + supervisor=supervisor, + worker_factory=worker_factory, + max_workers=3 + ) + + # Start execution in background + asyncio.create_task(_execute_dag_background( + task_id=task_id, + scheduler=scheduler, + context=context, + task=data.task, + supervisor_agent=supervisor_agent + )) + + return success_response(data={ + "task_id": task_id, + "status": "planning", + "conversation_id": data.conversation_id + }, message="Agent task created successfully") + + except Exception as e: + logger.error(f"Failed to create agent task: {e}") + return error_response(f"Failed to create task: {str(e)}", 500) + + +async def _execute_dag_background( + task_id: str, + scheduler: DAGScheduler, + context: dict, + task: str, + supervisor_agent +): + """Execute DAG in background and handle completion""" + try: + result = await scheduler.execute(context, task) + + # Update supervisor status + supervisor_agent.status = AgentStatus.COMPLETED if result["success"] else AgentStatus.FAILED + + # Emit completion event via WebSocket + from luxx.routes.agents_ws import emit_dag_complete, emit_node_complete + + # Emit node complete events for each completed node + for node_id, node_result in result.get("results", {}).items(): + if node_result.get("success"): + # Find the node in the DAG + node = scheduler.dag.nodes.get(node_id) + if node: + await emit_node_complete(task_id, node) + + # Emit DAG complete event + await emit_dag_complete(task_id, result["success"], result) + + except Exception as e: + logger.error(f"DAG execution failed for task {task_id}: {e}") + + +@router.get("/task/{task_id}", response_model=dict) +async def get_task_status( + task_id: str, + current_user: User = Depends(get_current_user) +): + """Get task status and DAG info""" + scheduler = scheduler_pool.get(task_id) + + if not scheduler: + return error_response("Task not found", 404) + + return success_response(data={ + "task_id": task_id, + "status": "executing", + "dag": scheduler.dag.to_dict(), + "progress": scheduler.dag.progress + }) + + +@router.post("/task/{task_id}/cancel", response_model=dict) +async def cancel_task( + task_id: str, + current_user: User = Depends(get_current_user) +): + """Cancel a running task""" + if scheduler_pool.cancel(task_id): + return success_response(message="Task cancelled") + return error_response("Task not found or already completed", 404) + + +@router.delete("/task/{task_id}", response_model=dict) +async def delete_task( + task_id: str, + current_user: User = Depends(get_current_user) +): + """Delete a task""" + if scheduler_pool.remove(task_id): + return success_response(message="Task deleted") + return error_response("Task not found", 404) + + +@router.get("/tasks", response_model=dict) +async def list_tasks( + page: int = 1, + page_size: int = 20, + current_user: User = Depends(get_current_user) +): + """List user's agent tasks""" + # Get active schedulers for this user + # Note: In a real implementation, you'd store task metadata in database + tasks = [] + + return success_response(data={ + "items": tasks, + "total": len(tasks), + "page": page, + "page_size": page_size + }) diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index acec3eb..b4fd8d9 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -95,10 +95,15 @@ class LLMClient: tool_calls = None usage = None - if "choices" in data: + logger.info(f"Parsing LLM response: {data}") + + if "choices" in data and data["choices"]: choice = data["choices"][0] - content = choice.get("message", {}).get("content", "") - tool_calls = choice.get("message", {}).get("tool_calls") + message = choice.get("message") if choice else {} + content = message.get("content", "") if message else "" + tool_calls = message.get("tool_calls") if message else None + else: + logger.warning(f"No choices in LLM response: {data}") if "usage" in data: usage = data["usage"]