feat: 实现增量渲染逻辑

This commit is contained in:
ViperEkura 2026-04-12 23:05:22 +08:00
parent e93ec6d94d
commit 576eb39a51
9 changed files with 1005 additions and 375 deletions

View File

@ -22,6 +22,7 @@ luxx/
│ ├── auth.py # 认证 │ ├── auth.py # 认证
│ ├── conversations.py # 会话管理 │ ├── conversations.py # 会话管理
│ ├── messages.py # 消息处理 │ ├── messages.py # 消息处理
│ ├── providers.py # LLM 提供商管理
│ └── tools.py # 工具管理 │ └── tools.py # 工具管理
├── services/ # 服务层 ├── services/ # 服务层
│ ├── chat.py # 聊天服务 │ ├── chat.py # 聊天服务
@ -101,15 +102,63 @@ erDiagram
string id PK string id PK
string conversation_id FK string conversation_id FK
string role string role
longtext content longtext content "JSON 格式"
int token_count int token_count
datetime created_at datetime created_at
} }
USER ||--o{ CONVERSATION : "has" USER ||--o{ CONVERSATION : "has"
CONVERSATION ||--o{ MESSAGE : "has" CONVERSATION ||--o{ MESSAGE : "has"
USER ||--o{ LLM_PROVIDER : "configures"
LLM_PROVIDER {
int id PK
int user_id FK
string name
string provider_type
string base_url
string api_key
string default_model
boolean is_default
boolean enabled
datetime created_at
datetime updated_at
}
``` ```
### Message Content JSON 结构
`content` 字段统一使用 JSON 格式存储:
**User 消息:**
```json
{
"text": "用户输入的文本内容",
"attachments": [
{"name": "utils.py", "extension": "py", "content": "..."}
]
}
```
**Assistant 消息:**
```json
{
"text": "AI 回复的文本内容",
"tool_calls": [...],
"steps": [
{"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
{"id": "step-1", "index": 1, "type": "text", "content": "..."},
{"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
{"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
]
}
```
`steps` 字段是**渲染顺序的唯一数据源**,按 `index` 顺序排列。thinking、text、tool_call、tool_result 可以在多轮迭代中穿插出现。
### 5. 工具系统 ### 5. 工具系统
```mermaid ```mermaid
@ -191,6 +240,9 @@ LLM API 客户端:
| `/conversations` | GET/POST | 会话列表/创建 | | `/conversations` | GET/POST | 会话列表/创建 |
| `/conversations/{id}` | GET/DELETE | 会话详情/删除 | | `/conversations/{id}` | GET/DELETE | 会话详情/删除 |
| `/messages/stream` | POST | 流式消息发送 | | `/messages/stream` | POST | 流式消息发送 |
| `/providers` | GET/POST | LLM 提供商列表/创建 |
| `/providers/{id}` | GET/PUT/DELETE | 提供商详情/更新/删除 |
| `/providers/{id}/test` | POST | 测试提供商连接 |
| `/tools` | GET | 可用工具列表 | | `/tools` | GET | 可用工具列表 |
## 数据流 ## 数据流
@ -227,12 +279,29 @@ sequenceDiagram
| 事件 | 说明 | | 事件 | 说明 |
|------|------| |------|------|
| `text` | 文本内容增量 | | `process_step` | 结构化步骤thinking/text/tool_call/tool_result携带 `id`、`index` 确保渲染顺序 |
| `tool_call` | 工具调用请求 |
| `tool_result` | 工具执行结果 |
| `done` | 响应完成 | | `done` | 响应完成 |
| `error` | 错误信息 | | `error` | 错误信息 |
### process_step 事件格式
```json
{"type": "process_step", "step": {"id": "step-0", "index": 0, "type": "thinking", "content": "..."}}
{"type": "process_step", "step": {"id": "step-1", "index": 1, "type": "text", "content": "回复文本..."}}
{"type": "process_step", "step": {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"...\"}"}}
{"type": "process_step", "step": {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{\"success\": true, ...}"}}
```
| 字段 | 说明 |
|------|------|
| `id` | 步骤唯一标识(格式 `step-{index}` |
| `index` | 步骤序号,确保按正确顺序显示 |
| `type` | 步骤类型:`thinking` / `text` / `tool_call` / `tool_result` |
| `id_ref` | 工具调用引用 ID仅 tool_call/tool_result |
| `name` | 工具名称(仅 tool_call/tool_result |
| `arguments` | 工具调用参数 JSON 字符串(仅 tool_call |
| `content` | 内容thinking 的思考内容、text 的文本、tool_result 的返回结果) |
## 配置示例 ## 配置示例
### config.yaml ### config.yaml

View File

@ -1,7 +1,7 @@
# 配置文件 # 配置文件
app: app:
secret_key: ${APP_SECRET_KEY} secret_key: ${APP_SECRET_KEY}
debug: true debug: flase
host: 0.0.0.0 host: 0.0.0.0
port: 8000 port: 8000

View File

@ -0,0 +1,472 @@
<template>
<div ref="processRef" class="process-block" :class="{ 'is-streaming': streaming }">
<!-- Render all steps in order: thinking, text, tool_call, tool_result interleaved -->
<template v-if="processItems.length > 0">
<div v-for="item in processItems" :key="item.key">
<!-- Thinking block -->
<div v-if="item.type === 'thinking'" class="step-item thinking">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M9.663 17h4.673M12 3v1m6.364 1.636l-.707.707M21 12h-1M4 12H3m3.343-5.657l-.707-.707m2.828 9.9a5 5 0 117.072 0l-.548.547A3.374 3.374 0 0014 18.469V19a2 2 0 11-4 0v-.531c0-.895-.356-1.754-.988-2.386l-.548-.547z"/>
</svg>
<span class="step-label">思考过程</span>
<span v-if="item.summary" class="step-brief">{{ item.summary }}</span>
<svg class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key]" class="step-content">
<div class="thinking-text">{{ item.content }}</div>
</div>
</div>
<!-- Tool call block -->
<div v-else-if="item.type === 'tool_call'" class="step-item tool_call" :class="{ loading: item.loading }">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>
</svg>
<span class="step-label">{{ item.loading ? `执行工具: ${item.toolName}` : `调用工具: ${item.toolName}` }}</span>
<span v-if="item.summary && !item.loading" class="step-brief">{{ item.summary }}</span>
<span v-if="item.resultSummary" class="step-badge" :class="{ success: item.isSuccess, error: !item.isSuccess }">{{ item.resultSummary }}</span>
<span v-if="item.loading" class="loading-dots">...</span>
<svg v-if="!item.loading" class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key] && !item.loading" class="step-content">
<div class="tool-detail" style="margin-bottom: 8px;">
<span class="detail-label">调用参数:</span>
<pre>{{ item.arguments }}</pre>
</div>
<div v-if="item.result" class="tool-detail">
<span class="detail-label">返回结果:</span>
<pre>{{ expandedResultKeys[item.key] ? item.result : item.resultPreview }}</pre>
<button v-if="item.resultTruncated" class="btn-expand-result" @click.stop="toggleResultExpand(item.key)">
{{ expandedResultKeys[item.key] ? '收起' : `展开全部 (${item.resultLength} 字符)` }}
</button>
</div>
</div>
</div>
<!-- Text content -->
<div v-else-if="item.type === 'text'" class="step-item text-content" v-html="item.rendered"></div>
<!-- Tool result block -->
<div v-else-if="item.type === 'tool_result'" class="step-item tool_result">
<div class="step-header" @click="toggleItem(item.key)">
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>
</svg>
<span class="step-label">工具结果: {{ item.name }}</span>
<span v-if="item.resultSummary" class="step-badge" :class="{ success: item.isSuccess, error: !item.isSuccess }">{{ item.resultSummary }}</span>
<svg class="arrow" :class="{ open: expandedKeys[item.key] }" width="10" height="10" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<polyline points="6 9 12 15 18 9"></polyline>
</svg>
</div>
<div v-if="expandedKeys[item.key]" class="step-content">
<div class="tool-detail">
<span class="detail-label">返回结果:</span>
<pre>{{ expandedResultKeys[item.key] ? item.result : item.resultPreview }}</pre>
<button v-if="item.resultTruncated" class="btn-expand-result" @click.stop="toggleResultExpand(item.key)">
{{ expandedResultKeys[item.key] ? '收起' : `展开全部 (${item.resultLength} 字符)` }}
</button>
</div>
</div>
</div>
</div>
</template>
<!-- Active streaming indicator -->
<div v-if="streaming" class="streaming-indicator">
<svg class="spinner" width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M21 12a9 9 0 1 1-6.219-8.56"/>
</svg>
<span>正在生成...</span>
</div>
</div>
</template>
<script setup>
import { ref, computed, watch } from 'vue'
const RESULT_PREVIEW_LIMIT = 500
function formatJson(str) {
try {
const obj = typeof str === 'string' ? JSON.parse(str) : str
return JSON.stringify(obj, null, 2)
} catch {
return String(str)
}
}
function truncate(str, maxLen = 80) {
const s = String(str)
if (s.length <= maxLen) return s
return s.substring(0, maxLen) + '...'
}
function buildResultFields(rawContent) {
const formatted = formatJson(rawContent)
const len = formatted.length
const truncated = len > RESULT_PREVIEW_LIMIT
return {
result: formatted,
resultPreview: truncated ? formatted.slice(0, RESULT_PREVIEW_LIMIT) + '\n...' : formatted,
resultTruncated: truncated,
resultLength: len,
}
}
const props = defineProps({
toolCalls: { type: Array, default: () => [] },
processSteps: { type: Array, default: () => [] },
streaming: { type: Boolean, default: false }
})
const expandedKeys = ref({})
const expandedResultKeys = ref({})
// Auto-collapse all items when a new stream starts
watch(() => props.streaming, (v) => {
if (v) {
expandedKeys.value = {}
expandedResultKeys.value = {}
}
})
const processRef = ref(null)
function toggleItem(key) {
expandedKeys.value[key] = !expandedKeys.value[key]
}
function toggleResultExpand(key) {
expandedResultKeys.value[key] = !expandedResultKeys.value[key]
}
function getResultSummary(result) {
try {
const parsed = typeof result === 'string' ? JSON.parse(result) : result
if (parsed.success === true) return { text: '成功', success: true }
if (parsed.success === false || parsed.error) return { text: parsed.error || '失败', success: false }
if (parsed.results) return { text: `${parsed.results.length} 条结果`, success: true }
return { text: '完成', success: true }
} catch {
return { text: '完成', success: true }
}
}
function renderMarkdown(text) {
if (!text) return ''
// Simple markdown rendering
return text
.replace(/```(\w*)\n([\s\S]*?)```/g, '<pre><code>$2</code></pre>')
.replace(/`([^`]+)`/g, '<code>$1</code>')
.replace(/\n/g, '<br>')
}
// Build ordered process items from all available data (thinking, tool calls, text).
const processItems = computed(() => {
const items = []
if (props.processSteps && props.processSteps.length > 0) {
for (const step of props.processSteps) {
if (!step) continue
if (step.type === 'thinking') {
items.push({
type: 'thinking',
content: step.content,
summary: truncate(step.content),
key: step.id || `thinking-${step.index}`,
})
} else if (step.type === 'tool_call') {
const toolId = step.id_ref || step.id
items.push({
type: 'tool_call',
toolName: step.name || '未知工具',
arguments: formatJson(step.arguments),
summary: truncate(step.arguments),
id: toolId,
key: step.id || `tool_call-${toolId || step.index}`,
loading: false,
result: null,
})
} else if (step.type === 'tool_result') {
// tool_result
const summary = getResultSummary(step.content)
items.push({
type: 'tool_result',
id: step.id_ref || step.id,
name: step.name || 'unknown',
content: step.content,
resultSummary: summary.text,
isSuccess: summary.success,
key: step.id || `tool_result-${step.id_ref || step.index}`,
...buildResultFields(step.content)
})
} else if (step.type === 'text') {
items.push({
type: 'text',
content: step.content,
rendered: renderMarkdown(step.content) || '<span class="placeholder">...</span>',
key: step.id || `text-${step.index}`,
})
}
}
// Mark the last tool_call as loading if it has no result yet (still executing)
if (props.streaming && items.length > 0) {
const last = items[items.length - 1]
if (last.type === 'tool_call' && !last.result) {
last.loading = true
}
}
} else {
// Fallback: legacy mode for old messages without processSteps stored in DB
if (props.toolCalls && props.toolCalls.length > 0) {
props.toolCalls.forEach((call, i) => {
const toolName = call.function?.name || '未知工具'
const resultSummary = call.result ? getResultSummary(call.result) : null
const resultFields = call.result ? buildResultFields(call.result) : { result: null, resultPreview: null, resultTruncated: false, resultLength: 0 }
items.push({
type: 'tool_call',
toolName,
arguments: formatJson(call.function?.arguments),
summary: truncate(call.function?.arguments),
id: call.id,
key: `tool_call-${call.id || i}`,
loading: !call.result && props.streaming,
...resultFields,
resultSummary: resultSummary ? resultSummary.text : null,
isSuccess: resultSummary ? resultSummary.success : undefined,
})
})
}
}
return items
})
</script>
<style scoped>
.process-block {
width: 100%;
}
/* Step items (shared) */
.step-item {
margin-bottom: 8px;
}
.step-item:last-child {
margin-bottom: 0;
}
@keyframes pulse {
0%, 100% { opacity: 0.4; }
50% { opacity: 1; }
}
/* Step header (shared by thinking and tool_call) */
.thinking .step-header,
.tool_call .step-header {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
background: var(--code-bg);
border: 1px solid var(--border);
border-radius: 8px;
cursor: pointer;
font-size: 13px;
transition: background 0.15s;
}
.thinking .step-header:hover,
.tool_call .step-header:hover {
background: var(--bg-hover);
}
.thinking .step-header svg:first-child {
color: #f59e0b;
}
.tool_call .step-header svg:first-child {
color: #10b981;
}
.step-label {
font-weight: 500;
color: var(--text);
flex-shrink: 0;
min-width: 130px;
max-width: 130px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.arrow {
margin-left: auto;
transition: transform 0.2s;
color: var(--text-secondary);
flex-shrink: 0;
}
.step-badge {
font-size: 11px;
padding: 2px 8px;
border-radius: 10px;
font-weight: 500;
}
.step-badge.success {
background: rgba(16, 185, 129, 0.1);
color: #10b981;
}
.step-badge.error {
background: rgba(239, 68, 68, 0.1);
color: #ef4444;
}
.step-brief {
font-size: 11px;
color: var(--text-secondary);
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
flex: 1;
min-width: 0;
}
.arrow.open {
transform: rotate(180deg);
}
.loading-dots {
font-size: 16px;
font-weight: 700;
color: #10b981;
animation: pulse 1s ease-in-out infinite;
}
.tool_call.loading .step-header {
background: var(--bg-hover);
}
/* Tool result styling */
.tool_result .step-header {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
background: var(--code-bg);
border: 1px solid var(--border);
border-radius: 8px;
cursor: pointer;
font-size: 13px;
transition: background 0.15s;
}
.tool_result .step-header:hover {
background: var(--bg-hover);
}
.tool_result .step-header svg:first-child {
color: #10b981;
}
/* Expandable step content panel */
.step-content {
padding: 12px;
margin-top: 4px;
background: var(--bg);
border: 1px solid var(--border);
border-radius: 8px;
overflow: hidden;
}
.thinking-text {
font-size: 13px;
color: var(--text-secondary);
line-height: 1.6;
white-space: pre-wrap;
}
.tool-detail {
font-size: 13px;
}
.detail-label {
color: var(--text-secondary);
font-size: 11px;
font-weight: 600;
display: block;
margin-bottom: 4px;
}
.tool-detail pre {
padding: 8px;
background: var(--code-bg);
border-radius: 4px;
border: 1px solid var(--border);
font-family: 'JetBrains Mono', 'Fira Code', monospace;
font-size: 12px;
line-height: 1.5;
color: var(--text-secondary);
overflow-x: auto;
white-space: pre-wrap;
word-break: break-word;
}
.btn-expand-result {
display: inline-block;
margin-top: 6px;
padding: 3px 10px;
font-size: 11px;
color: #10b981;
background: rgba(16, 185, 129, 0.1);
border: 1px solid rgba(16, 185, 129, 0.3);
border-radius: 4px;
cursor: pointer;
transition: background 0.15s;
}
.btn-expand-result:hover {
background: rgba(16, 185, 129, 0.2);
}
/* Text content */
.text-content {
padding: 0;
font-size: 15px;
line-height: 1.7;
color: var(--text);
word-break: break-word;
white-space: pre-wrap;
}
.text-content :deep(.placeholder) {
color: var(--text-secondary);
}
/* Streaming cursor indicator */
.streaming-indicator {
display: flex;
align-items: center;
gap: 8px;
font-size: 12px;
color: var(--text-secondary);
}
/* Add separator only when there are step items above the indicator */
.process-block:has(.step-item) .streaming-indicator {
margin-top: 8px;
padding: 8px 0 0;
border-top: 1px solid var(--border);
}
</style>

View File

@ -36,101 +36,133 @@ api.interceptors.response.use(
} }
) )
/**
* SSE 流式请求处理器
* @param {string} url - API URL (不含 baseURL 前缀)
* @param {object} body - 请求体
* @param {object} callbacks - 事件回调: { onProcessStep, onDone, onError }
* @returns {{ abort: () => void }}
*/
export function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
const token = localStorage.getItem('access_token')
const controller = new AbortController()
const promise = (async () => {
try {
const res = await fetch(`/api${url}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify(body),
signal: controller.signal
})
if (!res.ok) {
const err = await res.json().catch(() => ({}))
throw new Error(err.message || `HTTP ${res.status}`)
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
let completed = false
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6))
if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data)
} else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data)
} else if (currentEvent === 'error' && onError) {
onError(data.content)
}
}
}
}
if (!completed && onError) {
onError('stream ended unexpectedly')
}
} catch (e) {
if (e.name !== 'AbortError' && onError) {
onError(e.message)
}
}
})()
promise.abort = () => controller.abort()
return promise
}
// ============ 认证接口 ============ // ============ 认证接口 ============
export const authAPI = { export const authAPI = {
// 用户登录
login: (data) => api.post('/auth/login', data), login: (data) => api.post('/auth/login', data),
// 用户注册
register: (data) => api.post('/auth/register', data), register: (data) => api.post('/auth/register', data),
// 用户登出
logout: () => api.post('/auth/logout'), logout: () => api.post('/auth/logout'),
// 获取当前用户信息
getMe: () => api.get('/auth/me') getMe: () => api.get('/auth/me')
} }
// ============ 会话接口 ============ // ============ 会话接口 ============
export const conversationsAPI = { export const conversationsAPI = {
// 获取会话列表
list: (params) => api.get('/conversations/', { params }), list: (params) => api.get('/conversations/', { params }),
// 创建会话
create: (data) => api.post('/conversations/', data), create: (data) => api.post('/conversations/', data),
// 获取会话详情
get: (id) => api.get(`/conversations/${id}`), get: (id) => api.get(`/conversations/${id}`),
// 更新会话
update: (id, data) => api.put(`/conversations/${id}`, data), update: (id, data) => api.put(`/conversations/${id}`, data),
// 删除会话
delete: (id) => api.delete(`/conversations/${id}`) delete: (id) => api.delete(`/conversations/${id}`)
} }
// ============ 消息接口 ============ // ============ 消息接口 ============
export const messagesAPI = { export const messagesAPI = {
// 获取消息列表
list: (conversationId, params) => api.get('/messages/', { params: { conversation_id: conversationId, ...params } }), list: (conversationId, params) => api.get('/messages/', { params: { conversation_id: conversationId, ...params } }),
// 发送消息(非流式)
send: (data) => api.post('/messages/', data), send: (data) => api.post('/messages/', data),
// 发送消息(流式)- 使用原生 fetch 避免 axios 拦截 // 发送消息(流式)
sendStream: (data) => { sendStream: (data, callbacks) => {
const token = localStorage.getItem('access_token') return createSSEStream('/messages/stream', {
return fetch('/api/messages/stream', { conversation_id: data.conversation_id,
method: 'POST', content: data.content,
headers: { tools_enabled: callbacks.toolsEnabled !== false
'Content-Type': 'application/json', }, callbacks)
'Authorization': `Bearer ${token}`
},
body: JSON.stringify(data)
})
}, },
// 删除消息
delete: (id) => api.delete(`/messages/${id}`) delete: (id) => api.delete(`/messages/${id}`)
} }
// ============ 工具接口 ============ // ============ 工具接口 ============
export const toolsAPI = { export const toolsAPI = {
// 获取工具列表
list: (params) => api.get('/tools/', { params }), list: (params) => api.get('/tools/', { params }),
// 获取工具详情
get: (name) => api.get(`/tools/${name}`), get: (name) => api.get(`/tools/${name}`),
// 执行工具
execute: (name, data) => api.post(`/tools/${name}/execute`, data) execute: (name, data) => api.post(`/tools/${name}/execute`, data)
} }
// ============ LLM Provider 接口 ============ // ============ LLM Provider 接口 ============
export const providersAPI = { export const providersAPI = {
// 获取提供商列表
list: () => api.get('/providers/'), list: () => api.get('/providers/'),
// 创建提供商
create: (data) => api.post('/providers/', data), create: (data) => api.post('/providers/', data),
// 获取提供商详情
get: (id) => api.get(`/providers/${id}`), get: (id) => api.get(`/providers/${id}`),
// 更新提供商
update: (id, data) => api.put(`/providers/${id}`, data), update: (id, data) => api.put(`/providers/${id}`, data),
// 删除提供商
delete: (id) => api.delete(`/providers/${id}`), delete: (id) => api.delete(`/providers/${id}`),
// 测试连接
test: (id) => api.post(`/providers/${id}/test`) test: (id) => api.post(`/providers/${id}/test`)
} }
// 默认导出 export default api
export default api

