fix: 修复流式聊天部分问题
This commit is contained in:
parent
22a4b8a4bb
commit
9edf4dac9c
|
|
@ -213,81 +213,74 @@ class ChatService:
|
||||||
|
|
||||||
# Get delta
|
# Get delta
|
||||||
choices = chunk.get("choices", [])
|
choices = chunk.get("choices", [])
|
||||||
if not choices:
|
delta = None
|
||||||
|
|
||||||
|
if choices:
|
||||||
|
delta = choices[0].get("delta", {})
|
||||||
|
# If no delta but has message (non-streaming response)
|
||||||
|
if not delta:
|
||||||
|
message = choices[0].get("message", {})
|
||||||
|
if message.get("content"):
|
||||||
|
delta = {"content": message["content"]}
|
||||||
|
|
||||||
|
if not delta:
|
||||||
# Check if there's any content in the response (for non-standard LLM responses)
|
# Check if there's any content in the response (for non-standard LLM responses)
|
||||||
if chunk.get("content") or chunk.get("message"):
|
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
|
||||||
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
|
if content:
|
||||||
if content:
|
delta = {"content": content}
|
||||||
# BUG FIX: Update full_content so it gets saved to database
|
|
||||||
prev_content_len = len(full_content)
|
|
||||||
full_content += content
|
|
||||||
if prev_content_len == 0: # New text stream started
|
|
||||||
text_step_idx = step_index
|
|
||||||
text_step_id = f"step-{step_index}"
|
|
||||||
step_index += 1
|
|
||||||
yield _sse_event("process_step", {
|
|
||||||
"step": {
|
|
||||||
"id": text_step_id if prev_content_len == 0 else f"step-{step_index - 1}",
|
|
||||||
"index": text_step_idx if prev_content_len == 0 else step_index - 1,
|
|
||||||
"type": "text",
|
|
||||||
"content": full_content # Always send accumulated content
|
|
||||||
}
|
|
||||||
})
|
|
||||||
continue
|
|
||||||
|
|
||||||
delta = choices[0].get("delta", {})
|
if delta:
|
||||||
|
# Handle reasoning (thinking)
|
||||||
# Handle reasoning (thinking)
|
reasoning = delta.get("reasoning_content", "")
|
||||||
reasoning = delta.get("reasoning_content", "")
|
if reasoning:
|
||||||
if reasoning:
|
prev_thinking_len = len(full_thinking)
|
||||||
prev_thinking_len = len(full_thinking)
|
full_thinking += reasoning
|
||||||
full_thinking += reasoning
|
if prev_thinking_len == 0: # New thinking stream started
|
||||||
if prev_thinking_len == 0: # New thinking stream started
|
thinking_step_idx = step_index
|
||||||
thinking_step_idx = step_index
|
thinking_step_id = f"step-{step_index}"
|
||||||
thinking_step_id = f"step-{step_index}"
|
step_index += 1
|
||||||
step_index += 1
|
yield _sse_event("process_step", {
|
||||||
yield _sse_event("process_step", {
|
"step": {
|
||||||
"step": {
|
"id": thinking_step_id,
|
||||||
"id": thinking_step_id,
|
"index": thinking_step_idx,
|
||||||
"index": thinking_step_idx,
|
"type": "thinking",
|
||||||
"type": "thinking",
|
"content": full_thinking
|
||||||
"content": full_thinking
|
}
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
# Handle content
|
|
||||||
content = delta.get("content", "")
|
|
||||||
if content:
|
|
||||||
prev_content_len = len(full_content)
|
|
||||||
full_content += content
|
|
||||||
if prev_content_len == 0: # New text stream started
|
|
||||||
text_step_idx = step_index
|
|
||||||
text_step_id = f"step-{step_index}"
|
|
||||||
step_index += 1
|
|
||||||
yield _sse_event("process_step", {
|
|
||||||
"step": {
|
|
||||||
"id": text_step_id,
|
|
||||||
"index": text_step_idx,
|
|
||||||
"type": "text",
|
|
||||||
"content": full_content
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
# Accumulate tool calls
|
|
||||||
tool_calls_delta = delta.get("tool_calls", [])
|
|
||||||
for tc in tool_calls_delta:
|
|
||||||
idx = tc.get("index", 0)
|
|
||||||
if idx >= len(tool_calls_list):
|
|
||||||
tool_calls_list.append({
|
|
||||||
"id": tc.get("id", ""),
|
|
||||||
"type": "function",
|
|
||||||
"function": {"name": "", "arguments": ""}
|
|
||||||
})
|
})
|
||||||
func = tc.get("function", {})
|
|
||||||
if func.get("name"):
|
# Handle content
|
||||||
tool_calls_list[idx]["function"]["name"] += func["name"]
|
content = delta.get("content", "")
|
||||||
if func.get("arguments"):
|
if content:
|
||||||
tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
|
prev_content_len = len(full_content)
|
||||||
|
full_content += content
|
||||||
|
if prev_content_len == 0: # New text stream started
|
||||||
|
text_step_idx = step_index
|
||||||
|
text_step_id = f"step-{step_index}"
|
||||||
|
step_index += 1
|
||||||
|
yield _sse_event("process_step", {
|
||||||
|
"step": {
|
||||||
|
"id": text_step_id,
|
||||||
|
"index": text_step_idx,
|
||||||
|
"type": "text",
|
||||||
|
"content": full_content
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
# Accumulate tool calls
|
||||||
|
tool_calls_delta = delta.get("tool_calls", [])
|
||||||
|
for tc in tool_calls_delta:
|
||||||
|
idx = tc.get("index", 0)
|
||||||
|
if idx >= len(tool_calls_list):
|
||||||
|
tool_calls_list.append({
|
||||||
|
"id": tc.get("id", ""),
|
||||||
|
"type": "function",
|
||||||
|
"function": {"name": "", "arguments": ""}
|
||||||
|
})
|
||||||
|
func = tc.get("function", {})
|
||||||
|
if func.get("name"):
|
||||||
|
tool_calls_list[idx]["function"]["name"] += func["name"]
|
||||||
|
if func.get("arguments"):
|
||||||
|
tool_calls_list[idx]["function"]["arguments"] += func["arguments"]
|
||||||
|
|
||||||
# Save thinking step
|
# Save thinking step
|
||||||
if thinking_step_id is not None:
|
if thinking_step_id is not None:
|
||||||
|
|
|
||||||
|
|
@ -143,30 +143,23 @@ class LLMClient:
|
||||||
tools: Optional[List[Dict]] = None,
|
tools: Optional[List[Dict]] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> AsyncGenerator[str, None]:
|
) -> AsyncGenerator[str, None]:
|
||||||
"""Stream call LLM API - yields raw SSE event lines
|
"""Stream call LLM API - yields raw SSE event lines"""
|
||||||
|
|
||||||
Yields:
|
|
||||||
str: Raw SSE event lines for direct forwarding
|
|
||||||
"""
|
|
||||||
body = self._build_body(model, messages, tools, stream=True, **kwargs)
|
body = self._build_body(model, messages, tools, stream=True, **kwargs)
|
||||||
|
|
||||||
logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||||
logger.info(f"Sending request to {self.api_url}")
|
|
||||||
async with client.stream(
|
async with client.stream(
|
||||||
"POST",
|
"POST",
|
||||||
self.api_url,
|
self.api_url,
|
||||||
headers=self._build_headers(),
|
headers=self._build_headers(),
|
||||||
json=body
|
json=body
|
||||||
) as response:
|
) as response:
|
||||||
logger.info(f"Response status: {response.status_code}")
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
async for line in response.aiter_lines():
|
async for line in response.aiter_lines():
|
||||||
if line.strip():
|
if line.strip():
|
||||||
yield line + "\n"
|
yield line + "\n"
|
||||||
|
|
||||||
|
logger.info(f"response finish with : {response.status_code}")
|
||||||
except httpx.HTTPStatusError as e:
|
except httpx.HTTPStatusError as e:
|
||||||
status_code = e.response.status_code if e.response else "?"
|
status_code = e.response.status_code if e.response else "?"
|
||||||
logger.error(f"HTTP error: {status_code}")
|
logger.error(f"HTTP error: {status_code}")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue