fix: 修复工具调用接口的bug

This commit is contained in:
ViperEkura 2026-04-13 20:46:26 +08:00
parent c88b5686f3
commit 85619c0d97
15 changed files with 539 additions and 270 deletions

View File

@ -216,6 +216,82 @@ classDiagram
| `get_weather` | 天气查询 | 支持城市名查询 |
| `process_data` | 数据处理 | JSON 转换、格式化等 |
#### 工具开发规范
所有工具必须遵循统一的开发规范,确保错误处理和返回格式一致。
**核心原则:装饰器自动处理一切,工具函数只写业务逻辑**
```python
from luxx.tools.factory import tool
@tool(
name="my_tool",
description="工具描述",
parameters={...},
required_params=["arg1"], # 自动验证
category="my_category"
)
def my_tool(arguments: dict):
# 业务逻辑 - 只管返回数据
data = fetch_data(arguments["arg1"])
return {"items": data, "count": len(data)}
# 或者直接抛出异常(装饰器自动捕获并转换)
if invalid:
raise ValueError("Invalid input")
```
**装饰器自动处理:**
1. 必需参数验证(`required_params`
2. 所有异常捕获和转换
3. 结果格式统一包装
**返回格式转换**
| 工具函数返回/抛出 | 装饰器转换为 |
|-------------------|-------------|
| `return {"result": "ok"}` | `{"success": true, "data": {...}, "error": null}` |
| `raise ValueError("msg")` | `{"success": false, "data": null, "error": "ValueError: msg"}` |
| `raise Exception()` | `{"success": false, "data": null, "error": "..."}` |
**工具调用流程**
```
LLM 请求
ToolRegistry.execute(name, args)
@tool 装饰器
├─ 验证 required_params
├─ 执行工具函数 (try-except 包裹)
├─ 捕获异常 → 转换为 error
├─ 包装返回格式
└─ 返回 ToolResult
前端 ProcessBlock 显示
```
| 工具函数返回 | 装饰器转换为 |
|-------------|-------------|
| `{"result": "ok"}` | `{"success": true, "data": {...}, "error": null}` |
| `{"error": "msg"}` | `{"success": false, "data": null, "error": "msg"}` |
| `raise Exception()` | `{"success": false, "data": null, "error": "..."}` |
**工具调用流程**
```
LLM 请求 → ToolRegistry.execute() → @tool 装饰器包装
工具函数执行
返回 ToolResult 或 dict
自动转换为标准格式 {"success": bool, "data": any, "error": str}
前端 ProcessBlock 显示结果
```
### 6. 服务层
#### ChatService (`services/chat.py`)

View File

@ -1,7 +1,7 @@
# 配置文件
app:
secret_key: ${APP_SECRET_KEY}
debug: false
debug: true
host: 0.0.0.0
port: 8000

View File

@ -113,13 +113,20 @@ const allItems = computed(() => {
if (match) {
let resultContent = step.content || ''
let displayContent = resultContent
let hasError = false
// JSON
try {
const parsed = JSON.parse(resultContent)
// ToolResult
if (parsed.error) {
displayContent = `错误: ${parsed.error}`
hasError = true
} else if (parsed.success !== undefined && parsed.data !== undefined) {
// ToolResult : data
displayContent = JSON.stringify(parsed.data, null, 2)
} else {
// JSON
displayContent = JSON.stringify(parsed, null, 2)
}
} catch (e) {
@ -129,7 +136,7 @@ const allItems = computed(() => {
match.resultSummary = displayContent.slice(0, 200)
match.fullResult = displayContent
match.displayResult = displayContent.length > 2048 ? displayContent.slice(0, 2048) + '...' : displayContent
match.isSuccess = step.success !== false
match.isSuccess = !hasError && step.success !== false
match.loading = false
} else {
// tool_call

View File

@ -71,9 +71,41 @@ export function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
while (true) {
const { done, value } = await reader.read()
if (done) break
// 处理数据
if (value) {
buffer += decoder.decode(value, { stream: true })
}
// 流结束时,先处理 buffer 中的剩余数据,再 break
if (done) {
// 处理 buffer 中剩余的数据
const lines = buffer.split('\n')
buffer = ''
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6))
if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data.step)
} else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data)
} else if (currentEvent === 'error' && onError) {
onError(data.content)
}
} catch (e) {
// 忽略解析错误
}
}
}
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
@ -82,19 +114,24 @@ export function createSSEStream(url, body, { onProcessStep, onDone, onError }) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
} else if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6))
if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data.step)
} else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data)
} else if (currentEvent === 'error' && onError) {
onError(data.content)
try {
const data = JSON.parse(line.slice(6))
if (currentEvent === 'process_step' && onProcessStep) {
onProcessStep(data.step)
} else if (currentEvent === 'done' && onDone) {
completed = true
onDone(data)
} else if (currentEvent === 'error' && onError) {
onError(data.content)
}
} catch (e) {
// 忽略解析错误
}
}
}
}
// 流结束但没有收到 done 事件,才报错
if (!completed && onError) {
onError('stream ended unexpectedly')
}
@ -139,7 +176,8 @@ export const messagesAPI = {
return createSSEStream('/messages/stream', {
conversation_id: data.conversation_id,
content: data.content,
thinking_enabled: data.thinking_enabled || false
thinking_enabled: data.thinking_enabled || false,
enabled_tools: data.enabled_tools || []
}, callbacks)
},