View File

@ -9,14 +9,23 @@
<div v-for="msg in messages" :key="msg.id" :class="['message', msg.role]"> <div v-for="msg in messages" :key="msg.id" :class="['message', msg.role]">
<div class="message-avatar">{{ msg.role === 'user' ? 'U' : 'A' }}</div> <div class="message-avatar">{{ msg.role === 'user' ? 'U' : 'A' }}</div>
<div class="message-content"> <div class="message-content">
<div class="message-text">{{ msg.content }}</div> <ProcessBlock
v-if="msg.process_steps && msg.process_steps.length > 0"
:process-steps="msg.process_steps"
/>
<div v-else class="message-text">{{ msg.content || msg.text }}</div>
<div class="message-time">{{ formatTime(msg.created_at) }}</div> <div class="message-time">{{ formatTime(msg.created_at) }}</div>
</div> </div>
</div> </div>
<div v-if="streaming" class="message assistant streaming">
<!-- 流式消息 -->
<div v-if="streamingMessage" class="message assistant streaming">
<div class="message-avatar">A</div> <div class="message-avatar">A</div>
<div class="message-content"> <div class="message-content">
<div class="message-text">{{ streamContent }}<span class="cursor"></span></div> <ProcessBlock
:process-steps="streamingMessage.process_steps"
:streaming="true"
/>
</div> </div>
</div> </div>
</div> </div>
@ -40,14 +49,14 @@
import { ref, onMounted, nextTick } from 'vue' import { ref, onMounted, nextTick } from 'vue'
import { useRoute } from 'vue-router' import { useRoute } from 'vue-router'
import { conversationsAPI, messagesAPI } from '../services/api.js' import { conversationsAPI, messagesAPI } from '../services/api.js'
import ProcessBlock from '../components/ProcessBlock.vue'
const route = useRoute() const route = useRoute()
const messages = ref([]) const messages = ref([])
const inputMessage = ref('') const inputMessage = ref('')
const loading = ref(true) const loading = ref(true)
const sending = ref(false) const sending = ref(false)
const streaming = ref(false) const streamingMessage = ref(null)
const streamContent = ref('')
const messagesContainer = ref(null) const messagesContainer = ref(null)
const conversationId = ref(route.params.id) const conversationId = ref(route.params.id)
@ -78,104 +87,61 @@ const sendMessage = async () => {
id: Date.now(), id: Date.now(),
role: 'user', role: 'user',
content: content, content: content,
text: content,
attachments: [],
process_steps: [],
created_at: new Date().toISOString() created_at: new Date().toISOString()
}) })
scrollToBottom() scrollToBottom()
try { //
streaming.value = true streamingMessage.value = {
streamContent.value = '' id: Date.now() + 1,
role: 'assistant',
const response = await messagesAPI.sendStream({ process_steps: [],
conversation_id: conversationId.value, created_at: new Date().toISOString()
content: content, }
tools_enabled: true
}) // SSE
messagesAPI.sendStream(
const reader = response.body.getReader() { conversation_id: conversationId.value, content },
const decoder = new TextDecoder() {
onProcessStep: (step) => {
while (true) { if (!streamingMessage.value) return
const { done, value } = await reader.read() // id
if (done) break const idx = streamingMessage.value.process_steps.findIndex(s => s.id === step.id)
if (idx >= 0) {
const chunk = decoder.decode(value) streamingMessage.value.process_steps[idx] = step
const lines = chunk.split('\n') } else {
streamingMessage.value.process_steps.push(step)
for (const line of lines) { }
if (line.startsWith('data: ')) { },
const data = line.slice(6) onDone: () => {
if (data === '[DONE]') continue //
if (streamingMessage.value) {
try { messages.value.push({
const parsed = JSON.parse(data) ...streamingMessage.value,
if (parsed.type === 'text') { created_at: new Date().toISOString()
streamContent.value += parsed.content })
} else if (parsed.type === 'tool_call') { streamingMessage.value = null
// }
const data = parsed.data },
if (data && Array.isArray(data) && data.length > 0) { onError: (error) => {
// console.error('Stream error:', error)
const hasFunctionName = data.some(tc => tc.function && tc.function.name) if (streamingMessage.value) {
if (hasFunctionName) { streamingMessage.value.process_steps.push({
streamContent.value += '\n\n[调用工具] ' id: 'error',
data.forEach(tc => { index: streamingMessage.value.process_steps.length,
if (tc.function && tc.function.name) { type: 'text',
streamContent.value += `${tc.function.name} ` content: `[错误] ${error}`
} })
})
}
}
} else if (parsed.type === 'tool_result') {
//
streamContent.value += '\n\n[工具结果]\n'
if (Array.isArray(parsed.data)) {
parsed.data.forEach(tr => {
if (tr.content) {
try {
const result = JSON.parse(tr.content)
if (result.success && result.data && result.data.results) {
result.data.results.forEach(r => {
streamContent.value += `${r.title}\n${r.snippet}\n\n`
})
} else {
streamContent.value += tr.content.substring(0, 500)
}
} catch {
streamContent.value += tr.content.substring(0, 500)
}
} else {
streamContent.value += '无结果'
}
})
}
} else if (parsed.type === 'error') {
streamContent.value += '\n\n[错误] ' + (parsed.error || '未知错误')
}
} catch (e) {
console.error('Parse error:', e, data)
}
} }
} }
} }
)
//
if (streamContent.value) { sending.value = false
messages.value.push({ scrollToBottom()
id: Date.now() + 1,
role: 'assistant',
content: streamContent.value,
created_at: new Date().toISOString()
})
}
} catch (e) {
console.error('发送失败:', e)
alert('发送失败: ' + e.message)
} finally {
sending.value = false
streaming.value = false
scrollToBottom()
}
} }
const scrollToBottom = () => { const scrollToBottom = () => {
@ -203,12 +169,10 @@ onMounted(loadMessages)
.message.user { flex-direction: row-reverse; } .message.user { flex-direction: row-reverse; }
.message-avatar { width: 40px; height: 40px; background: var(--code-bg); border-radius: 50%; display: flex; align-items: center; justify-content: center; font-size: 1.2rem; flex-shrink: 0; } .message-avatar { width: 40px; height: 40px; background: var(--code-bg); border-radius: 50%; display: flex; align-items: center; justify-content: center; font-size: 1.2rem; flex-shrink: 0; }
.message.user .message-avatar { background: var(--accent-bg); } .message.user .message-avatar { background: var(--accent-bg); }
.message-content { max-width: 70%; } .message-content { max-width: 80%; }
.message-text { padding: 1rem; background: var(--code-bg); border-radius: 12px; line-height: 1.6; white-space: pre-wrap; } .message-text { padding: 1rem; background: var(--code-bg); border-radius: 12px; line-height: 1.6; white-space: pre-wrap; }
.message.user .message-text { background: var(--accent); color: white; } .message.user .message-text { background: var(--accent); color: white; }
.message-time { font-size: 0.75rem; color: var(--text); margin-top: 0.25rem; } .message-time { font-size: 0.75rem; color: var(--text-secondary); margin-top: 0.25rem; }
.cursor { animation: blink 1s infinite; }
@keyframes blink { 0%, 50% { opacity: 1; } 51%, 100% { opacity: 0; } }
.input-area { display: flex; gap: 0.75rem; padding: 1rem; border-top: 1px solid var(--border); } .input-area { display: flex; gap: 0.75rem; padding: 1rem; border-top: 1px solid var(--border); }
.input-area textarea { flex: 1; padding: 0.875rem 1rem; border: 1px solid var(--border); border-radius: 12px; resize: none; font-size: 1rem; background: var(--bg); color: var(--text); } .input-area textarea { flex: 1; padding: 0.875rem 1rem; border: 1px solid var(--border); border-radius: 12px; resize: none; font-size: 1rem; background: var(--bg); color: var(--text); }
.input-area textarea:focus { outline: none; border-color: var(--accent); } .input-area textarea:focus { outline: none; border-color: var(--accent); }

View File

@ -130,13 +130,36 @@ class Conversation(Base):
class Message(Base): class Message(Base):
"""Message model""" """Message model
content 字段统一使用 JSON 格式存储
**User 消息**
{
"text": "用户输入的文本内容",
"attachments": [
{"name": "utils.py", "extension": "py", "content": "..."}
]
}
**Assistant 消息**
{
"text": "AI 回复的文本内容",
"tool_calls": [...], // 遗留的扁平结构
"steps": [ // 有序步骤用于渲染主要数据源
{"id": "step-0", "index": 0, "type": "thinking", "content": "..."},
{"id": "step-1", "index": 1, "type": "text", "content": "..."},
{"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_xxx", "name": "...", "arguments": "..."},
{"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_xxx", "name": "...", "content": "..."}
]
}
"""
__tablename__ = "messages" __tablename__ = "messages"
id: Mapped[str] = mapped_column(String(64), primary_key=True) id: Mapped[str] = mapped_column(String(64), primary_key=True)
conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False) conversation_id: Mapped[str] = mapped_column(String(64), ForeignKey("conversations.id"), nullable=False)
role: Mapped[str] = mapped_column(String(16), nullable=False) role: Mapped[str] = mapped_column(String(16), nullable=False) # user, assistant, system, tool
content: Mapped[str] = mapped_column(Text, nullable=False) content: Mapped[str] = mapped_column(Text, nullable=False, default="")
token_count: Mapped[int] = mapped_column(Integer, default=0) token_count: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
@ -144,11 +167,39 @@ class Message(Base):
conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages") conversation: Mapped["Conversation"] = relationship("Conversation", back_populates="messages")
def to_dict(self): def to_dict(self):
return { """Convert to dictionary, extracting process_steps for frontend"""
import json
result = {
"id": self.id, "id": self.id,
"conversation_id": self.conversation_id, "conversation_id": self.conversation_id,
"role": self.role, "role": self.role,
"content": self.content,
"token_count": self.token_count, "token_count": self.token_count,
"created_at": self.created_at.isoformat() if self.created_at else None "created_at": self.created_at.isoformat() if self.created_at else None
} }
# Parse content JSON
try:
content_obj = json.loads(self.content) if self.content else {}
except json.JSONDecodeError:
# Legacy plain text content
result["content"] = self.content
result["text"] = self.content
result["attachments"] = []
result["tool_calls"] = []
result["process_steps"] = []
return result
# Extract common fields
result["text"] = content_obj.get("text", "")
result["attachments"] = content_obj.get("attachments", [])
result["tool_calls"] = content_obj.get("tool_calls", [])
# Extract steps as process_steps for frontend rendering
result["process_steps"] = content_obj.get("steps", [])
# For backward compatibility
if "content" not in result:
result["content"] = result["text"]
return result

View File

@ -136,44 +136,13 @@ async def stream_message(
db.commit() db.commit()
async def event_generator(): async def event_generator():
full_response = "" async for sse_str in chat_service.stream_response(
async for event in chat_service.stream_response(
conversation=conversation, conversation=conversation,
user_message=data.content, user_message=data.content,
tools_enabled=tools_enabled tools_enabled=tools_enabled
): ):
event_type = event.get("type") # Chat service returns raw SSE strings
yield sse_str
if event_type == "text":
content = event.get("content", "")
full_response += content
yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n"
elif event_type == "tool_call":
yield f"data: {json.dumps({'type': 'tool_call', 'data': event.get('data')})}\n\n"
elif event_type == "tool_result":
yield f"data: {json.dumps({'type': 'tool_result', 'data': event.get('data')})}\n\n"
elif event_type == "done":
try:
ai_message = Message(
id=generate_id("msg"),
conversation_id=data.conversation_id,
role="assistant",
content=full_response,
token_count=len(full_response) // 4
)
db.add(ai_message)
db.commit()
except Exception:
pass
yield f"data: {json.dumps({'type': 'done', 'message_id': ai_message.id if 'ai_message' in dir() else None})}\n\n"
elif event_type == "error":
yield f"data: {json.dumps({'type': 'error', 'error': event.get('error')})}\n\n"
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"

View File

@ -1,5 +1,6 @@
"""Chat service module""" """Chat service module"""
import json import json
import uuid
from typing import List, Dict, Any, AsyncGenerator from typing import List, Dict, Any, AsyncGenerator
from luxx.models import Conversation, Message from luxx.models import Conversation, Message
@ -13,6 +14,11 @@ from luxx.config import config
MAX_ITERATIONS = 10 MAX_ITERATIONS = 10
def _sse_event(event: str, data: dict) -> str:
"""Format a Server-Sent Event string."""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def get_llm_client(conversation: Conversation = None): def get_llm_client(conversation: Conversation = None):
"""Get LLM client, optionally using conversation's provider""" """Get LLM client, optionally using conversation's provider"""
if conversation and conversation.provider_id: if conversation and conversation.provider_id:
@ -37,7 +43,7 @@ def get_llm_client(conversation: Conversation = None):
class ChatService: class ChatService:
"""Chat service""" """Chat service with tool support"""
def __init__(self): def __init__(self):
self.tool_executor = ToolExecutor() self.tool_executor = ToolExecutor()
@ -66,9 +72,19 @@ class ChatService:
).order_by(Message.created_at).all() ).order_by(Message.created_at).all()
for msg in db_messages: for msg in db_messages:
# Parse JSON content if possible
try:
content_obj = json.loads(msg.content) if msg.content else {}
if isinstance(content_obj, dict):
content = content_obj.get("text", msg.content)
else:
content = msg.content
except (json.JSONDecodeError, TypeError):
content = msg.content
messages.append({ messages.append({
"role": msg.role, "role": msg.role,
"content": msg.content "content": content
}) })
finally: finally:
db.close() db.close()
@ -80,163 +96,263 @@ class ChatService:
conversation: Conversation, conversation: Conversation,
user_message: str, user_message: str,
tools_enabled: bool = True tools_enabled: bool = True
) -> AsyncGenerator[Dict[str, Any], None]: ) -> AsyncGenerator[Dict[str, str], None]:
""" """
Streaming response generator Streaming response generator
Event types: Yields raw SSE event strings for direct forwarding.
- process_step: thinking/text/tool_call/tool_result step
- done: final response complete
- error: on error
""" """
try: try:
messages = self.build_messages(conversation) messages = self.build_messages(conversation)
messages.append({ messages.append({
"role": "user", "role": "user",
"content": user_message "content": json.dumps({"text": user_message, "attachments": []})
}) })
tools = registry.list_all() if tools_enabled else None tools = registry.list_all() if tools_enabled else None
iteration = 0
llm = get_llm_client(conversation) llm = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4" model = conversation.model or llm.default_model or "gpt-4"
while iteration < MAX_ITERATIONS: # State tracking
iteration += 1 all_steps = []
print(f"[CHAT DEBUG] ====== Starting iteration {iteration} ======") all_tool_calls = []
print(f"[CHAT DEBUG] Messages count: {len(messages)}") all_tool_results = []
step_index = 0
for iteration in range(MAX_ITERATIONS):
print(f"[CHAT] Starting iteration {iteration + 1}, messages: {len(messages)}")
tool_calls_this_round = None # Stream from LLM
full_content = ""
full_thinking = ""
tool_calls_list = []
thinking_step_id = None
thinking_step_idx = None
text_step_id = None
text_step_idx = None
async for event in llm.stream_call( async for sse_line in llm.stream_call(
model=model, model=model,
messages=messages, messages=messages,
tools=tools, tools=tools,
temperature=conversation.temperature, temperature=conversation.temperature,
max_tokens=conversation.max_tokens max_tokens=conversation.max_tokens
): ):
event_type = event.get("type") # Parse SSE line
# Format: "event: xxx\ndata: {...}\n\n"
event_type = None
data_str = None
if event_type == "content_delta": for line in sse_line.strip().split('\n'):
content = event.get("content", "") if line.startswith('event: '):
if content: event_type = line[7:].strip()
print(f"[CHAT DEBUG] Iteration {iteration} content: {content[:100]}...") elif line.startswith('data: '):
yield {"type": "text", "content": content} data_str = line[6:].strip()
elif event_type == "tool_call_delta": if data_str is None:
tool_call = event.get("tool_call", {}) continue
yield {"type": "tool_call", "data": tool_call}
elif event_type == "done": # Handle error events from LLM
tool_calls_this_round = event.get("tool_calls") if event_type == 'error':
print(f"[CHAT DEBUG] Done event, tool_calls: {tool_calls_this_round}") try:
error_data = json.loads(data_str)
if tool_calls_this_round and tools_enabled: yield _sse_event("error", {"content": error_data.get("content", "Unknown error")})
print(f"[CHAT DEBUG] Executing tools: {tool_calls_this_round}") except json.JSONDecodeError:
yield {"type": "tool_call", "data": tool_calls_this_round} yield _sse_event("error", {"content": data_str})
return
tool_results = self.tool_executor.process_tool_calls_parallel(
tool_calls_this_round, # Parse the data
{} try:
) chunk = json.loads(data_str)
except json.JSONDecodeError:
messages.append({ continue
"role": "assistant",
"content": "", # Get delta
"tool_calls": tool_calls_this_round choices = chunk.get("choices", [])
if not choices:
continue
delta = choices[0].get("delta", {})
# Handle reasoning (thinking)
reasoning = delta.get("reasoning_content", "")
if reasoning:
full_thinking += reasoning
if thinking_step_id is None:
thinking_step_id = f"step-{step_index}"
thinking_step_idx = step_index
step_index += 1
yield _sse_event("process_step", {
"id": thinking_step_id,
"index": thinking_step_idx,
"type": "thinking",
"content": full_thinking
})
# Handle content
content = delta.get("content", "")
if content:
full_content += content
if text_step_id is None:
text_step_idx = step_index
text_step_id = f"step-{text_step_idx}"
step_index += 1
yield _sse_event("process_step", {
"id": text_step_id,
"index": text_step_idx,
"type": "text",
"content": full_content
})
# Accumulate tool calls
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx >= len(tool_calls_list):
tool_calls_list.append({
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
}) })
func = tc.get("function", {})
for tr in tool_results: if func.get("name"):
messages.append({ tool_calls_list[idx]["function"]["name"] += func["name"]
"role": "tool", if func.get("arguments"):
"tool_call_id": tr.get("tool_call_id"), tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
"content": str(tr.get("result", ""))
})
yield {"type": "tool_result", "data": tool_results}
else:
break
if not tool_calls_this_round or not tools_enabled: # Save thinking step
break if thinking_step_id is not None:
all_steps.append({
yield {"type": "done"} "id": thinking_step_id,
"index": thinking_step_idx,
except Exception as e: "type": "thinking",
print(f"[CHAT ERROR] Exception in stream_response: {type(e).__name__}: {str(e)}") "content": full_thinking
yield {"type": "error", "error": str(e)} })
except Exception as e:
yield {"type": "error", "error": str(e)}
def non_stream_response(
self,
conversation: Conversation,
user_message: str,
tools_enabled: bool = False
) -> Dict[str, Any]:
"""Non-streaming response"""
try:
messages = self.build_messages(conversation)
messages.append({
"role": "user",
"content": user_message
})
tools = registry.list_all() if tools_enabled else None
iteration = 0
llm_client = get_llm_client(conversation)
model = conversation.model or llm_client.default_model or "gpt-4"
while iteration < MAX_ITERATIONS:
iteration += 1
response = llm_client.sync_call( # Save text step
model=model, if text_step_id is not None:
messages=messages, all_steps.append({
tools=tools, "id": text_step_id,
temperature=conversation.temperature, "index": text_step_idx,
max_tokens=conversation.max_tokens "type": "text",
) "content": full_content
})
tool_calls = response.tool_calls # Handle tool calls
if tool_calls_list:
if tool_calls and tools_enabled: all_tool_calls.extend(tool_calls_list)
# Yield tool_call steps
for tc in tool_calls_list:
call_step = {
"id": f"step-{step_index}",
"index": step_index,
"type": "tool_call",
"id_ref": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
all_steps.append(call_step)
yield _sse_event("process_step", call_step)
step_index += 1
# Execute tools
tool_results = self.tool_executor.process_tool_calls_parallel(
tool_calls_list, {}
)
# Yield tool_result steps
for tr in tool_results:
result_step = {
"id": f"step-{step_index}",
"index": step_index,
"type": "tool_result",
"id_ref": tr.get("tool_call_id", ""),
"name": tr.get("name", ""),
"content": tr.get("content", "")
}
all_steps.append(result_step)
yield _sse_event("process_step", result_step)
step_index += 1
all_tool_results.append({
"role": "tool",
"tool_call_id": tr.get("tool_call_id", ""),
"content": tr.get("content", "")
})
# Add assistant message with tool calls for next iteration
messages.append({ messages.append({
"role": "assistant", "role": "assistant",
"content": response.content, "content": full_content or "",
"tool_calls": tool_calls "tool_calls": tool_calls_list
}) })
messages.extend(all_tool_results[-len(tool_results):])
tool_results = self.tool_executor.process_tool_calls_parallel(tool_calls) all_tool_results = []
continue
for tr in tool_results:
messages.append({ # No tool calls - final iteration, save message
"role": "tool", msg_id = str(uuid.uuid4())
"tool_call_id": tr.get("tool_call_id"), self._save_message(
"content": str(tr.get("result", "")) conversation.id,
}) msg_id,
else: full_content,
return { all_tool_calls,
"success": True, all_tool_results,
"content": response.content all_steps
} )
yield _sse_event("done", {
"message_id": msg_id,
"token_count": len(full_content) // 4
})
return
return { # Max iterations exceeded
"success": True, yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
"content": "Max iterations reached"
}
except Exception as e: except Exception as e:
return { print(f"[CHAT] Exception: {type(e).__name__}: {str(e)}")
"success": False, yield _sse_event("error", {"content": str(e)})
"error": str(e)
} def _save_message(
self,
conversation_id: str,
msg_id: str,
full_content: str,
all_tool_calls: list,
all_tool_results: list,
all_steps: list
):
"""Save the assistant message to database."""
from luxx.database import SessionLocal
from luxx.models import Message
content_json = {
"text": full_content,
"steps": all_steps
}
if all_tool_calls:
content_json["tool_calls"] = all_tool_calls
db = SessionLocal()
try:
msg = Message(
id=msg_id,
conversation_id=conversation_id,
role="assistant",
content=json.dumps(content_json, ensure_ascii=False),
token_count=len(full_content) // 4
)
db.add(msg)
db.commit()
except Exception as e:
print(f"[CHAT] Failed to save message: {e}")
db.rollback()
finally:
db.close()
# Global chat service # Global chat service

View File

@ -136,82 +136,39 @@ class LLMClient:
messages: List[Dict], messages: List[Dict],
tools: Optional[List[Dict]] = None, tools: Optional[List[Dict]] = None,
**kwargs **kwargs
) -> AsyncGenerator[Dict[str, Any], None]: ) -> AsyncGenerator[str, None]:
"""Stream call LLM API""" """Stream call LLM API - yields raw SSE event lines
Yields:
str: Raw SSE event lines for direct forwarding
"""
body = self._build_body(model, messages, tools, stream=True, **kwargs) body = self._build_body(model, messages, tools, stream=True, **kwargs)
# Accumulators for tool calls (need to collect from delta chunks) print(f"[LLM] Starting stream_call for model: {model}")
accumulated_tool_calls = {} print(f"[LLM] Messages count: {len(messages)}")
try: try:
async with httpx.AsyncClient(timeout=120.0) as client: async with httpx.AsyncClient(timeout=120.0) as client:
print(f"[LLM] Sending request to {self.api_url}")
async with client.stream( async with client.stream(
"POST", "POST",
self.api_url, self.api_url,
headers=self._build_headers(), headers=self._build_headers(),
json=body json=body
) as response: ) as response:
print(f"[LLM] Response status: {response.status_code}")
response.raise_for_status() response.raise_for_status()
async for line in response.aiter_lines(): async for line in response.aiter_lines():
if not line.strip(): if line.strip():
continue yield line + "\n"
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
yield {"type": "done"}
continue
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
if "choices" not in chunk:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
# DeepSeek reasoner: use content if available, otherwise fall back to reasoning_content
content = delta.get("content")
reasoning = delta.get("reasoning_content", "")
if content and isinstance(content, str) and content.strip():
yield {"type": "content_delta", "content": content}
elif reasoning:
yield {"type": "content_delta", "content": reasoning}
# Accumulate tool calls from delta chunks (DeepSeek sends them incrementally)
tool_calls_delta = delta.get("tool_calls", [])
for tc in tool_calls_delta:
idx = tc.get("index", 0)
if idx not in accumulated_tool_calls:
accumulated_tool_calls[idx] = {"index": idx}
if "function" in tc:
if "function" not in accumulated_tool_calls[idx]:
accumulated_tool_calls[idx]["function"] = {"name": "", "arguments": ""}
if "name" in tc["function"]:
accumulated_tool_calls[idx]["function"]["name"] += tc["function"]["name"]
if "arguments" in tc["function"]:
accumulated_tool_calls[idx]["function"]["arguments"] += tc["function"]["arguments"]
if tool_calls_delta:
yield {"type": "tool_call_delta", "tool_call": tool_calls_delta}
# Check for finish_reason to signal end of stream
choice = chunk.get("choices", [{}])[0]
finish_reason = choice.get("finish_reason")
if finish_reason:
# Build final tool_calls list from accumulated chunks
final_tool_calls = list(accumulated_tool_calls.values()) if accumulated_tool_calls else None
yield {"type": "done", "tool_calls": final_tool_calls}
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
# Return error as an event instead of raising status_code = e.response.status_code if e.response else "?"
error_text = e.response.text if e.response else str(e) print(f"[LLM] HTTP error: {status_code}")
yield {"type": "error", "error": f"HTTP {e.response.status_code}: {error_text}"} yield f"event: error\ndata: {json.dumps({'content': f'HTTP {status_code}: Request failed'})}\n\n"
except Exception as e: except Exception as e:
yield {"type": "error", "error": str(e)} print(f"[LLM] Exception: {type(e).__name__}: {str(e)}")
yield f"event: error\ndata: {json.dumps({'content': str(e)})}\n\n"
# Global LLM client # Global LLM client