diff --git a/asserts/ARCHITECTURE.md b/asserts/ARCHITECTURE.md index 55b9601..e9dfc60 100644 --- a/asserts/ARCHITECTURE.md +++ b/asserts/ARCHITECTURE.md @@ -22,6 +22,7 @@ luxx/ │ ├── auth.py # 认证 │ ├── conversations.py # 会话管理 │ ├── messages.py # 消息处理 +│ ├── providers.py # LLM 提供商管理 │ └── tools.py # 工具管理 ├── services/ # 服务层 │ ├── chat.py # 聊天服务 @@ -101,15 +102,63 @@ erDiagram string id PK string conversation_id FK string role - longtext content + longtext content "JSON 格式" int token_count datetime created_at } USER ||--o{ CONVERSATION : "has" CONVERSATION ||--o{ MESSAGE : "has" + + USER ||--o{ LLM_PROVIDER : "configures" + + LLM_PROVIDER { + int id PK + int user_id FK + string name + string provider_type + string base_url + string api_key + string default_model + boolean is_default + boolean enabled + datetime created_at + datetime updated_at + } ``` +### Message Content JSON 结构 + +`content` 字段统一使用 JSON 格式存储: + +**User 消息:** + +```json +{ + "text": "用户输入的文本内容", + "attachments": [ + {"name": "utils.py", "extension": "py", "content": "..."} + ] +} +``` + +**Assistant 消息:** + +```json +{ + "text": "AI 回复的文本内容", + "tool_calls": [...], + "steps": [ + {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, + {"id": "step-1", "index": 1, "type": "text", "content": "..."}, + {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, + {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."} + ] +} +``` + +`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。 + ### 5. 工具系统 ```mermaid @@ -191,6 +240,9 @@ LLM API 客户端: | `/conversations` | GET/POST | 会话列表/创建 | | `/conversations/{id}` | GET/DELETE | 会话详情/删除 | | `/messages/stream` | POST | 流式消息发送 | +| `/providers` | GET/POST | LLM 提供商列表/创建 | +| `/providers/{id}` | GET/PUT/DELETE | 提供商详情/更新/删除 | +| `/providers/{id}/test` | POST | 测试提供商连接 | | `/tools` | GET | 可用工具列表 | ## 数据流 @@ -227,12 +279,29 @@ sequenceDiagram | 事件 | 说明 | |------|------| -| `text` | 文本内容增量 | -| `tool_call` | 工具调用请求 | -| `tool_result` | 工具执行结果 | +| `process_step` | 结构化步骤(thinking/text/tool_call/tool_result),携带 `id`、`index` 确保渲染顺序 | | `done` | 响应完成 | | `error` | 错误信息 | +### process_step 事件格式 + +```json +{"type": "process_step", "step": {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}} +{"type": "process_step", "step": {"id": "step-1", "index": 1, "type": "text", "content": "回复文本..."}} +{"type": "process_step", "step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}} +{"type": "process_step", "step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{\"success\": true, ...}"}} +``` + +| 字段 | 说明 | +|------|------| +| `id` | 步骤唯一标识(格式 `step-{index}`) | +| `index` | 步骤序号,确保按正确顺序显示 | +| `type` | 步骤类型:`thinking` / `text` / `tool_call` / `tool_result` | +| `id_ref` | 工具调用引用 ID(仅 tool_call/tool_result) | +| `name` | 工具名称(仅 tool_call/tool_result) | +| `arguments` | 工具调用参数 JSON 字符串(仅 tool_call) | +| `content` | 内容(thinking 的思考内容、text 的文本、tool_result 的返回结果) | + ## 配置示例 ### config.yaml diff --git a/config.yaml b/config.yaml index a59ab84..d2f8222 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,7 @@ # 配置文件 app: secret_key: ${APP_SECRET_KEY} - debug: true + debug: flase host: 0.0.0.0 port: 8000 diff --git a/dashboard/src/components/ProcessBlock.vue b/dashboard/src/components/ProcessBlock.vue new file mode 100644 index 0000000..578ac01 --- /dev/null +++ b/dashboard/src/components/ProcessBlock.vue @@ -0,0 +1,472 @@ + + + + + diff --git a/dashboard/src/services/api.js b/dashboard/src/services/api.js index ccc9289..9d20737 100644 --- a/dashboard/src/services/api.js +++ b/dashboard/src/services/api.js @@ -36,101 +36,133 @@ api.interceptors.response.use( } ) +/** + * SSE 流式请求处理器 + * @param {string} url - API URL (不含 baseURL 前缀) + * @param {object} body - 请求体 + * @param {object} callbacks - 事件回调: { onProcessStep, onDone, onError } + * @returns {{ abort: () => void }} + */ +export function createSSEStream(url, body, { onProcessStep, onDone, onError }) { + const token = localStorage.getItem('access_token') + const controller = new AbortController() + + const promise = (async () => { + try { + const res = await fetch(`/api${url}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}` + }, + body: JSON.stringify(body), + signal: controller.signal + }) + + if (!res.ok) { + const err = await res.json().catch(() => ({})) + throw new Error(err.message || `HTTP ${res.status}`) + } + + const reader = res.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + let completed = false + + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + let currentEvent = '' + for (const line of lines) { + if (line.startsWith('event: ')) { + currentEvent = line.slice(7).trim() + } else if (line.startsWith('data: ')) { + const data = JSON.parse(line.slice(6)) + if (currentEvent === 'process_step' && onProcessStep) { + onProcessStep(data) + } else if (currentEvent === 'done' && onDone) { + completed = true + onDone(data) + } else if (currentEvent === 'error' && onError) { + onError(data.content) + } + } + } + } + + if (!completed && onError) { + onError('stream ended unexpectedly') + } + } catch (e) { + if (e.name !== 'AbortError' && onError) { + onError(e.message) + } + } + })() + + promise.abort = () => controller.abort() + return promise +} + // ============ 认证接口 ============ export const authAPI = { - // 用户登录 login: (data) => api.post('/auth/login', data), - - // 用户注册 register: (data) => api.post('/auth/register', data), - - // 用户登出 logout: () => api.post('/auth/logout'), - - // 获取当前用户信息 getMe: () => api.get('/auth/me') } // ============ 会话接口 ============ export const conversationsAPI = { - // 获取会话列表 list: (params) => api.get('/conversations/', { params }), - - // 创建会话 create: (data) => api.post('/conversations/', data), - - // 获取会话详情 get: (id) => api.get(`/conversations/${id}`), - - // 更新会话 update: (id, data) => api.put(`/conversations/${id}`, data), - - // 删除会话 delete: (id) => api.delete(`/conversations/${id}`) } // ============ 消息接口 ============ export const messagesAPI = { - // 获取消息列表 list: (conversationId, params) => api.get('/messages/', { params: { conversation_id: conversationId, ...params } }), - - // 发送消息(非流式) send: (data) => api.post('/messages/', data), - // 发送消息(流式)- 使用原生 fetch 避免 axios 拦截 - sendStream: (data) => { - const token = localStorage.getItem('access_token') - return fetch('/api/messages/stream', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${token}` - }, - body: JSON.stringify(data) - }) + // 发送消息(流式) + sendStream: (data, callbacks) => { + return createSSEStream('/messages/stream', { + conversation_id: data.conversation_id, + content: data.content, + tools_enabled: callbacks.toolsEnabled !== false + }, callbacks) }, - // 删除消息 delete: (id) => api.delete(`/messages/${id}`) } // ============ 工具接口 ============ export const toolsAPI = { - // 获取工具列表 list: (params) => api.get('/tools/', { params }), - - // 获取工具详情 get: (name) => api.get(`/tools/${name}`), - - // 执行工具 execute: (name, data) => api.post(`/tools/${name}/execute`, data) } // ============ LLM Provider 接口 ============ export const providersAPI = { - // 获取提供商列表 list: () => api.get('/providers/'), - - // 创建提供商 create: (data) => api.post('/providers/', data), - - // 获取提供商详情 get: (id) => api.get(`/providers/${id}`), - - // 更新提供商 update: (id, data) => api.put(`/providers/${id}`, data), - - // 删除提供商 delete: (id) => api.delete(`/providers/${id}`), - - // 测试连接 test: (id) => api.post(`/providers/${id}/test`) } -// 默认导出 -export default api \ No newline at end of file +export default api diff --git a/dashboard/src/views/ConversationDetailView.vue b/dashboard/src/views/ConversationDetailView.vue index 37fc613..c3d173f 100644 --- a/dashboard/src/views/ConversationDetailView.vue +++ b/dashboard/src/views/ConversationDetailView.vue @@ -9,14 +9,23 @@
{{ msg.role === 'user' ? 'U' : 'A' }}
-
{{ msg.content }}
+ +
{{ msg.content || msg.text }}
{{ formatTime(msg.created_at) }}
-
+ + +
A
-
{{ streamContent }}
+
@@ -40,14 +49,14 @@ import { ref, onMounted, nextTick } from 'vue' import { useRoute } from 'vue-router' import { conversationsAPI, messagesAPI } from '../services/api.js' +import ProcessBlock from '../components/ProcessBlock.vue' const route = useRoute() const messages = ref([]) const inputMessage = ref('') const loading = ref(true) const sending = ref(false) -const streaming = ref(false) -const streamContent = ref('') +const streamingMessage = ref(null) const messagesContainer = ref(null) const conversationId = ref(route.params.id) @@ -78,104 +87,61 @@ const sendMessage = async () => { id: Date.now(), role: 'user', content: content, + text: content, + attachments: [], + process_steps: [], created_at: new Date().toISOString() }) scrollToBottom() - try { - streaming.value = true - streamContent.value = '' - - const response = await messagesAPI.sendStream({ - conversation_id: conversationId.value, - content: content, - tools_enabled: true - }) - - const reader = response.body.getReader() - const decoder = new TextDecoder() - - while (true) { - const { done, value } = await reader.read() - if (done) break - - const chunk = decoder.decode(value) - const lines = chunk.split('\n') - - for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.slice(6) - if (data === '[DONE]') continue - - try { - const parsed = JSON.parse(data) - if (parsed.type === 'text') { - streamContent.value += parsed.content - } else if (parsed.type === 'tool_call') { - // 工具调用(只在完整结果时显示一次) - const data = parsed.data - if (data && Array.isArray(data) && data.length > 0) { - // 检查是否有完整的函数名 - const hasFunctionName = data.some(tc => tc.function && tc.function.name) - if (hasFunctionName) { - streamContent.value += '\n\n[调用工具] ' - data.forEach(tc => { - if (tc.function && tc.function.name) { - streamContent.value += `${tc.function.name} ` - } - }) - } - } - } else if (parsed.type === 'tool_result') { - // 工具结果 - streamContent.value += '\n\n[工具结果]\n' - if (Array.isArray(parsed.data)) { - parsed.data.forEach(tr => { - if (tr.content) { - try { - const result = JSON.parse(tr.content) - if (result.success && result.data && result.data.results) { - result.data.results.forEach(r => { - streamContent.value += `• ${r.title}\n${r.snippet}\n\n` - }) - } else { - streamContent.value += tr.content.substring(0, 500) - } - } catch { - streamContent.value += tr.content.substring(0, 500) - } - } else { - streamContent.value += '无结果' - } - }) - } - } else if (parsed.type === 'error') { - streamContent.value += '\n\n[错误] ' + (parsed.error || '未知错误') - } - } catch (e) { - console.error('Parse error:', e, data) - } + // 初始化流式消息 + streamingMessage.value = { + id: Date.now() + 1, + role: 'assistant', + process_steps: [], + created_at: new Date().toISOString() + } + + // SSE 流式请求 + messagesAPI.sendStream( + { conversation_id: conversationId.value, content }, + { + onProcessStep: (step) => { + if (!streamingMessage.value) return + // 按 id 更新或追加步骤 + const idx = streamingMessage.value.process_steps.findIndex(s => s.id === step.id) + if (idx >= 0) { + streamingMessage.value.process_steps[idx] = step + } else { + streamingMessage.value.process_steps.push(step) + } + }, + onDone: () => { + // 完成,添加到消息列表 + if (streamingMessage.value) { + messages.value.push({ + ...streamingMessage.value, + created_at: new Date().toISOString() + }) + streamingMessage.value = null + } + }, + onError: (error) => { + console.error('Stream error:', error) + if (streamingMessage.value) { + streamingMessage.value.process_steps.push({ + id: 'error', + index: streamingMessage.value.process_steps.length, + type: 'text', + content: `[错误] ${error}` + }) } } } - - // 添加助手消息 - if (streamContent.value) { - messages.value.push({ - id: Date.now() + 1, - role: 'assistant', - content: streamContent.value, - created_at: new Date().toISOString() - }) - } - } catch (e) { - console.error('发送失败:', e) - alert('发送失败: ' + e.message) - } finally { - sending.value = false - streaming.value = false - scrollToBottom() - } + ) + + sending.value = false + scrollToBottom() } const scrollToBottom = () => { @@ -203,12 +169,10 @@ onMounted(loadMessages) .message.user { flex-direction: row-reverse; } .message-avatar { width: 40px; height: 40px; background: var(--code-bg); border-radius: 50%; display: flex; align-items: center; justify-content: center; font-size: 1.2rem; flex-shrink: 0; } .message.user .message-avatar { background: var(--accent-bg); } -.message-content { max-width: 70%; } +.message-content { max-width: 80%; } .message-text { padding: 1rem; background: var(--code-bg); border-radius: 12px; line-height: 1.6; white-space: pre-wrap; } .message.user .message-text { background: var(--accent); color: white; } -.message-time { font-size: 0.75rem; color: var(--text); margin-top: 0.25rem; } -.cursor { animation: blink 1s infinite; } -@keyframes blink { 0%, 50% { opacity: 1; } 51%, 100% { opacity: 0; } } +.message-time { font-size: 0.75rem; color: var(--text-secondary); margin-top: 0.25rem; } .input-area { display: flex; gap: 0.75rem; padding: 1rem; border-top: 1px solid var(--border); } .input-area textarea { flex: 1; padding: 0.875rem 1rem; border: 1px solid var(--border); border-radius: 12px; resize: none; font-size: 1rem; background: var(--bg); color: var(--text); } .input-area textarea:focus { outline: none; border-color: var(--accent); } diff --git a/luxx/models.py b/luxx/models.py index e8b171b..8e73bd7 100644 --- a/luxx/models.py +++ b/luxx/models.py @@ -130,13 +130,36 @@ class Conversation(Base): class Message(Base): - """Message model""" + """Message model + + content 字段统一使用 JSON 格式存储: + + **User 消息:** + { + "text": "用户输入的文本内容", + "attachments": [ + {"name": "utils.py", "extension": "py", "content": "..."} + ] + } + + **Assistant 消息:** + { + "text": "AI 回复的文本内容", + "tool_calls": [...], // 遗留的扁平结构 + "steps": [ // 有序步骤,用于渲染(主要数据源) + {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}, + {"id": "step-1", "index": 1, "type": "text", "content": "..."}, + {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."}, + {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."} + ] + } + """ __tablename__ = "messages" id: Mapped[str] = mapped_column(String(64), primary_key=True) conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) - role: Mapped[str] = mapped_column(String(16), nullable=False) - content: Mapped[str] = mapped_column(Text, nullable=False) + role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool + content: Mapped[str] = mapped_column(Text, nullable=False, default="") token_count: Mapped[int] = mapped_column(Integer, default=0) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) @@ -144,11 +167,39 @@ class Message(Base): conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") def to_dict(self): - return { + """Convert to dictionary, extracting process_steps for frontend""" + import json + + result = { "id": self.id, "conversation_id": self.conversation_id, "role": self.role, - "content": self.content, "token_count": self.token_count, "created_at": self.created_at.isoformat() if self.created_at else None } + + # Parse content JSON + try: + content_obj = json.loads(self.content) if self.content else {} + except json.JSONDecodeError: + # Legacy plain text content + result["content"] = self.content + result["text"] = self.content + result["attachments"] = [] + result["tool_calls"] = [] + result["process_steps"] = [] + return result + + # Extract common fields + result["text"] = content_obj.get("text", "") + result["attachments"] = content_obj.get("attachments", []) + result["tool_calls"] = content_obj.get("tool_calls", []) + + # Extract steps as process_steps for frontend rendering + result["process_steps"] = content_obj.get("steps", []) + + # For backward compatibility + if "content" not in result: + result["content"] = result["text"] + + return result diff --git a/luxx/routes/messages.py b/luxx/routes/messages.py index 1882040..5522085 100644 --- a/luxx/routes/messages.py +++ b/luxx/routes/messages.py @@ -136,44 +136,13 @@ async def stream_message( db.commit() async def event_generator(): - full_response = "" - - async for event in chat_service.stream_response( + async for sse_str in chat_service.stream_response( conversation=conversation, user_message=data.content, tools_enabled=tools_enabled ): - event_type = event.get("type") - - if event_type == "text": - content = event.get("content", "") - full_response += content - yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n" - - elif event_type == "tool_call": - yield f"data: {json.dumps({'type': 'tool_call', 'data': event.get('data')})}\n\n" - - elif event_type == "tool_result": - yield f"data: {json.dumps({'type': 'tool_result', 'data': event.get('data')})}\n\n" - - elif event_type == "done": - try: - ai_message = Message( - id=generate_id("msg"), - conversation_id=data.conversation_id, - role="assistant", - content=full_response, - token_count=len(full_response) // 4 - ) - db.add(ai_message) - db.commit() - except Exception: - pass - - yield f"data: {json.dumps({'type': 'done', 'message_id': ai_message.id if 'ai_message' in dir() else None})}\n\n" - - elif event_type == "error": - yield f"data: {json.dumps({'type': 'error', 'error': event.get('error')})}\n\n" + # Chat service returns raw SSE strings + yield sse_str yield "data: [DONE]\n\n" diff --git a/luxx/services/chat.py b/luxx/services/chat.py index 77e1ae0..2ad0653 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -1,5 +1,6 @@ """Chat service module""" import json +import uuid from typing import List, Dict, Any, AsyncGenerator from luxx.models import Conversation, Message @@ -13,6 +14,11 @@ from luxx.config import config MAX_ITERATIONS = 10 +def _sse_event(event: str, data: dict) -> str: + """Format a Server-Sent Event string.""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + def get_llm_client(conversation: Conversation = None): """Get LLM client, optionally using conversation's provider""" if conversation and conversation.provider_id: @@ -37,7 +43,7 @@ def get_llm_client(conversation: Conversation = None): class ChatService: - """Chat service""" + """Chat service with tool support""" def __init__(self): self.tool_executor = ToolExecutor() @@ -66,9 +72,19 @@ class ChatService: ).order_by(Message.created_at).all() for msg in db_messages: + # Parse JSON content if possible + try: + content_obj = json.loads(msg.content) if msg.content else {} + if isinstance(content_obj, dict): + content = content_obj.get("text", msg.content) + else: + content = msg.content + except (json.JSONDecodeError, TypeError): + content = msg.content + messages.append({ "role": msg.role, - "content": msg.content + "content": content }) finally: db.close() @@ -80,163 +96,263 @@ class ChatService: conversation: Conversation, user_message: str, tools_enabled: bool = True - ) -> AsyncGenerator[Dict[str, Any], None]: + ) -> AsyncGenerator[Dict[str, str], None]: """ Streaming response generator - Event types: - - process_step: thinking/text/tool_call/tool_result step - - done: final response complete - - error: on error + Yields raw SSE event strings for direct forwarding. """ try: messages = self.build_messages(conversation) messages.append({ "role": "user", - "content": user_message + "content": json.dumps({"text": user_message, "attachments": []}) }) tools = registry.list_all() if tools_enabled else None - iteration = 0 - llm = get_llm_client(conversation) model = conversation.model or llm.default_model or "gpt-4" - while iteration < MAX_ITERATIONS: - iteration += 1 - print(f"[CHAT DEBUG] ====== Starting iteration {iteration} ======") - print(f"[CHAT DEBUG] Messages count: {len(messages)}") + # State tracking + all_steps = [] + all_tool_calls = [] + all_tool_results = [] + step_index = 0 + + for iteration in range(MAX_ITERATIONS): + print(f"[CHAT] Starting iteration {iteration + 1}, messages: {len(messages)}") - tool_calls_this_round = None + # Stream from LLM + full_content = "" + full_thinking = "" + tool_calls_list = [] + thinking_step_id = None + thinking_step_idx = None + text_step_id = None + text_step_idx = None - async for event in llm.stream_call( + async for sse_line in llm.stream_call( model=model, messages=messages, tools=tools, temperature=conversation.temperature, max_tokens=conversation.max_tokens ): - event_type = event.get("type") + # Parse SSE line + # Format: "event: xxx\ndata: {...}\n\n" + event_type = None + data_str = None - if event_type == "content_delta": - content = event.get("content", "") - if content: - print(f"[CHAT DEBUG] Iteration {iteration} content: {content[:100]}...") - yield {"type": "text", "content": content} + for line in sse_line.strip().split('\n'): + if line.startswith('event: '): + event_type = line[7:].strip() + elif line.startswith('data: '): + data_str = line[6:].strip() - elif event_type == "tool_call_delta": - tool_call = event.get("tool_call", {}) - yield {"type": "tool_call", "data": tool_call} + if data_str is None: + continue - elif event_type == "done": - tool_calls_this_round = event.get("tool_calls") - print(f"[CHAT DEBUG] Done event, tool_calls: {tool_calls_this_round}") - - if tool_calls_this_round and tools_enabled: - print(f"[CHAT DEBUG] Executing tools: {tool_calls_this_round}") - yield {"type": "tool_call", "data": tool_calls_this_round} - - tool_results = self.tool_executor.process_tool_calls_parallel( - tool_calls_this_round, - {} - ) - - messages.append({ - "role": "assistant", - "content": "", - "tool_calls": tool_calls_this_round + # Handle error events from LLM + if event_type == 'error': + try: + error_data = json.loads(data_str) + yield _sse_event("error", {"content": error_data.get("content", "Unknown error")}) + except json.JSONDecodeError: + yield _sse_event("error", {"content": data_str}) + return + + # Parse the data + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + continue + + # Get delta + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + + # Handle reasoning (thinking) + reasoning = delta.get("reasoning_content", "") + if reasoning: + full_thinking += reasoning + if thinking_step_id is None: + thinking_step_id = f"step-{step_index}" + thinking_step_idx = step_index + step_index += 1 + yield _sse_event("process_step", { + "id": thinking_step_id, + "index": thinking_step_idx, + "type": "thinking", + "content": full_thinking + }) + + # Handle content + content = delta.get("content", "") + if content: + full_content += content + if text_step_id is None: + text_step_idx = step_index + text_step_id = f"step-{text_step_idx}" + step_index += 1 + yield _sse_event("process_step", { + "id": text_step_id, + "index": text_step_idx, + "type": "text", + "content": full_content + }) + + # Accumulate tool calls + tool_calls_delta = delta.get("tool_calls", []) + for tc in tool_calls_delta: + idx = tc.get("index", 0) + if idx >= len(tool_calls_list): + tool_calls_list.append({ + "id": tc.get("id", ""), + "type": "function", + "function": {"name": "", "arguments": ""} }) - - for tr in tool_results: - messages.append({ - "role": "tool", - "tool_call_id": tr.get("tool_call_id"), - "content": str(tr.get("result", "")) - }) - - yield {"type": "tool_result", "data": tool_results} - else: - break + func = tc.get("function", {}) + if func.get("name"): + tool_calls_list[idx]["function"]["name"] += func["name"] + if func.get("arguments"): + tool_calls_list[idx]["function"]["arguments"] += func["arguments"] - if not tool_calls_this_round or not tools_enabled: - break - - yield {"type": "done"} - - except Exception as e: - print(f"[CHAT ERROR] Exception in stream_response: {type(e).__name__}: {str(e)}") - yield {"type": "error", "error": str(e)} - - except Exception as e: - yield {"type": "error", "error": str(e)} - - def non_stream_response( - self, - conversation: Conversation, - user_message: str, - tools_enabled: bool = False - ) -> Dict[str, Any]: - """Non-streaming response""" - try: - messages = self.build_messages(conversation) - messages.append({ - "role": "user", - "content": user_message - }) - - tools = registry.list_all() if tools_enabled else None - - iteration = 0 - - llm_client = get_llm_client(conversation) - model = conversation.model or llm_client.default_model or "gpt-4" - - while iteration < MAX_ITERATIONS: - iteration += 1 + # Save thinking step + if thinking_step_id is not None: + all_steps.append({ + "id": thinking_step_id, + "index": thinking_step_idx, + "type": "thinking", + "content": full_thinking + }) - response = llm_client.sync_call( - model=model, - messages=messages, - tools=tools, - temperature=conversation.temperature, - max_tokens=conversation.max_tokens - ) + # Save text step + if text_step_id is not None: + all_steps.append({ + "id": text_step_id, + "index": text_step_idx, + "type": "text", + "content": full_content + }) - tool_calls = response.tool_calls - - if tool_calls and tools_enabled: + # Handle tool calls + if tool_calls_list: + all_tool_calls.extend(tool_calls_list) + + # Yield tool_call steps + for tc in tool_calls_list: + call_step = { + "id": f"step-{step_index}", + "index": step_index, + "type": "tool_call", + "id_ref": tc.get("id", ""), + "name": tc["function"]["name"], + "arguments": tc["function"]["arguments"] + } + all_steps.append(call_step) + yield _sse_event("process_step", call_step) + step_index += 1 + + # Execute tools + tool_results = self.tool_executor.process_tool_calls_parallel( + tool_calls_list, {} + ) + + # Yield tool_result steps + for tr in tool_results: + result_step = { + "id": f"step-{step_index}", + "index": step_index, + "type": "tool_result", + "id_ref": tr.get("tool_call_id", ""), + "name": tr.get("name", ""), + "content": tr.get("content", "") + } + all_steps.append(result_step) + yield _sse_event("process_step", result_step) + step_index += 1 + + all_tool_results.append({ + "role": "tool", + "tool_call_id": tr.get("tool_call_id", ""), + "content": tr.get("content", "") + }) + + # Add assistant message with tool calls for next iteration messages.append({ "role": "assistant", - "content": response.content, - "tool_calls": tool_calls + "content": full_content or "", + "tool_calls": tool_calls_list }) - - tool_results = self.tool_executor.process_tool_calls_parallel(tool_calls) - - for tr in tool_results: - messages.append({ - "role": "tool", - "tool_call_id": tr.get("tool_call_id"), - "content": str(tr.get("result", "")) - }) - else: - return { - "success": True, - "content": response.content - } + messages.extend(all_tool_results[-len(tool_results):]) + all_tool_results = [] + continue + + # No tool calls - final iteration, save message + msg_id = str(uuid.uuid4()) + self._save_message( + conversation.id, + msg_id, + full_content, + all_tool_calls, + all_tool_results, + all_steps + ) + + yield _sse_event("done", { + "message_id": msg_id, + "token_count": len(full_content) // 4 + }) + return - return { - "success": True, - "content": "Max iterations reached" - } + # Max iterations exceeded + yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"}) except Exception as e: - return { - "success": False, - "error": str(e) - } + print(f"[CHAT] Exception: {type(e).__name__}: {str(e)}") + yield _sse_event("error", {"content": str(e)}) + + def _save_message( + self, + conversation_id: str, + msg_id: str, + full_content: str, + all_tool_calls: list, + all_tool_results: list, + all_steps: list + ): + """Save the assistant message to database.""" + from luxx.database import SessionLocal + from luxx.models import Message + + content_json = { + "text": full_content, + "steps": all_steps + } + if all_tool_calls: + content_json["tool_calls"] = all_tool_calls + + db = SessionLocal() + try: + msg = Message( + id=msg_id, + conversation_id=conversation_id, + role="assistant", + content=json.dumps(content_json, ensure_ascii=False), + token_count=len(full_content) // 4 + ) + db.add(msg) + db.commit() + except Exception as e: + print(f"[CHAT] Failed to save message: {e}") + db.rollback() + finally: + db.close() # Global chat service diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index 32cbded..257f0a9 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -136,82 +136,39 @@ class LLMClient: messages: List[Dict], tools: Optional[List[Dict]] = None, **kwargs - ) -> AsyncGenerator[Dict[str, Any], None]: - """Stream call LLM API""" + ) -> AsyncGenerator[str, None]: + """Stream call LLM API - yields raw SSE event lines + + Yields: + str: Raw SSE event lines for direct forwarding + """ body = self._build_body(model, messages, tools, stream=True, **kwargs) - # Accumulators for tool calls (need to collect from delta chunks) - accumulated_tool_calls = {} + print(f"[LLM] Starting stream_call for model: {model}") + print(f"[LLM] Messages count: {len(messages)}") try: async with httpx.AsyncClient(timeout=120.0) as client: + print(f"[LLM] Sending request to {self.api_url}") async with client.stream( "POST", self.api_url, headers=self._build_headers(), json=body ) as response: + print(f"[LLM] Response status: {response.status_code}") response.raise_for_status() async for line in response.aiter_lines(): - if not line.strip(): - continue - - if line.startswith("data: "): - data_str = line[6:] - - if data_str == "[DONE]": - yield {"type": "done"} - continue - - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - continue - - if "choices" not in chunk: - continue - - delta = chunk.get("choices", [{}])[0].get("delta", {}) - - # DeepSeek reasoner: use content if available, otherwise fall back to reasoning_content - content = delta.get("content") - reasoning = delta.get("reasoning_content", "") - if content and isinstance(content, str) and content.strip(): - yield {"type": "content_delta", "content": content} - elif reasoning: - yield {"type": "content_delta", "content": reasoning} - - # Accumulate tool calls from delta chunks (DeepSeek sends them incrementally) - tool_calls_delta = delta.get("tool_calls", []) - for tc in tool_calls_delta: - idx = tc.get("index", 0) - if idx not in accumulated_tool_calls: - accumulated_tool_calls[idx] = {"index": idx} - if "function" in tc: - if "function" not in accumulated_tool_calls[idx]: - accumulated_tool_calls[idx]["function"] = {"name": "", "arguments": ""} - if "name" in tc["function"]: - accumulated_tool_calls[idx]["function"]["name"] += tc["function"]["name"] - if "arguments" in tc["function"]: - accumulated_tool_calls[idx]["function"]["arguments"] += tc["function"]["arguments"] - - if tool_calls_delta: - yield {"type": "tool_call_delta", "tool_call": tool_calls_delta} - - # Check for finish_reason to signal end of stream - choice = chunk.get("choices", [{}])[0] - finish_reason = choice.get("finish_reason") - if finish_reason: - # Build final tool_calls list from accumulated chunks - final_tool_calls = list(accumulated_tool_calls.values()) if accumulated_tool_calls else None - yield {"type": "done", "tool_calls": final_tool_calls} + if line.strip(): + yield line + "\n" except httpx.HTTPStatusError as e: - # Return error as an event instead of raising - error_text = e.response.text if e.response else str(e) - yield {"type": "error", "error": f"HTTP {e.response.status_code}: {error_text}"} + status_code = e.response.status_code if e.response else "?" + print(f"[LLM] HTTP error: {status_code}") + yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n" except Exception as e: - yield {"type": "error", "error": str(e)} + print(f"[LLM] Exception: {type(e).__name__}: {str(e)}") + yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n" # Global LLM client