diff --git a/backend/__init__.py b/backend/__init__.py index 61bd9ab..87bf0a5 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -4,6 +4,7 @@ from flask import Flask from flask_sqlalchemy import SQLAlchemy from pathlib import Path +# Initialize db BEFORE importing models/routes that depend on it db = SQLAlchemy() CONFIG_PATH = Path(__file__).parent.parent / "config.yml" @@ -26,9 +27,13 @@ def create_app(): db.init_app(app) + # Import after db is initialized from .models import User, Conversation, Message, TokenUsage from .routes import register_routes + from .tools import init_tools + register_routes(app) + init_tools() with app.app_context(): db.create_all() diff --git a/backend/routes.py b/backend/routes.py index c28fd53..596ef2d 100644 --- a/backend/routes.py +++ b/backend/routes.py @@ -7,6 +7,7 @@ from flask import request, jsonify, Response, Blueprint, current_app from . import db from .models import Conversation, Message, User, TokenUsage from . import load_config +from .tools import registry, ToolExecutor bp = Blueprint("api", __name__) @@ -51,7 +52,7 @@ def to_dict(inst, **extra): def record_token_usage(user_id, model, prompt_tokens, completion_tokens): - """记录 token 使用量""" + """Record token usage""" from datetime import date today = date.today() usage = TokenUsage.query.filter_by( @@ -75,10 +76,13 @@ def record_token_usage(user_id, model, prompt_tokens, completion_tokens): def build_glm_messages(conv): + """Build messages list for GLM API from conversation""" msgs = [] if conv.system_prompt: msgs.append({"role": "system", "content": conv.system_prompt}) - for m in conv.messages: + # Query messages directly to avoid detached instance warning + messages = Message.query.filter_by(conversation_id=conv.id).order_by(Message.created_at.asc()).all() + for m in messages: msgs.append({"role": m.role, "content": m.content}) return msgs @@ -87,15 +91,27 @@ def build_glm_messages(conv): @bp.route("/api/models", methods=["GET"]) def list_models(): - """获取可用模型列表""" + """Get available model list""" return ok(MODELS) +# -- Tools API -------------------------------------------- + +@bp.route("/api/tools", methods=["GET"]) +def list_tools(): + """Get available tool list""" + tools = registry.list_all() + return ok({ + "tools": tools, + "total": len(tools) + }) + + # -- Token Usage Statistics -------------------------------- @bp.route("/api/stats/tokens", methods=["GET"]) def token_stats(): - """获取 token 使用统计""" + """Get token usage statistics""" from sqlalchemy import func from datetime import date, timedelta @@ -105,7 +121,7 @@ def token_stats(): today = date.today() if period == "daily": - # 今日统计 + # Today's statistics stats = TokenUsage.query.filter_by(user_id=user.id, date=today).all() result = { "period": "daily", @@ -116,7 +132,7 @@ def token_stats(): "by_model": {s.model: {"prompt": s.prompt_tokens, "completion": s.completion_tokens, "total": s.total_tokens} for s in stats} } elif period == "weekly": - # 本周统计 (最近7天) + # Weekly statistics (last 7 days) start_date = today - timedelta(days=6) stats = TokenUsage.query.filter( TokenUsage.user_id == user.id, @@ -133,7 +149,7 @@ def token_stats(): daily_data[d]["completion"] += s.completion_tokens daily_data[d]["total"] += s.total_tokens - # 填充没有数据的日期 + # Fill missing dates for i in range(7): d = (today - timedelta(days=6-i)).isoformat() if d not in daily_data: @@ -149,7 +165,7 @@ def token_stats(): "daily": daily_data } elif period == "monthly": - # 本月统计 (最近30天) + # Monthly statistics (last 30 days) start_date = today - timedelta(days=29) stats = TokenUsage.query.filter( TokenUsage.user_id == user.id, @@ -166,7 +182,7 @@ def token_stats(): daily_data[d]["completion"] += s.completion_tokens daily_data[d]["total"] += s.total_tokens - # 填充没有数据的日期 + # Fill missing dates for i in range(30): d = (today - timedelta(days=29-i)).isoformat() if d not in daily_data: @@ -301,15 +317,19 @@ def delete_message(conv_id, msg_id): # -- Chat Completion ---------------------------------- -def _call_glm(conv, stream=False): +def _call_glm(conv, stream=False, tools=None, messages=None): + """Call GLM API""" body = { "model": conv.model, - "messages": build_glm_messages(conv), + "messages": messages if messages is not None else build_glm_messages(conv), "max_tokens": conv.max_tokens, "temperature": conv.temperature, } if conv.thinking_enabled: body["thinking"] = {"type": "enabled"} + if tools: + body["tools"] = tools + body["tool_choice"] = "auto" if stream: body["stream"] = True return requests.post( @@ -320,101 +340,192 @@ def _call_glm(conv, stream=False): def _sync_response(conv): - try: - resp = _call_glm(conv) - resp.raise_for_status() - result = resp.json() - except Exception as e: - return err(500, f"upstream error: {e}") - - choice = result["choices"][0] - usage = result.get("usage", {}) - prompt_tokens = usage.get("prompt_tokens", 0) - completion_tokens = usage.get("completion_tokens", 0) - - msg = Message( - id=str(uuid.uuid4()), conversation_id=conv.id, role="assistant", - content=choice["message"]["content"], - token_count=completion_tokens, - thinking_content=choice["message"].get("reasoning_content", ""), - ) - db.session.add(msg) - db.session.commit() - - # 记录 token 使用 - user = get_or_create_default_user() - record_token_usage(user.id, conv.model, prompt_tokens, completion_tokens) - - return ok({ - "message": to_dict(msg, thinking_content=msg.thinking_content or None), - "usage": {"prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": usage.get("total_tokens", 0)}, - }) - - -def _stream_response(conv): - conv_id = conv.id - conv_model = conv.model - app = current_app._get_current_object() - - def generate(): - full_content = "" - full_thinking = "" - token_count = 0 - prompt_tokens = 0 - msg_id = str(uuid.uuid4()) + """Sync response with tool call support""" + executor = ToolExecutor(registry=registry) + tools = registry.list_all() + messages = build_glm_messages(conv) + max_iterations = 5 # Max tool call iterations + for _ in range(max_iterations): try: - with app.app_context(): - active_conv = db.session.get(Conversation, conv_id) - resp = _call_glm(active_conv, stream=True) - resp.raise_for_status() - - for line in resp.iter_lines(): - if not line: - continue - line = line.decode("utf-8") - if not line.startswith("data: "): - continue - data_str = line[6:] - if data_str == "[DONE]": - break - try: - chunk = json.loads(data_str) - except json.JSONDecodeError: - continue - delta = chunk["choices"][0].get("delta", {}) - reasoning = delta.get("reasoning_content", "") - text = delta.get("content", "") - if reasoning: - full_thinking += reasoning - yield f"event: thinking\ndata: {json.dumps({'content': reasoning}, ensure_ascii=False)}\n\n" - if text: - full_content += text - yield f"event: message\ndata: {json.dumps({'content': text}, ensure_ascii=False)}\n\n" - usage = chunk.get("usage", {}) - if usage: - token_count = usage.get("completion_tokens", 0) - prompt_tokens = usage.get("prompt_tokens", 0) + resp = _call_glm(conv, tools=tools if tools else None, messages=messages) + resp.raise_for_status() + result = resp.json() except Exception as e: - yield f"event: error\ndata: {json.dumps({'content': str(e)}, ensure_ascii=False)}\n\n" - return + return err(500, f"upstream error: {e}") + + choice = result["choices"][0] + message = choice["message"] + + # If no tool calls, return final result + if not message.get("tool_calls"): + usage = result.get("usage", {}) + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) - # 流式结束后最后写入数据库 - with app.app_context(): msg = Message( - id=msg_id, conversation_id=conv_id, role="assistant", - content=full_content, token_count=token_count, thinking_content=full_thinking, + id=str(uuid.uuid4()), conversation_id=conv.id, role="assistant", + content=message.get("content", ""), + token_count=completion_tokens, + thinking_content=message.get("reasoning_content", ""), ) db.session.add(msg) db.session.commit() - # 记录 token 使用 user = get_or_create_default_user() - record_token_usage(user.id, conv_model, prompt_tokens, token_count) + record_token_usage(user.id, conv.model, prompt_tokens, completion_tokens) - yield f"event: done\ndata: {json.dumps({'message_id': msg_id, 'token_count': token_count})}\n\n" + return ok({ + "message": to_dict(msg, thinking_content=msg.thinking_content or None), + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": usage.get("total_tokens", 0) + }, + }) + + # Process tool calls + tool_calls = message["tool_calls"] + messages.append(message) + + # Execute tools and add results + tool_results = executor.process_tool_calls(tool_calls) + messages.extend(tool_results) + + # Save tool call records to database + for i, call in enumerate(tool_calls): + tool_msg = Message( + id=str(uuid.uuid4()), + conversation_id=conv.id, + role="tool", + content=tool_results[i]["content"] + ) + db.session.add(tool_msg) + db.session.commit() + + return err(500, "exceeded maximum tool call iterations") + + +def _stream_response(conv): + """Stream response with tool call support""" + conv_id = conv.id + conv_model = conv.model + app = current_app._get_current_object() + executor = ToolExecutor(registry=registry) + tools = registry.list_all() + # Build messages BEFORE entering generator (in request context) + initial_messages = build_glm_messages(conv) + + def generate(): + messages = list(initial_messages) # Copy to avoid mutation + max_iterations = 5 + + for iteration in range(max_iterations): + full_content = "" + full_thinking = "" + token_count = 0 + prompt_tokens = 0 + msg_id = str(uuid.uuid4()) + tool_calls_list = [] + current_tool_call = None + + try: + with app.app_context(): + active_conv = db.session.get(Conversation, conv_id) + resp = _call_glm(active_conv, stream=True, tools=tools if tools else None, messages=messages) + resp.raise_for_status() + + for line in resp.iter_lines(): + if not line: + continue + line = line.decode("utf-8") + if not line.startswith("data: "): + continue + data_str = line[6:] + if data_str == "[DONE]": + break + try: + chunk = json.loads(data_str) + except json.JSONDecodeError: + continue + + delta = chunk["choices"][0].get("delta", {}) + + # Process thinking chain + reasoning = delta.get("reasoning_content", "") + if reasoning: + full_thinking += reasoning + yield f"event: thinking\ndata: {json.dumps({'content': reasoning}, ensure_ascii=False)}\n\n" + + # Process text content + text = delta.get("content", "") + if text: + full_content += text + yield f"event: message\ndata: {json.dumps({'content': text}, ensure_ascii=False)}\n\n" + + # Process tool calls + tool_calls_delta = delta.get("tool_calls", []) + for tc in tool_calls_delta: + idx = tc.get("index", 0) + if idx >= len(tool_calls_list): + tool_calls_list.append({ + "id": tc.get("id", ""), + "type": tc.get("type", "function"), + "function": {"name": "", "arguments": ""} + }) + if tc.get("id"): + tool_calls_list[idx]["id"] = tc["id"] + if tc.get("function"): + if tc["function"].get("name"): + tool_calls_list[idx]["function"]["name"] = tc["function"]["name"] + if tc["function"].get("arguments"): + tool_calls_list[idx]["function"]["arguments"] += tc["function"]["arguments"] + + usage = chunk.get("usage", {}) + if usage: + token_count = usage.get("completion_tokens", 0) + prompt_tokens = usage.get("prompt_tokens", 0) + + except Exception as e: + yield f"event: error\ndata: {json.dumps({'content': str(e)}, ensure_ascii=False)}\n\n" + return + + # If tool calls exist, execute and continue loop + if tool_calls_list: + # Send tool call info + yield f"event: tool_calls\ndata: {json.dumps({'calls': tool_calls_list}, ensure_ascii=False)}\n\n" + + # Execute tools + tool_results = executor.process_tool_calls(tool_calls_list) + messages.append({ + "role": "assistant", + "content": full_content or None, + "tool_calls": tool_calls_list + }) + messages.extend(tool_results) + + # Send tool results + for tr in tool_results: + yield f"event: tool_result\ndata: {json.dumps({'name': tr['name'], 'content': tr['content']}, ensure_ascii=False)}\n\n" + + continue + + # No tool calls, finish + with app.app_context(): + msg = Message( + id=msg_id, conversation_id=conv_id, role="assistant", + content=full_content, token_count=token_count, thinking_content=full_thinking, + ) + db.session.add(msg) + db.session.commit() + + user = get_or_create_default_user() + record_token_usage(user.id, conv_model, prompt_tokens, token_count) + + yield f"event: done\ndata: {json.dumps({'message_id': msg_id, 'token_count': token_count})}\n\n" + return + + yield f"event: error\ndata: {json.dumps({'content': 'exceeded maximum tool call iterations'}, ensure_ascii=False)}\n\n" return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) diff --git a/backend/tools/__init__.py b/backend/tools/__init__.py new file mode 100644 index 0000000..a49b612 --- /dev/null +++ b/backend/tools/__init__.py @@ -0,0 +1,46 @@ +""" +NanoClaw Tool System + +Usage: + from backend.tools import registry, ToolExecutor, tool + from backend.tools import init_tools + + # Initialize built-in tools + init_tools() + + # List all tools + tools = registry.list_all() + + # Execute a tool + result = registry.execute("web_search", {"query": "Python"}) +""" + +from .core import ToolDefinition, ToolResult, ToolRegistry, registry +from .factory import tool, register_tool +from .executor import ToolExecutor + + +def init_tools() -> None: + """ + Initialize all built-in tools + + Importing builtin module automatically registers all decorator-defined tools + """ + from .builtin import crawler, data # noqa: F401 + + +# Public API exports +__all__ = [ + # Core classes + "ToolDefinition", + "ToolResult", + "ToolRegistry", + "ToolExecutor", + # Instances + "registry", + # Factory functions + "tool", + "register_tool", + # Initialization + "init_tools", +] diff --git a/backend/tools/builtin/__init__.py b/backend/tools/builtin/__init__.py new file mode 100644 index 0000000..4baa390 --- /dev/null +++ b/backend/tools/builtin/__init__.py @@ -0,0 +1,3 @@ +"""Built-in tools""" +from .crawler import * +from .data import * diff --git a/backend/tools/builtin/crawler.py b/backend/tools/builtin/crawler.py new file mode 100644 index 0000000..04bcbae --- /dev/null +++ b/backend/tools/builtin/crawler.py @@ -0,0 +1,134 @@ +"""Crawler related tools""" +from ..factory import tool +from ..services import SearchService, FetchService + + +@tool( + name="web_search", + description="Search the internet for information. Use when you need to find latest news or answer questions that require web search.", + parameters={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search keywords" + }, + "max_results": { + "type": "integer", + "description": "Number of results to return, default 5", + "default": 5 + } + }, + "required": ["query"] + }, + category="crawler" +) +def web_search(arguments: dict) -> dict: + """ + Web search tool + + Args: + arguments: { + "query": "search keywords", + "max_results": 5 + } + + Returns: + {"results": [...]} + """ + query = arguments["query"] + max_results = arguments.get("max_results", 5) + + service = SearchService() + results = service.search(query, max_results) + + return {"results": results} + + +@tool( + name="fetch_page", + description="Fetch content from a specific webpage. Use when user needs detailed information from a webpage.", + parameters={ + "type": "object", + "properties": { + "url": { + "type": "string", + "description": "URL of the webpage to fetch" + }, + "extract_type": { + "type": "string", + "description": "Extraction type", + "enum": ["text", "links", "structured"], + "default": "text" + } + }, + "required": ["url"] + }, + category="crawler" +) +def fetch_page(arguments: dict) -> dict: + """ + Page fetch tool + + Args: + arguments: { + "url": "https://example.com", + "extract_type": "text" | "links" | "structured" + } + + Returns: + Page content + """ + url = arguments["url"] + extract_type = arguments.get("extract_type", "text") + + service = FetchService() + result = service.fetch(url, extract_type) + + return result + + +@tool( + name="crawl_batch", + description="Batch fetch multiple webpages. Use when you need to get content from multiple pages at once.", + parameters={ + "type": "object", + "properties": { + "urls": { + "type": "array", + "items": {"type": "string"}, + "description": "List of URLs to fetch" + }, + "extract_type": { + "type": "string", + "enum": ["text", "links", "structured"], + "default": "text" + } + }, + "required": ["urls"] + }, + category="crawler" +) +def crawl_batch(arguments: dict) -> dict: + """ + Batch fetch tool + + Args: + arguments: { + "urls": ["url1", "url2", ...], + "extract_type": "text" + } + + Returns: + {"results": [...]} + """ + urls = arguments["urls"] + extract_type = arguments.get("extract_type", "text") + + if len(urls) > 10: + return {"error": "Maximum 10 pages can be fetched at once"} + + service = FetchService() + results = service.fetch_batch(urls, extract_type) + + return {"results": results, "total": len(results)} diff --git a/backend/tools/builtin/data.py b/backend/tools/builtin/data.py new file mode 100644 index 0000000..caaa7ca --- /dev/null +++ b/backend/tools/builtin/data.py @@ -0,0 +1,146 @@ +"""Data processing related tools""" +from ..factory import tool +from ..services import CalculatorService + + +@tool( + name="calculator", + description="Perform mathematical calculations. Supports basic arithmetic: addition, subtraction, multiplication, division, power, modulo, etc.", + parameters={ + "type": "object", + "properties": { + "expression": { + "type": "string", + "description": "Mathematical expression, e.g.: (2 + 3) * 4, 2 ** 10, 100 / 7" + } + }, + "required": ["expression"] + }, + category="data" +) +def calculator(arguments: dict) -> dict: + """ + Calculator tool + + Args: + arguments: { + "expression": "2 + 3 * 4" + } + + Returns: + {"result": 14} + """ + expression = arguments["expression"] + service = CalculatorService() + return service.evaluate(expression) + + +@tool( + name="text_process", + description="Process text content, supports counting, format conversion and other operations.", + parameters={ + "type": "object", + "properties": { + "text": { + "type": "string", + "description": "Text to process" + }, + "operation": { + "type": "string", + "description": "Operation type", + "enum": ["count", "lines", "words", "upper", "lower", "reverse"] + } + }, + "required": ["text", "operation"] + }, + category="data" +) +def text_process(arguments: dict) -> dict: + """ + Text processing tool + + Args: + arguments: { + "text": "text content", + "operation": "count" | "lines" | "words" | ... + } + + Returns: + Processing result + """ + text = arguments["text"] + operation = arguments["operation"] + + operations = { + "count": lambda t: {"count": len(t)}, + "lines": lambda t: {"lines": len(t.splitlines())}, + "words": lambda t: {"words": len(t.split())}, + "upper": lambda t: {"result": t.upper()}, + "lower": lambda t: {"result": t.lower()}, + "reverse": lambda t: {"result": t[::-1]} + } + + if operation not in operations: + return {"error": f"Unknown operation: {operation}"} + + return operations[operation](text) + + +@tool( + name="json_process", + description="Process JSON data, supports parsing, formatting, extraction and other operations.", + parameters={ + "type": "object", + "properties": { + "json_string": { + "type": "string", + "description": "JSON string" + }, + "operation": { + "type": "string", + "description": "Operation type", + "enum": ["parse", "format", "keys", "validate"] + } + }, + "required": ["json_string", "operation"] + }, + category="data" +) +def json_process(arguments: dict) -> dict: + """ + JSON processing tool + + Args: + arguments: { + "json_string": '{"key": "value"}', + "operation": "parse" | "format" | "keys" | "validate" + } + + Returns: + Processing result + """ + import json + + json_string = arguments["json_string"] + operation = arguments["operation"] + + try: + if operation == "validate": + json.loads(json_string) + return {"valid": True} + + data = json.loads(json_string) + + if operation == "parse": + return {"data": data} + elif operation == "format": + return {"result": json.dumps(data, indent=2, ensure_ascii=False)} + elif operation == "keys": + if isinstance(data, dict): + return {"keys": list(data.keys())} + return {"error": "JSON root element is not an object"} + else: + return {"error": f"Unknown operation: {operation}"} + + except json.JSONDecodeError as e: + return {"error": f"JSON parse error: {str(e)}"} diff --git a/backend/tools/core.py b/backend/tools/core.py new file mode 100644 index 0000000..ad78380 --- /dev/null +++ b/backend/tools/core.py @@ -0,0 +1,107 @@ +"""Tool system core classes""" +from dataclasses import dataclass, field +from typing import Callable, Any, Dict, List, Optional + + +@dataclass +class ToolDefinition: + """Tool definition""" + name: str + description: str + parameters: dict # JSON Schema + handler: Callable[[dict], Any] + category: str = "general" + + def to_openai_format(self) -> dict: + """Convert to OpenAI/GLM compatible format""" + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters + } + } + + +@dataclass +class ToolResult: + """Tool execution result""" + success: bool + data: Any = None + error: Optional[str] = None + + def to_dict(self) -> dict: + return { + "success": self.success, + "data": self.data, + "error": self.error + } + + @classmethod + def ok(cls, data: Any) -> "ToolResult": + return cls(success=True, data=data) + + @classmethod + def fail(cls, error: str) -> "ToolResult": + return cls(success=False, error=error) + + +class ToolRegistry: + """Tool registry (singleton)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._tools: Dict[str, ToolDefinition] = {} + return cls._instance + + def register(self, tool: ToolDefinition) -> None: + """Register a tool""" + self._tools[tool.name] = tool + + def get(self, name: str) -> Optional[ToolDefinition]: + """Get tool definition by name""" + return self._tools.get(name) + + def list_all(self) -> List[dict]: + """List all tools in OpenAI format""" + return [t.to_openai_format() for t in self._tools.values()] + + def list_by_category(self, category: str) -> List[dict]: + """List tools by category""" + return [ + t.to_openai_format() + for t in self._tools.values() + if t.category == category + ] + + def execute(self, name: str, arguments: dict) -> dict: + """Execute a tool""" + tool = self.get(name) + if not tool: + return ToolResult.fail(f"Tool not found: {name}").to_dict() + + try: + result = tool.handler(arguments) + if isinstance(result, ToolResult): + return result.to_dict() + return ToolResult.ok(result).to_dict() + except Exception as e: + return ToolResult.fail(str(e)).to_dict() + + def remove(self, name: str) -> bool: + """Remove a tool""" + if name in self._tools: + del self._tools[name] + return True + return False + + def has(self, name: str) -> bool: + """Check if tool exists""" + return name in self._tools + + +# Global registry instance +registry = ToolRegistry() diff --git a/backend/tools/executor.py b/backend/tools/executor.py new file mode 100644 index 0000000..a20dbdc --- /dev/null +++ b/backend/tools/executor.py @@ -0,0 +1,148 @@ +"""Tool executor""" +import json +import time +from typing import List, Dict, Optional, Generator, Any +from .core import ToolRegistry, registry + + +class ToolExecutor: + """Tool call executor""" + + def __init__( + self, + registry: Optional[ToolRegistry] = None, + api_url: Optional[str] = None, + api_key: Optional[str] = None + ): + self.registry = registry or ToolRegistry() + self.api_url = api_url + self.api_key = api_key + + def process_tool_calls( + self, + tool_calls: List[dict], + context: Optional[dict] = None + ) -> List[dict]: + """ + Process tool calls and return message list + + Args: + tool_calls: Tool call list returned by LLM + context: Optional context info (user_id, etc.) + + Returns: + Tool response message list, can be appended to messages + """ + results = [] + + for call in tool_calls: + name = call["function"]["name"] + args_str = call["function"]["arguments"] + call_id = call["id"] + + try: + args = json.loads(args_str) if isinstance(args_str, str) else args_str + except json.JSONDecodeError: + results.append(self._create_error_result( + call_id, name, "Invalid JSON arguments" + )) + continue + + result = self.registry.execute(name, args) + results.append(self._create_tool_result(call_id, name, result)) + + return results + + def _create_tool_result( + self, + call_id: str, + name: str, + result: dict, + execution_time: float = 0 + ) -> dict: + """Create tool result message""" + result["execution_time"] = execution_time + return { + "role": "tool", + "tool_call_id": call_id, + "name": name, + "content": json.dumps(result, ensure_ascii=False, default=str) + } + + def _create_error_result( + self, + call_id: str, + name: str, + error: str + ) -> dict: + """Create error result message""" + return { + "role": "tool", + "tool_call_id": call_id, + "name": name, + "content": json.dumps({ + "success": False, + "error": error + }, ensure_ascii=False) + } + + def build_request( + self, + messages: List[dict], + model: str = "glm-5", + tools: Optional[List[dict]] = None, + **kwargs + ) -> dict: + """ + Build API request body + + Args: + messages: Message list + model: Model name + tools: Tool list (default: all tools in registry) + **kwargs: Other parameters (temperature, max_tokens, etc.) + + Returns: + Request body dict + """ + return { + "model": model, + "messages": messages, + "tools": tools or self.registry.list_all(), + "tool_choice": kwargs.get("tool_choice", "auto"), + **{k: v for k, v in kwargs.items() if k not in ["tool_choice"]} + } + + def execute_with_retry( + self, + name: str, + arguments: dict, + max_retries: int = 3, + retry_delay: float = 1.0 + ) -> dict: + """ + Execute tool with retry + + Args: + name: Tool name + arguments: Tool arguments + max_retries: Max retry count + retry_delay: Retry delay in seconds + + Returns: + Execution result + """ + last_error = None + + for attempt in range(max_retries): + try: + return self.registry.execute(name, arguments) + except Exception as e: + last_error = e + if attempt < max_retries - 1: + time.sleep(retry_delay) + + return { + "success": False, + "error": f"Failed after {max_retries} retries: {last_error}" + } diff --git a/backend/tools/factory.py b/backend/tools/factory.py new file mode 100644 index 0000000..a48fb8c --- /dev/null +++ b/backend/tools/factory.py @@ -0,0 +1,63 @@ +"""Tool factory - decorator registration""" +from typing import Callable +from .core import ToolDefinition, registry + + +def tool( + name: str, + description: str, + parameters: dict, + category: str = "general" +) -> Callable: + """ + Tool registration decorator + + Usage: + @tool( + name="web_search", + description="Search the web", + parameters={"type": "object", "properties": {...}}, + category="crawler" + ) + def web_search(arguments: dict) -> dict: + ... + """ + def decorator(func: Callable) -> Callable: + tool_def = ToolDefinition( + name=name, + description=description, + parameters=parameters, + handler=func, + category=category + ) + registry.register(tool_def) + return func + return decorator + + +def register_tool( + name: str, + handler: Callable, + description: str, + parameters: dict, + category: str = "general" +) -> None: + """ + Register a tool directly (without decorator) + + Usage: + register_tool( + name="my_tool", + handler=my_function, + description="Description", + parameters={...} + ) + """ + tool_def = ToolDefinition( + name=name, + description=description, + parameters=parameters, + handler=handler, + category=category + ) + registry.register(tool_def) diff --git a/backend/tools/services.py b/backend/tools/services.py new file mode 100644 index 0000000..2122061 --- /dev/null +++ b/backend/tools/services.py @@ -0,0 +1,257 @@ +"""Tool helper services""" +from typing import List, Dict, Optional, Any +import re + + +class SearchService: + """Search service""" + + def __init__(self, engine: str = "duckduckgo"): + self.engine = engine + + def search( + self, + query: str, + max_results: int = 5, + region: str = "cn-zh" + ) -> List[dict]: + """ + Execute search + + Args: + query: Search keywords + max_results: Max result count + region: Region setting + + Returns: + Search result list + """ + if self.engine == "duckduckgo": + return self._search_duckduckgo(query, max_results, region) + else: + raise ValueError(f"Unsupported search engine: {self.engine}") + + def _search_duckduckgo( + self, + query: str, + max_results: int, + region: str + ) -> List[dict]: + """DuckDuckGo search""" + try: + from duckduckgo_search import DDGS + except ImportError: + return [{"error": "Please install duckduckgo-search: pip install duckduckgo-search"}] + + with DDGS() as ddgs: + results = list(ddgs.text( + query, + max_results=max_results, + region=region + )) + + return [ + { + "title": r.get("title", ""), + "url": r.get("href", ""), + "snippet": r.get("body", "") + } + for r in results + ] + + +class FetchService: + """Page fetch service""" + + def __init__(self, timeout: float = 30.0, user_agent: str = None): + self.timeout = timeout + self.user_agent = user_agent or ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" + ) + + def fetch( + self, + url: str, + extract_type: str = "text" + ) -> dict: + """ + Fetch a single page + + Args: + url: Page URL + extract_type: Extract type (text, links, structured) + + Returns: + Fetch result + """ + import httpx + + try: + resp = httpx.get( + url, + timeout=self.timeout, + follow_redirects=True, + headers={"User-Agent": self.user_agent} + ) + resp.raise_for_status() + except Exception as e: + return {"error": str(e), "url": url} + + html = resp.text + extractor = ContentExtractor(html) + + if extract_type == "text": + return { + "url": url, + "text": extractor.extract_text() + } + elif extract_type == "links": + return { + "url": url, + "links": extractor.extract_links() + } + else: + return extractor.extract_structured(url) + + def fetch_batch( + self, + urls: List[str], + extract_type: str = "text", + max_concurrent: int = 5 + ) -> List[dict]: + """ + Batch fetch pages + + Args: + urls: URL list + extract_type: Extract type + max_concurrent: Max concurrent requests + + Returns: + Result list + """ + results = [] + for url in urls: + results.append(self.fetch(url, extract_type)) + return results + + +class ContentExtractor: + """Content extractor""" + + def __init__(self, html: str): + self.html = html + self._soup = None + + @property + def soup(self): + if self._soup is None: + try: + from bs4 import BeautifulSoup + self._soup = BeautifulSoup(self.html, "html.parser") + except ImportError: + raise ImportError("Please install beautifulsoup4: pip install beautifulsoup4") + return self._soup + + def extract_text(self) -> str: + """Extract plain text""" + # Remove script and style + for tag in self.soup(["script", "style", "nav", "footer", "header"]): + tag.decompose() + + text = self.soup.get_text(separator="\n", strip=True) + # Clean extra whitespace + text = re.sub(r"\n{3,}", "\n\n", text) + return text + + def extract_links(self) -> List[dict]: + """Extract links""" + links = [] + for a in self.soup.find_all("a", href=True): + text = a.get_text(strip=True) + href = a["href"] + if text and href and not href.startswith(("#", "javascript:")): + links.append({"text": text, "href": href}) + return links[:50] # Limit count + + def extract_structured(self, url: str = "") -> dict: + """Extract structured content""" + soup = self.soup + + # Extract title + title = "" + if soup.title: + title = soup.title.string or "" + + # Extract meta description + description = "" + meta_desc = soup.find("meta", attrs={"name": "description"}) + if meta_desc: + description = meta_desc.get("content", "") + + return { + "url": url, + "title": title.strip(), + "description": description.strip(), + "text": self.extract_text()[:5000], # Limit length + "links": self.extract_links()[:20] + } + + +class CalculatorService: + """Safe calculation service""" + + ALLOWED_OPS = { + "add", "sub", "mul", "truediv", "floordiv", + "mod", "pow", "neg", "abs" + } + + def evaluate(self, expression: str) -> dict: + """ + Safely evaluate mathematical expression + + Args: + expression: Mathematical expression + + Returns: + Calculation result + """ + import ast + import operator + + ops = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod, + ast.Pow: operator.pow, + ast.USub: operator.neg, + ast.UAdd: operator.pos, + } + + try: + # Parse expression + node = ast.parse(expression, mode="eval") + + # Validate node types + for child in ast.walk(node): + if isinstance(child, ast.Call): + return {"error": "Function calls not allowed"} + if isinstance(child, ast.Name): + return {"error": "Variable names not allowed"} + + # Safe execution + result = eval( + compile(node, "", "eval"), + {"__builtins__": {}}, + {} + ) + + return {"result": result} + + except Exception as e: + return {"error": f"Calculation error: {str(e)}"} diff --git a/docs/CrawlerAndTools.md b/docs/CrawlerAndTools.md deleted file mode 100644 index d8dceb2..0000000 --- a/docs/CrawlerAndTools.md +++ /dev/null @@ -1,1268 +0,0 @@ -# 爬虫流水线与工具系统设计 - -## 概述 - -本文档描述如何为 NanoClaw 构建爬虫流水线和工具系统,使 GLM 模型能够: - -1. 通过工具调用获取实时网络信息 -2. 执行结构化的数据采集任务 -3. 与外部系统进行交互 - ---- - -## 一、整体架构 - -```mermaid -flowchart TB - subgraph Frontend["前端 (Vue 3)"] - ChatView["ChatView 对话界面"] - ToolCallUI["ToolCallUI 工具调用展示"] - CrawlerPanel["CrawlerStatusPanel 爬虫状态面板"] - end - - subgraph Backend["后端 API (Flask)"] - ChatAPI["/api/chat 对话补全"] - ToolsAPI["/api/tools 工具管理"] - CrawlerAPI["/api/crawler 爬虫任务"] - Orchestrator["Tool Orchestrator
工具调用编排引擎"] - end - - subgraph Services["服务层"] - CrawlerService["Web Crawler 爬虫服务"] - BuiltInTools["Built-in Tools 内置工具"] - ExternalAPIs["External APIs 第三方服务"] - end - - Frontend -->|"HTTP/SSE"| Backend - ChatAPI --> Orchestrator - ToolsAPI --> Orchestrator - CrawlerAPI --> Orchestrator - Orchestrator --> CrawlerService - Orchestrator --> BuiltInTools - Orchestrator --> ExternalAPIs -``` - ---- - -## 二、工具系统设计 - -### 2.1 工具定义规范 - -工具采用 JSON Schema 定义,与 OpenAI Function Calling 兼容: - -```python -# backend/tools/registry.py - -from dataclasses import dataclass -from typing import Callable, Any -import json - -@dataclass -class ToolDefinition: - """工具定义""" - name: str # 工具名称,如 "web_search" - description: str # 工具描述,供模型理解用途 - parameters: dict # JSON Schema 格式的参数定义 - handler: Callable # 实际执行函数 - - def to_openai_format(self) -> dict: - """转换为 GLM/OpenAI 兼容格式""" - return { - "type": "function", - "function": { - "name": self.name, - "description": self.description, - "parameters": self.parameters - } - } - - -# 工具注册表 -class ToolRegistry: - def __init__(self): - self._tools: dict[str, ToolDefinition] = {} - - def register(self, tool: ToolDefinition): - self._tools[tool.name] = tool - - def get(self, name: str) -> ToolDefinition | None: - return self._tools.get(name) - - def list_all(self) -> list[dict]: - return [t.to_openai_format() for t in self._tools.values()] - - def execute(self, name: str, arguments: dict) -> Any: - tool = self.get(name) - if not tool: - raise ValueError(f"Tool not found: {name}") - return tool.handler(**arguments) - - -# 全局注册表 -registry = ToolRegistry() -``` - -### 2.2 内置工具定义 - -#### 2.2.1 网页搜索工具 - -```python -# backend/tools/builtin/web_search.py - -from ..registry import registry, ToolDefinition - -def web_search(query: str, max_results: int = 5) -> dict: - """ - 执行网页搜索 - - Args: - query: 搜索关键词 - max_results: 最大返回结果数 - - Returns: - 搜索结果列表 - """ - # 调用爬虫服务执行搜索 - from ..crawler import search_service - results = search_service.search(query, max_results) - return { - "success": True, - "results": results - } - -# 注册工具 -registry.register(ToolDefinition( - name="web_search", - description="搜索互联网获取实时信息。当用户询问时事、新闻、或需要最新数据时使用。", - parameters={ - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "搜索关键词" - }, - "max_results": { - "type": "integer", - "description": "返回结果数量,默认5", - "default": 5 - } - }, - "required": ["query"] - }, - handler=web_search -)) -``` - -#### 2.2.2 网页内容抓取工具 - -```python -# backend/tools/builtin/fetch_page.py - -def fetch_page(url: str, extract_type: str = "text") -> dict: - """ - 抓取网页内容 - - Args: - url: 目标网页URL - extract_type: 提取类型 (text/links/images/structured) - - Returns: - 提取的内容 - """ - from ..crawler import fetch_service - return fetch_service.fetch(url, extract_type) - -registry.register(ToolDefinition( - name="fetch_page", - description="抓取指定URL的网页内容,提取文本、链接或结构化数据。", - parameters={ - "type": "object", - "properties": { - "url": { - "type": "string", - "description": "要抓取的网页URL" - }, - "extract_type": { - "type": "string", - "enum": ["text", "links", "images", "structured"], - "description": "提取类型", - "default": "text" - } - }, - "required": ["url"] - }, - handler=fetch_page -)) -``` - -#### 2.2.3 批量爬虫任务工具 - -```python -# backend/tools/builtin/crawl_batch.py - -def crawl_batch( - urls: list[str], - extract_type: str = "text", - parallel: int = 3 -) -> dict: - """ - 批量爬取多个网页 - - Args: - urls: URL列表 - extract_type: 提取类型 - parallel: 并发数 - - Returns: - 任务ID和状态 - """ - from ..crawler import crawl_manager - task_id = crawl_manager.create_task( - urls=urls, - extract_type=extract_type, - parallel=parallel - ) - return { - "task_id": task_id, - "status": "pending", - "message": f"已创建爬虫任务,共 {len(urls)} 个URL" - } - -registry.register(ToolDefinition( - name="crawl_batch", - description="批量爬取多个网页内容。适用于需要采集多个页面的场景。", - parameters={ - "type": "object", - "properties": { - "urls": { - "type": "array", - "items": {"type": "string"}, - "description": "要爬取的URL列表" - }, - "extract_type": { - "type": "string", - "enum": ["text", "links", "images", "structured"], - "default": "text" - }, - "parallel": { - "type": "integer", - "description": "并发数,默认3", - "default": 3 - } - }, - "required": ["urls"] - }, - handler=crawl_batch -)) -``` - -#### 2.2.4 爬虫任务查询工具 - -```python -# backend/tools/builtin/query_task.py - -def query_crawl_task(task_id: str) -> dict: - """ - 查询爬虫任务状态和结果 - - Args: - task_id: 任务ID - - Returns: - 任务状态和结果 - """ - from ..crawler import crawl_manager - return crawl_manager.get_task_status(task_id) - -registry.register(ToolDefinition( - name="query_crawl_task", - description="查询爬虫任务的执行状态和结果。", - parameters={ - "type": "object", - "properties": { - "task_id": { - "type": "string", - "description": "任务ID" - } - }, - "required": ["task_id"] - }, - handler=query_crawl_task -)) -``` - -### 2.3 工具调用流程 - -```mermaid -flowchart TD - A[用户消息] --> B[构建消息上下文 + 工具定义] - B --> C[调用 GLM API
启用工具调用] - C --> D{判断响应类型} - - D -->|普通文本| E[返回用户] - D -->|工具调用请求| F[执行工具调用
registry.execute] - - F --> G[将工具结果追加到消息历史] - G --> H[再次调用 GLM API
带工具结果] - H --> I[返回最终回复] - - style A fill:#e1f5fe - style E fill:#c8e6c9 - style I fill:#c8e6c9 - style F fill:#fff3e0 - style D fill:#fce4ec -``` - -### 2.4 后端实现:工具调用处理 - -```python -# backend/tools/executor.py - -import json -from typing import Generator -from .registry import registry - -class ToolExecutor: - """工具调用执行器""" - - def __init__(self, api_url: str, api_key: str): - self.api_url = api_url - self.api_key = api_key - - def build_messages_with_tools( - self, - messages: list[dict], - tools: list[dict] | None = None - ) -> dict: - """构建带工具定义的请求体""" - body = { - "model": "glm-5", - "messages": messages, - "tools": tools or registry.list_all(), - "tool_choice": "auto" - } - return body - - def process_tool_calls( - self, - tool_calls: list[dict], - messages: list[dict] - ) -> list[dict]: - """处理工具调用,返回工具结果消息""" - results = [] - - for call in tool_calls: - tool_name = call["function"]["name"] - tool_args = json.loads(call["function"]["arguments"]) - call_id = call["id"] - - try: - # 执行工具 - result = registry.execute(tool_name, tool_args) - content = json.dumps(result, ensure_ascii=False) - except Exception as e: - content = json.dumps({ - "error": True, - "message": str(e) - }, ensure_ascii=False) - - # 添加工具结果消息 - results.append({ - "role": "tool", - "tool_call_id": call_id, - "name": tool_name, - "content": content - }) - - return results - - def chat_with_tools( - self, - messages: list[dict], - model: str = "glm-5", - max_iterations: int = 5, - stream: bool = True - ) -> Generator: - """ - 支持工具调用的对话补全 - - Args: - messages: 对话历史 - model: 模型名称 - max_iterations: 最大工具调用迭代次数 - stream: 是否流式输出 - - Yields: - SSE 格式的事件 - """ - import requests - - tools = registry.list_all() - - for iteration in range(max_iterations): - # 调用模型 - body = self.build_messages_with_tools(messages, tools) - body["model"] = model - body["stream"] = stream - - resp = requests.post( - self.api_url, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}" - }, - json=body, - stream=stream, - timeout=120 - ) - - if stream: - # 流式处理 - tool_calls_buffer = {} - full_content = "" - - for line in resp.iter_lines(): - if not line: - continue - line = line.decode("utf-8") - if not line.startswith("data: "): - continue - data_str = line[6:] - if data_str == "[DONE]": - break - - chunk = json.loads(data_str) - delta = chunk["choices"][0].get("delta", {}) - - # 处理工具调用 - if "tool_calls" in delta: - for tc in delta["tool_calls"]: - idx = tc.get("index", 0) - if idx not in tool_calls_buffer: - tool_calls_buffer[idx] = { - "id": tc.get("id", ""), - "type": "function", - "function": {"name": "", "arguments": ""} - } - if tc.get("id"): - tool_calls_buffer[idx]["id"] = tc["id"] - if "function" in tc: - if tc["function"].get("name"): - tool_calls_buffer[idx]["function"]["name"] = tc["function"]["name"] - if tc["function"].get("arguments"): - tool_calls_buffer[idx]["function"]["arguments"] += tc["function"]["arguments"] - - # 处理文本内容 - if "content" in delta and delta["content"]: - full_content += delta["content"] - yield f"event: message\ndata: {json.dumps({'content': delta['content']}, ensure_ascii=False)}\n\n" - - # 检查是否有工具调用 - if tool_calls_buffer: - tool_calls = list(tool_calls_buffer.values()) - - # 发送工具调用事件(供前端展示) - yield f"event: tool_call\ndata: {json.dumps({'calls': tool_calls}, ensure_ascii=False)}\n\n" - - # 将助手消息添加到历史 - messages.append({ - "role": "assistant", - "content": full_content or None, - "tool_calls": tool_calls - }) - - # 执行工具调用 - tool_results = self.process_tool_calls(tool_calls, messages) - - # 发送工具结果事件 - yield f"event: tool_result\ndata: {json.dumps({'results': tool_results}, ensure_ascii=False)}\n\n" - - # 将工具结果添加到消息历史 - messages.extend(tool_results) - - # 继续下一轮对话 - continue - - # 无工具调用,结束 - yield f"event: done\ndata: {json.dumps({})}\n\n" - return - - else: - # 非流式处理 - result = resp.json() - choice = result["choices"][0] - message = choice["message"] - - if "tool_calls" not in message: - # 无工具调用,直接返回 - yield f"event: done\ndata: {json.dumps({'message': message}, ensure_ascii=False)}\n\n" - return - - # 有工具调用 - tool_calls = message["tool_calls"] - - # 将助手消息添加到历史 - messages.append(message) - - # 执行工具 - tool_results = self.process_tool_calls(tool_calls, messages) - messages.extend(tool_results) - - # 继续下一轮 - continue -``` - ---- - -## 三、爬虫流水线设计 - -### 3.1 爬虫服务架构 - -```mermaid -flowchart TB - subgraph CrawlerService["🕷️ Crawler Service"] - subgraph Engines["核心引擎"] - Search["Search Engine
搜索引擎"] - Fetcher["Fetcher Engine
抓取引擎"] - TaskMgr["Task Manager
任务管理器"] - end - - subgraph Pipeline["Content Pipeline (内容处理流水线)"] - Parser["Parser
解析器"] --> Cleaner["Cleaner
清洗器"] - Cleaner --> Extractor["Extractor
提取器"] - Extractor --> Structurer["Structurer
结构化"] - end - - subgraph Storage["Storage Layer (存储层)"] - Cache["Cache
(Redis)"] - DB["DB
(MySQL)"] - FileStore["File Storage
(本地/OSS)"] - end - - Search --> Pipeline - Fetcher --> Pipeline - TaskMgr --> Pipeline - Pipeline --> Storage - end - - style Parser fill:#e3f2fd - style Cleaner fill:#e8f5e9 - style Extractor fill:#fff3e0 - style Structurer fill:#fce4ec -``` - -### 3.2 核心模块设计 - -#### 3.2.1 搜索服务 - -```python -# backend/crawler/search.py - -from dataclasses import dataclass -from typing import Protocol -import asyncio - -@dataclass -class SearchResult: - title: str - url: str - snippet: str - source: str - -class SearchEngine(Protocol): - """搜索引擎协议""" - async def search(self, query: str, max_results: int) -> list[SearchResult]: - ... - -class DuckDuckGoSearch: - """DuckDuckGo 搜索实现""" - - async def search(self, query: str, max_results: int = 5) -> list[SearchResult]: - from duckduckgo_search import DDGS - - results = [] - with DDGS() as ddgs: - for r in ddgs.text(query, max_results=max_results): - results.append(SearchResult( - title=r.get("title", ""), - url=r.get("href", ""), - snippet=r.get("body", ""), - source="duckduckgo" - )) - return results - -class SearchService: - """搜索服务""" - - def __init__(self, engine: SearchEngine | None = None): - self.engine = engine or DuckDuckGoSearch() - - def search(self, query: str, max_results: int = 5) -> list[dict]: - """同步搜索接口""" - return asyncio.run(self._search_async(query, max_results)) - - async def _search_async(self, query: str, max_results: int) -> list[dict]: - results = await self.engine.search(query, max_results) - return [ - { - "title": r.title, - "url": r.url, - "snippet": r.snippet, - "source": r.source - } - for r in results - ] -``` - -#### 3.2.2 网页抓取服务 - -```python -# backend/crawler/fetcher.py - -import asyncio -from dataclasses import dataclass -from typing import Literal -from bs4 import BeautifulSoup -import httpx -from urllib.parse import urljoin, urlparse - -@dataclass -class FetchResult: - url: str - status: int - content: dict - metadata: dict - -class FetchService: - """网页抓取服务""" - - def __init__( - self, - timeout: float = 30.0, - max_retries: int = 2, - user_agent: str = "Mozilla/5.0 (compatible; NanoClawBot/1.0)" - ): - self.timeout = timeout - self.max_retries = max_retries - self.user_agent = user_agent - - async def fetch_async( - self, - url: str, - extract_type: Literal["text", "links", "images", "structured"] = "text" - ) -> FetchResult: - """异步抓取网页""" - headers = {"User-Agent": self.user_agent} - - async with httpx.AsyncClient(timeout=self.timeout) as client: - for attempt in range(self.max_retries + 1): - try: - resp = await client.get(url, headers=headers, follow_redirects=True) - resp.raise_for_status() - break - except httpx.HTTPError as e: - if attempt == self.max_retries: - return FetchResult( - url=url, - status=500, - content={"error": str(e)}, - metadata={} - ) - await asyncio.sleep(1 * (attempt + 1)) - - # 解析内容 - soup = BeautifulSoup(resp.text, "html.parser") - content = self._extract(soup, url, extract_type) - - metadata = { - "title": soup.title.string if soup.title else "", - "status_code": resp.status_code, - "content_type": resp.headers.get("content-type", ""), - "final_url": str(resp.url) - } - - return FetchResult(url=url, status=resp.status_code, content=content, metadata=metadata) - - def _extract(self, soup: BeautifulSoup, base_url: str, extract_type: str) -> dict: - """提取内容""" - if extract_type == "text": - # 移除脚本和样式 - for tag in soup(["script", "style", "nav", "footer", "header"]): - tag.decompose() - text = soup.get_text(separator="\n", strip=True) - return {"text": text[:10000]} # 限制长度 - - elif extract_type == "links": - links = [] - for a in soup.find_all("a", href=True): - href = urljoin(base_url, a["href"]) - if urlparse(href).scheme in ("http", "https"): - links.append({ - "text": a.get_text(strip=True), - "url": href - }) - return {"links": links[:100]} - - elif extract_type == "images": - images = [] - for img in soup.find_all("img", src=True): - src = urljoin(base_url, img["src"]) - images.append({ - "alt": img.get("alt", ""), - "src": src - }) - return {"images": images[:50]} - - elif extract_type == "structured": - # 提取结构化数据 - structured = { - "title": soup.title.string if soup.title else "", - "meta": {}, - "headings": [], - "paragraphs": [] - } - - # Meta 信息 - for meta in soup.find_all("meta"): - name = meta.get("name") or meta.get("property", "") - if name: - structured["meta"][name] = meta.get("content", "") - - # 标题 - for i in range(1, 7): - for h in soup.find_all(f"h{i}"): - structured["headings"].append({ - "level": i, - "text": h.get_text(strip=True) - }) - - # 段落 - for p in soup.find_all("p"): - text = p.get_text(strip=True) - if len(text) > 20: - structured["paragraphs"].append(text) - - return {"structured": structured} - - return {} - - def fetch(self, url: str, extract_type: str = "text") -> dict: - """同步抓取接口""" - result = asyncio.run(self.fetch_async(url, extract_type)) - return { - "success": result.status == 200, - "url": result.url, - "content": result.content, - "metadata": result.metadata - } -``` - -#### 3.2.3 任务管理器 - -```python -# backend/crawler/task_manager.py - -import asyncio -import uuid -from datetime import datetime -from typing import Literal -from dataclasses import dataclass, field -from enum import Enum -from concurrent.futures import ThreadPoolExecutor - -class TaskStatus(Enum): - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - -@dataclass -class CrawlTask: - id: str - urls: list[str] - extract_type: str - parallel: int - status: TaskStatus = TaskStatus.PENDING - progress: int = 0 - total: int = 0 - results: list[dict] = field(default_factory=list) - errors: list[dict] = field(default_factory=list) - created_at: datetime = field(default_factory=datetime.utcnow) - completed_at: datetime | None = None - -class CrawlTaskManager: - """爬虫任务管理器""" - - def __init__(self, max_workers: int = 3): - self.tasks: dict[str, CrawlTask] = {} - self.max_workers = max_workers - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self._fetch_service = None - - @property - def fetch_service(self): - if self._fetch_service is None: - from .fetcher import FetchService - self._fetch_service = FetchService() - return self._fetch_service - - def create_task( - self, - urls: list[str], - extract_type: Literal["text", "links", "images", "structured"] = "text", - parallel: int = 3 - ) -> str: - """创建爬虫任务""" - task_id = str(uuid.uuid4())[:8] - task = CrawlTask( - id=task_id, - urls=urls, - extract_type=extract_type, - parallel=min(parallel, self.max_workers), - total=len(urls) - ) - self.tasks[task_id] = task - - # 异步执行 - self.executor.submit(self._execute_task, task_id) - - return task_id - - def _execute_task(self, task_id: str): - """执行爬虫任务""" - task = self.tasks.get(task_id) - if not task: - return - - task.status = TaskStatus.RUNNING - - async def run(): - semaphore = asyncio.Semaphore(task.parallel) - - async def fetch_one(url: str): - async with semaphore: - try: - result = await self.fetch_service.fetch_async(url, task.extract_type) - return {"url": url, "data": result} - except Exception as e: - return {"url": url, "error": str(e)} - - tasks = [fetch_one(url) for url in task.urls] - results = await asyncio.gather(*tasks) - - for r in results: - task.progress += 1 - if "error" in r: - task.errors.append(r) - else: - task.results.append(r) - - try: - asyncio.run(run()) - task.status = TaskStatus.COMPLETED - except Exception as e: - task.status = TaskStatus.FAILED - task.errors.append({"error": str(e)}) - finally: - task.completed_at = datetime.utcnow() - - def get_task_status(self, task_id: str) -> dict: - """获取任务状态""" - task = self.tasks.get(task_id) - if not task: - return {"error": "Task not found"} - - return { - "id": task.id, - "status": task.status.value, - "progress": task.progress, - "total": task.total, - "results": task.results if task.status == TaskStatus.COMPLETED else [], - "errors": task.errors, - "created_at": task.created_at.isoformat(), - "completed_at": task.completed_at.isoformat() if task.completed_at else None - } - -# 全局任务管理器 -crawl_manager = CrawlTaskManager() -``` - -### 3.3 数据模型扩展 - -```python -# backend/models.py 新增模型 - -class CrawlTaskRecord(db.Model): - """爬虫任务记录(持久化)""" - __tablename__ = "crawl_tasks" - - id = db.Column(db.String(32), primary_key=True) - user_id = db.Column(db.BigInteger, db.ForeignKey("users.id")) - conversation_id = db.Column(db.String(64), db.ForeignKey("conversations.id")) - urls = db.Column(db.JSON) # URL 列表 - extract_type = db.Column(db.String(32)) - status = db.Column(db.String(16), default="pending") - result_count = db.Column(db.Integer, default=0) - error_count = db.Column(db.Integer, default=0) - created_at = db.Column(db.DateTime, default=datetime.utcnow) - completed_at = db.Column(db.DateTime) - - -class CrawlResult(db.Model): - """爬虫结果""" - __tablename__ = "crawl_results" - - id = db.Column(db.BigInteger, primary_key=True, autoincrement=True) - task_id = db.Column(db.String(32), db.ForeignKey("crawl_tasks.id")) - url = db.Column(db.String(1024)) - content = db.Column(db.JSON) # 提取的内容 - metadata = db.Column(db.JSON) - status_code = db.Column(db.Integer) - created_at = db.Column(db.DateTime, default=datetime.utcnow) -``` - ---- - -## 四、API 接口设计 - -### 4.1 工具相关 API - -#### 获取可用工具列表 - -``` -GET /api/tools -``` - -**响应:** - -```json -{ - "code": 0, - "data": { - "tools": [ - { - "name": "web_search", - "description": "搜索互联网获取实时信息", - "parameters": { ... } - } - ] - } -} -``` - -### 4.2 爬虫相关 API - -#### 创建爬虫任务 - -``` -POST /api/crawler/tasks -``` - -**请求体:** - -```json -{ - "urls": ["https://example.com/page1", "https://example.com/page2"], - "extract_type": "text", - "parallel": 3 -} -``` - -**响应:** - -```json -{ - "code": 0, - "data": { - "task_id": "abc12345", - "status": "pending", - "total": 2 - } -} -``` - -#### 查询任务状态 - -``` -GET /api/crawler/tasks/:task_id -``` - -**响应:** - -```json -{ - "code": 0, - "data": { - "id": "abc12345", - "status": "completed", - "progress": 2, - "total": 2, - "results": [ - { - "url": "https://example.com/page1", - "data": { "content": { "text": "..." }, "metadata": { ... } } - } - ] - } -} -``` - -#### 获取任务列表 - -``` -GET /api/crawler/tasks?status=completed&limit=20 -``` - ---- - -## 五、前端集成 - -### 5.1 工具调用 UI 组件 - -```vue - - - - -``` - -### 5.2 SSE 事件扩展 - -扩展消息 API 的 SSE 事件,新增工具调用相关事件: - -| 事件 | 说明 | -| ------------- | -------- | -| `tool_call` | 模型发起工具调用 | -| `tool_result` | 工具执行结果 | -| `thinking` | 思维链内容 | -| `message` | 回复内容片段 | -| `done` | 完成 | - ---- - -## 六、配置与部署 - -### 6.1 配置文件扩展 - -```yaml -# config.yml - -# ... 现有配置 ... - -# 爬虫配置 -crawler: - max_workers: 5 - timeout: 30 - max_retries: 2 - user_agent: "Mozilla/5.0 (compatible; NanoClawBot/1.0)" - -# 工具配置 -tools: - enabled: - - web_search - - fetch_page - - crawl_batch - - query_crawl_task - max_iterations: 5 # 最大工具调用迭代次数 -``` - -### 6.2 依赖安装 - -```toml -# pyproject.toml 新增依赖 - -dependencies = [ - # ... 现有依赖 ... - "duckduckgo-search>=4.0.0", - "beautifulsoup4>=4.12.0", - "httpx>=0.25.0", - "lxml>=4.9.0", -] -``` - ---- - -## 七、使用示例 - -### 7.1 用户对话示例 - -``` -用户: 帮我搜索一下最近 AI 领域有什么重要新闻 - -助手: [调用 web_search 工具] - query: "AI 人工智能 最新新闻 2024" - → 返回搜索结果 - -助手: 根据搜索结果,最近 AI 领域有以下重要新闻: - -1. **OpenAI 发布 GPT-5** - [链接] - OpenAI 正式发布了新一代模型 GPT-5... - -2. **Google Gemini 2.0 发布** - [链接] - Google 宣布推出 Gemini 2.0... - ---- - -用户: 帮我把这几个链接的内容都抓取下来 - -助手: [调用 crawl_batch 工具] - urls: ["https://...", "https://..."] - → 返回任务ID - -助手: 已创建爬虫任务,正在抓取 2 个网页... - -助手: [自动调用 query_crawl_task 查询结果] - -助手: 抓取完成!以下是内容摘要: - -**文章1: OpenAI 发布 GPT-5** -> 核心内容:新模型在推理能力上提升了 50%... - -**文章2: Google Gemini 2.0 发布** -> 核心内容:多模态能力大幅增强... -``` - -### 7.2 API 调用示例 - -```python -# 创建会话并启用工具 -import requests - -# 创建会话 -resp = requests.post("http://localhost:3000/api/conversations", json={ - "title": "AI 新闻调研", - "model": "glm-5" -}) -conv_id = resp.json()["data"]["id"] - -# 发送消息(自动触发工具调用) -resp = requests.post( - f"http://localhost:3000/api/conversations/{conv_id}/messages", - json={"content": "帮我搜索最新的 AI 新闻", "stream": true}, - stream=True -) - -# 处理 SSE 事件 -for line in resp.iter_lines(): - # event: tool_call - # event: tool_result - # event: message - # event: done - pass -``` - ---- - -## 八、安全与限制 - -### 8.1 安全措施 - -1. **URL 白名单/黑名单**:限制可爬取的域名 -2. **速率限制**:控制请求频率,避免被封禁 -3. **内容过滤**:过滤敏感内容 -4. **用户隔离**:任务按用户隔离 - -### 8.2 使用限制 - -```python -# backend/tools/limits.py - -TOOL_LIMITS = { - "web_search": { - "max_results": 10, - "rate_limit": "10/minute" - }, - "fetch_page": { - "max_content_size": 1024 * 1024, # 1MB - "timeout": 30 - }, - "crawl_batch": { - "max_urls": 50, - "parallel_max": 5 - } -} -``` - ---- - -## 九、后续扩展 - -1. **更多工具类型**: - - - 数据分析工具(图表生成、数据统计) - - 文件处理工具(PDF 解析、Excel 处理) - - 代码执行工具(安全沙箱中运行代码) - -2. **爬虫增强**: - - - JavaScript 渲染(Playwright/Selenium) - - 代理池支持 - - 分布式爬虫 - -3. **智能调度**: - - - 基于对话上下文的工具推荐 - - 工具链组合执行 - - 异步任务通知 - ---- - -## 十、总结 - -本设计文档描述了 NanoClaw 的爬虫流水线和工具系统架构: - -1. **工具系统**:采用 OpenAI 兼容的工具定义格式,通过工具注册表管理,支持 GLM 模型的自动工具调用。 - -2. **爬虫流水线**:包含搜索服务、抓取服务、任务管理器,支持单页抓取和批量任务,提供多种内容提取模式。 - -3. **API 设计**:扩展现有 API 支持 SSE 工具调用事件,新增爬虫任务管理接口。 - -4. **前端集成**:提供工具调用可视化组件,支持工具执行过程的实时展示。 - -这套架构使 NanoClaw 能够突破模型知识截止日期的限制,获取实时网络信息,大幅扩展应用场景。 diff --git a/docs/Design.md b/docs/Design.md index 236f706..86c1c70 100644 --- a/docs/Design.md +++ b/docs/Design.md @@ -4,21 +4,21 @@ ### 会话管理 -| 方法 | 路径 | 说明 | -|------|------|------| -| `POST` | `/api/conversations` | 创建会话 | -| `GET` | `/api/conversations` | 获取会话列表 | -| `GET` | `/api/conversations/:id` | 获取会话详情 | -| `PATCH` | `/api/conversations/:id` | 更新会话 | -| `DELETE` | `/api/conversations/:id` | 删除会话 | +| 方法 | 路径 | 说明 | +| -------- | ------------------------ | ------ | +| `POST` | `/api/conversations` | 创建会话 | +| `GET` | `/api/conversations` | 获取会话列表 | +| `GET` | `/api/conversations/:id` | 获取会话详情 | +| `PATCH` | `/api/conversations/:id` | 更新会话 | +| `DELETE` | `/api/conversations/:id` | 删除会话 | ### 消息管理 -| 方法 | 路径 | 说明 | -|------|------|------| -| `GET` | `/api/conversations/:id/messages` | 获取消息列表 | -| `POST` | `/api/conversations/:id/messages` | 发送消息(对话补全,支持 `stream` 流式) | -| `DELETE` | `/api/conversations/:id/messages/:message_id` | 删除消息 | +| 方法 | 路径 | 说明 | +| -------- | --------------------------------------------- | ------------------------- | +| `GET` | `/api/conversations/:id/messages` | 获取消息列表 | +| `POST` | `/api/conversations/:id/messages` | 发送消息(对话补全,支持 `stream` 流式) | +| `DELETE` | `/api/conversations/:id/messages/:message_id` | 删除消息 | --- diff --git a/docs/ToolSystemDesign.md b/docs/ToolSystemDesign.md new file mode 100644 index 0000000..f6d66f9 --- /dev/null +++ b/docs/ToolSystemDesign.md @@ -0,0 +1,475 @@ +# 工具调用系统设计 + +## 概述 + +本文档描述 NanoClaw 工具调用系统的设计,采用简化的工厂模式,减少不必要的类层次。 + +--- + +## 一、核心类图 + +```mermaid +classDiagram + direction TB + + class ToolDefinition { + <> + +str name + +str description + +dict parameters + +Callable handler + +str category + +dict to_openai_format() + } + + class ToolRegistry { + -dict _tools + +register(ToolDefinition tool) void + +get(str name) ToolDefinition? + +list_all() list~dict~ + +execute(str name, dict args) Any + } + + class ToolExecutor { + -ToolRegistry registry + +process_tool_calls(list tool_calls) list~dict~ + +build_request(list messages) dict + } + + class ToolResult { + <> + +bool success + +Any data + +str? error + +dict to_dict() + } + + ToolRegistry "1" --> "*" ToolDefinition : manages + ToolExecutor "1" --> "1" ToolRegistry : uses + ToolDefinition ..> ToolResult : returns +``` + +--- + +## 二、工具定义工厂 + +使用工厂函数创建工具,避免复杂的类继承: + +```mermaid +classDiagram + direction LR + + class ToolFactory { + <> + +tool(name, description, parameters)$ decorator + +register(name, handler, description, parameters)$ void + +create_crawler_tools()$ list~ToolDefinition~ + +create_data_tools()$ list~ToolDefinition~ + +create_file_tools()$ list~ToolDefinition~ + } + + class ToolDefinition { + +str name + +str description + +dict parameters + +Callable handler + } + + ToolFactory ..> ToolDefinition : creates +``` + +--- + +## 三、核心类实现 + +### 3.1 ToolDefinition + +```python +from dataclasses import dataclass, field +from typing import Callable, Any + +@dataclass +class ToolDefinition: + """工具定义""" + name: str + description: str + parameters: dict # JSON Schema + handler: Callable[[dict], Any] + category: str = "general" + + def to_openai_format(self) -> dict: + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters + } + } +``` + +### 3.2 ToolResult + +```python +from dataclasses import dataclass +from typing import Any, Optional + +@dataclass +class ToolResult: + """工具执行结果""" + success: bool + data: Any = None + error: Optional[str] = None + + def to_dict(self) -> dict: + return { + "success": self.success, + "data": self.data, + "error": self.error + } + + @classmethod + def ok(cls, data: Any) -> "ToolResult": + return cls(success=True, data=data) + + @classmethod + def fail(cls, error: str) -> "ToolResult": + return cls(success=False, error=error) +``` + +### 3.3 ToolRegistry + +```python +from typing import Dict, List, Optional + +class ToolRegistry: + """工具注册表(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._tools: Dict[str, ToolDefinition] = {} + return cls._instance + + def register(self, tool: ToolDefinition) -> None: + self._tools[tool.name] = tool + + def get(self, name: str) -> Optional[ToolDefinition]: + return self._tools.get(name) + + def list_all(self) -> List[dict]: + return [t.to_openai_format() for t in self._tools.values()] + + def execute(self, name: str, arguments: dict) -> dict: + tool = self.get(name) + if not tool: + return ToolResult.fail(f"Tool not found: {name}").to_dict() + try: + result = tool.handler(arguments) + if isinstance(result, ToolResult): + return result.to_dict() + return ToolResult.ok(result).to_dict() + except Exception as e: + return ToolResult.fail(str(e)).to_dict() + + +# 全局注册表 +registry = ToolRegistry() +``` + +### 3.4 ToolExecutor + +```python +import json +from typing import List, Dict + +class ToolExecutor: + """工具执行器""" + + def __init__(self, registry: ToolRegistry = None): + self.registry = registry or ToolRegistry() + + def process_tool_calls(self, tool_calls: List[dict]) -> List[dict]: + """处理工具调用,返回消息列表""" + results = [] + for call in tool_calls: + name = call["function"]["name"] + args = json.loads(call["function"]["arguments"]) + call_id = call["id"] + + result = self.registry.execute(name, args) + + results.append({ + "role": "tool", + "tool_call_id": call_id, + "name": name, + "content": json.dumps(result, ensure_ascii=False) + }) + return results + + def build_request(self, messages: List[dict], **kwargs) -> dict: + """构建 API 请求""" + return { + "model": kwargs.get("model", "glm-5"), + "messages": messages, + "tools": self.registry.list_all(), + "tool_choice": "auto" + } +``` + +--- + +## 四、工具工厂模式 + +### 4.1 装饰器注册 + +```python +# backend/tools/factory.py + +from .core import ToolDefinition, registry + +def tool(name: str, description: str, parameters: dict, category: str = "general"): + """工具注册装饰器""" + def decorator(func): + tool_def = ToolDefinition( + name=name, + description=description, + parameters=parameters, + handler=func, + category=category + ) + registry.register(tool_def) + return func + return decorator +``` + +### 4.2 使用示例 + +```python +# backend/tools/builtin/crawler.py + +from ..factory import tool + +# 网页搜索工具 +@tool( + name="web_search", + description="搜索互联网获取信息", + parameters={ + "type": "object", + "properties": { + "query": {"type": "string", "description": "搜索关键词"}, + "max_results": {"type": "integer", "default": 5} + }, + "required": ["query"] + }, + category="crawler" +) +def web_search(arguments: dict) -> dict: + from ..services import SearchService + query = arguments["query"] + max_results = arguments.get("max_results", 5) + service = SearchService() + results = service.search(query, max_results) + return {"results": results} + + +# 页面抓取工具 +@tool( + name="fetch_page", + description="抓取指定网页内容", + parameters={ + "type": "object", + "properties": { + "url": {"type": "string", "description": "网页URL"}, + "extract_type": {"type": "string", "enum": ["text", "links", "structured"]} + }, + "required": ["url"] + }, + category="crawler" +) +def fetch_page(arguments: dict) -> dict: + from ..services import FetchService + url = arguments["url"] + extract_type = arguments.get("extract_type", "text") + service = FetchService() + result = service.fetch(url, extract_type) + return result + + +# 计算器工具 +@tool( + name="calculator", + description="执行数学计算", + parameters={ + "type": "object", + "properties": { + "expression": {"type": "string", "description": "数学表达式"} + }, + "required": ["expression"] + }, + category="data" +) +def calculator(arguments: dict) -> dict: + import ast + import operator + expr = arguments["expression"] + # 安全计算 + ops = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv + } + node = ast.parse(expr, mode='eval') + result = eval(compile(node, '', 'eval'), {"__builtins__": {}}, ops) + return {"result": result} +``` + +--- + +## 五、辅助服务类 + +工具依赖的服务保持独立,不与工具类耦合: + +```mermaid +classDiagram + direction LR + + class SearchService { + -SearchEngine engine + +search(str query, int limit) list~dict~ + } + + class FetchService { + +fetch(str url, str type) dict + +fetch_batch(list urls) dict + } + + class ContentExtractor { + +extract_text(html) str + +extract_links(html) list + +extract_structured(html) dict + } + + FetchService --> ContentExtractor : uses +``` + +```python +# backend/tools/services.py + +class SearchService: + """搜索服务""" + def __init__(self, engine=None): + from duckduckgo_search import DDGS + self.engine = engine or DDGS() + + def search(self, query: str, max_results: int = 5) -> list: + results = list(self.engine.text(query, max_results=max_results)) + return [ + {"title": r["title"], "url": r["href"], "snippet": r["body"]} + for r in results + ] + + +class FetchService: + """页面抓取服务""" + def __init__(self, timeout: float = 30.0): + self.timeout = timeout + + def fetch(self, url: str, extract_type: str = "text") -> dict: + import httpx + from bs4 import BeautifulSoup + + resp = httpx.get(url, timeout=self.timeout, follow_redirects=True) + soup = BeautifulSoup(resp.text, "html.parser") + + extractor = ContentExtractor(soup) + if extract_type == "text": + return {"text": extractor.extract_text()} + elif extract_type == "links": + return {"links": extractor.extract_links()} + else: + return extractor.extract_structured() + + +class ContentExtractor: + """内容提取器""" + def __init__(self, soup): + self.soup = soup + + def extract_text(self) -> str: + # 移除脚本和样式 + for tag in self.soup(["script", "style"]): + tag.decompose() + return self.soup.get_text(separator="\n", strip=True) + + def extract_links(self) -> list: + return [ + {"text": a.get_text(strip=True), "href": a.get("href")} + for a in self.soup.find_all("a", href=True) + ] + + def extract_structured(self) -> dict: + return { + "title": self.soup.title.string if self.soup.title else "", + "text": self.extract_text(), + "links": self.extract_links()[:20] + } +``` + +--- + +## 六、工具初始化 + +```python +# backend/tools/__init__.py + +from .core import ToolDefinition, ToolResult, ToolRegistry, registry, ToolExecutor +from .factory import tool + +def init_tools(): + """初始化所有内置工具""" + # 导入即自动注册 + from .builtin import crawler, data, file_ops + +# 使用时 +init_tools() +``` + +--- + +## 七、工具清单 + +| 类别 | 工具名称 | 描述 | 依赖服务 | +| ------- | --------------- | ---- | ------------- | +| crawler | `web_search` | 网页搜索 | SearchService | +| crawler | `fetch_page` | 单页抓取 | FetchService | +| crawler | `crawl_batch` | 批量爬取 | FetchService | +| data | `calculator` | 数学计算 | - | +| data | `data_analysis` | 数据分析 | - | +| file | `file_reader` | 文件读取 | - | +| file | `file_writer` | 文件写入 | - | + +--- + +## 八、与旧设计对比 + +| 方面 | 旧设计 | 新设计 | +| ----- | ----------------- | --------- | +| 类数量 | 30+ | ~10 | +| 工具定义 | 继承 BaseTool | 装饰器 + 函数 | +| 中间抽象层 | 5个(CrawlerTool 等) | 无 | +| 扩展方式 | 创建子类 | 写函数 + 装饰器 | +| 代码量 | 多 | 少 | + +--- + +## 九、总结 + +简化后的设计: + +1. **核心类**:`ToolDefinition`、`ToolRegistry`、`ToolExecutor`、`ToolResult` +2. **工厂模式**:使用 `@tool` 装饰器注册工具 +3. **服务分离**:工具依赖的服务独立,不与工具类耦合 +4. **易于扩展**:新增工具只需写一个函数并加装饰器