diff --git a/backend/services/chat.py b/backend/services/chat.py index 719faf5..78d1ff1 100644 --- a/backend/services/chat.py +++ b/backend/services/chat.py @@ -1,7 +1,8 @@ """Chat completion service""" import json import uuid -from flask import current_app, g, Response +from flask import current_app, g, Response, request as flask_request +from werkzeug.exceptions import ClientDisconnected from backend import db from backend.models import Conversation, Message from backend.tools import registry, ToolExecutor @@ -13,12 +14,23 @@ from backend.services.llm_client import LLMClient from backend.config import MAX_ITERATIONS +def _client_disconnected(): + """Check if the client has disconnected.""" + try: + stream = flask_request.input_stream + # If input_stream is unavailable, assume still connected + if stream is None: + return False + return stream.closed + except Exception: + return False + + class ChatService: """Chat completion service with tool support""" def __init__(self, llm: LLMClient): self.llm = llm - self.executor = ToolExecutor(registry=registry) def stream_response(self, conv: Conversation, tools_enabled: bool = True, project_id: str = None): @@ -38,8 +50,10 @@ class ChatService: tools = registry.list_all() if tools_enabled else None initial_messages = build_messages(conv, project_id) - # Clear tool call history for new request - self.executor.clear_history() + # Create per-request executor for thread-safe isolation. + # Each request gets its own _call_history and _cache, eliminating + # race conditions when multiple conversations stream concurrently. + executor = ToolExecutor(registry=registry) # Build context for tool execution context = None @@ -89,6 +103,11 @@ class ChatService: # Stream LLM response chunk by chunk for line in resp.iter_lines(): + # Early exit if client has disconnected + if _client_disconnected(): + resp.close() + return + if not line: continue line = line.decode("utf-8") @@ -177,7 +196,7 @@ class ChatService: # Execute the tool with app.app_context(): - single_result = self.executor.process_tool_calls([tc], context) + single_result = executor.process_tool_calls([tc], context) tool_results.extend(single_result) # Emit tool_result step (after execution) @@ -269,8 +288,15 @@ class ChatService: yield f"event: error\ndata: {json.dumps({'content': 'exceeded maximum tool call iterations'}, ensure_ascii=False)}\n\n" + def safe_generate(): + """Wrapper that catches client disconnection during yield.""" + try: + yield from generate() + except (ClientDisconnected, BrokenPipeError, ConnectionResetError): + pass # Client aborted, silently stop + return Response( - generate(), + safe_generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 0ae2ba9..65d0380 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -46,6 +46,7 @@ :loading-more="loadingMessages" :tools-enabled="toolsEnabled" @send-message="sendMessage" + @stop-streaming="stopStreaming" @delete-message="deleteMessage" @regenerate-message="regenerateMessage" @toggle-settings="togglePanel('settings')" @@ -139,20 +140,52 @@ const hasMoreMessages = ref(false) const loadingMessages = ref(false) const nextMsgCursor = ref(null) -// -- Streaming state -- +// -- Streaming state (per-conversation) -- // processSteps is the single source of truth for all streaming content. // thinking/text steps are sent incrementally via process_step events and // updated in-place by id. tool_call/tool_result steps are appended on arrival. // On stream completion (onDone), the finalized steps are stored in the message object. -const streaming = ref(false) +const streaming = ref(false) // true when current conversation is actively streaming const streamProcessSteps = shallowRef([]) // Ordered steps: thinking/text/tool_call/tool_result -// 保存每个对话的流式状态 +// Track which conversations are currently streaming (supports multi-concurrent streams) +const streamingConvs = new Set() + +// Per-conversation abort controllers (for stopping active streams) +const streamAborters = new Map() + +// 保存每个对话的流式状态(切换对话时暂存) const streamStates = new Map() -function setStreamState(isActive) { +// Stop the active stream for a conversation +function stopStreaming(convId) { + const conv = convId || currentConvId.value + if (!conv) return + const abort = streamAborters.get(conv) + if (abort) { + abort() + streamAborters.delete(conv) + } + // AbortError is silently caught in createSSEStream, so clean up state here + streamStates.delete(conv) + streamingConvs.delete(conv) + if (currentConvId.value === conv) { + setStreamState(false, conv) + } +} + +function setStreamState(isActive, convId) { streaming.value = isActive - streamProcessSteps.value = [] + if (!isActive) { + streamProcessSteps.value = [] + } + if (convId) { + if (isActive) { + streamingConvs.add(convId) + } else { + streamingConvs.delete(convId) + } + } } function updateStreamField(convId, field, ref, valueOrUpdater) { @@ -248,13 +281,15 @@ async function selectConversation(id) { currentProject.value = null } - // Save current streaming state - if (currentConvId.value && streaming.value) { - streamStates.set(currentConvId.value, { - streaming: true, - streamProcessSteps: [...streamProcessSteps.value], - messages: [...messages.value], - }) + // Save current streaming state before switching + if (currentConvId.value) { + if (streamingConvs.has(currentConvId.value)) { + streamStates.set(currentConvId.value, { + streaming: true, + streamProcessSteps: [...streamProcessSteps.value], + messages: [...messages.value], + }) + } } currentConvId.value = id @@ -263,13 +298,18 @@ async function selectConversation(id) { // Restore streaming state for new conversation const savedState = streamStates.get(id) + const isThisConvStreaming = streamingConvs.has(id) if (savedState && savedState.streaming) { streaming.value = true streamProcessSteps.value = savedState.streamProcessSteps messages.value = savedState.messages || [] - } else { - setStreamState(false) + } else if (!isThisConvStreaming) { + setStreamState(false, currentConvId.value) messages.value = [] + } else { + // This conv is streaming but we don't have saved state (e.g. started from background) + streaming.value = true + streamProcessSteps.value = [] } if (!streaming.value) { @@ -317,6 +357,8 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) { }, async onDone(data) { streamStates.delete(convId) + streamingConvs.delete(convId) + streamAborters.delete(convId) if (currentConvId.value === convId) { streaming.value = false @@ -353,7 +395,7 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) { token_count: data.token_count, created_at: new Date().toISOString(), }] - setStreamState(false) + setStreamState(false, convId) if (updateConvList) { const idx = conversations.value.findIndex(c => c.id === convId) @@ -386,8 +428,10 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) { }, onError(msg) { streamStates.delete(convId) + streamingConvs.delete(convId) + streamAborters.delete(convId) if (currentConvId.value === convId) { - setStreamState(false) + setStreamState(false, convId) console.error('Stream error:', msg) } }, @@ -396,7 +440,7 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) { // -- Send message (streaming) -- async function sendMessage(data) { - if (!currentConvId.value || streaming.value) return + if (!currentConvId.value || streamingConvs.has(currentConvId.value)) return const convId = currentConvId.value const text = data.text || '' @@ -413,12 +457,13 @@ async function sendMessage(data) { } messages.value = [...messages.value, userMsg] - setStreamState(true) + setStreamState(true, convId) - messageApi.send(convId, { text, attachments, projectId: currentProject.value?.id }, { + const stream = messageApi.send(convId, { text, attachments, projectId: currentProject.value?.id }, { toolsEnabled: toolsEnabled.value, ...createStreamCallbacks(convId, { updateConvList: true }), }) + streamAborters.set(convId, () => stream.abort()) } // -- Delete message -- @@ -434,7 +479,7 @@ async function deleteMessage(msgId) { // -- Regenerate message -- async function regenerateMessage(msgId) { - if (!currentConvId.value || streaming.value) return + if (!currentConvId.value || streamingConvs.has(currentConvId.value)) return const convId = currentConvId.value const msgIndex = messages.value.findIndex(m => m.id === msgId) @@ -442,13 +487,14 @@ async function regenerateMessage(msgId) { messages.value = messages.value.slice(0, msgIndex) - setStreamState(true) + setStreamState(true, convId) - messageApi.regenerate(convId, msgId, { + const stream = messageApi.regenerate(convId, msgId, { toolsEnabled: toolsEnabled.value, projectId: currentProject.value?.id, ...createStreamCallbacks(convId, { updateConvList: false }), }) + streamAborters.set(convId, () => stream.abort()) } // -- Delete conversation -- diff --git a/frontend/src/api/index.js b/frontend/src/api/index.js index ea4f9d4..fa32b1c 100644 --- a/frontend/src/api/index.js +++ b/frontend/src/api/index.js @@ -52,6 +52,7 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) { const reader = res.body.getReader() const decoder = new TextDecoder() let buffer = '' + let completed = false while (true) { const { done, value } = await reader.read() @@ -70,6 +71,7 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) { if (currentEvent === 'process_step' && onProcessStep) { onProcessStep(data) } else if (currentEvent === 'done' && onDone) { + completed = true onDone(data) } else if (currentEvent === 'error' && onError) { onError(data.content) @@ -77,6 +79,11 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) { } } } + + // Connection closed without receiving 'done' event — clean up + if (!completed && onError) { + onError('stream ended unexpectedly') + } } catch (e) { if (e.name !== 'AbortError' && onError) { onError(e.message) diff --git a/frontend/src/components/ChatView.vue b/frontend/src/components/ChatView.vue index 8b3c977..8ae9dd2 100644 --- a/frontend/src/components/ChatView.vue +++ b/frontend/src/components/ChatView.vue @@ -11,7 +11,6 @@

{{ conversation.title || '新对话' }}

{{ formatModelName(conversation.model) }} - 思考
@@ -60,6 +59,7 @@ :disabled="streaming" :tools-enabled="toolsEnabled" @send="$emit('sendMessage', $event)" + @stop-streaming="$emit('stopStreaming')" @toggle-tools="$emit('toggleTools', $event)" /> @@ -91,7 +91,7 @@ const props = defineProps({ toolsEnabled: { type: Boolean, default: true }, }) -const emit = defineEmits(['sendMessage', 'deleteMessage', 'regenerateMessage', 'loadMoreMessages', 'toggleTools']) +const emit = defineEmits(['sendMessage', 'stopStreaming', 'deleteMessage', 'regenerateMessage', 'loadMoreMessages', 'toggleTools']) const scrollContainer = ref(null) const inputRef = ref(null) diff --git a/frontend/src/components/MessageInput.vue b/frontend/src/components/MessageInput.vue index f826c3f..b2a3bee 100644 --- a/frontend/src/components/MessageInput.vue +++ b/frontend/src/components/MessageInput.vue @@ -29,6 +29,15 @@ style="display: none" />
+ -