feat: 实现增量渲染逻辑

This commit is contained in:
ViperEkura 2026-04-12 23:03:04 +08:00
parent e93ec6d94d
commit 53b1de4485
8 changed files with 1017 additions and 336 deletions

View File

@ -22,6 +22,7 @@ luxx/
│ ├── auth.py # 认证
│ ├── conversations.py # 会话管理
│ ├── messages.py # 消息处理
│ ├── providers.py # LLM 提供商管理
│ └── tools.py # 工具管理
├── services/ # 服务层
│ ├── chat.py # 聊天服务
@ -101,15 +102,63 @@ erDiagram
string id PK
string conversation_id FK
string role
longtext content
longtext content "JSON 格式"
int token_count
datetime created_at
}
USER ||--o{ CONVERSATION : "has"
CONVERSATION ||--o{ MESSAGE : "has"
USER ||--o{ LLM_PROVIDER : "configures"
LLM_PROVIDER {
int id PK
int user_id FK
string name
string provider_type
string base_url
string api_key
string default_model
boolean is_default
boolean enabled
datetime created_at
datetime updated_at
}
```
### Message Content JSON 结构
`content` 字段统一使用 JSON 格式存储:
**User 消息:**
```json
{
"text": "用户输入的文本内容",
"attachments": [
{"name": "utils.py", "extension": "py", "content": "..."}
]
}
```
**Assistant 消息:**
```json
{
"text": "AI 回复的文本内容",
"tool_calls": [...],
"steps": [
{"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
{"id": "step-1", "index": 1, "type": "text", "content": "..."},
{"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
{"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
]
}
```
`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。
### 5. 工具系统
```mermaid
@ -191,6 +240,9 @@ LLM API 客户端:
| `/conversations` | GET/POST | 会话列表/创建 |
| `/conversations/{id}` | GET/DELETE | 会话详情/删除 |
| `/messages/stream` | POST | 流式消息发送 |
| `/providers` | GET/POST | LLM 提供商列表/创建 |
| `/providers/{id}` | GET/PUT/DELETE | 提供商详情/更新/删除 |
| `/providers/{id}/test` | POST | 测试提供商连接 |
| `/tools` | GET | 可用工具列表 |
## 数据流
@ -227,12 +279,29 @@ sequenceDiagram
| 事件 | 说明 |
|------|------|
| `text` | 文本内容增量 |
| `tool_call` | 工具调用请求 |
| `tool_result` | 工具执行结果 |
| `process_step` | 结构化步骤thinking/text/tool_call/tool_result携带 `id`、`index` 确保渲染顺序 |
| `done` | 响应完成 |
| `error` | 错误信息 |
### process_step 事件格式
```json
{"type": "process_step", "step": {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}}
{"type": "process_step", "step": {"id": "step-1", "index": 1, "type": "text", "content": "回复文本..."}}
{"type": "process_step", "step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}}
{"type": "process_step", "step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{\"success\": true, ...}"}}
```
| 字段 | 说明 |
|------|------|
| `id` | 步骤唯一标识(格式 `step-{index}` |
| `index` | 步骤序号,确保按正确顺序显示 |
| `type` | 步骤类型:`thinking` / `text` / `tool_call` / `tool_result` |
| `id_ref` | 工具调用引用 ID仅 tool_call/tool_result |
| `name` | 工具名称(仅 tool_call/tool_result |
| `arguments` | 工具调用参数 JSON 字符串(仅 tool_call |
| `content` | 内容thinking 的思考内容、text 的文本、tool_result 的返回结果) |
## 配置示例
### config.yaml

View File

@ -0,0 +1,472 @@
<template>
<div ref="processRef" class="process-block" :class="{ 'is-streaming': streaming }">
<!-- Render all steps in order: thinking, text, tool_call, tool_result interleaved -->
<template v-if="processItems.length > 0">
<div v-for="item in processItems" :key="item.key">
<!-- Thinking block -->
<div v-if="item.type === 'thinking'" class="step-item thinking">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M9.663 17h4.673M12 3v1m6.364 1.636l-.707.707M21 12h-1M4 12H3m3.343-5.657l-.707-.707m2.828 9.9a5 5 0 117.072 0l-.548.547A3.374 3.374 0 0014 18.469V19a2 2 0 11-4 0v-.531c0-.895-.356-1.754-.988-2.386l-.548-.547z"/>
</svg>
<span class="step-label">思考过程</span>
<span v-if="item.summary" class="step-brief">{{ item.summary }}</span>
<svg class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key]" class="step-content">
<div class="thinking-text">{{ item.content }}</div>
</div>
</div>
<!-- Tool call block -->
<div v-else-if="item.type === 'tool_call'" class="step-item tool_call" :class="{ loading: item.loading }">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>
</svg>
<span class="step-label">{{ item.loading ? `执行工具: ${item.toolName}` : `调用工具: ${item.toolName}` }}</span>
<span v-if="item.summary && !item.loading" class="step-brief">{{ item.summary }}</span>
<span v-if="item.resultSummary" class="step-badge" :class="{ success: item.isSuccess, error: !item.isSuccess }">{{ item.resultSummary }}</span>
<span v-if="item.loading" class="loading-dots">...</span>
<svg v-if="!item.loading" class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key] && !item.loading" class="step-content">
<div class="tool-detail" style="margin-bottom: 8px;">
<span class="detail-label">调用参数:</span>
<pre>{{ item.arguments }}</pre>
</div>
<div v-if="item.result" class="tool-detail">
<span class="detail-label">返回结果:</span>
<pre>{{ expandedResultKeys[item.key] ? item.result : item.resultPreview }}</pre>
<button v-if="item.resultTruncated" class="btn-expand-result" @click.stop="toggleResultExpand(item.key)">
{{ expandedResultKeys[item.key] ? '收起' : `展开全部 (${item.resultLength} 字符)` }}
</button>
</div>
</div>
</div>
<!-- Text content -->
<div v-else-if="item.type === 'text'" class="step-item text-content" v-html="item.rendered"></div>
<!-- Tool result block -->
<div v-else-if="item.type === 'tool_result'" class="step-item tool_result">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>
</svg>
<span class="step-label">工具结果: {{ item.name }}</span>
<span v-if="item.resultSummary" class="step-badge" :class="{ success: item.isSuccess, error: !item.isSuccess }">{{ item.resultSummary }}</span>
<svg class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key]" class="step-content">
<div class="tool-detail">
<span class="detail-label">返回结果:</span>
<pre>{{ expandedResultKeys[item.key] ? item.result : item.resultPreview }}</pre>
<button v-if="item.resultTruncated" class="btn-expand-result" @click.stop="toggleResultExpand(item.key)">
{{ expandedResultKeys[item.key] ? '收起' : `展开全部 (${item.resultLength} 字符)` }}
</button>
</div>
</div>
</div>
</div>
</template>
<!-- Active streaming indicator -->
<div v-if="streaming" class="streaming-indicator">
<svg class="spinner" width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M21 12a9 9 0 1 1-6.219-8.56"/>
</svg>
<span>正在生成...</span>
</div>
</div>
</template>
<script setup>
import { ref, computed, watch } from 'vue'
const RESULT_PREVIEW_LIMIT = 500
function formatJson(str) {
try {
const obj = typeof str === 'string' ? JSON.parse(str) : str
return JSON.stringify(obj, null, 2)
} catch {
return String(str)
}
}
function truncate(str, maxLen = 80) {
const s = String(str)
if (s.length <= maxLen) return s
return s.substring(0, maxLen) + '...'
}
function buildResultFields(rawContent) {
const formatted = formatJson(rawContent)
const len = formatted.length
const truncated = len > RESULT_PREVIEW_LIMIT
return {
result: formatted,
resultPreview: truncated ? formatted.slice(0, RESULT_PREVIEW_LIMIT) + '\n...' : formatted,
resultTruncated: truncated,
resultLength: len,
}
}
const props = defineProps({
toolCalls: { type: Array, default: () => [] },
processSteps: { type: Array, default: () => [] },
streaming: { type: Boolean, default: false }
})
const expandedKeys = ref({})
const expandedResultKeys = ref({})
// Auto-collapse all items when a new stream starts
watch(() => props.streaming, (v) => {
if (v) {
expandedKeys.value = {}
expandedResultKeys.value = {}
}
})
const processRef = ref(null)
function toggleItem(key) {
expandedKeys.value[key] = !expandedKeys.value[key]
}
function toggleResultExpand(key) {
expandedResultKeys.value[key] = !expandedResultKeys.value[key]
}
function getResultSummary(result) {
try {
const parsed = typeof result === 'string' ? JSON.parse(result) : result
if (parsed.success === true) return { text: '成功', success: true }
if (parsed.success === false || parsed.error) return { text: parsed.error || '失败', success: false }
if (parsed.results) return { text: `${parsed.results.length} 条结果`, success: true }
return { text: '完成', success: true }
} catch {
return { text: '完成', success: true }
}
}
function renderMarkdown(text) {
if (!text) return ''
// Simple markdown rendering
return text
.replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code>$2</code></pre>')
.replace(/`([^`]+)`/g, '<code>$1</code>')
.replace(/\n/g, '<br>')
}
// Build ordered process items from all available data (thinking, tool calls, text).
const processItems = computed(() => {
const items = []
if (props.processSteps && props.processSteps.length > 0) {
for (const step of props.processSteps) {
if (!step) continue
if (step.type === 'thinking') {
items.push({
type: 'thinking',
content: step.content,
summary: truncate(step.content),
key: step.id || `thinking-${step.index}`,
})
} else if (step.type === 'tool_call') {
const toolId = step.id_ref || step.id
items.push({
type: 'tool_call',
toolName: step.name || '未知工具',
arguments: formatJson(step.arguments),
summary: truncate(step.arguments),
id: toolId,
key: step.id || `tool_call-${toolId || step.index}`,
loading: false,
result: null,
})
} else if (step.type === 'tool_result') {
// tool_result
const summary = getResultSummary(step.content)
items.push({
type: 'tool_result',
id: step.id_ref || step.id,
name: step.name || 'unknown',
content: step.content,
resultSummary: summary.text,
isSuccess: summary.success,
key: step.id || `tool_result-${step.id_ref || step.index}`,
...buildResultFields(step.content)
})
} else if (step.type === 'text') {
items.push({
type: 'text',
content: step.content,
rendered: renderMarkdown(step.content) || '<span class="placeholder">...</span>',
key: step.id || `text-${step.index}`,
})
}
}
// Mark the last tool_call as loading if it has no result yet (still executing)
if (props.streaming && items.length > 0) {
const last = items[items.length - 1]
if (last.type === 'tool_call' && !last.result) {
last.loading = true
}
}
} else {
// Fallback: legacy mode for old messages without processSteps stored in DB
if (props.toolCalls && props.toolCalls.length > 0) {
props.toolCalls.forEach((call, i) => {
const toolName = call.function?.name || '未知工具'
const resultSummary = call.result ? getResultSummary(call.result) : null
const resultFields = call.result ? buildResultFields(call.result) : { result: null, resultPreview: null, resultTruncated: false, resultLength: 0 }
items.push({
type: 'tool_call',
toolName,
arguments: formatJson(call.function?.arguments),
summary: truncate(call.function?.arguments),
id: call.id,
key: `tool_call-${call.id || i}`,
loading: !call.result && props.streaming,
...resultFields,
resultSummary: resultSummary ? resultSummary.text : null,
isSuccess: resultSummary ? resultSummary.success : undefined,
})
})
}
}
return items
})
</script>
<style scoped>
.process-block {
width: 100%;
}
/* Step items (shared) */
.step-item {
margin-bottom: 8px;
}
.step-item:last-child {
margin-bottom: 0;
}
@keyframes pulse {
0%, 100% { opacity: 0.4; }
50% { opacity: 1; }
}
/* Step header (shared by thinking and tool_call) */
.thinking .step-header,
.tool_call .step-header {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
background: var(--code-bg);
border: 1px solid var(--border);
border-radius: 8px;
cursor: pointer;
font-size: 13px;
transition: background 0.15s;
}
.thinking .step-header:hover,
.tool_call .step-header:hover {
background: var(--bg-hover);
}
.thinking .step-header svg:first-child {
color: #f59e0b;
}
.tool_call .step-header svg:first-child {
color: #10b981;
}
.step-label {
font-weight: 500;
color: var(--text);
flex-shrink: 0;
min-width: 130px;
max-width: 130px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.arrow {
margin-left: auto;
transition: transform 0.2s;
color: var(--text-secondary);
flex-shrink: 0;
}
.step-badge {
font-size: 11px;
padding: 2px 8px;
border-radius: 10px;
font-weight: 500;
}
.step-badge.success {
background: rgba(16, 185, 129, 0.1);
color: #10b981;
}
.step-badge.error {
background: rgba(239, 68, 68, 0.1);
color: #ef4444;
}
.step-brief {
font-size: 11px;
color: var(--text-secondary);
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
flex: 1;
min-width: 0;
}
.arrow.open {
transform: rotate(180deg);
}
.loading-dots {
font-size: 16px;
font-weight: 700;
color: #10b981;
animation: pulse 1s ease-in-out infinite;
}
.tool_call.loading .step-header {
background: var(--bg-hover);
}
/* Tool result styling */
.tool_result .step-header {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
background: var(--code-bg);
border: 1px solid var(--border);
border-radius: 8px;
cursor: pointer;
font-size: 13px;
transition: background 0.15s;
}
.tool_result .step-header:hover {
background: var(--bg-hover);
}
.tool_result .step-header svg:first-child {
color: #10b981;
}
/* Expandable step content panel */
.step-content {
padding: 12px;
margin-top: 4px;
background: var(--bg);
border: 1px solid var(--border);
border-radius: 8px;
overflow: hidden;
}
.thinking-text {
font-size: 13px;
color: var(--text-secondary);
line-height: 1.6;
white-space: pre-wrap;
}
.tool-detail {
font-size: 13px;
}
.detail-label {
color: var(--text-secondary);
font-size: 11px;
font-weight: 600;
display: block;
margin-bottom: 4px;
}
.tool-detail pre {
padding: 8px;
background: var(--code-bg);
border-radius: 4px;
border: 1px solid var(--border);
font-family: 'JetBrains Mono', 'Fira Code', monospace;
font-size: 12px;
line-height: 1.5;
color: var(--text-secondary);
overflow-x: auto;
white-space: pre-wrap;
word-break: break-word;
}
.btn-expand-result {
display: inline-block;
margin-top: 6px;
padding: 3px 10px;
font-size: 11px;
color: #10b981;
background: rgba(16, 185, 129, 0.1);
border: 1px solid rgba(16, 185, 129, 0.3);
border-radius: 4px;
cursor: pointer;
transition: background 0.15s;
}
.btn-expand-result:hover {
background: rgba(16, 185, 129, 0.2);
}
/* Text content */
.text-content {
padding: 0;
font-size: 15px;
line-height: 1.7;
color: var(--text);
word-break: break-word;
white-space: pre-wrap;
}
.text-content :deep(.placeholder) {
color: var(--text-secondary);
}
/* Streaming cursor indicator */
.streaming-indicator {
display: flex;
align-items: center;
gap: 8px;
font-size: 12px;
color: var(--text-secondary);
}
/* Add separator only when there are step items above the indicator */
.process-block:has(.step-item) .streaming-indicator {
margin-top: 8px;
padding: 8px 0 0;
border-top: 1px solid var(--border);
}
</style>

View File

@ -36,101 +36,133 @@ api.interceptors.response.use(
}
)
/**
* SSE 流式请求处理器
* @param {string} url - API URL (不含 baseURL 前缀)
* @param {object} body - 请求体
* @param {object} callbacks - 事件回调: { onProcessStep, onDone, onError }
* @returns {{ abort: () => void }}
*/
export function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
const token = localStorage.getItem('access_token')
const controller = new AbortController()
const promise = (async () => {
try {
const res = await fetch(`/api${url}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify(body),
signal: controller.signal
})
if (!res.ok) {
const err = await res.json().catch(() => ({}))
throw new Error(err.message || `HTTP ${res.status}`)
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
let completed = false
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6))
if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data)
} else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data)
} else if (currentEvent === 'error' && onError) {
onError(data.content)
}
}
}
}
if (!completed && onError) {
onError('stream ended unexpectedly')
}
} catch (e) {
if (e.name !== 'AbortError' && onError) {
onError(e.message)
}
}
})()
promise.abort = () => controller.abort()
return promise
}
// ============ 认证接口 ============
export const authAPI = {
// 用户登录
login: (data) => api.post('/auth/login', data),
// 用户注册
register: (data) => api.post('/auth/register', data),
// 用户登出
logout: () => api.post('/auth/logout'),
// 获取当前用户信息
getMe: () => api.get('/auth/me')
}
// ============ 会话接口 ============
export const conversationsAPI = {
// 获取会话列表
list: (params) => api.get('/conversations/', { params }),
// 创建会话
create: (data) => api.post('/conversations/', data),
// 获取会话详情
get: (id) => api.get(`/conversations/${id}`),
// 更新会话
update: (id, data) => api.put(`/conversations/${id}`, data),
// 删除会话
delete: (id) => api.delete(`/conversations/${id}`)
}
// ============ 消息接口 ============
export const messagesAPI = {
// 获取消息列表
list: (conversationId, params) => api.get('/messages/', { params: { conversation_id: conversationId, ...params } }),
// 发送消息(非流式)
send: (data) => api.post('/messages/', data),
// 发送消息(流式)- 使用原生 fetch 避免 axios 拦截
sendStream: (data) => {
const token = localStorage.getItem('access_token')
return fetch('/api/messages/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify(data)
})
// 发送消息(流式)
sendStream: (data, callbacks) => {
return createSSEStream('/messages/stream', {
conversation_id: data.conversation_id,
content: data.content,
tools_enabled: callbacks.toolsEnabled !== false
}, callbacks)
},
// 删除消息
delete: (id) => api.delete(`/messages/${id}`)
}
// ============ 工具接口 ============
export const toolsAPI = {
// 获取工具列表
list: (params) => api.get('/tools/', { params }),
// 获取工具详情
get: (name) => api.get(`/tools/${name}`),
// 执行工具
execute: (name, data) => api.post(`/tools/${name}/execute`, data)
}
// ============ LLM Provider 接口 ============
export const providersAPI = {
// 获取提供商列表
list: () => api.get('/providers/'),
// 创建提供商
create: (data) => api.post('/providers/', data),
// 获取提供商详情
get: (id) => api.get(`/providers/${id}`),
// 更新提供商
update: (id, data) => api.put(`/providers/${id}`, data),
// 删除提供商
delete: (id) => api.delete(`/providers/${id}`),
// 测试连接
test: (id) => api.post(`/providers/${id}/test`)
}
// 默认导出
export default api

View File

@ -9,14 +9,24 @@
<div v-for="msg in messages" :key="msg.id" :class="['message', msg.role]">
<div class="message-avatar">{{ msg.role === 'user' ? 'U' : 'A' }}</div>
<div class="message-content">
<div class="message-text">{{ msg.content }}</div>
<!-- 显示 process_steps 或普通文本 -->
<ProcessBlock
v-if="msg.process_steps && msg.process_steps.length > 0"
:process-steps="msg.process_steps"
/>
<div v-else class="message-text">{{ msg.content || msg.text }}</div>
<div class="message-time">{{ formatTime(msg.created_at) }}</div>
</div>
</div>
<div v-if="streaming" class="message assistant streaming">
<!-- 流式消息 -->
<div v-if="streamingMessage" class="message assistant streaming">
<div class="message-avatar">A</div>
<div class="message-content">
<div class="message-text">{{ streamContent }}<span class="cursor"></span></div>
<ProcessBlock
:process-steps="streamingMessage.process_steps"
:streaming="true"
/>
</div>
</div>
</div>
@ -40,6 +50,7 @@
import { ref, onMounted, nextTick } from 'vue'
import { useRoute } from 'vue-router'
import { conversationsAPI, messagesAPI } from '../services/api.js'
import ProcessBlock from '../components/ProcessBlock.vue'
const route = useRoute()
const messages = ref([])
@ -47,7 +58,7 @@ const inputMessage = ref('')
const loading = ref(true)
const sending = ref(false)
const streaming = ref(false)
const streamContent = ref('')
const streamingMessage = ref(null)
const messagesContainer = ref(null)
const conversationId = ref(route.params.id)
@ -78,13 +89,21 @@ const sendMessage = async () => {
id: Date.now(),
role: 'user',
content: content,
text: content,
attachments: [],
process_steps: [],
created_at: new Date().toISOString()
})
scrollToBottom()
try {
streaming.value = true
streamContent.value = ''
streamingMessage.value = {
id: Date.now() + 1,
role: 'assistant',
process_steps: [],
created_at: new Date().toISOString()
}
const response = await messagesAPI.sendStream({
conversation_id: conversationId.value,
@ -94,79 +113,77 @@ const sendMessage = async () => {
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
buffer += chunk
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') continue
// Process complete SSE events (separated by double newline)
const events = buffer.split('\n\n')
buffer = events.pop() || '' // Keep incomplete event in buffer
for (const eventStr of events) {
if (eventStr.trim() === '') continue
// Parse SSE event
// Format: "event: xxx\ndata: {...}\n" or "data: {...}\n"
let eventType = null
let dataStr = null
for (const line of eventStr.split('\n')) {
if (line.startsWith('event: ')) {
eventType = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
dataStr = line.slice(6).trim()
}
}
if (dataStr === '[DONE]') continue
if (dataStr) {
try {
const parsed = JSON.parse(data)
if (parsed.type === 'text') {
streamContent.value += parsed.content
} else if (parsed.type === 'tool_call') {
//
const data = parsed.data
if (data && Array.isArray(data) && data.length > 0) {
//
const hasFunctionName = data.some(tc => tc.function && tc.function.name)
if (hasFunctionName) {
streamContent.value += '\n\n[调用工具] '
data.forEach(tc => {
if (tc.function && tc.function.name) {
streamContent.value += `${tc.function.name} `
const parsed = JSON.parse(dataStr)
// Handle done event
if (eventType === 'done') {
// Message saved to DB
continue
}
})
}
}
} else if (parsed.type === 'tool_result') {
//
streamContent.value += '\n\n[工具结果]\n'
if (Array.isArray(parsed.data)) {
parsed.data.forEach(tr => {
if (tr.content) {
try {
const result = JSON.parse(tr.content)
if (result.success && result.data && result.data.results) {
result.data.results.forEach(r => {
streamContent.value += `${r.title}\n${r.snippet}\n\n`
})
// Forward to ProcessBlock via streamingMessage
if (streamingMessage.value && parsed) {
// Normalize step format based on event type
if (parsed.type === 'thinking' || parsed.type === 'text' || parsed.type === 'tool_call' || parsed.type === 'tool_result') {
const step = parsed
// Find existing step by id or create new
const existingIndex = streamingMessage.value.process_steps.findIndex(
s => s.id === step.id
)
if (existingIndex >= 0) {
streamingMessage.value.process_steps[existingIndex] = step
} else {
streamContent.value += tr.content.substring(0, 500)
streamingMessage.value.process_steps.push(step)
}
} catch {
streamContent.value += tr.content.substring(0, 500)
}
} else {
streamContent.value += '无结果'
}
})
}
} else if (parsed.type === 'error') {
streamContent.value += '\n\n[错误] ' + (parsed.error || '未知错误')
}
} catch (e) {
console.error('Parse error:', e, data)
console.error('Parse error:', e)
}
}
}
}
//
if (streamContent.value) {
//
if (streamingMessage.value) {
messages.value.push({
id: Date.now() + 1,
role: 'assistant',
content: streamContent.value,
...streamingMessage.value,
created_at: new Date().toISOString()
})
streamingMessage.value = null
}
} catch (e) {
console.error('发送失败:', e)
@ -203,12 +220,10 @@ onMounted(loadMessages)
.message.user { flex-direction: row-reverse; }
.message-avatar { width: 40px; height: 40px; background: var(--code-bg); border-radius: 50%; display: flex; align-items: center; justify-content: center; font-size: 1.2rem; flex-shrink: 0; }
.message.user .message-avatar { background: var(--accent-bg); }
.message-content { max-width: 70%; }
.message-content { max-width: 80%; }
.message-text { padding: 1rem; background: var(--code-bg); border-radius: 12px; line-height: 1.6; white-space: pre-wrap; }
.message.user .message-text { background: var(--accent); color: white; }
.message-time { font-size: 0.75rem; color: var(--text); margin-top: 0.25rem; }
.cursor { animation: blink 1s infinite; }
@keyframes blink { 0%, 50% { opacity: 1; } 51%, 100% { opacity: 0; } }
.message-time { font-size: 0.75rem; color: var(--text-secondary); margin-top: 0.25rem; }
.input-area { display: flex; gap: 0.75rem; padding: 1rem; border-top: 1px solid var(--border); }
.input-area textarea { flex: 1; padding: 0.875rem 1rem; border: 1px solid var(--border); border-radius: 12px; resize: none; font-size: 1rem; background: var(--bg); color: var(--text); }
.input-area textarea:focus { outline: none; border-color: var(--accent); }

View File

@ -130,13 +130,36 @@ class Conversation(Base):
class Message(Base):
"""Message model"""
"""Message model
content 字段统一使用 JSON 格式存储
**User 消息**
{
"text": "用户输入的文本内容",
"attachments": [
{"name": "utils.py", "extension": "py", "content": "..."}
]
}
**Assistant 消息**
{
"text": "AI 回复的文本内容",
"tool_calls": [...], // 遗留的扁平结构
"steps": [ // 有序步骤用于渲染主要数据源
{"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
{"id": "step-1", "index": 1, "type": "text", "content": "..."},
{"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
{"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
]
}
"""
__tablename__ = "messages"
id: Mapped[str] = mapped_column(String(64), primary_key=True)
conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False)
role: Mapped[str] = mapped_column(String(16), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool
content: Mapped[str] = mapped_column(Text, nullable=False, default="")
token_count: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
@ -144,11 +167,39 @@ class Message(Base):
conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages")
def to_dict(self):
return {
"""Convert to dictionary, extracting process_steps for frontend"""
import json
result = {
"id": self.id,
"conversation_id": self.conversation_id,
"role": self.role,
"content": self.content,
"token_count": self.token_count,
"created_at": self.created_at.isoformat() if self.created_at else None
}
# Parse content JSON
try:
content_obj = json.loads(self.content) if self.content else {}
except json.JSONDecodeError:
# Legacy plain text content
result["content"] = self.content
result["text"] = self.content
result["attachments"] = []
result["tool_calls"] = []
result["process_steps"] = []
return result
# Extract common fields
result["text"] = content_obj.get("text", "")
result["attachments"] = content_obj.get("attachments", [])
result["tool_calls"] = content_obj.get("tool_calls", [])
# Extract steps as process_steps for frontend rendering
result["process_steps"] = content_obj.get("steps", [])
# For backward compatibility
if "content" not in result:
result["content"] = result["text"]
return result

View File

@ -136,44 +136,13 @@ async def stream_message(
db.commit()
async def event_generator():
full_response = ""
async for event in chat_service.stream_response(
async for sse_str in chat_service.stream_response(
conversation=conversation,
user_message=data.content,
tools_enabled=tools_enabled
):
event_type = event.get("type")
if event_type == "text":
content = event.get("content", "")
full_response += content
yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n"
elif event_type == "tool_call":
yield f"data: {json.dumps({'type': 'tool_call', 'data': event.get('data')})}\n\n"
elif event_type == "tool_result":
yield f"data: {json.dumps({'type': 'tool_result', 'data': event.get('data')})}\n\n"
elif event_type == "done":
try:
ai_message = Message(
id=generate_id("msg"),
conversation_id=data.conversation_id,
role="assistant",
content=full_response,
token_count=len(full_response) // 4
)
db.add(ai_message)
db.commit()
except Exception:
pass
yield f"data: {json.dumps({'type': 'done', 'message_id': ai_message.id if 'ai_message' in dir() else None})}\n\n"
elif event_type == "error":
yield f"data: {json.dumps({'type': 'error', 'error': event.get('error')})}\n\n"
# Chat service returns raw SSE strings
yield sse_str
yield "data: [DONE]\n\n"

View File

@ -1,5 +1,6 @@
"""Chat service module"""
import json
import uuid
from typing import List, Dict, Any, AsyncGenerator
from luxx.models import Conversation, Message
@ -13,6 +14,11 @@ from luxx.config import config
MAX_ITERATIONS = 10
def _sse_event(event: str, data: dict) -> str:
"""Format a Server-Sent Event string."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider"""
if conversation and conversation.provider_id:
@ -37,7 +43,7 @@ def get_llm_client(conversation: Conversation = None):
class ChatService:
"""Chat service"""
"""Chat service with tool support"""
def __init__(self):
self.tool_executor = ToolExecutor()
@ -66,9 +72,19 @@ class ChatService:
).order_by(Message.created_at).all()
for msg in db_messages:
# Parse JSON content if possible
try:
content_obj = json.loads(msg.content) if msg.content else {}
if isinstance(content_obj, dict):
content = content_obj.get("text", msg.content)
else:
content = msg.content
except (json.JSONDecodeError, TypeError):
content = msg.content
messages.append({
"role": msg.role,
"content": msg.content
"content": content
})
finally:
db.close()
@ -80,163 +96,263 @@ class ChatService:
conversation: Conversation,
user_message: str,
tools_enabled: bool = True
) -> AsyncGenerator[Dict[str, Any], None]:
) -> AsyncGenerator[Dict[str, str], None]:
"""
Streaming response generator
Event types:
- process_step: thinking/text/tool_call/tool_result step
- done: final response complete
- error: on error
Yields raw SSE event strings for direct forwarding.
"""
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": user_message
"content": json.dumps({"text": user_message, "attachments": []})
})
tools = registry.list_all() if tools_enabled else None
iteration = 0
llm = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
while iteration < MAX_ITERATIONS:
iteration += 1
print(f"[CHAT DEBUG] ====== Starting iteration {iteration} ======")
print(f"[CHAT DEBUG] Messages count: {len(messages)}")
# State tracking
all_steps = []
all_tool_calls = []
all_tool_results = []
step_index = 0
tool_calls_this_round = None
for iteration in range(MAX_ITERATIONS):
print(f"[CHAT] Starting iteration {iteration + 1}, messages: {len(messages)}")
async for event in llm.stream_call(
# Stream from LLM
full_content = ""
full_thinking = ""
tool_calls_list = []
thinking_step_id = None
thinking_step_idx = None
text_step_id = None
text_step_idx = None
async for sse_line in llm.stream_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=conversation.max_tokens
):
event_type = event.get("type")
# Parse SSE line
# Format: "event: xxx\ndata: {...}\n\n"
event_type = None
data_str = None
if event_type == "content_delta":
content = event.get("content", "")
if content:
print(f"[CHAT DEBUG] Iteration {iteration} content: {content[:100]}...")
yield {"type": "text", "content": content}
for line in sse_line.strip().split('\n'):
if line.startswith('event: '):
event_type = line[7:].strip()
elif line.startswith('data: '):
data_str = line[6:].strip()
elif event_type == "tool_call_delta":
tool_call = event.get("tool_call", {})
yield {"type": "tool_call", "data": tool_call}
if data_str is None:
continue
elif event_type == "done":
tool_calls_this_round = event.get("tool_calls")
print(f"[CHAT DEBUG] Done event, tool_calls: {tool_calls_this_round}")
if tool_calls_this_round and tools_enabled:
print(f"[CHAT DEBUG] Executing tools: {tool_calls_this_round}")
yield {"type": "tool_call", "data": tool_calls_this_round}
tool_results = self.tool_executor.process_tool_calls_parallel(
tool_calls_this_round,
{}
)
messages.append({
"role": "assistant",
"content": "",
"tool_calls": tool_calls_this_round
})
for tr in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id"),
"content": str(tr.get("result", ""))
})
yield {"type": "tool_result", "data": tool_results}
else:
break
if not tool_calls_this_round or not tools_enabled:
break
yield {"type": "done"}
except Exception as e:
print(f"[CHAT ERROR] Exception in stream_response: {type(e).__name__}: {str(e)}")
yield {"type": "error", "error": str(e)}
except Exception as e:
yield {"type": "error", "error": str(e)}
def non_stream_response(
self,
conversation: Conversation,
user_message: str,
tools_enabled: bool = False
) -> Dict[str, Any]:
"""Non-streaming response"""
# Handle error events from LLM
if event_type == 'error':
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": user_message
error_data = json.loads(data_str)
yield _sse_event("error", {"content": error_data.get("content", "Unknown error")})
except json.JSONDecodeError:
yield _sse_event("error", {"content": data_str})
return
# Parse the data
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
# Get delta
choices = chunk.get("choices", [])
if not choices:
continue
delta = choices[0].get("delta", {})
# Handle reasoning (thinking)
reasoning = delta.get("reasoning_content", "")
if reasoning:
full_thinking += reasoning
if thinking_step_id is None:
thinking_step_id = f"step-{step_index}"
thinking_step_idx = step_index
step_index += 1
yield _sse_event("process_step", {
"id": thinking_step_id,
"index": thinking_step_idx,
"type": "thinking",
"content": full_thinking
})
tools = registry.list_all() if tools_enabled else None
# Handle content
content = delta.get("content", "")
if content:
full_content += content
if text_step_id is None:
text_step_idx = step_index
text_step_id = f"step-{text_step_idx}"
step_index += 1
yield _sse_event("process_step", {
"id": text_step_id,
"index": text_step_idx,
"type": "text",
"content": full_content
})
iteration = 0
# Accumulate tool calls
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx >= len(tool_calls_list):
tool_calls_list.append({
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
})
func = tc.get("function", {})
if func.get("name"):
tool_calls_list[idx]["function"]["name"] += func["name"]
if func.get("arguments"):
tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
llm_client = get_llm_client(conversation)
model = conversation.model or llm_client.default_model or "gpt-4"
# Save thinking step
if thinking_step_id is not None:
all_steps.append({
"id": thinking_step_id,
"index": thinking_step_idx,
"type": "thinking",
"content": full_thinking
})
while iteration < MAX_ITERATIONS:
iteration += 1
# Save text step
if text_step_id is not None:
all_steps.append({
"id": text_step_id,
"index": text_step_idx,
"type": "text",
"content": full_content
})
response = llm_client.sync_call(
model=model,
messages=messages,
tools=tools,
temperature=conversation.temperature,
max_tokens=conversation.max_tokens
# Handle tool calls
if tool_calls_list:
all_tool_calls.extend(tool_calls_list)
# Yield tool_call steps
for tc in tool_calls_list:
call_step = {
"id": f"step-{step_index}",
"index": step_index,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
all_steps.append(call_step)
yield _sse_event("process_step", call_step)
step_index += 1
# Execute tools
tool_results = self.tool_executor.process_tool_calls_parallel(
tool_calls_list, {}
)
tool_calls = response.tool_calls
# Yield tool_result steps
for tr in tool_results:
result_step = {
"id": f"step-{step_index}",
"index": step_index,
"type": "tool_result",
"id_ref": tr.get("tool_call_id", ""),
"name": tr.get("name", ""),
"content": tr.get("content", "")
}
all_steps.append(result_step)
yield _sse_event("process_step", result_step)
step_index += 1
if tool_calls and tools_enabled:
all_tool_results.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id", ""),
"content": tr.get("content", "")
})
# Add assistant message with tool calls for next iteration
messages.append({
"role": "assistant",
"content": response.content,
"tool_calls": tool_calls
"content": full_content or "",
"tool_calls": tool_calls_list
})
messages.extend(all_tool_results[-len(tool_results):])
all_tool_results = []
continue
tool_results = self.tool_executor.process_tool_calls_parallel(tool_calls)
# No tool calls - final iteration, save message
msg_id = str(uuid.uuid4())
self._save_message(
conversation.id,
msg_id,
full_content,
all_tool_calls,
all_tool_results,
all_steps
)
for tr in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id"),
"content": str(tr.get("result", ""))
yield _sse_event("done", {
"message_id": msg_id,
"token_count": len(full_content) // 4
})
else:
return {
"success": True,
"content": response.content
}
return
return {
"success": True,
"content": "Max iterations reached"
}
# Max iterations exceeded
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
except Exception as e:
return {
"success": False,
"error": str(e)
print(f"[CHAT] Exception: {type(e).__name__}: {str(e)}")
yield _sse_event("error", {"content": str(e)})
def _save_message(
self,
conversation_id: str,
msg_id: str,
full_content: str,
all_tool_calls: list,
all_tool_results: list,
all_steps: list
):
"""Save the assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
content_json = {
"text": full_content,
"steps": all_steps
}
if all_tool_calls:
content_json["tool_calls"] = all_tool_calls
db = SessionLocal()
try:
msg = Message(
id=msg_id,
conversation_id=conversation_id,
role="assistant",
content=json.dumps(content_json, ensure_ascii=False),
token_count=len(full_content) // 4
)
db.add(msg)
db.commit()
except Exception as e:
print(f"[CHAT] Failed to save message: {e}")
db.rollback()
finally:
db.close()
# Global chat service

View File

@ -136,82 +136,39 @@ class LLMClient:
messages: List[Dict],
tools: Optional[List[Dict]] = None,
**kwargs
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream call LLM API"""
) -> AsyncGenerator[str, None]:
"""Stream call LLM API - yields raw SSE event lines
Yields:
str: Raw SSE event lines for direct forwarding
"""
body = self._build_body(model, messages, tools, stream=True, **kwargs)
# Accumulators for tool calls (need to collect from delta chunks)
accumulated_tool_calls = {}
print(f"[LLM] Starting stream_call for model: {model}")
print(f"[LLM] Messages count: {len(messages)}")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
print(f"[LLM] Sending request to {self.api_url}")
async with client.stream(
"POST",
self.api_url,
headers=self._build_headers(),
json=body
) as response:
print(f"[LLM] Response status: {response.status_code}")
response.raise_for_status()
async for line in response.aiter_lines():
if not line.strip():
continue
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
yield {"type": "done"}
continue
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
if "choices" not in chunk:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
# DeepSeek reasoner: use content if available, otherwise fall back to reasoning_content
content = delta.get("content")
reasoning = delta.get("reasoning_content", "")
if content and isinstance(content, str) and content.strip():
yield {"type": "content_delta", "content": content}
elif reasoning:
yield {"type": "content_delta", "content": reasoning}
# Accumulate tool calls from delta chunks (DeepSeek sends them incrementally)
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx not in accumulated_tool_calls:
accumulated_tool_calls[idx] = {"index": idx}
if "function" in tc:
if "function" not in accumulated_tool_calls[idx]:
accumulated_tool_calls[idx]["function"] = {"name": "", "arguments": ""}
if "name" in tc["function"]:
accumulated_tool_calls[idx]["function"]["name"] += tc["function"]["name"]
if "arguments" in tc["function"]:
accumulated_tool_calls[idx]["function"]["arguments"] += tc["function"]["arguments"]
if tool_calls_delta:
yield {"type": "tool_call_delta", "tool_call": tool_calls_delta}
# Check for finish_reason to signal end of stream
choice = chunk.get("choices", [{}])[0]
finish_reason = choice.get("finish_reason")
if finish_reason:
# Build final tool_calls list from accumulated chunks
final_tool_calls = list(accumulated_tool_calls.values()) if accumulated_tool_calls else None
yield {"type": "done", "tool_calls": final_tool_calls}
if line.strip():
yield line + "\n"
except httpx.HTTPStatusError as e:
# Return error as an event instead of raising
error_text = e.response.text if e.response else str(e)
yield {"type": "error", "error": f"HTTP {e.response.status_code}: {error_text}"}
status_code = e.response.status_code if e.response else "?"
print(f"[LLM] HTTP error: {status_code}")
yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
except Exception as e:
yield {"type": "error", "error": str(e)}
print(f"[LLM] Exception: {type(e).__name__}: {str(e)}")
yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"
# Global LLM client