"""LLM API client""" import json import httpx from typing import Dict, Any, Optional, List, AsyncGenerator from luxx.config import config class LLMResponse: """LLM response""" content: str tool_calls: Optional[List[Dict]] = None usage: Optional[Dict] = None def __init__( self, content: str = "", tool_calls: Optional[List[Dict]] = None, usage: Optional[Dict] = None ): self.content = content self.tool_calls = tool_calls self.usage = usage class LLMClient: """LLM API client with multi-provider support""" def __init__(self, api_key: str = None, api_url: str = None, model: str = None): self.api_key = api_key or config.llm_api_key self.api_url = api_url or config.llm_api_url self.default_model = model self.provider = self._detect_provider() self._client: Optional[httpx.AsyncClient] = None def _detect_provider(self) -> str: """Detect provider from URL""" url = self.api_url.lower() if "deepseek" in url: return "deepseek" elif "glm" in url or "zhipu" in url: return "glm" elif "openai" in url: return "openai" return "openai" async def close(self): """Close client""" if self._client: await self._client.aclose() self._client = None def _build_headers(self) -> Dict[str, str]: """Build request headers""" return { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } def _build_body( self, model: str, messages: List[Dict], tools: Optional[List[Dict]] = None, stream: bool = False, **kwargs ) -> Dict[str, Any]: """Build request body""" body = { "model": model, "messages": messages, "stream": stream } if "temperature" in kwargs: body["temperature"] = kwargs["temperature"] if "max_tokens" in kwargs: body["max_tokens"] = kwargs["max_tokens"] if tools: body["tools"] = tools return body def _parse_response(self, data: Dict) -> LLMResponse: """Parse response""" content = "" tool_calls = None usage = None if "choices" in data: choice = data["choices"][0] content = choice.get("message", {}).get("content", "") tool_calls = choice.get("message", {}).get("tool_calls") if "usage" in data: usage = data["usage"] return LLMResponse( content=content, tool_calls=tool_calls, usage=usage ) async def client(self) -> httpx.AsyncClient: """Get HTTP client""" if self._client is None: self._client = httpx.AsyncClient(timeout=120.0) return self._client async def sync_call( self, model: str, messages: List[Dict], tools: Optional[List[Dict]] = None, **kwargs ) -> LLMResponse: """Call LLM API (non-streaming)""" body = self._build_body(model, messages, tools, stream=False, **kwargs) async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( self.api_url, headers=self._build_headers(), json=body ) response.raise_for_status() data = response.json() return self._parse_response(data) async def stream_call( self, model: str, messages: List[Dict], tools: Optional[List[Dict]] = None, **kwargs ) -> AsyncGenerator[Dict[str, Any], None]: """Stream call LLM API""" body = self._build_body(model, messages, tools, stream=True, **kwargs) # Accumulators for tool calls (need to collect from delta chunks) accumulated_tool_calls = {} try: async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", self.api_url, headers=self._build_headers(), json=body ) as response: response.raise_for_status() async for line in response.aiter_lines(): if not line.strip(): continue if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": yield {"type": "done"} continue try: chunk = json.loads(data_str) except json.JSONDecodeError: continue if "choices" not in chunk: continue delta = chunk.get("choices", [{}])[0].get("delta", {}) # DeepSeek reasoner: use content if available, otherwise fall back to reasoning_content content = delta.get("content") reasoning = delta.get("reasoning_content", "") if content and isinstance(content, str) and content.strip(): yield {"type": "content_delta", "content": content} elif reasoning: yield {"type": "content_delta", "content": reasoning} # Accumulate tool calls from delta chunks (DeepSeek sends them incrementally) tool_calls_delta = delta.get("tool_calls", []) for tc in tool_calls_delta: idx = tc.get("index", 0) if idx not in accumulated_tool_calls: accumulated_tool_calls[idx] = {"index": idx} if "function" in tc: if "function" not in accumulated_tool_calls[idx]: accumulated_tool_calls[idx]["function"] = {"name": "", "arguments": ""} if "name" in tc["function"]: accumulated_tool_calls[idx]["function"]["name"] += tc["function"]["name"] if "arguments" in tc["function"]: accumulated_tool_calls[idx]["function"]["arguments"] += tc["function"]["arguments"] if tool_calls_delta: yield {"type": "tool_call_delta", "tool_call": tool_calls_delta} # Check for finish_reason to signal end of stream choice = chunk.get("choices", [{}])[0] finish_reason = choice.get("finish_reason") if finish_reason: # Build final tool_calls list from accumulated chunks final_tool_calls = list(accumulated_tool_calls.values()) if accumulated_tool_calls else None yield {"type": "done", "tool_calls": final_tool_calls} except httpx.HTTPStatusError as e: # Return error as an event instead of raising error_text = e.response.text if e.response else str(e) yield {"type": "error", "error": f"HTTP {e.response.status_code}: {error_text}"} except Exception as e: yield {"type": "error", "error": str(e)} # Global LLM client llm_client = LLMClient()