From 576eb39a51dd5e53dfcd23e7b637802817c403d9 Mon Sep 17 00:00:00 2001
From: ViperEkura <3081035982@qq.com>
Date: Sun, 12 Apr 2026 23:05:22 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=A2=9E=E9=87=8F?=
=?UTF-8?q?=E6=B8=B2=E6=9F=93=E9=80=BB=E8=BE=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
asserts/ARCHITECTURE.md | 77 ++-
config.yaml | 2 +-
dashboard/src/components/ProcessBlock.vue | 472 ++++++++++++++++++
dashboard/src/services/api.js | 130 +++--
.../src/views/ConversationDetailView.vue | 164 +++---
luxx/models.py | 61 ++-
luxx/routes/messages.py | 37 +-
luxx/services/chat.py | 360 ++++++++-----
luxx/services/llm_client.py | 77 +--
9 files changed, 1005 insertions(+), 375 deletions(-)
create mode 100644 dashboard/src/components/ProcessBlock.vue
diff --git a/asserts/ARCHITECTURE.md b/asserts/ARCHITECTURE.md
index 55b9601..e9dfc60 100644
--- a/asserts/ARCHITECTURE.md
+++ b/asserts/ARCHITECTURE.md
@@ -22,6 +22,7 @@ luxx/
│ ├── auth.py # 认证
│ ├── conversations.py # 会话管理
│ ├── messages.py # 消息处理
+│ ├── providers.py # LLM 提供商管理
│ └── tools.py # 工具管理
├── services/ # 服务层
│ ├── chat.py # 聊天服务
@@ -101,15 +102,63 @@ erDiagram
string id PK
string conversation_id FK
string role
- longtext content
+ longtext content "JSON 格式"
int token_count
datetime created_at
}
USER ||--o{ CONVERSATION : "has"
CONVERSATION ||--o{ MESSAGE : "has"
+
+ USER ||--o{ LLM_PROVIDER : "configures"
+
+ LLM_PROVIDER {
+ int id PK
+ int user_id FK
+ string name
+ string provider_type
+ string base_url
+ string api_key
+ string default_model
+ boolean is_default
+ boolean enabled
+ datetime created_at
+ datetime updated_at
+ }
```
+### Message Content JSON 结构
+
+`content` 字段统一使用 JSON 格式存储:
+
+**User 消息:**
+
+```json
+{
+ "text": "用户输入的文本内容",
+ "attachments": [
+ {"name": "utils.py", "extension": "py", "content": "..."}
+ ]
+}
+```
+
+**Assistant 消息:**
+
+```json
+{
+ "text": "AI 回复的文本内容",
+ "tool_calls": [...],
+ "steps": [
+ {"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
+ {"id": "step-1", "index": 1, "type": "text", "content": "..."},
+ {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
+ {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
+ ]
+}
+```
+
+`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。
+
### 5. 工具系统
```mermaid
@@ -191,6 +240,9 @@ LLM API 客户端:
| `/conversations` | GET/POST | 会话列表/创建 |
| `/conversations/{id}` | GET/DELETE | 会话详情/删除 |
| `/messages/stream` | POST | 流式消息发送 |
+| `/providers` | GET/POST | LLM 提供商列表/创建 |
+| `/providers/{id}` | GET/PUT/DELETE | 提供商详情/更新/删除 |
+| `/providers/{id}/test` | POST | 测试提供商连接 |
| `/tools` | GET | 可用工具列表 |
## 数据流
@@ -227,12 +279,29 @@ sequenceDiagram
| 事件 | 说明 |
|------|------|
-| `text` | 文本内容增量 |
-| `tool_call` | 工具调用请求 |
-| `tool_result` | 工具执行结果 |
+| `process_step` | 结构化步骤(thinking/text/tool_call/tool_result),携带 `id`、`index` 确保渲染顺序 |
| `done` | 响应完成 |
| `error` | 错误信息 |
+### process_step 事件格式
+
+```json
+{"type": "process_step", "step": {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}}
+{"type": "process_step", "step": {"id": "step-1", "index": 1, "type": "text", "content": "回复文本..."}}
+{"type": "process_step", "step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}}
+{"type": "process_step", "step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{\"success\": true, ...}"}}
+```
+
+| 字段 | 说明 |
+|------|------|
+| `id` | 步骤唯一标识(格式 `step-{index}`) |
+| `index` | 步骤序号,确保按正确顺序显示 |
+| `type` | 步骤类型:`thinking` / `text` / `tool_call` / `tool_result` |
+| `id_ref` | 工具调用引用 ID(仅 tool_call/tool_result) |
+| `name` | 工具名称(仅 tool_call/tool_result) |
+| `arguments` | 工具调用参数 JSON 字符串(仅 tool_call) |
+| `content` | 内容(thinking 的思考内容、text 的文本、tool_result 的返回结果) |
+
## 配置示例
### config.yaml
diff --git a/config.yaml b/config.yaml
index a59ab84..d2f8222 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,7 +1,7 @@
# 配置文件
app:
secret_key: ${APP_SECRET_KEY}
- debug: true
+ debug: flase
host: 0.0.0.0
port: 8000
diff --git a/dashboard/src/components/ProcessBlock.vue b/dashboard/src/components/ProcessBlock.vue
new file mode 100644
index 0000000..578ac01
--- /dev/null
+++ b/dashboard/src/components/ProcessBlock.vue
@@ -0,0 +1,472 @@
+
+
+
+
+
+
+
diff --git a/dashboard/src/services/api.js b/dashboard/src/services/api.js
index ccc9289..9d20737 100644
--- a/dashboard/src/services/api.js
+++ b/dashboard/src/services/api.js
@@ -36,101 +36,133 @@ api.interceptors.response.use(
}
)
+/**
+ * SSE 流式请求处理器
+ * @param {string} url - API URL (不含 baseURL 前缀)
+ * @param {object} body - 请求体
+ * @param {object} callbacks - 事件回调: { onProcessStep, onDone, onError }
+ * @returns {{ abort: () => void }}
+ */
+export function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
+ const token = localStorage.getItem('access_token')
+ const controller = new AbortController()
+
+ const promise = (async () => {
+ try {
+ const res = await fetch(`/api${url}`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Authorization': `Bearer ${token}`
+ },
+ body: JSON.stringify(body),
+ signal: controller.signal
+ })
+
+ if (!res.ok) {
+ const err = await res.json().catch(() => ({}))
+ throw new Error(err.message || `HTTP ${res.status}`)
+ }
+
+ const reader = res.body.getReader()
+ const decoder = new TextDecoder()
+ let buffer = ''
+ let completed = false
+
+ while (true) {
+ const { done, value } = await reader.read()
+ if (done) break
+
+ buffer += decoder.decode(value, { stream: true })
+ const lines = buffer.split('\n')
+ buffer = lines.pop() || ''
+
+ let currentEvent = ''
+ for (const line of lines) {
+ if (line.startsWith('event: ')) {
+ currentEvent = line.slice(7).trim()
+ } else if (line.startsWith('data: ')) {
+ const data = JSON.parse(line.slice(6))
+ if (currentEvent === 'process_step' && onProcessStep) {
+ onProcessStep(data)
+ } else if (currentEvent === 'done' && onDone) {
+ completed = true
+ onDone(data)
+ } else if (currentEvent === 'error' && onError) {
+ onError(data.content)
+ }
+ }
+ }
+ }
+
+ if (!completed && onError) {
+ onError('stream ended unexpectedly')
+ }
+ } catch (e) {
+ if (e.name !== 'AbortError' && onError) {
+ onError(e.message)
+ }
+ }
+ })()
+
+ promise.abort = () => controller.abort()
+ return promise
+}
+
// ============ 认证接口 ============
export const authAPI = {
- // 用户登录
login: (data) => api.post('/auth/login', data),
-
- // 用户注册
register: (data) => api.post('/auth/register', data),
-
- // 用户登出
logout: () => api.post('/auth/logout'),
-
- // 获取当前用户信息
getMe: () => api.get('/auth/me')
}
// ============ 会话接口 ============
export const conversationsAPI = {
- // 获取会话列表
list: (params) => api.get('/conversations/', { params }),
-
- // 创建会话
create: (data) => api.post('/conversations/', data),
-
- // 获取会话详情
get: (id) => api.get(`/conversations/${id}`),
-
- // 更新会话
update: (id, data) => api.put(`/conversations/${id}`, data),
-
- // 删除会话
delete: (id) => api.delete(`/conversations/${id}`)
}
// ============ 消息接口 ============
export const messagesAPI = {
- // 获取消息列表
list: (conversationId, params) => api.get('/messages/', { params: { conversation_id: conversationId, ...params } }),
-
- // 发送消息(非流式)
send: (data) => api.post('/messages/', data),
- // 发送消息(流式)- 使用原生 fetch 避免 axios 拦截
- sendStream: (data) => {
- const token = localStorage.getItem('access_token')
- return fetch('/api/messages/stream', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- 'Authorization': `Bearer ${token}`
- },
- body: JSON.stringify(data)
- })
+ // 发送消息(流式)
+ sendStream: (data, callbacks) => {
+ return createSSEStream('/messages/stream', {
+ conversation_id: data.conversation_id,
+ content: data.content,
+ tools_enabled: callbacks.toolsEnabled !== false
+ }, callbacks)
},
- // 删除消息
delete: (id) => api.delete(`/messages/${id}`)
}
// ============ 工具接口 ============
export const toolsAPI = {
- // 获取工具列表
list: (params) => api.get('/tools/', { params }),
-
- // 获取工具详情
get: (name) => api.get(`/tools/${name}`),
-
- // 执行工具
execute: (name, data) => api.post(`/tools/${name}/execute`, data)
}
// ============ LLM Provider 接口 ============
export const providersAPI = {
- // 获取提供商列表
list: () => api.get('/providers/'),
-
- // 创建提供商
create: (data) => api.post('/providers/', data),
-
- // 获取提供商详情
get: (id) => api.get(`/providers/${id}`),
-
- // 更新提供商
update: (id, data) => api.put(`/providers/${id}`, data),
-
- // 删除提供商
delete: (id) => api.delete(`/providers/${id}`),
-
- // 测试连接
test: (id) => api.post(`/providers/${id}/test`)
}
-// 默认导出
-export default api
\ No newline at end of file
+export default api
diff --git a/dashboard/src/views/ConversationDetailView.vue b/dashboard/src/views/ConversationDetailView.vue
index 37fc613..c3d173f 100644
--- a/dashboard/src/views/ConversationDetailView.vue
+++ b/dashboard/src/views/ConversationDetailView.vue
@@ -9,14 +9,23 @@
{{ msg.role === 'user' ? 'U' : 'A' }}
-
{{ msg.content }}
+
+
{{ msg.content || msg.text }}
{{ formatTime(msg.created_at) }}
-
@@ -40,14 +49,14 @@
import { ref, onMounted, nextTick } from 'vue'
import { useRoute } from 'vue-router'
import { conversationsAPI, messagesAPI } from '../services/api.js'
+import ProcessBlock from '../components/ProcessBlock.vue'
const route = useRoute()
const messages = ref([])
const inputMessage = ref('')
const loading = ref(true)
const sending = ref(false)
-const streaming = ref(false)
-const streamContent = ref('')
+const streamingMessage = ref(null)
const messagesContainer = ref(null)
const conversationId = ref(route.params.id)
@@ -78,104 +87,61 @@ const sendMessage = async () => {
id: Date.now(),
role: 'user',
content: content,
+ text: content,
+ attachments: [],
+ process_steps: [],
created_at: new Date().toISOString()
})
scrollToBottom()
- try {
- streaming.value = true
- streamContent.value = ''
-
- const response = await messagesAPI.sendStream({
- conversation_id: conversationId.value,
- content: content,
- tools_enabled: true
- })
-
- const reader = response.body.getReader()
- const decoder = new TextDecoder()
-
- while (true) {
- const { done, value } = await reader.read()
- if (done) break
-
- const chunk = decoder.decode(value)
- const lines = chunk.split('\n')
-
- for (const line of lines) {
- if (line.startsWith('data: ')) {
- const data = line.slice(6)
- if (data === '[DONE]') continue
-
- try {
- const parsed = JSON.parse(data)
- if (parsed.type === 'text') {
- streamContent.value += parsed.content
- } else if (parsed.type === 'tool_call') {
- // 工具调用(只在完整结果时显示一次)
- const data = parsed.data
- if (data && Array.isArray(data) && data.length > 0) {
- // 检查是否有完整的函数名
- const hasFunctionName = data.some(tc => tc.function && tc.function.name)
- if (hasFunctionName) {
- streamContent.value += '\n\n[调用工具] '
- data.forEach(tc => {
- if (tc.function && tc.function.name) {
- streamContent.value += `${tc.function.name} `
- }
- })
- }
- }
- } else if (parsed.type === 'tool_result') {
- // 工具结果
- streamContent.value += '\n\n[工具结果]\n'
- if (Array.isArray(parsed.data)) {
- parsed.data.forEach(tr => {
- if (tr.content) {
- try {
- const result = JSON.parse(tr.content)
- if (result.success && result.data && result.data.results) {
- result.data.results.forEach(r => {
- streamContent.value += `• ${r.title}\n${r.snippet}\n\n`
- })
- } else {
- streamContent.value += tr.content.substring(0, 500)
- }
- } catch {
- streamContent.value += tr.content.substring(0, 500)
- }
- } else {
- streamContent.value += '无结果'
- }
- })
- }
- } else if (parsed.type === 'error') {
- streamContent.value += '\n\n[错误] ' + (parsed.error || '未知错误')
- }
- } catch (e) {
- console.error('Parse error:', e, data)
- }
+ // 初始化流式消息
+ streamingMessage.value = {
+ id: Date.now() + 1,
+ role: 'assistant',
+ process_steps: [],
+ created_at: new Date().toISOString()
+ }
+
+ // SSE 流式请求
+ messagesAPI.sendStream(
+ { conversation_id: conversationId.value, content },
+ {
+ onProcessStep: (step) => {
+ if (!streamingMessage.value) return
+ // 按 id 更新或追加步骤
+ const idx = streamingMessage.value.process_steps.findIndex(s => s.id === step.id)
+ if (idx >= 0) {
+ streamingMessage.value.process_steps[idx] = step
+ } else {
+ streamingMessage.value.process_steps.push(step)
+ }
+ },
+ onDone: () => {
+ // 完成,添加到消息列表
+ if (streamingMessage.value) {
+ messages.value.push({
+ ...streamingMessage.value,
+ created_at: new Date().toISOString()
+ })
+ streamingMessage.value = null
+ }
+ },
+ onError: (error) => {
+ console.error('Stream error:', error)
+ if (streamingMessage.value) {
+ streamingMessage.value.process_steps.push({
+ id: 'error',
+ index: streamingMessage.value.process_steps.length,
+ type: 'text',
+ content: `[错误] ${error}`
+ })
}
}
}
-
- // 添加助手消息
- if (streamContent.value) {
- messages.value.push({
- id: Date.now() + 1,
- role: 'assistant',
- content: streamContent.value,
- created_at: new Date().toISOString()
- })
- }
- } catch (e) {
- console.error('发送失败:', e)
- alert('发送失败: ' + e.message)
- } finally {
- sending.value = false
- streaming.value = false
- scrollToBottom()
- }
+ )
+
+ sending.value = false
+ scrollToBottom()
}
const scrollToBottom = () => {
@@ -203,12 +169,10 @@ onMounted(loadMessages)
.message.user { flex-direction: row-reverse; }
.message-avatar { width: 40px; height: 40px; background: var(--code-bg); border-radius: 50%; display: flex; align-items: center; justify-content: center; font-size: 1.2rem; flex-shrink: 0; }
.message.user .message-avatar { background: var(--accent-bg); }
-.message-content { max-width: 70%; }
+.message-content { max-width: 80%; }
.message-text { padding: 1rem; background: var(--code-bg); border-radius: 12px; line-height: 1.6; white-space: pre-wrap; }
.message.user .message-text { background: var(--accent); color: white; }
-.message-time { font-size: 0.75rem; color: var(--text); margin-top: 0.25rem; }
-.cursor { animation: blink 1s infinite; }
-@keyframes blink { 0%, 50% { opacity: 1; } 51%, 100% { opacity: 0; } }
+.message-time { font-size: 0.75rem; color: var(--text-secondary); margin-top: 0.25rem; }
.input-area { display: flex; gap: 0.75rem; padding: 1rem; border-top: 1px solid var(--border); }
.input-area textarea { flex: 1; padding: 0.875rem 1rem; border: 1px solid var(--border); border-radius: 12px; resize: none; font-size: 1rem; background: var(--bg); color: var(--text); }
.input-area textarea:focus { outline: none; border-color: var(--accent); }
diff --git a/luxx/models.py b/luxx/models.py
index e8b171b..8e73bd7 100644
--- a/luxx/models.py
+++ b/luxx/models.py
@@ -130,13 +130,36 @@ class Conversation(Base):
class Message(Base):
- """Message model"""
+ """Message model
+
+ content 字段统一使用 JSON 格式存储:
+
+ **User 消息:**
+ {
+ "text": "用户输入的文本内容",
+ "attachments": [
+ {"name": "utils.py", "extension": "py", "content": "..."}
+ ]
+ }
+
+ **Assistant 消息:**
+ {
+ "text": "AI 回复的文本内容",
+ "tool_calls": [...], // 遗留的扁平结构
+ "steps": [ // 有序步骤,用于渲染(主要数据源)
+ {"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
+ {"id": "step-1", "index": 1, "type": "text", "content": "..."},
+ {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
+ {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
+ ]
+ }
+ """
__tablename__ = "messages"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False)
- role: Mapped[str] = mapped_column(String(16), nullable=False)
- content: Mapped[str] = mapped_column(Text, nullable=False)
+ role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool
+ content: Mapped[str] = mapped_column(Text, nullable=False, default="")
token_count: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
@@ -144,11 +167,39 @@ class Message(Base):
conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages")
def to_dict(self):
- return {
+ """Convert to dictionary, extracting process_steps for frontend"""
+ import json
+
+ result = {
"id": self.id,
"conversation_id": self.conversation_id,
"role": self.role,
- "content": self.content,
"token_count": self.token_count,
"created_at": self.created_at.isoformat() if self.created_at else None
}
+
+ # Parse content JSON
+ try:
+ content_obj = json.loads(self.content) if self.content else {}
+ except json.JSONDecodeError:
+ # Legacy plain text content
+ result["content"] = self.content
+ result["text"] = self.content
+ result["attachments"] = []
+ result["tool_calls"] = []
+ result["process_steps"] = []
+ return result
+
+ # Extract common fields
+ result["text"] = content_obj.get("text", "")
+ result["attachments"] = content_obj.get("attachments", [])
+ result["tool_calls"] = content_obj.get("tool_calls", [])
+
+ # Extract steps as process_steps for frontend rendering
+ result["process_steps"] = content_obj.get("steps", [])
+
+ # For backward compatibility
+ if "content" not in result:
+ result["content"] = result["text"]
+
+ return result
diff --git a/luxx/routes/messages.py b/luxx/routes/messages.py
index 1882040..5522085 100644
--- a/luxx/routes/messages.py
+++ b/luxx/routes/messages.py
@@ -136,44 +136,13 @@ async def stream_message(
db.commit()
async def event_generator():
- full_response = ""
-
- async for event in chat_service.stream_response(
+ async for sse_str in chat_service.stream_response(
conversation=conversation,
user_message=data.content,
tools_enabled=tools_enabled
):
- event_type = event.get("type")
-
- if event_type == "text":
- content = event.get("content", "")
- full_response += content
- yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n"
-
- elif event_type == "tool_call":
- yield f"data: {json.dumps({'type': 'tool_call', 'data': event.get('data')})}\n\n"
-
- elif event_type == "tool_result":
- yield f"data: {json.dumps({'type': 'tool_result', 'data': event.get('data')})}\n\n"
-
- elif event_type == "done":
- try:
- ai_message = Message(
- id=generate_id("msg"),
- conversation_id=data.conversation_id,
- role="assistant",
- content=full_response,
- token_count=len(full_response) // 4
- )
- db.add(ai_message)
- db.commit()
- except Exception:
- pass
-
- yield f"data: {json.dumps({'type': 'done', 'message_id': ai_message.id if 'ai_message' in dir() else None})}\n\n"
-
- elif event_type == "error":
- yield f"data: {json.dumps({'type': 'error', 'error': event.get('error')})}\n\n"
+ # Chat service returns raw SSE strings
+ yield sse_str
yield "data: [DONE]\n\n"
diff --git a/luxx/services/chat.py b/luxx/services/chat.py
index 77e1ae0..2ad0653 100644
--- a/luxx/services/chat.py
+++ b/luxx/services/chat.py
@@ -1,5 +1,6 @@
"""Chat service module"""
import json
+import uuid
from typing import List, Dict, Any, AsyncGenerator
from luxx.models import Conversation, Message
@@ -13,6 +14,11 @@ from luxx.config import config
MAX_ITERATIONS = 10
+def _sse_event(event: str, data: dict) -> str:
+ """Format a Server-Sent Event string."""
+ return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
+
+
def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider"""
if conversation and conversation.provider_id:
@@ -37,7 +43,7 @@ def get_llm_client(conversation: Conversation = None):
class ChatService:
- """Chat service"""
+ """Chat service with tool support"""
def __init__(self):
self.tool_executor = ToolExecutor()
@@ -66,9 +72,19 @@ class ChatService:
).order_by(Message.created_at).all()
for msg in db_messages:
+ # Parse JSON content if possible
+ try:
+ content_obj = json.loads(msg.content) if msg.content else {}
+ if isinstance(content_obj, dict):
+ content = content_obj.get("text", msg.content)
+ else:
+ content = msg.content
+ except (json.JSONDecodeError, TypeError):
+ content = msg.content
+
messages.append({
"role": msg.role,
- "content": msg.content
+ "content": content
})
finally:
db.close()
@@ -80,163 +96,263 @@ class ChatService:
conversation: Conversation,
user_message: str,
tools_enabled: bool = True
- ) -> AsyncGenerator[Dict[str, Any], None]:
+ ) -> AsyncGenerator[Dict[str, str], None]:
"""
Streaming response generator
- Event types:
- - process_step: thinking/text/tool_call/tool_result step
- - done: final response complete
- - error: on error
+ Yields raw SSE event strings for direct forwarding.
"""
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
- "content": user_message
+ "content": json.dumps({"text": user_message, "attachments": []})
})
tools = registry.list_all() if tools_enabled else None
- iteration = 0
-
llm = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
- while iteration < MAX_ITERATIONS:
- iteration += 1
- print(f"[CHAT DEBUG] ====== Starting iteration {iteration} ======")
- print(f"[CHAT DEBUG] Messages count: {len(messages)}")
+ # State tracking
+ all_steps = []
+ all_tool_calls = []
+ all_tool_results = []
+ step_index = 0
+
+ for iteration in range(MAX_ITERATIONS):
+ print(f"[CHAT] Starting iteration {iteration + 1}, messages: {len(messages)}")
- tool_calls_this_round = None
+ # Stream from LLM
+ full_content = ""
+ full_thinking = ""
+ tool_calls_list = []
+ thinking_step_id = None
+ thinking_step_idx = None
+ text_step_id = None
+ text_step_idx = None
- async for event in llm.stream_call(
+ async for sse_line in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=conversation.max_tokens
):
- event_type = event.get("type")
+ # Parse SSE line
+ # Format: "event: xxx\ndata: {...}\n\n"
+ event_type = None
+ data_str = None
- if event_type == "content_delta":
- content = event.get("content", "")
- if content:
- print(f"[CHAT DEBUG] Iteration {iteration} content: {content[:100]}...")
- yield {"type": "text", "content": content}
+ for line in sse_line.strip().split('\n'):
+ if line.startswith('event: '):
+ event_type = line[7:].strip()
+ elif line.startswith('data: '):
+ data_str = line[6:].strip()
- elif event_type == "tool_call_delta":
- tool_call = event.get("tool_call", {})
- yield {"type": "tool_call", "data": tool_call}
+ if data_str is None:
+ continue
- elif event_type == "done":
- tool_calls_this_round = event.get("tool_calls")
- print(f"[CHAT DEBUG] Done event, tool_calls: {tool_calls_this_round}")
-
- if tool_calls_this_round and tools_enabled:
- print(f"[CHAT DEBUG] Executing tools: {tool_calls_this_round}")
- yield {"type": "tool_call", "data": tool_calls_this_round}
-
- tool_results = self.tool_executor.process_tool_calls_parallel(
- tool_calls_this_round,
- {}
- )
-
- messages.append({
- "role": "assistant",
- "content": "",
- "tool_calls": tool_calls_this_round
+ # Handle error events from LLM
+ if event_type == 'error':
+ try:
+ error_data = json.loads(data_str)
+ yield _sse_event("error", {"content": error_data.get("content", "Unknown error")})
+ except json.JSONDecodeError:
+ yield _sse_event("error", {"content": data_str})
+ return
+
+ # Parse the data
+ try:
+ chunk = json.loads(data_str)
+ except json.JSONDecodeError:
+ continue
+
+ # Get delta
+ choices = chunk.get("choices", [])
+ if not choices:
+ continue
+
+ delta = choices[0].get("delta", {})
+
+ # Handle reasoning (thinking)
+ reasoning = delta.get("reasoning_content", "")
+ if reasoning:
+ full_thinking += reasoning
+ if thinking_step_id is None:
+ thinking_step_id = f"step-{step_index}"
+ thinking_step_idx = step_index
+ step_index += 1
+ yield _sse_event("process_step", {
+ "id": thinking_step_id,
+ "index": thinking_step_idx,
+ "type": "thinking",
+ "content": full_thinking
+ })
+
+ # Handle content
+ content = delta.get("content", "")
+ if content:
+ full_content += content
+ if text_step_id is None:
+ text_step_idx = step_index
+ text_step_id = f"step-{text_step_idx}"
+ step_index += 1
+ yield _sse_event("process_step", {
+ "id": text_step_id,
+ "index": text_step_idx,
+ "type": "text",
+ "content": full_content
+ })
+
+ # Accumulate tool calls
+ tool_calls_delta = delta.get("tool_calls", [])
+ for tc in tool_calls_delta:
+ idx = tc.get("index", 0)
+ if idx >= len(tool_calls_list):
+ tool_calls_list.append({
+ "id": tc.get("id", ""),
+ "type": "function",
+ "function": {"name": "", "arguments": ""}
})
-
- for tr in tool_results:
- messages.append({
- "role": "tool",
- "tool_call_id": tr.get("tool_call_id"),
- "content": str(tr.get("result", ""))
- })
-
- yield {"type": "tool_result", "data": tool_results}
- else:
- break
+ func = tc.get("function", {})
+ if func.get("name"):
+ tool_calls_list[idx]["function"]["name"] += func["name"]
+ if func.get("arguments"):
+ tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
- if not tool_calls_this_round or not tools_enabled:
- break
-
- yield {"type": "done"}
-
- except Exception as e:
- print(f"[CHAT ERROR] Exception in stream_response: {type(e).__name__}: {str(e)}")
- yield {"type": "error", "error": str(e)}
-
- except Exception as e:
- yield {"type": "error", "error": str(e)}
-
- def non_stream_response(
- self,
- conversation: Conversation,
- user_message: str,
- tools_enabled: bool = False
- ) -> Dict[str, Any]:
- """Non-streaming response"""
- try:
- messages = self.build_messages(conversation)
- messages.append({
- "role": "user",
- "content": user_message
- })
-
- tools = registry.list_all() if tools_enabled else None
-
- iteration = 0
-
- llm_client = get_llm_client(conversation)
- model = conversation.model or llm_client.default_model or "gpt-4"
-
- while iteration < MAX_ITERATIONS:
- iteration += 1
+ # Save thinking step
+ if thinking_step_id is not None:
+ all_steps.append({
+ "id": thinking_step_id,
+ "index": thinking_step_idx,
+ "type": "thinking",
+ "content": full_thinking
+ })
- response = llm_client.sync_call(
- model=model,
- messages=messages,
- tools=tools,
- temperature=conversation.temperature,
- max_tokens=conversation.max_tokens
- )
+ # Save text step
+ if text_step_id is not None:
+ all_steps.append({
+ "id": text_step_id,
+ "index": text_step_idx,
+ "type": "text",
+ "content": full_content
+ })
- tool_calls = response.tool_calls
-
- if tool_calls and tools_enabled:
+ # Handle tool calls
+ if tool_calls_list:
+ all_tool_calls.extend(tool_calls_list)
+
+ # Yield tool_call steps
+ for tc in tool_calls_list:
+ call_step = {
+ "id": f"step-{step_index}",
+ "index": step_index,
+ "type": "tool_call",
+ "id_ref": tc.get("id", ""),
+ "name": tc["function"]["name"],
+ "arguments": tc["function"]["arguments"]
+ }
+ all_steps.append(call_step)
+ yield _sse_event("process_step", call_step)
+ step_index += 1
+
+ # Execute tools
+ tool_results = self.tool_executor.process_tool_calls_parallel(
+ tool_calls_list, {}
+ )
+
+ # Yield tool_result steps
+ for tr in tool_results:
+ result_step = {
+ "id": f"step-{step_index}",
+ "index": step_index,
+ "type": "tool_result",
+ "id_ref": tr.get("tool_call_id", ""),
+ "name": tr.get("name", ""),
+ "content": tr.get("content", "")
+ }
+ all_steps.append(result_step)
+ yield _sse_event("process_step", result_step)
+ step_index += 1
+
+ all_tool_results.append({
+ "role": "tool",
+ "tool_call_id": tr.get("tool_call_id", ""),
+ "content": tr.get("content", "")
+ })
+
+ # Add assistant message with tool calls for next iteration
messages.append({
"role": "assistant",
- "content": response.content,
- "tool_calls": tool_calls
+ "content": full_content or "",
+ "tool_calls": tool_calls_list
})
-
- tool_results = self.tool_executor.process_tool_calls_parallel(tool_calls)
-
- for tr in tool_results:
- messages.append({
- "role": "tool",
- "tool_call_id": tr.get("tool_call_id"),
- "content": str(tr.get("result", ""))
- })
- else:
- return {
- "success": True,
- "content": response.content
- }
+ messages.extend(all_tool_results[-len(tool_results):])
+ all_tool_results = []
+ continue
+
+ # No tool calls - final iteration, save message
+ msg_id = str(uuid.uuid4())
+ self._save_message(
+ conversation.id,
+ msg_id,
+ full_content,
+ all_tool_calls,
+ all_tool_results,
+ all_steps
+ )
+
+ yield _sse_event("done", {
+ "message_id": msg_id,
+ "token_count": len(full_content) // 4
+ })
+ return
- return {
- "success": True,
- "content": "Max iterations reached"
- }
+ # Max iterations exceeded
+ yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
except Exception as e:
- return {
- "success": False,
- "error": str(e)
- }
+ print(f"[CHAT] Exception: {type(e).__name__}: {str(e)}")
+ yield _sse_event("error", {"content": str(e)})
+
+ def _save_message(
+ self,
+ conversation_id: str,
+ msg_id: str,
+ full_content: str,
+ all_tool_calls: list,
+ all_tool_results: list,
+ all_steps: list
+ ):
+ """Save the assistant message to database."""
+ from luxx.database import SessionLocal
+ from luxx.models import Message
+
+ content_json = {
+ "text": full_content,
+ "steps": all_steps
+ }
+ if all_tool_calls:
+ content_json["tool_calls"] = all_tool_calls
+
+ db = SessionLocal()
+ try:
+ msg = Message(
+ id=msg_id,
+ conversation_id=conversation_id,
+ role="assistant",
+ content=json.dumps(content_json, ensure_ascii=False),
+ token_count=len(full_content) // 4
+ )
+ db.add(msg)
+ db.commit()
+ except Exception as e:
+ print(f"[CHAT] Failed to save message: {e}")
+ db.rollback()
+ finally:
+ db.close()
# Global chat service
diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py
index 32cbded..257f0a9 100644
--- a/luxx/services/llm_client.py
+++ b/luxx/services/llm_client.py
@@ -136,82 +136,39 @@ class LLMClient:
messages: List[Dict],
tools: Optional[List[Dict]] = None,
**kwargs
- ) -> AsyncGenerator[Dict[str, Any], None]:
- """Stream call LLM API"""
+ ) -> AsyncGenerator[str, None]:
+ """Stream call LLM API - yields raw SSE event lines
+
+ Yields:
+ str: Raw SSE event lines for direct forwarding
+ """
body = self._build_body(model, messages, tools, stream=True, **kwargs)
- # Accumulators for tool calls (need to collect from delta chunks)
- accumulated_tool_calls = {}
+ print(f"[LLM] Starting stream_call for model: {model}")
+ print(f"[LLM] Messages count: {len(messages)}")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
+ print(f"[LLM] Sending request to {self.api_url}")
async with client.stream(
"POST",
self.api_url,
headers=self._build_headers(),
json=body
) as response:
+ print(f"[LLM] Response status: {response.status_code}")
response.raise_for_status()
async for line in response.aiter_lines():
- if not line.strip():
- continue
-
- if line.startswith("data: "):
- data_str = line[6:]
-
- if data_str == "[DONE]":
- yield {"type": "done"}
- continue
-
- try:
- chunk = json.loads(data_str)
- except json.JSONDecodeError:
- continue
-
- if "choices" not in chunk:
- continue
-
- delta = chunk.get("choices", [{}])[0].get("delta", {})
-
- # DeepSeek reasoner: use content if available, otherwise fall back to reasoning_content
- content = delta.get("content")
- reasoning = delta.get("reasoning_content", "")
- if content and isinstance(content, str) and content.strip():
- yield {"type": "content_delta", "content": content}
- elif reasoning:
- yield {"type": "content_delta", "content": reasoning}
-
- # Accumulate tool calls from delta chunks (DeepSeek sends them incrementally)
- tool_calls_delta = delta.get("tool_calls", [])
- for tc in tool_calls_delta:
- idx = tc.get("index", 0)
- if idx not in accumulated_tool_calls:
- accumulated_tool_calls[idx] = {"index": idx}
- if "function" in tc:
- if "function" not in accumulated_tool_calls[idx]:
- accumulated_tool_calls[idx]["function"] = {"name": "", "arguments": ""}
- if "name" in tc["function"]:
- accumulated_tool_calls[idx]["function"]["name"] += tc["function"]["name"]
- if "arguments" in tc["function"]:
- accumulated_tool_calls[idx]["function"]["arguments"] += tc["function"]["arguments"]
-
- if tool_calls_delta:
- yield {"type": "tool_call_delta", "tool_call": tool_calls_delta}
-
- # Check for finish_reason to signal end of stream
- choice = chunk.get("choices", [{}])[0]
- finish_reason = choice.get("finish_reason")
- if finish_reason:
- # Build final tool_calls list from accumulated chunks
- final_tool_calls = list(accumulated_tool_calls.values()) if accumulated_tool_calls else None
- yield {"type": "done", "tool_calls": final_tool_calls}
+ if line.strip():
+ yield line + "\n"
except httpx.HTTPStatusError as e:
- # Return error as an event instead of raising
- error_text = e.response.text if e.response else str(e)
- yield {"type": "error", "error": f"HTTP {e.response.status_code}: {error_text}"}
+ status_code = e.response.status_code if e.response else "?"
+ print(f"[LLM] HTTP error: {status_code}")
+ yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
except Exception as e:
- yield {"type": "error", "error": str(e)}
+ print(f"[LLM] Exception: {type(e).__name__}: {str(e)}")
+ yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"
# Global LLM client