diff --git a/dashboard/src/utils/store.js b/dashboard/src/utils/store.js index 8a05980..1187e6d 100644 --- a/dashboard/src/utils/store.js +++ b/dashboard/src/utils/store.js @@ -3,3 +3,6 @@ import { createPinia } from 'pinia' const pinia = createPinia() export default pinia + +// 导出 store 供其他地方使用 +export { useStreamStore } from './streamStore.js' diff --git a/dashboard/src/utils/streamManager.js b/dashboard/src/utils/streamManager.js new file mode 100644 index 0000000..6ea968b --- /dev/null +++ b/dashboard/src/utils/streamManager.js @@ -0,0 +1,217 @@ +/** + * StreamManager - 管理多个并发 SSE 流 + * + * 功能: + * 1. 同时管理多个会话的流式请求 + * 2. 支持取消、重试等操作 + * 3. 与 Pinia store 集成 + */ +import { useStreamStore } from './streamStore.js' + +class StreamManager { + constructor() { + // 存储所有活跃的流:{ conversationId: { abort, promise } } + this.activeStreams = {} + // SSE 解码器 + this.decoder = new TextDecoder() + } + + /** + * 启动一个新的流 + * @param {string} conversationId - 会话 ID + * @param {object} data - 请求数据 + * @param {string} userMessageId - 用户消息 ID + */ + async startStream(conversationId, data, userMessageId) { + const streamStore = useStreamStore() + + // 如果该会话已有活跃流,先取消 + if (this.activeStreams[conversationId]) { + this.cancelStream(conversationId) + } + + const controller = new AbortController() + this.activeStreams[conversationId] = { controller } + + // 初始化 store 中的流状态 + streamStore.initStream(conversationId, userMessageId) + + const promise = this._executeStream(conversationId, data, controller.signal) + this.activeStreams[conversationId].promise = promise + + return promise + } + + /** + * 执行 SSE 流 + */ + async _executeStream(conversationId, data, signal) { + const streamStore = useStreamStore() + const token = localStorage.getItem('access_token') + + try { + const res = await fetch('/api/messages/stream', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}` + }, + body: JSON.stringify({ + conversation_id: data.conversation_id, + content: data.content, + thinking_enabled: data.thinking_enabled || false, + enabled_tools: data.enabled_tools || [] + }), + signal + }) + + if (!res.ok) { + const err = await res.json().catch(() => ({})) + throw new Error(err.message || `HTTP ${res.status}`) + } + + const reader = res.body.getReader() + let buffer = '' + let completed = false + + while (true) { + const { done, value } = await reader.read() + + if (value) { + buffer += this.decoder.decode(value, { stream: true }) + } + + if (done) { + // 处理 buffer 中剩余的数据 + this._processBuffer(conversationId, buffer, streamStore, () => { + completed = true + }) + + if (!completed) { + streamStore.errorStream(conversationId, 'stream ended without done event') + } + break + } + + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + this._processLines(conversationId, lines, streamStore) + } + + // 流结束但没有收到 done 事件 + if (!completed) { + streamStore.errorStream(conversationId, 'stream ended unexpectedly') + } + } catch (e) { + if (e.name !== 'AbortError') { + console.error('Stream error:', e) + streamStore.errorStream(conversationId, e.message) + } + } finally { + // 清理活跃流记录 + delete this.activeStreams[conversationId] + } + } + + /** + * 处理缓冲区 + */ + _processBuffer(conversationId, buffer, streamStore, onComplete) { + const lines = buffer.split('\n') + let currentEvent = '' + + for (const line of lines) { + if (line.startsWith('event: ')) { + currentEvent = line.slice(7).trim() + } else if (line.startsWith('data: ')) { + try { + const data = JSON.parse(line.slice(6)) + this._handleEvent(conversationId, currentEvent, data, streamStore, onComplete) + } catch (e) { + console.error('SSE parse error:', e, 'line:', line) + } + } + } + } + + /** + * 处理行 + */ + _processLines(conversationId, lines, streamStore) { + let currentEvent = '' + + for (const line of lines) { + if (line.startsWith('event: ')) { + currentEvent = line.slice(7).trim() + } else if (line.startsWith('data: ')) { + try { + const data = JSON.parse(line.slice(6)) + this._handleEvent(conversationId, currentEvent, data, streamStore, null) + } catch (e) { + // 忽略解析错误 + } + } + } + } + + /** + * 处理事件 + */ + _handleEvent(conversationId, eventType, data, streamStore, onComplete) { + switch (eventType) { + case 'process_step': + streamStore.updateStep(conversationId, data.step) + break + case 'done': + streamStore.completeStream(conversationId, data) + if (onComplete) onComplete() + break + case 'error': + streamStore.errorStream(conversationId, data.content) + if (onComplete) onComplete() + break + } + } + + /** + * 取消指定会话的流 + */ + cancelStream(conversationId) { + const stream = this.activeStreams[conversationId] + if (stream) { + stream.controller.abort() + delete this.activeStreams[conversationId] + } + // 清除 store 中的流状态 + const streamStore = useStreamStore() + streamStore.clearStream(conversationId) + } + + /** + * 取消所有流 + */ + cancelAll() { + Object.keys(this.activeStreams).forEach(conversationId => { + this.cancelStream(conversationId) + }) + } + + /** + * 检查指定会话是否有活跃流 + */ + hasActiveStream(conversationId) { + return !!this.activeStreams[conversationId] + } + + /** + * 获取活跃流数量 + */ + getActiveCount() { + return Object.keys(this.activeStreams).length + } +} + +// 导出单例 +export const streamManager = new StreamManager() +export default streamManager diff --git a/dashboard/src/utils/streamStore.js b/dashboard/src/utils/streamStore.js new file mode 100644 index 0000000..639897c --- /dev/null +++ b/dashboard/src/utils/streamStore.js @@ -0,0 +1,102 @@ +/** + * 流状态管理 Store + * 管理多个会话的并发流式消息状态 + */ +import { defineStore } from 'pinia' +import { ref, computed } from 'vue' + +export const useStreamStore = defineStore('stream', () => { + // 存储所有活跃的流状态:{ conversationId: StreamState } + const streams = ref({}) + + // 获取指定会话的流状态 + const getStreamState = (conversationId) => { + return streams.value[conversationId] || null + } + + // 检查指定会话是否有活跃的流 + const hasActiveStream = (conversationId) => { + const state = streams.value[conversationId] + return state && state.status === 'streaming' + } + + // 获取所有有活跃流的会话 ID + const activeConversationIds = computed(() => { + return Object.keys(streams.value).filter(id => + streams.value[id]?.status === 'streaming' + ) + }) + + // 初始化流状态 + const initStream = (conversationId, userMessageId) => { + streams.value[conversationId] = { + id: userMessageId, + status: 'streaming', // streaming, done, error + process_steps: [], + token_count: 0, + usage: null, + error: null, + started_at: new Date().toISOString(), + completed_at: null + } + } + + // 更新步骤(追加或更新) + const updateStep = (conversationId, step) => { + const state = streams.value[conversationId] + if (!state) return + + const idx = state.process_steps.findIndex(s => s.id === step.id) + if (idx >= 0) { + state.process_steps[idx] = step + } else { + state.process_steps.push(step) + } + } + + // 完成流 + const completeStream = (conversationId, data) => { + const state = streams.value[conversationId] + if (!state) return + + state.status = 'done' + state.token_count = data.token_count || 0 + state.usage = data.usage || null + state.completed_at = new Date().toISOString() + } + + // 流错误 + const errorStream = (conversationId, error) => { + const state = streams.value[conversationId] + if (!state) return + + state.status = 'error' + state.error = error + state.completed_at = new Date().toISOString() + } + + // 清除流状态 + const clearStream = (conversationId) => { + delete streams.value[conversationId] + } + + // 批量设置流状态(用于恢复) + const setStreamState = (conversationId, state) => { + streams.value[conversationId] = state + } + + return { + streams, + getStreamState, + hasActiveStream, + activeConversationIds, + initStream, + updateStep, + completeStream, + errorStream, + clearStream, + setStreamState + } +}) + +export default useStreamStore diff --git a/dashboard/src/views/ConversationDetailView.vue b/dashboard/src/views/ConversationDetailView.vue index cfdedfb..696c399 100644 --- a/dashboard/src/views/ConversationDetailView.vue +++ b/dashboard/src/views/ConversationDetailView.vue @@ -49,13 +49,13 @@ /> - -
+ +
Luxx
@@ -92,28 +92,41 @@