Luxx/luxx/services/llm_client.py

189 lines
5.9 KiB
Python

"""LLM API client"""
import json
import httpx
from typing import Dict, Any, Optional, List, AsyncGenerator
from luxx.config import config
class LLMResponse:
"""LLM response"""
content: str
tool_calls: Optional[List[Dict]] = None
usage: Optional[Dict] = None
def __init__(
self,
content: str = "",
tool_calls: Optional[List[Dict]] = None,
usage: Optional[Dict] = None
):
self.content = content
self.tool_calls = tool_calls
self.usage = usage
class LLMClient:
"""LLM API client with multi-provider support"""
def __init__(self, api_key: str = None, api_url: str = None, model: str = None):
self.api_key = api_key or config.llm_api_key
self.api_url = api_url or config.llm_api_url
self.default_model = model
self.provider = self._detect_provider()
self._client: Optional[httpx.AsyncClient] = None
def _detect_provider(self) -> str:
"""Detect provider from URL"""
url = self.api_url.lower()
if "deepseek" in url:
return "deepseek"
elif "glm" in url or "zhipu" in url:
return "glm"
elif "openai" in url:
return "openai"
return "openai"
async def close(self):
"""Close client"""
if self._client:
await self._client.aclose()
self._client = None
def _build_headers(self) -> Dict[str, str]:
"""Build request headers"""
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
def _build_body(
self,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
stream: bool = False,
**kwargs
) -> Dict[str, Any]:
"""Build request body"""
body = {
"model": model,
"messages": messages,
"stream": stream
}
if "temperature" in kwargs:
body["temperature"] = kwargs["temperature"]
if "max_tokens" in kwargs:
body["max_tokens"] = kwargs["max_tokens"]
if tools:
body["tools"] = tools
return body
def _parse_response(self, data: Dict) -> LLMResponse:
"""Parse response"""
content = ""
tool_calls = None
usage = None
if "choices" in data:
choice = data["choices"][0]
content = choice.get("message", {}).get("content", "")
tool_calls = choice.get("message", {}).get("tool_calls")
if "usage" in data:
usage = data["usage"]
return LLMResponse(
content=content,
tool_calls=tool_calls,
usage=usage
)
async def client(self) -> httpx.AsyncClient:
"""Get HTTP client"""
if self._client is None:
self._client = httpx.AsyncClient(timeout=120.0)
return self._client
async def sync_call(
self,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
**kwargs
) -> LLMResponse:
"""Call LLM API (non-streaming)"""
body = self._build_body(model, messages, tools, stream=False, **kwargs)
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
self.api_url,
headers=self._build_headers(),
json=body
)
response.raise_for_status()
data = response.json()
return self._parse_response(data)
async def stream_call(
self,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
**kwargs
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream call LLM API"""
body = self._build_body(model, messages, tools, stream=True, **kwargs)
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
"POST",
self.api_url,
headers=self._build_headers(),
json=body
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line.strip():
continue
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
yield {"type": "done"}
continue
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
if "choices" not in chunk:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
content_delta = delta.get("content", "")
if content_delta:
yield {"type": "content_delta", "content": content_delta}
tool_calls = delta.get("tool_calls", [])
if tool_calls:
yield {"type": "tool_call_delta", "tool_call": tool_calls}
finish_reason = chunk.get("choices", [{}])[0].get("finish_reason")
if finish_reason:
tool_calls_finish = chunk.get("choices", [{}])[0].get("message", {}).get("tool_calls")
yield {"type": "done", "tool_calls": tool_calls_finish}
# Global LLM client
llm_client = LLMClient()