View File

@ -165,18 +165,8 @@ const loadEnabledTools = async () => {
try {
const res = await toolsAPI.list()
if (res.success) {
const data = res.data?.categorized || {}
const enabled = []
Object.values(data).forEach(arr => {
if (Array.isArray(arr)) {
arr.forEach(t => {
if (t.enabled !== false) {
enabled.push(t.function?.name || t.name)
}
})
}
})
enabledTools.value = enabled
const tools = res.data?.tools || []
enabledTools.value = tools.map(t => t.function?.name || t.name)
}
} catch (e) {
console.error('Failed to load tools:', e)
@ -317,6 +307,15 @@ const formatTime = (time) => {
return new Date(time).toLocaleTimeString('zh-CN', { hour: '2-digit', minute: '2-digit' })
}
// ID
watch(() => route.params.id, (newId) => {
if (newId) {
conversationId.value = newId
loadMessages()
loadEnabledTools()
}
})
onMounted(() => {
loadMessages()
loadEnabledTools()

View File

@ -139,7 +139,7 @@
<script setup>
import { ref, computed, onMounted, watch, nextTick } from 'vue'
import { useRouter } from 'vue-router'
import { conversationsAPI, providersAPI, messagesAPI } from '../utils/api.js'
import { conversationsAPI, providersAPI, messagesAPI, toolsAPI } from '../utils/api.js'
import { renderMarkdown } from '../utils/markdown.js'
import ProcessBlock from '../components/ProcessBlock.vue'
@ -158,9 +158,23 @@ const selectedId = ref(null)
const selectedConv = ref(null)
const convMessages = ref([])
const loadingMessages = ref(false)
const enabledTools = ref([]) //
const totalPages = computed(() => Math.ceil(total.value / pageSize))
//
const loadEnabledTools = async () => {
try {
const res = await toolsAPI.list()
if (res.success) {
const tools = res.data?.tools || []
enabledTools.value = tools.map(t => t.function?.name || t.name)
}
} catch (e) {
console.error('Failed to load tools:', e)
}
}
const onProviderChange = () => {
const p = providers.value.find(p => p.id === form.value.provider_id)
if (p) form.value.model = p.default_model || ''
@ -267,7 +281,8 @@ const sendMessage = async () => {
await new Promise((resolve, reject) => {
messagesAPI.sendStream({
conversation_id: selectedConv.value.id,
content: content
content: content,
enabled_tools: enabledTools.value //
}, {
onProcessStep: (step) => {
if (!streamingMessage.value) return
@ -356,7 +371,10 @@ const formatDate = (dateStr) => {
return new Date(dateStr).toLocaleDateString('zh-CN', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' })
}
onMounted(fetchData)
onMounted(() => {
fetchData()
loadEnabledTools()
})
</script>
<style scoped>

View File

@ -35,6 +35,7 @@ async def lifespan(app: FastAPI):
finally:
db.close()
# Import and register tools
from luxx.tools.builtin import crawler, code, data
yield

View File

@ -22,6 +22,7 @@ class MessageCreate(BaseModel):
conversation_id: str
content: str
thinking_enabled: bool = False
enabled_tools: List[str] = [] # 启用的工具名称列表
class MessageResponse(BaseModel):
@ -142,7 +143,8 @@ async def stream_message(
async for sse_str in chat_service.stream_response(
conversation=conversation,
user_message=data.content,
thinking_enabled=data.thinking_enabled
thinking_enabled=data.thinking_enabled,
enabled_tools=data.enabled_tools
):
# Chat service returns raw SSE strings (including done event)
yield sse_str

View File

@ -19,17 +19,24 @@ def list_tools(
current_user: User = Depends(get_current_user)
):
"""Get available tools list"""
# Get tool definitions directly from registry to access category
from luxx.tools.core import ToolDefinition
if category:
tools = registry.list_by_category(category)
all_tools = [t for t in registry._tools.values() if t.category == category]
tools = [t.to_openai_format() for t in all_tools]
categorized_tools = [t for t in registry._tools.values() if t.category == category]
else:
all_tools = list(registry._tools.values())
tools = registry.list_all()
categorized_tools = all_tools
categorized = {}
for tool in tools:
cat = tool.get("category", "other")
for tool in categorized_tools:
cat = tool.category
if cat not in categorized:
categorized[cat] = []
categorized[cat].append(tool)
categorized[cat].append(tool.to_openai_format())
return success_response(data={
"tools": tools,

View File

@ -97,7 +97,8 @@ class ChatService:
self,
conversation: Conversation,
user_message: str,
thinking_enabled: bool = False
thinking_enabled: bool = False,
enabled_tools: list = None
) -> AsyncGenerator[Dict[str, str], None]:
"""
Streaming response generator
@ -112,8 +113,11 @@ class ChatService:
"content": json.dumps({"text": user_message, "attachments": []})
})
# Get all available tools
tools = registry.list_all()
# Get tools based on enabled_tools filter
if enabled_tools:
tools = [t for t in registry.list_all() if t.get("function", {}).get("name") in enabled_tools]
else:
tools = []
llm, provider_max_tokens = get_llm_client(conversation)
model = conversation.model or llm.default_model or "gpt-4"
@ -133,8 +137,6 @@ class ChatService:
text_step_idx = None
for iteration in range(MAX_ITERATIONS):
print(f"[CHAT] Starting iteration {iteration + 1}, messages: {len(messages)}")
# Stream from LLM
full_content = ""
full_thinking = ""
@ -193,19 +195,25 @@ class ChatService:
# Get delta
choices = chunk.get("choices", [])
if not choices:
# Check if there's any content in the response
# Check if there's any content in the response (for non-standard LLM responses)
if chunk.get("content") or chunk.get("message"):
content = chunk.get("content") or chunk.get("message", {}).get("content", "")
if content:
# BUG FIX: Update full_content so it gets saved to database
prev_content_len = len(full_content)
full_content += content
if prev_content_len == 0: # New text stream started
text_step_idx = step_index
text_step_id = f"step-{step_index}"
step_index += 1
yield _sse_event("process_step", {
"step": {
"id": f"step-{step_index}",
"index": step_index,
"id": text_step_id if prev_content_len == 0 else f"step-{step_index - 1}",
"index": text_step_idx if prev_content_len == 0 else step_index - 1,
"type": "text",
"content": content
"content": full_content # Always send accumulated content
}
})
step_index += 1
continue
delta = choices[0].get("delta", {})
@ -313,13 +321,25 @@ class ChatService:
result_step_idx = step_index
result_step_id = f"step-{step_index}"
step_index += 1
# 解析 content 中的 success 状态
content = tr.get("content", "")
success = True
try:
content_obj = json.loads(content)
if isinstance(content_obj, dict):
success = content_obj.get("success", True)
except:
pass
result_step = {
"id": result_step_id,
"index": result_step_idx,
"type": "tool_result",
"id_ref": tool_call_step_id, # Reference to the tool_call step
"name": tr.get("name", ""),
"content": tr.get("content", "")
"content": content,
"success": success
}
all_steps.append(result_step)
yield _sse_event("process_step", {"step": result_step})
@ -357,11 +377,20 @@ class ChatService:
})
return
# Max iterations exceeded
# Max iterations exceeded - save message before error
if full_content or all_tool_calls:
msg_id = str(uuid.uuid4())
self._save_message(
conversation.id,
msg_id,
full_content,
all_tool_calls,
all_tool_results,
all_steps
)
yield _sse_event("error", {"content": "Exceeded maximum tool call iterations"})
except Exception as e:
print(f"[CHAT] Exception: {type(e).__name__}: {str(e)}")
yield _sse_event("error", {"content": str(e)})
def _save_message(
@ -396,8 +425,8 @@ class ChatService:
db.add(msg)
db.commit()
except Exception as e:
print(f"[CHAT] Failed to save message: {e}")
db.rollback()
raise
finally:
db.close()

View File

@ -1,6 +1,6 @@
"""Code execution tools"""
import json
import traceback
"""Code execution tools - exception handling in decorator"""
import io
from contextlib import redirect_stdout
from typing import Dict, Any
from luxx.tools.factory import tool
@ -24,53 +24,45 @@ from luxx.tools.factory import tool
},
"required": ["code"]
},
required_params=["code"],
category="code"
)
def python_execute(arguments: Dict[str, Any]) -> Dict[str, Any]:
def python_execute(arguments: Dict[str, Any]):
"""
Execute Python code
Note: This is a simplified executor, production environments should use safer isolated environments
such as: Docker containers, Pyodide, etc.
Returns:
{"output": str, "variables": dict}
"""
code = arguments.get("code", "")
timeout = arguments.get("timeout", 30)
if not code:
return {"error": "Code is required"}
# Create execution environment
namespace = {
"__builtins__": __builtins__
}
try:
# Compile and execute code
compiled = compile(code, "<string>", "exec")
# Capture output
import io
from contextlib import redirect_stdout
output = io.StringIO()
with redirect_stdout(output):
exec(compiled, namespace)
result = output.getvalue()
# Try to extract variables
result_vars = {k: v for k, v in namespace.items()
if not k.startswith("_") and k != "__builtins__"}
return {
"output": result,
"variables": {k: repr(v) for k, v in result_vars.items()}
}
except SyntaxError as e:
return {"error": f"Syntax error: {e}"}
except Exception as e:
return {"error": f"Runtime error: {type(e).__name__}: {str(e)}"}
# Compile and execute code
compiled = compile(code, "<string>", "exec")
# Capture output
output = io.StringIO()
with redirect_stdout(output):
exec(compiled, namespace)
result = output.getvalue()
# Extract variables
result_vars = {k: v for k, v in namespace.items()
if not k.startswith("_") and k != "__builtins__"}
return {
"output": result,
"variables": {k: repr(v) for k, v in result_vars.items()}
}
@tool(
@ -86,20 +78,20 @@ def python_execute(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"required": ["expression"]
},
required_params=["expression"],
category="code"
)
def python_eval(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate Python expression"""
def python_eval(arguments: Dict[str, Any]):
"""
Evaluate Python expression
Returns:
{"result": str, "type": str}
"""
expression = arguments.get("expression", "")
if not expression:
return {"error": "Expression is required"}
try:
result = eval(expression)
return {
"result": repr(result),
"type": type(result).__name__
}
except Exception as e:
return {"error": f"Evaluation error: {str(e)}"}
result = eval(expression)
return {
"result": repr(result),
"type": type(result).__name__
}

View File

@ -1,7 +1,10 @@
"""Crawler tools"""
"""Crawler tools - all exception handling in decorator"""
from luxx.tools.factory import tool
from luxx.tools.services import SearchService, FetchService
# 服务实例SearchService.search() 是静态方法风格,不需要实例化)
_fetch_service = FetchService()
@tool(name="web_search", description="Search the internet. Use when you need to find latest news or answer questions.", parameters={
"type": "object",
@ -10,42 +13,63 @@ from luxx.tools.services import SearchService, FetchService
"max_results": {"type": "integer", "description": "Number of results, default 5", "default": 5}
},
"required": ["query"]
}, category="crawler")
def web_search(arguments: dict) -> dict:
results = SearchService().search(arguments["query"], arguments.get("max_results", 5))
return {"results": results or []}
}, required_params=["query"], category="crawler")
def web_search(arguments: dict):
"""
Search the web using DuckDuckGo
Returns:
{"query": str, "count": int, "results": list}
"""
query = arguments["query"]
max_results = arguments.get("max_results", 5)
results = SearchService().search(query, max_results)
return {
"query": query,
"count": len(results),
"results": results
}
@tool(name="web_fetch", description="Fetch content from a webpage.", parameters={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"},
"extract_type": {"type": "string", "enum": ["text", "links", "structured"], "default": "text"}
"url": {"type": "string", "description": "URL to fetch"}
},
"required": ["url"]
}, category="crawler")
def web_fetch(arguments: dict) -> dict:
if not arguments.get("url"):
return {"error": "URL is required"}
result = FetchService().fetch(arguments["url"], arguments.get("extract_type", "text"))
if "error" in result:
return {"error": result["error"]}
return result
}, required_params=["url"], category="crawler")
def web_fetch(arguments: dict):
"""
Fetch webpage content
Returns:
{"url": str, "title": str, "text": str}
"""
url = arguments["url"]
return _fetch_service.fetch(url)
@tool(name="batch_fetch", description="Batch fetch multiple webpages.", parameters={
"type": "object",
"properties": {
"urls": {"type": "array", "items": {"type": "string"}, "description": "URLs to fetch"},
"extract_type": {"type": "string", "enum": ["text", "links", "structured"], "default": "text"}
"urls": {"type": "array", "items": {"type": "string"}, "description": "URLs to fetch"}
},
"required": ["urls"]
}, category="crawler")
def batch_fetch(arguments: dict) -> dict:
}, required_params=["urls"], category="crawler")
def batch_fetch(arguments: dict):
"""
Batch fetch multiple webpages
Returns:
{"count": int, "results": list}
"""
urls = arguments.get("urls", [])
if not urls:
return {"error": "URLs list is required"}
if len(urls) > 10:
return {"error": "Maximum 10 pages allowed"}
results = FetchService().fetch_batch(urls, arguments.get("extract_type", "text"))
return {"results": results, "total": len(results)}
results = _fetch_service.fetch_batch(urls)
return {
"count": len(results),
"results": results
}

View File

@ -1,7 +1,8 @@
"""Data processing tools"""
"""Data processing tools - exception handling in decorator"""
import re
import json
import base64
import hashlib
from typing import Dict, Any
from urllib.parse import quote, unquote
@ -21,40 +22,39 @@ from luxx.tools.factory import tool
},
"required": ["expression"]
},
required_params=["expression"],
category="data"
)
def calculate(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute mathematical calculation"""
def calculate(arguments: Dict[str, Any]):
"""
Execute mathematical calculation
Returns:
{"result": float, "expression": str}
"""
expression = arguments.get("expression", "")
if not expression:
return {"error": "Expression is required"}
# Safe replacement for math functions
safe_dict = {
"abs": abs,
"round": round,
"min": min,
"max": max,
"pow": pow,
"sqrt": lambda x: x ** 0.5,
"sin": lambda x: __import__('math').sin(x),
"cos": lambda x: __import__('math').cos(x),
"tan": lambda x: __import__('math').tan(x),
"log": lambda x: __import__('math').log(x),
"pi": __import__('math').pi,
"e": __import__('math').e
}
try:
# Safe replacement for math functions
safe_dict = {
"abs": abs,
"round": round,
"min": min,
"max": max,
"pow": pow,
"sqrt": lambda x: x ** 0.5,
"sin": lambda x: __import__('math').sin(x),
"cos": lambda x: __import__('math').cos(x),
"tan": lambda x: __import__('math').tan(x),
"log": lambda x: __import__('math').log(x),
"pi": __import__('math').pi,
"e": __import__('math').e
}
# Remove dangerous characters, only keep numbers and operators
safe_expr = re.sub(r"[^0-9+\-*/().%sqrtinsclogmaxminpowabsroundte, ]", "", expression)
result = eval(safe_expr, {"__builtins__": {}, **safe_dict})
return {"result": result}
except Exception as e:
return {"error": f"Calculation error: {str(e)}"}
# Remove dangerous characters, only keep numbers and operators
safe_expr = re.sub(r"[^0-9+\-*/().%sqrtinsclogmaxminpowabsroundte, ]", "", expression)
result = eval(safe_expr, {"__builtins__": {}, **safe_dict})
return {"result": result, "expression": expression}
@tool(
@ -69,22 +69,25 @@ def calculate(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"operation": {
"type": "string",
"description": "Operation to perform: uppercase, lowercase, title, strip, reverse, word_count, char_count",
"description": "Operation to perform",
"enum": ["uppercase", "lowercase", "title", "strip", "reverse", "word_count", "char_count"]
}
},
"required": ["text", "operation"]
},
required_params=["text", "operation"],
category="data"
)
def text_process(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Text processing"""
def text_process(arguments: Dict[str, Any]):
"""
Text processing operations
Returns:
{"operation": str, "result": str|int, "original_length": int}
"""
text = arguments.get("text", "")
operation = arguments.get("operation", "")
if not text:
return {"error": "Text is required"}
operations = {
"uppercase": text.upper(),
"lowercase": text.lower(),
@ -96,11 +99,14 @@ def text_process(arguments: Dict[str, Any]) -> Dict[str, Any]:
}
result = operations.get(operation)
if result is None:
return {"error": f"Unknown operation: {operation}"}
raise ValueError(f"Unknown operation: {operation}")
return {"result": result}
return {
"operation": operation,
"result": result,
"original_length": len(text)
}
@tool(
@ -121,31 +127,31 @@ def text_process(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"required": ["data", "operation"]
},
required_params=["data", "operation"],
category="data"
)
def json_process(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""JSON data processing"""
def json_process(arguments: Dict[str, Any]):
"""
JSON data processing
Returns:
{"operation": str, "result": str}
"""
data = arguments.get("data", "")
operation = arguments.get("operation", "")
if not data:
return {"error": "Data is required"}
parsed = json.loads(data)
try:
parsed = json.loads(data)
if operation == "format":
result = json.dumps(parsed, indent=2, ensure_ascii=False)
elif operation == "minify":
result = json.dumps(parsed, ensure_ascii=False)
elif operation == "validate":
result = "Valid JSON"
else:
return {"error": f"Unknown operation: {operation}"}
return {"result": result}
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON: {str(e)}"}
if operation == "format":
result = json.dumps(parsed, indent=2, ensure_ascii=False)
elif operation == "minify":
result = json.dumps(parsed, ensure_ascii=False)
elif operation == "validate":
result = "Valid JSON"
else:
raise ValueError(f"Unknown operation: {operation}")
return {"operation": operation, "result": result}
@tool(
@ -160,32 +166,35 @@ def json_process(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"algorithm": {
"type": "string",
"description": "Hash algorithm: md5, sha1, sha256, sha512",
"description": "Hash algorithm",
"enum": ["md5", "sha1", "sha256", "sha512"],
"default": "sha256"
}
},
"required": ["text"]
},
required_params=["text"],
category="data"
)
def hash_text(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Generate text hash"""
import hashlib
def hash_text(arguments: Dict[str, Any]):
"""
Generate text hash
Returns:
{"algorithm": str, "hash": str, "length": int}
"""
text = arguments.get("text", "")
algorithm = arguments.get("algorithm", "sha256")
if not text:
return {"error": "Text is required"}
hash_obj = hashlib.new(algorithm)
hash_obj.update(text.encode('utf-8'))
hash_value = hash_obj.hexdigest()
try:
hash_obj = hashlib.new(algorithm)
hash_obj.update(text.encode('utf-8'))
hash_value = hash_obj.hexdigest()
return {"hash": hash_value}
except Exception as e:
return {"error": f"Hash error: {str(e)}"}
return {
"algorithm": algorithm,
"hash": hash_value,
"length": len(hash_value)
}
@tool(
@ -200,33 +209,33 @@ def hash_text(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"operation": {
"type": "string",
"description": "Operation: encode, decode",
"description": "Operation",
"enum": ["encode", "decode"]
}
},
"required": ["text", "operation"]
},
required_params=["text", "operation"],
category="data"
)
def url_encode_decode(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""URL encoding/decoding"""
def url_encode_decode(arguments: Dict[str, Any]):
"""
URL encoding/decoding
Returns:
{"operation": str, "result": str}
"""
text = arguments.get("text", "")
operation = arguments.get("operation", "")
if not text:
return {"error": "Text is required"}
if operation == "encode":
result = quote(text)
elif operation == "decode":
result = unquote(text)
else:
raise ValueError(f"Unknown operation: {operation}")
try:
if operation == "encode":
result = quote(text)
elif operation == "decode":
result = unquote(text)
else:
return {"error": f"Unknown operation: {operation}"}
return {"result": result}
except Exception as e:
return {"error": f"URL error: {str(e)}"}
return {"operation": operation, "result": result}
@tool(
@ -241,30 +250,30 @@ def url_encode_decode(arguments: Dict[str, Any]) -> Dict[str, Any]:
},
"operation": {
"type": "string",
"description": "Operation: encode, decode",
"description": "Operation",
"enum": ["encode", "decode"]
}
},
"required": ["text", "operation"]
},
required_params=["text", "operation"],
category="data"
)
def base64_encode_decode(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Base64 encoding/decoding"""
def base64_encode_decode(arguments: Dict[str, Any]):
"""
Base64 encoding/decoding
Returns:
{"operation": str, "result": str}
"""
text = arguments.get("text", "")
operation = arguments.get("operation", "")
if not text:
return {"error": "Text is required"}
if operation == "encode":
result = base64.b64encode(text.encode()).decode()
elif operation == "decode":
result = base64.b64decode(text.encode()).decode()
else:
raise ValueError(f"Unknown operation: {operation}")
try:
if operation == "encode":
result = base64.b64encode(text.encode()).decode()
elif operation == "decode":
result = base64.b64decode(text.encode()).decode()
else:
return {"error": f"Unknown operation: {operation}"}
return {"result": result}
except Exception as e:
return {"error": f"Base64 error: {str(e)}"}
return {"operation": operation, "result": result}

View File

@ -1,16 +1,31 @@
"""Tool decorator factory"""
from typing import Callable, Any, Dict
from luxx.tools.core import ToolDefinition, registry
"""Tool decorator factory with unified result wrapping"""
from typing import Callable, Any, Dict, Optional, List
from luxx.tools.core import ToolDefinition, ToolResult, registry
import traceback
import logging
logger = logging.getLogger(__name__)
def tool(
name: str,
description: str,
parameters: Dict[str, Any],
category: str = "general"
category: str = "general",
required_params: Optional[List[str]] = None
):
"""
Tool registration decorator
Tool registration decorator with UNIFED result wrapping
The decorator automatically wraps all return values into ToolResult format.
Tool functions only need to return plain data - no need to use ToolResult manually.
Args:
name: Tool name
description: Tool description
parameters: OpenAI format parameters schema
category: Tool category
required_params: List of required parameter names (auto-validated)
Usage:
```python
@ -23,21 +38,52 @@ def tool(
"arg1": {"type": "string"}
},
"required": ["arg1"]
}
},
required_params=["arg1"] # Auto-validated
)
def my_tool(arguments: dict) -> dict:
# Implementation...
return {"result": "success"}
def my_tool(arguments: dict):
# Just return plain data - decorator handles wrapping!
return {"result": "success", "count": 5}
# The tool will be automatically registered
# For errors, return a dict with "error" key:
def my_tool2(arguments: dict):
return {"error": "Something went wrong"}
# Or raise an exception:
def my_tool3(arguments: dict):
raise ValueError("Invalid input")
```
The decorator will convert:
- Plain dict {"success": true, "data": dict, "error": null}
- {"error": "msg"} {"success": false, "data": null, "error": "msg"}
- Exception {"success": false, "data": null, "error": "..."}
"""
def decorator(func: Callable) -> Callable:
def wrapped_handler(arguments: Dict[str, Any]) -> ToolResult:
try:
# 1. Validate required params
if required_params:
for param in required_params:
if param not in arguments or arguments[param] is None:
return ToolResult.fail(f"Missing required parameter: {param}")
# 2. Execute handler
result = func(arguments)
# 3. Auto-wrap result
return _auto_wrap(result)
except Exception as e:
logger.error(f"[{name}] Unexpected error: {type(e).__name__}: {str(e)}")
logger.debug(traceback.format_exc())
return ToolResult.fail(f"Execution failed: {type(e).__name__}: {str(e)}")
tool_def = ToolDefinition(
name=name,
description=description,
parameters=parameters,
handler=func,
handler=wrapped_handler,
category=category
)
registry.register(tool_def)
@ -45,13 +91,44 @@ def tool(
return decorator
def _auto_wrap(result: Any) -> ToolResult:
"""
Auto-wrap any return value into ToolResult format
Rules:
- ToolResult use directly
- dict with "error" key ToolResult.fail()
- dict without "error" ToolResult.ok()
- other values ToolResult.ok()
"""
# Already a ToolResult
if isinstance(result, ToolResult):
return result
# Dict with error
if isinstance(result, dict) and "error" in result:
return ToolResult.fail(str(result["error"]))
# Plain dict or other value
return ToolResult.ok(result)
def tool_function(
name: str = None,
description: str = None,
parameters: Dict[str, Any] = None,
category: str = "general"
category: str = "general",
required_params: Optional[List[str]] = None
):
"""
Alias for tool decorator, providing a more semantic naming
All parameters are the same as tool()
"""
return tool(name=name, description=description, parameters=parameters, category=category)
return tool(
name=name,
description=description,
parameters=parameters,
category=category,
required_params=required_params
)

View File

@ -13,15 +13,13 @@ class SearchService:
def search(self, query: str, max_results: int = 5) -> List[dict]:
results = []
try:
for result in DDGS().text(query, max_results=max_results):
with DDGS() as client:
for result in client.text(query, max_results=max_results):
results.append({
"title": result.get("title", ""),
"url": result.get("href", ""),
"snippet": result.get("body", "")
})
except Exception:
pass
return results
@ -35,32 +33,27 @@ class FetchService:
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
with httpx.Client(timeout=self.timeout, follow_redirects=True) as client:
response = client.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
title = soup.title.string if soup.title else ""
text = soup.get_text(separator="\n", strip=True)
return {
"url": url,
"title": title[:500] if title else "",
"text": text[:15000]
}
with httpx.Client(timeout=self.timeout, follow_redirects=True) as client:
response = client.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
title = soup.title.string if soup.title else ""
text = soup.get_text(separator="\n", strip=True)
return {
"url": url,
"title": title[:500] if title else "",
"text": text[:15000]
}
except Exception as e:
return {"error": str(e)}
return {"url": url, "title": "", "text": ""}
def fetch_batch(self, urls: List[str], extract_type: str = "text", max_concurrent: int = 5) -> List[dict]:
if len(urls) <= 1:
@ -72,9 +65,6 @@ class FetchService:
with ThreadPoolExecutor(max_workers=max_concurrent) as pool:
futures = {pool.submit(self.fetch, url, extract_type): i for i, url in enumerate(urls)}
for future in as_completed(futures):
try:
results[futures[future]] = future.result()
except Exception as e:
results[futures[future]] = {"error": str(e)}
results[futures[future]] = future.result()
return results