fix: 实现对话中断处理

This commit is contained in:
ViperEkura 2026-03-27 15:37:34 +08:00
parent c2aff3e4a6
commit f4cb991ed7
7 changed files with 194 additions and 92 deletions

View File

@ -1,7 +1,8 @@
"""Chat completion service""" """Chat completion service"""
import json import json
import uuid import uuid
from flask import current_app, g, Response from flask import current_app, g, Response, request as flask_request
from werkzeug.exceptions import ClientDisconnected
from backend import db from backend import db
from backend.models import Conversation, Message from backend.models import Conversation, Message
from backend.tools import registry, ToolExecutor from backend.tools import registry, ToolExecutor
@ -13,12 +14,23 @@ from backend.services.llm_client import LLMClient
from backend.config import MAX_ITERATIONS from backend.config import MAX_ITERATIONS
def _client_disconnected():
"""Check if the client has disconnected."""
try:
stream = flask_request.input_stream
# If input_stream is unavailable, assume still connected
if stream is None:
return False
return stream.closed
except Exception:
return False
class ChatService: class ChatService:
"""Chat completion service with tool support""" """Chat completion service with tool support"""
def __init__(self, llm: LLMClient): def __init__(self, llm: LLMClient):
self.llm = llm self.llm = llm
self.executor = ToolExecutor(registry=registry)
def stream_response(self, conv: Conversation, tools_enabled: bool = True, project_id: str = None): def stream_response(self, conv: Conversation, tools_enabled: bool = True, project_id: str = None):
@ -38,8 +50,10 @@ class ChatService:
tools = registry.list_all() if tools_enabled else None tools = registry.list_all() if tools_enabled else None
initial_messages = build_messages(conv, project_id) initial_messages = build_messages(conv, project_id)
# Clear tool call history for new request # Create per-request executor for thread-safe isolation.
self.executor.clear_history() # Each request gets its own _call_history and _cache, eliminating
# race conditions when multiple conversations stream concurrently.
executor = ToolExecutor(registry=registry)
# Build context for tool execution # Build context for tool execution
context = None context = None
@ -89,6 +103,11 @@ class ChatService:
# Stream LLM response chunk by chunk # Stream LLM response chunk by chunk
for line in resp.iter_lines(): for line in resp.iter_lines():
# Early exit if client has disconnected
if _client_disconnected():
resp.close()
return
if not line: if not line:
continue continue
line = line.decode("utf-8") line = line.decode("utf-8")
@ -177,7 +196,7 @@ class ChatService:
# Execute the tool # Execute the tool
with app.app_context(): with app.app_context():
single_result = self.executor.process_tool_calls([tc], context) single_result = executor.process_tool_calls([tc], context)
tool_results.extend(single_result) tool_results.extend(single_result)
# Emit tool_result step (after execution) # Emit tool_result step (after execution)
@ -269,8 +288,15 @@ class ChatService:
yield f"event: error\ndata: {json.dumps({'content': 'exceeded maximum tool call iterations'}, ensure_ascii=False)}\n\n" yield f"event: error\ndata: {json.dumps({'content': 'exceeded maximum tool call iterations'}, ensure_ascii=False)}\n\n"
def safe_generate():
"""Wrapper that catches client disconnection during yield."""
try:
yield from generate()
except (ClientDisconnected, BrokenPipeError, ConnectionResetError):
pass # Client aborted, silently stop
return Response( return Response(
generate(), safe_generate(),
mimetype="text/event-stream", mimetype="text/event-stream",
headers={ headers={
"Cache-Control": "no-cache, no-store, must-revalidate", "Cache-Control": "no-cache, no-store, must-revalidate",

View File

@ -46,6 +46,7 @@
:loading-more="loadingMessages" :loading-more="loadingMessages"
:tools-enabled="toolsEnabled" :tools-enabled="toolsEnabled"
@send-message="sendMessage" @send-message="sendMessage"
@stop-streaming="stopStreaming"
@delete-message="deleteMessage" @delete-message="deleteMessage"
@regenerate-message="regenerateMessage" @regenerate-message="regenerateMessage"
@toggle-settings="togglePanel('settings')" @toggle-settings="togglePanel('settings')"
@ -139,20 +140,52 @@ const hasMoreMessages = ref(false)
const loadingMessages = ref(false) const loadingMessages = ref(false)
const nextMsgCursor = ref(null) const nextMsgCursor = ref(null)
// -- Streaming state -- // -- Streaming state (per-conversation) --
// processSteps is the single source of truth for all streaming content. // processSteps is the single source of truth for all streaming content.
// thinking/text steps are sent incrementally via process_step events and // thinking/text steps are sent incrementally via process_step events and
// updated in-place by id. tool_call/tool_result steps are appended on arrival. // updated in-place by id. tool_call/tool_result steps are appended on arrival.
// On stream completion (onDone), the finalized steps are stored in the message object. // On stream completion (onDone), the finalized steps are stored in the message object.
const streaming = ref(false) const streaming = ref(false) // true when current conversation is actively streaming
const streamProcessSteps = shallowRef([]) // Ordered steps: thinking/text/tool_call/tool_result const streamProcessSteps = shallowRef([]) // Ordered steps: thinking/text/tool_call/tool_result
// // Track which conversations are currently streaming (supports multi-concurrent streams)
const streamingConvs = new Set()
// Per-conversation abort controllers (for stopping active streams)
const streamAborters = new Map()
//
const streamStates = new Map() const streamStates = new Map()
function setStreamState(isActive) { // Stop the active stream for a conversation
function stopStreaming(convId) {
const conv = convId || currentConvId.value
if (!conv) return
const abort = streamAborters.get(conv)
if (abort) {
abort()
streamAborters.delete(conv)
}
// AbortError is silently caught in createSSEStream, so clean up state here
streamStates.delete(conv)
streamingConvs.delete(conv)
if (currentConvId.value === conv) {
setStreamState(false, conv)
}
}
function setStreamState(isActive, convId) {
streaming.value = isActive streaming.value = isActive
if (!isActive) {
streamProcessSteps.value = [] streamProcessSteps.value = []
}
if (convId) {
if (isActive) {
streamingConvs.add(convId)
} else {
streamingConvs.delete(convId)
}
}
} }
function updateStreamField(convId, field, ref, valueOrUpdater) { function updateStreamField(convId, field, ref, valueOrUpdater) {
@ -248,14 +281,16 @@ async function selectConversation(id) {
currentProject.value = null currentProject.value = null
} }
// Save current streaming state // Save current streaming state before switching
if (currentConvId.value && streaming.value) { if (currentConvId.value) {
if (streamingConvs.has(currentConvId.value)) {
streamStates.set(currentConvId.value, { streamStates.set(currentConvId.value, {
streaming: true, streaming: true,
streamProcessSteps: [...streamProcessSteps.value], streamProcessSteps: [...streamProcessSteps.value],
messages: [...messages.value], messages: [...messages.value],
}) })
} }
}
currentConvId.value = id currentConvId.value = id
nextMsgCursor.value = null nextMsgCursor.value = null
@ -263,13 +298,18 @@ async function selectConversation(id) {
// Restore streaming state for new conversation // Restore streaming state for new conversation
const savedState = streamStates.get(id) const savedState = streamStates.get(id)
const isThisConvStreaming = streamingConvs.has(id)
if (savedState && savedState.streaming) { if (savedState && savedState.streaming) {
streaming.value = true streaming.value = true
streamProcessSteps.value = savedState.streamProcessSteps streamProcessSteps.value = savedState.streamProcessSteps
messages.value = savedState.messages || [] messages.value = savedState.messages || []
} else { } else if (!isThisConvStreaming) {
setStreamState(false) setStreamState(false, currentConvId.value)
messages.value = [] messages.value = []
} else {
// This conv is streaming but we don't have saved state (e.g. started from background)
streaming.value = true
streamProcessSteps.value = []
} }
if (!streaming.value) { if (!streaming.value) {
@ -317,6 +357,8 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) {
}, },
async onDone(data) { async onDone(data) {
streamStates.delete(convId) streamStates.delete(convId)
streamingConvs.delete(convId)
streamAborters.delete(convId)
if (currentConvId.value === convId) { if (currentConvId.value === convId) {
streaming.value = false streaming.value = false
@ -353,7 +395,7 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) {
token_count: data.token_count, token_count: data.token_count,
created_at: new Date().toISOString(), created_at: new Date().toISOString(),
}] }]
setStreamState(false) setStreamState(false, convId)
if (updateConvList) { if (updateConvList) {
const idx = conversations.value.findIndex(c => c.id === convId) const idx = conversations.value.findIndex(c => c.id === convId)
@ -386,8 +428,10 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) {
}, },
onError(msg) { onError(msg) {
streamStates.delete(convId) streamStates.delete(convId)
streamingConvs.delete(convId)
streamAborters.delete(convId)
if (currentConvId.value === convId) { if (currentConvId.value === convId) {
setStreamState(false) setStreamState(false, convId)
console.error('Stream error:', msg) console.error('Stream error:', msg)
} }
}, },
@ -396,7 +440,7 @@ function createStreamCallbacks(convId, { updateConvList = true } = {}) {
// -- Send message (streaming) -- // -- Send message (streaming) --
async function sendMessage(data) { async function sendMessage(data) {
if (!currentConvId.value || streaming.value) return if (!currentConvId.value || streamingConvs.has(currentConvId.value)) return
const convId = currentConvId.value const convId = currentConvId.value
const text = data.text || '' const text = data.text || ''
@ -413,12 +457,13 @@ async function sendMessage(data) {
} }
messages.value = [...messages.value, userMsg] messages.value = [...messages.value, userMsg]
setStreamState(true) setStreamState(true, convId)
messageApi.send(convId, { text, attachments, projectId: currentProject.value?.id }, { const stream = messageApi.send(convId, { text, attachments, projectId: currentProject.value?.id }, {
toolsEnabled: toolsEnabled.value, toolsEnabled: toolsEnabled.value,
...createStreamCallbacks(convId, { updateConvList: true }), ...createStreamCallbacks(convId, { updateConvList: true }),
}) })
streamAborters.set(convId, () => stream.abort())
} }
// -- Delete message -- // -- Delete message --
@ -434,7 +479,7 @@ async function deleteMessage(msgId) {
// -- Regenerate message -- // -- Regenerate message --
async function regenerateMessage(msgId) { async function regenerateMessage(msgId) {
if (!currentConvId.value || streaming.value) return if (!currentConvId.value || streamingConvs.has(currentConvId.value)) return
const convId = currentConvId.value const convId = currentConvId.value
const msgIndex = messages.value.findIndex(m => m.id === msgId) const msgIndex = messages.value.findIndex(m => m.id === msgId)
@ -442,13 +487,14 @@ async function regenerateMessage(msgId) {
messages.value = messages.value.slice(0, msgIndex) messages.value = messages.value.slice(0, msgIndex)
setStreamState(true) setStreamState(true, convId)
messageApi.regenerate(convId, msgId, { const stream = messageApi.regenerate(convId, msgId, {
toolsEnabled: toolsEnabled.value, toolsEnabled: toolsEnabled.value,
projectId: currentProject.value?.id, projectId: currentProject.value?.id,
...createStreamCallbacks(convId, { updateConvList: false }), ...createStreamCallbacks(convId, { updateConvList: false }),
}) })
streamAborters.set(convId, () => stream.abort())
} }
// -- Delete conversation -- // -- Delete conversation --

View File

@ -52,6 +52,7 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
const reader = res.body.getReader() const reader = res.body.getReader()
const decoder = new TextDecoder() const decoder = new TextDecoder()
let buffer = '' let buffer = ''
let completed = false
while (true) { while (true) {
const { done, value } = await reader.read() const { done, value } = await reader.read()
@ -70,6 +71,7 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
if (currentEvent === 'process_step' && onProcessStep) { if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data) onProcessStep(data)
} else if (currentEvent === 'done' && onDone) { } else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data) onDone(data)
} else if (currentEvent === 'error' && onError) { } else if (currentEvent === 'error' && onError) {
onError(data.content) onError(data.content)
@ -77,6 +79,11 @@ function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
} }
} }
} }
// Connection closed without receiving 'done' event — clean up
if (!completed && onError) {
onError('stream ended unexpectedly')
}
} catch (e) { } catch (e) {
if (e.name !== 'AbortError' && onError) { if (e.name !== 'AbortError' && onError) {
onError(e.message) onError(e.message)

View File

@ -11,7 +11,6 @@
<div class="chat-title-area"> <div class="chat-title-area">
<h2 class="chat-title">{{ conversation.title || '新对话' }}</h2> <h2 class="chat-title">{{ conversation.title || '新对话' }}</h2>
<span class="model-badge">{{ formatModelName(conversation.model) }}</span> <span class="model-badge">{{ formatModelName(conversation.model) }}</span>
<span v-if="conversation.thinking_enabled" class="thinking-badge">思考</span>
</div> </div>
</div> </div>
@ -60,6 +59,7 @@
:disabled="streaming" :disabled="streaming"
:tools-enabled="toolsEnabled" :tools-enabled="toolsEnabled"
@send="$emit('sendMessage', $event)" @send="$emit('sendMessage', $event)"
@stop-streaming="$emit('stopStreaming')"
@toggle-tools="$emit('toggleTools', $event)" @toggle-tools="$emit('toggleTools', $event)"
/> />
</template> </template>
@ -91,7 +91,7 @@ const props = defineProps({
toolsEnabled: { type: Boolean, default: true }, toolsEnabled: { type: Boolean, default: true },
}) })
const emit = defineEmits(['sendMessage', 'deleteMessage', 'regenerateMessage', 'loadMoreMessages', 'toggleTools']) const emit = defineEmits(['sendMessage', 'stopStreaming', 'deleteMessage', 'regenerateMessage', 'loadMoreMessages', 'toggleTools'])
const scrollContainer = ref(null) const scrollContainer = ref(null)
const inputRef = ref(null) const inputRef = ref(null)

View File

@ -29,6 +29,15 @@
style="display: none" style="display: none"
/> />
<div class="input-actions"> <div class="input-actions">
<button
class="btn-upload"
:class="{ active: uploadedFiles.length > 0 }"
:disabled="disabled"
@click="triggerFileUpload"
:title="uploadedFiles.length > 0 ? `已上传 ${uploadedFiles.length} 个文件` : '上传文件'"
>
<span v-html="icons.upload" />
</button>
<button <button
class="btn-tool" class="btn-tool"
:class="{ active: toolsEnabled }" :class="{ active: toolsEnabled }"
@ -38,14 +47,6 @@
> >
<span v-html="icons.wrench" /> <span v-html="icons.wrench" />
</button> </button>
<button
class="btn-upload"
:disabled="disabled"
@click="triggerFileUpload"
title="上传文件"
>
<span v-html="icons.upload" />
</button>
<button <button
class="btn-send" class="btn-send"
:class="{ active: canSend }" :class="{ active: canSend }"
@ -278,6 +279,77 @@ textarea::placeholder {
} }
.btn-tool, .btn-tool,
.btn-upload {
width: 36px;
height: 36px;
border-radius: 8px;
border: none;
background: transparent;
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
position: relative;
transition: all 0.15s ease;
}
.btn-tool::before,
.btn-upload::before {
content: '';
position: absolute;
inset: 0;
border-radius: inherit;
opacity: 0.20;
transition: opacity 0.15s ease;
}
.btn-upload::before {
background: var(--attachment-color);
}
.btn-tool::before {
background: var(--tool-color);
}
.btn-upload {
color: var(--attachment-color);
}
.btn-tool {
color: var(--tool-color);
}
.btn-tool:hover:not(:disabled)::before,
.btn-upload:hover:not(:disabled)::before {
opacity: 0.7;
}
.btn-upload.active::before {
opacity: 0.5;
}
.btn-upload.active:hover:not(:disabled)::before {
opacity: 0.7;
}
.btn-tool.active::before {
opacity: 0.5;
}
.btn-tool.active:hover:not(:disabled)::before {
opacity: 0.7;
}
.btn-tool:disabled,
.btn-upload:disabled {
cursor: not-allowed;
}
.btn-tool:disabled::before,
.btn-upload:disabled::before {
opacity: 0.20;
}
.btn-send { .btn-send {
width: 36px; width: 36px;
height: 36px; height: 36px;
@ -285,64 +357,13 @@ textarea::placeholder {
border: none; border: none;
background: var(--bg-code); background: var(--bg-code);
color: var(--text-tertiary); color: var(--text-tertiary);
cursor: pointer; cursor: not-allowed;
display: flex; display: flex;
align-items: center; align-items: center;
justify-content: center; justify-content: center;
transition: all 0.15s ease; transition: all 0.15s ease;
} }
.btn-upload {
width: 36px;
height: 36px;
border-radius: 8px;
border: none;
background: var(--attachment-bg);
color: var(--attachment-color);
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
transition: all 0.15s ease;
}
.btn-tool:hover:not(:disabled) {
background: var(--bg-hover);
color: var(--text-primary);
transform: translateY(-1px);
}
.btn-tool.active {
background: var(--tool-bg);
color: var(--tool-color);
}
.btn-tool.active:hover:not(:disabled) {
background: var(--tool-color);
color: white;
}
.btn-upload:hover:not(:disabled) {
background: var(--attachment-color);
color: white;
transform: translateY(-1px);
}
.btn-upload:active:not(:disabled) {
background: var(--attachment-color-hover);
transform: translateY(0);
}
.btn-tool:disabled,
.btn-upload:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.btn-send {
cursor: not-allowed;
}
.btn-send.active { .btn-send.active {
background: var(--accent-primary); background: var(--accent-primary);
color: white; color: white;

View File

@ -287,10 +287,11 @@ function onScroll(e) {
.conv-count { .conv-count {
font-size: 11px; font-size: 11px;
line-height: 1;
color: var(--text-tertiary); color: var(--text-tertiary);
flex-shrink: 0; flex-shrink: 0;
background: var(--bg-secondary); background: var(--bg-secondary);
padding: 1px 6px; padding: 3px 6px;
border-radius: 10px; border-radius: 10px;
margin-left: auto; margin-left: auto;
} }

View File

@ -38,7 +38,7 @@ export const icons = {
save: s(12, '<path d="M19 21H5a2 2 0 0 1-2-2V5a2 2 0 0 1 2-2h11l5 5v11a2 2 0 0 1-2 2z"/><polyline points="17 21 17 13 7 13 7 21"/><polyline points="7 3 7 8 15 8"/>'), save: s(12, '<path d="M19 21H5a2 2 0 0 1-2-2V5a2 2 0 0 1 2-2h11l5 5v11a2 2 0 0 1-2 2z"/><polyline points="17 21 17 13 7 13 7 21"/><polyline points="7 3 7 8 15 8"/>'),
// -- Tools -- // -- Tools --
wrench: s(S, '<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>'), wrench: s(18, '<path d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"/>'),
settings: s(M, '<circle cx="12" cy="12" r="3"/><path d="M19.4 15a1.65 1.65 0 0 0 .33 1.82l.06.06a2 2 0 0 1 0 2.83 2 2 0 0 1-2.83 0l-.06-.06a1.65 1.65 0 0 0-1.82-.33 1.65 1.65 0 0 0-1 1.51V21a2 2 0 0 1-2 2 2 2 0 0 1-2-2v-.09A1.65 1.65 0 0 0 9 19.4a1.65 1.65 0 0 0-1.82.33l-.06.06a2 2 0 0 1-2.83 0 2 2 0 0 1 0-2.83l.06-.06A1.65 1.65 0 0 0 4.68 15a1.65 1.65 0 0 0-1.51-1H3a2 2 0 0 1-2-2 2 2 0 0 1 2-2h.09A1.65 1.65 0 0 0 4.6 9a1.65 1.65 0 0 0-.33-1.82l-.06-.06a2 2 0 0 1 0-2.83 2 2 0 0 1 2.83 0l.06.06A1.65 1.65 0 0 0 9 4.68a1.65 1.65 0 0 0 1-1.51V3a2 2 0 0 1 2-2 2 2 0 0 1 2 2v.09a1.65 1.65 0 0 0 1 1.51 1.65 1.65 0 0 0 1.82-.33l.06-.06a2 2 0 0 1 2.83 0 2 2 0 0 1 0 2.83l-.06.06a1.65 1.65 0 0 0-.33 1.82V9a1.65 1.65 0 0 0 1.51 1H21a2 2 0 0 1 2 2 2 2 0 0 1-2 2h-.09a1.65 1.65 0 0 0-1.51 1z"/>'), settings: s(M, '<circle cx="12" cy="12" r="3"/><path d="M19.4 15a1.65 1.65 0 0 0 .33 1.82l.06.06a2 2 0 0 1 0 2.83 2 2 0 0 1-2.83 0l-.06-.06a1.65 1.65 0 0 0-1.82-.33 1.65 1.65 0 0 0-1 1.51V21a2 2 0 0 1-2 2 2 2 0 0 1-2-2v-.09A1.65 1.65 0 0 0 9 19.4a1.65 1.65 0 0 0-1.82.33l-.06.06a2 2 0 0 1-2.83 0 2 2 0 0 1 0-2.83l.06-.06A1.65 1.65 0 0 0 4.68 15a1.65 1.65 0 0 0-1.51-1H3a2 2 0 0 1-2-2 2 2 0 0 1 2-2h.09A1.65 1.65 0 0 0 4.6 9a1.65 1.65 0 0 0-.33-1.82l-.06-.06a2 2 0 0 1 0-2.83 2 2 0 0 1 2.83 0l.06.06A1.65 1.65 0 0 0 9 4.68a1.65 1.65 0 0 0 1-1.51V3a2 2 0 0 1 2-2 2 2 0 0 1 2 2v.09a1.65 1.65 0 0 0 1 1.51 1.65 1.65 0 0 0 1.82-.33l.06-.06a2 2 0 0 1 2.83 0 2 2 0 0 1 0 2.83l-.06.06a1.65 1.65 0 0 0-.33 1.82V9a1.65 1.65 0 0 0 1.51 1H21a2 2 0 0 1 2 2 2 2 0 0 1-2 2h-.09a1.65 1.65 0 0 0-1.51 1z"/>'),
// -- UI -- // -- UI --
@ -54,4 +54,5 @@ export const icons = {
// -- Status -- // -- Status --
error: s(S, '<circle cx="12" cy="12" r="10"/><line x1="15" y1="9" x2="9" y2="15"/><line x1="9" y1="9" x2="15" y2="15"/>'), error: s(S, '<circle cx="12" cy="12" r="10"/><line x1="15" y1="9" x2="9" y2="15"/><line x1="9" y1="9" x2="15" y2="15"/>'),
info: s(S, '<circle cx="12" cy="12" r="10"/><line x1="12" y1="16" x2="12" y2="12"/><line x1="12" y1="8" x2="12.01" y2="8"/>'), info: s(S, '<circle cx="12" cy="12" r="10"/><line x1="12" y1="16" x2="12" y2="12"/><line x1="12" y1="8" x2="12.01" y2="8"/>'),
stop: s(18, '<rect x="4" y="4" width="16" height="16" rx="4" fill="currentColor" stroke="none"/>'),
} }