diff --git a/luxx/services/chat.py b/luxx/services/chat.py index 718bc05..f3ddf97 100644 --- a/luxx/services/chat.py +++ b/luxx/services/chat.py @@ -213,81 +213,74 @@ class ChatService: # Get delta choices = chunk.get("choices", []) - if not choices: + delta = None + + if choices: + delta = choices[0].get("delta", {}) + # If no delta but has message (non-streaming response) + if not delta: + message = choices[0].get("message", {}) + if message.get("content"): + delta = {"content": message["content"]} + + if not delta: # Check if there's any content in the response (for non-standard LLM responses) - if chunk.get("content") or chunk.get("message"): - content = chunk.get("content") or chunk.get("message", {}).get("content", "") - if content: - # BUG FIX: Update full_content so it gets saved to database - prev_content_len = len(full_content) - full_content += content - if prev_content_len == 0: # New text stream started - text_step_idx = step_index - text_step_id = f"step-{step_index}" - step_index += 1 - yield _sse_event("process_step", { - "step": { - "id": text_step_id if prev_content_len == 0 else f"step-{step_index - 1}", - "index": text_step_idx if prev_content_len == 0 else step_index - 1, - "type": "text", - "content": full_content # Always send accumulated content - } - }) - continue + content = chunk.get("content") or chunk.get("message", {}).get("content", "") + if content: + delta = {"content": content} - delta = choices[0].get("delta", {}) - - # Handle reasoning (thinking) - reasoning = delta.get("reasoning_content", "") - if reasoning: - prev_thinking_len = len(full_thinking) - full_thinking += reasoning - if prev_thinking_len == 0: # New thinking stream started - thinking_step_idx = step_index - thinking_step_id = f"step-{step_index}" - step_index += 1 - yield _sse_event("process_step", { - "step": { - "id": thinking_step_id, - "index": thinking_step_idx, - "type": "thinking", - "content": full_thinking - } - }) - - # Handle content - content = delta.get("content", "") - if content: - prev_content_len = len(full_content) - full_content += content - if prev_content_len == 0: # New text stream started - text_step_idx = step_index - text_step_id = f"step-{step_index}" - step_index += 1 - yield _sse_event("process_step", { - "step": { - "id": text_step_id, - "index": text_step_idx, - "type": "text", - "content": full_content - } - }) - - # Accumulate tool calls - tool_calls_delta = delta.get("tool_calls", []) - for tc in tool_calls_delta: - idx = tc.get("index", 0) - if idx >= len(tool_calls_list): - tool_calls_list.append({ - "id": tc.get("id", ""), - "type": "function", - "function": {"name": "", "arguments": ""} + if delta: + # Handle reasoning (thinking) + reasoning = delta.get("reasoning_content", "") + if reasoning: + prev_thinking_len = len(full_thinking) + full_thinking += reasoning + if prev_thinking_len == 0: # New thinking stream started + thinking_step_idx = step_index + thinking_step_id = f"step-{step_index}" + step_index += 1 + yield _sse_event("process_step", { + "step": { + "id": thinking_step_id, + "index": thinking_step_idx, + "type": "thinking", + "content": full_thinking + } }) - func = tc.get("function", {}) - if func.get("name"): - tool_calls_list[idx]["function"]["name"] += func["name"] - if func.get("arguments"): - tool_calls_list[idx]["function"]["arguments"] += func["arguments"] + + # Handle content + content = delta.get("content", "") + if content: + prev_content_len = len(full_content) + full_content += content + if prev_content_len == 0: # New text stream started + text_step_idx = step_index + text_step_id = f"step-{step_index}" + step_index += 1 + yield _sse_event("process_step", { + "step": { + "id": text_step_id, + "index": text_step_idx, + "type": "text", + "content": full_content + } + }) + + # Accumulate tool calls + tool_calls_delta = delta.get("tool_calls", []) + for tc in tool_calls_delta: + idx = tc.get("index", 0) + if idx >= len(tool_calls_list): + tool_calls_list.append({ + "id": tc.get("id", ""), + "type": "function", + "function": {"name": "", "arguments": ""} + }) + func = tc.get("function", {}) + if func.get("name"): + tool_calls_list[idx]["function"]["name"] += func["name"] + if func.get("arguments"): + tool_calls_list[idx]["function"]["arguments"] += func["arguments"] # Save thinking step if thinking_step_id is not None: diff --git a/luxx/services/llm_client.py b/luxx/services/llm_client.py index acec3eb..22b8142 100644 --- a/luxx/services/llm_client.py +++ b/luxx/services/llm_client.py @@ -143,30 +143,23 @@ class LLMClient: tools: Optional[List[Dict]] = None, **kwargs ) -> AsyncGenerator[str, None]: - """Stream call LLM API - yields raw SSE event lines - - Yields: - str: Raw SSE event lines for direct forwarding - """ + """Stream call LLM API - yields raw SSE event lines""" body = self._build_body(model, messages, tools, stream=True, **kwargs) - logger.info(f"Starting stream_call for model: {model}, messages count: {len(messages)}") - try: async with httpx.AsyncClient(timeout=120.0) as client: - logger.info(f"Sending request to {self.api_url}") async with client.stream( "POST", self.api_url, headers=self._build_headers(), json=body ) as response: - logger.info(f"Response status: {response.status_code}") response.raise_for_status() - async for line in response.aiter_lines(): if line.strip(): yield line + "\n" + + logger.info(f"response finish with : {response.status_code}") except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else "?" logger.error(f"HTTP error: {status_code}")