# 爬虫流水线与工具系统设计
## 概述
本文档描述如何为 NanoClaw 构建爬虫流水线和工具系统,使 GLM 模型能够:
1. 通过工具调用获取实时网络信息
2. 执行结构化的数据采集任务
3. 与外部系统进行交互
---
## 一、整体架构
```mermaid
flowchart TB
subgraph Frontend["前端 (Vue 3)"]
ChatView["ChatView 对话界面"]
ToolCallUI["ToolCallUI 工具调用展示"]
CrawlerPanel["CrawlerStatusPanel 爬虫状态面板"]
end
subgraph Backend["后端 API (Flask)"]
ChatAPI["/api/chat 对话补全"]
ToolsAPI["/api/tools 工具管理"]
CrawlerAPI["/api/crawler 爬虫任务"]
Orchestrator["Tool Orchestrator
工具调用编排引擎"]
end
subgraph Services["服务层"]
CrawlerService["Web Crawler 爬虫服务"]
BuiltInTools["Built-in Tools 内置工具"]
ExternalAPIs["External APIs 第三方服务"]
end
Frontend -->|"HTTP/SSE"| Backend
ChatAPI --> Orchestrator
ToolsAPI --> Orchestrator
CrawlerAPI --> Orchestrator
Orchestrator --> CrawlerService
Orchestrator --> BuiltInTools
Orchestrator --> ExternalAPIs
```
---
## 二、工具系统设计
### 2.1 工具定义规范
工具采用 JSON Schema 定义,与 OpenAI Function Calling 兼容:
```python
# backend/tools/registry.py
from dataclasses import dataclass
from typing import Callable, Any
import json
@dataclass
class ToolDefinition:
"""工具定义"""
name: str # 工具名称,如 "web_search"
description: str # 工具描述,供模型理解用途
parameters: dict # JSON Schema 格式的参数定义
handler: Callable # 实际执行函数
def to_openai_format(self) -> dict:
"""转换为 GLM/OpenAI 兼容格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
}
# 工具注册表
class ToolRegistry:
def __init__(self):
self._tools: dict[str, ToolDefinition] = {}
def register(self, tool: ToolDefinition):
self._tools[tool.name] = tool
def get(self, name: str) -> ToolDefinition | None:
return self._tools.get(name)
def list_all(self) -> list[dict]:
return [t.to_openai_format() for t in self._tools.values()]
def execute(self, name: str, arguments: dict) -> Any:
tool = self.get(name)
if not tool:
raise ValueError(f"Tool not found: {name}")
return tool.handler(**arguments)
# 全局注册表
registry = ToolRegistry()
```
### 2.2 内置工具定义
#### 2.2.1 网页搜索工具
```python
# backend/tools/builtin/web_search.py
from ..registry import registry, ToolDefinition
def web_search(query: str, max_results: int = 5) -> dict:
"""
执行网页搜索
Args:
query: 搜索关键词
max_results: 最大返回结果数
Returns:
搜索结果列表
"""
# 调用爬虫服务执行搜索
from ..crawler import search_service
results = search_service.search(query, max_results)
return {
"success": True,
"results": results
}
# 注册工具
registry.register(ToolDefinition(
name="web_search",
description="搜索互联网获取实时信息。当用户询问时事、新闻、或需要最新数据时使用。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认5",
"default": 5
}
},
"required": ["query"]
},
handler=web_search
))
```
#### 2.2.2 网页内容抓取工具
```python
# backend/tools/builtin/fetch_page.py
def fetch_page(url: str, extract_type: str = "text") -> dict:
"""
抓取网页内容
Args:
url: 目标网页URL
extract_type: 提取类型 (text/links/images/structured)
Returns:
提取的内容
"""
from ..crawler import fetch_service
return fetch_service.fetch(url, extract_type)
registry.register(ToolDefinition(
name="fetch_page",
description="抓取指定URL的网页内容,提取文本、链接或结构化数据。",
parameters={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "要抓取的网页URL"
},
"extract_type": {
"type": "string",
"enum": ["text", "links", "images", "structured"],
"description": "提取类型",
"default": "text"
}
},
"required": ["url"]
},
handler=fetch_page
))
```
#### 2.2.3 批量爬虫任务工具
```python
# backend/tools/builtin/crawl_batch.py
def crawl_batch(
urls: list[str],
extract_type: str = "text",
parallel: int = 3
) -> dict:
"""
批量爬取多个网页
Args:
urls: URL列表
extract_type: 提取类型
parallel: 并发数
Returns:
任务ID和状态
"""
from ..crawler import crawl_manager
task_id = crawl_manager.create_task(
urls=urls,
extract_type=extract_type,
parallel=parallel
)
return {
"task_id": task_id,
"status": "pending",
"message": f"已创建爬虫任务,共 {len(urls)} 个URL"
}
registry.register(ToolDefinition(
name="crawl_batch",
description="批量爬取多个网页内容。适用于需要采集多个页面的场景。",
parameters={
"type": "object",
"properties": {
"urls": {
"type": "array",
"items": {"type": "string"},
"description": "要爬取的URL列表"
},
"extract_type": {
"type": "string",
"enum": ["text", "links", "images", "structured"],
"default": "text"
},
"parallel": {
"type": "integer",
"description": "并发数,默认3",
"default": 3
}
},
"required": ["urls"]
},
handler=crawl_batch
))
```
#### 2.2.4 爬虫任务查询工具
```python
# backend/tools/builtin/query_task.py
def query_crawl_task(task_id: str) -> dict:
"""
查询爬虫任务状态和结果
Args:
task_id: 任务ID
Returns:
任务状态和结果
"""
from ..crawler import crawl_manager
return crawl_manager.get_task_status(task_id)
registry.register(ToolDefinition(
name="query_crawl_task",
description="查询爬虫任务的执行状态和结果。",
parameters={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "任务ID"
}
},
"required": ["task_id"]
},
handler=query_crawl_task
))
```
### 2.3 工具调用流程
```mermaid
flowchart TD
A[用户消息] --> B[构建消息上下文 + 工具定义]
B --> C[调用 GLM API
启用工具调用]
C --> D{判断响应类型}
D -->|普通文本| E[返回用户]
D -->|工具调用请求| F[执行工具调用
registry.execute]
F --> G[将工具结果追加到消息历史]
G --> H[再次调用 GLM API
带工具结果]
H --> I[返回最终回复]
style A fill:#e1f5fe
style E fill:#c8e6c9
style I fill:#c8e6c9
style F fill:#fff3e0
style D fill:#fce4ec
```
### 2.4 后端实现:工具调用处理
```python
# backend/tools/executor.py
import json
from typing import Generator
from .registry import registry
class ToolExecutor:
"""工具调用执行器"""
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
def build_messages_with_tools(
self,
messages: list[dict],
tools: list[dict] | None = None
) -> dict:
"""构建带工具定义的请求体"""
body = {
"model": "glm-5",
"messages": messages,
"tools": tools or registry.list_all(),
"tool_choice": "auto"
}
return body
def process_tool_calls(
self,
tool_calls: list[dict],
messages: list[dict]
) -> list[dict]:
"""处理工具调用,返回工具结果消息"""
results = []
for call in tool_calls:
tool_name = call["function"]["name"]
tool_args = json.loads(call["function"]["arguments"])
call_id = call["id"]
try:
# 执行工具
result = registry.execute(tool_name, tool_args)
content = json.dumps(result, ensure_ascii=False)
except Exception as e:
content = json.dumps({
"error": True,
"message": str(e)
}, ensure_ascii=False)
# 添加工具结果消息
results.append({
"role": "tool",
"tool_call_id": call_id,
"name": tool_name,
"content": content
})
return results
def chat_with_tools(
self,
messages: list[dict],
model: str = "glm-5",
max_iterations: int = 5,
stream: bool = True
) -> Generator:
"""
支持工具调用的对话补全
Args:
messages: 对话历史
model: 模型名称
max_iterations: 最大工具调用迭代次数
stream: 是否流式输出
Yields:
SSE 格式的事件
"""
import requests
tools = registry.list_all()
for iteration in range(max_iterations):
# 调用模型
body = self.build_messages_with_tools(messages, tools)
body["model"] = model
body["stream"] = stream
resp = requests.post(
self.api_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
},
json=body,
stream=stream,
timeout=120
)
if stream:
# 流式处理
tool_calls_buffer = {}
full_content = ""
for line in resp.iter_lines():
if not line:
continue
line = line.decode("utf-8")
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
break
chunk = json.loads(data_str)
delta = chunk["choices"][0].get("delta", {})
# 处理工具调用
if "tool_calls" in delta:
for tc in delta["tool_calls"]:
idx = tc.get("index", 0)
if idx not in tool_calls_buffer:
tool_calls_buffer[idx] = {
"id": tc.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""}
}
if tc.get("id"):
tool_calls_buffer[idx]["id"] = tc["id"]
if "function" in tc:
if tc["function"].get("name"):
tool_calls_buffer[idx]["function"]["name"] = tc["function"]["name"]
if tc["function"].get("arguments"):
tool_calls_buffer[idx]["function"]["arguments"] += tc["function"]["arguments"]
# 处理文本内容
if "content" in delta and delta["content"]:
full_content += delta["content"]
yield f"event: message\ndata: {json.dumps({'content': delta['content']}, ensure_ascii=False)}\n\n"
# 检查是否有工具调用
if tool_calls_buffer:
tool_calls = list(tool_calls_buffer.values())
# 发送工具调用事件(供前端展示)
yield f"event: tool_call\ndata: {json.dumps({'calls': tool_calls}, ensure_ascii=False)}\n\n"
# 将助手消息添加到历史
messages.append({
"role": "assistant",
"content": full_content or None,
"tool_calls": tool_calls
})
# 执行工具调用
tool_results = self.process_tool_calls(tool_calls, messages)
# 发送工具结果事件
yield f"event: tool_result\ndata: {json.dumps({'results': tool_results}, ensure_ascii=False)}\n\n"
# 将工具结果添加到消息历史
messages.extend(tool_results)
# 继续下一轮对话
continue
# 无工具调用,结束
yield f"event: done\ndata: {json.dumps({})}\n\n"
return
else:
# 非流式处理
result = resp.json()
choice = result["choices"][0]
message = choice["message"]
if "tool_calls" not in message:
# 无工具调用,直接返回
yield f"event: done\ndata: {json.dumps({'message': message}, ensure_ascii=False)}\n\n"
return
# 有工具调用
tool_calls = message["tool_calls"]
# 将助手消息添加到历史
messages.append(message)
# 执行工具
tool_results = self.process_tool_calls(tool_calls, messages)
messages.extend(tool_results)
# 继续下一轮
continue
```
---
## 三、爬虫流水线设计
### 3.1 爬虫服务架构
```mermaid
flowchart TB
subgraph CrawlerService["🕷️ Crawler Service"]
subgraph Engines["核心引擎"]
Search["Search Engine
搜索引擎"]
Fetcher["Fetcher Engine
抓取引擎"]
TaskMgr["Task Manager
任务管理器"]
end
subgraph Pipeline["Content Pipeline (内容处理流水线)"]
Parser["Parser
解析器"] --> Cleaner["Cleaner
清洗器"]
Cleaner --> Extractor["Extractor
提取器"]
Extractor --> Structurer["Structurer
结构化"]
end
subgraph Storage["Storage Layer (存储层)"]
Cache["Cache
(Redis)"]
DB["DB
(MySQL)"]
FileStore["File Storage
(本地/OSS)"]
end
Search --> Pipeline
Fetcher --> Pipeline
TaskMgr --> Pipeline
Pipeline --> Storage
end
style Parser fill:#e3f2fd
style Cleaner fill:#e8f5e9
style Extractor fill:#fff3e0
style Structurer fill:#fce4ec
```
### 3.2 核心模块设计
#### 3.2.1 搜索服务
```python
# backend/crawler/search.py
from dataclasses import dataclass
from typing import Protocol
import asyncio
@dataclass
class SearchResult:
title: str
url: str
snippet: str
source: str
class SearchEngine(Protocol):
"""搜索引擎协议"""
async def search(self, query: str, max_results: int) -> list[SearchResult]:
...
class DuckDuckGoSearch:
"""DuckDuckGo 搜索实现"""
async def search(self, query: str, max_results: int = 5) -> list[SearchResult]:
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
for r in ddgs.text(query, max_results=max_results):
results.append(SearchResult(
title=r.get("title", ""),
url=r.get("href", ""),
snippet=r.get("body", ""),
source="duckduckgo"
))
return results
class SearchService:
"""搜索服务"""
def __init__(self, engine: SearchEngine | None = None):
self.engine = engine or DuckDuckGoSearch()
def search(self, query: str, max_results: int = 5) -> list[dict]:
"""同步搜索接口"""
return asyncio.run(self._search_async(query, max_results))
async def _search_async(self, query: str, max_results: int) -> list[dict]:
results = await self.engine.search(query, max_results)
return [
{
"title": r.title,
"url": r.url,
"snippet": r.snippet,
"source": r.source
}
for r in results
]
```
#### 3.2.2 网页抓取服务
```python
# backend/crawler/fetcher.py
import asyncio
from dataclasses import dataclass
from typing import Literal
from bs4 import BeautifulSoup
import httpx
from urllib.parse import urljoin, urlparse
@dataclass
class FetchResult:
url: str
status: int
content: dict
metadata: dict
class FetchService:
"""网页抓取服务"""
def __init__(
self,
timeout: float = 30.0,
max_retries: int = 2,
user_agent: str = "Mozilla/5.0 (compatible; NanoClawBot/1.0)"
):
self.timeout = timeout
self.max_retries = max_retries
self.user_agent = user_agent
async def fetch_async(
self,
url: str,
extract_type: Literal["text", "links", "images", "structured"] = "text"
) -> FetchResult:
"""异步抓取网页"""
headers = {"User-Agent": self.user_agent}
async with httpx.AsyncClient(timeout=self.timeout) as client:
for attempt in range(self.max_retries + 1):
try:
resp = await client.get(url, headers=headers, follow_redirects=True)
resp.raise_for_status()
break
except httpx.HTTPError as e:
if attempt == self.max_retries:
return FetchResult(
url=url,
status=500,
content={"error": str(e)},
metadata={}
)
await asyncio.sleep(1 * (attempt + 1))
# 解析内容
soup = BeautifulSoup(resp.text, "html.parser")
content = self._extract(soup, url, extract_type)
metadata = {
"title": soup.title.string if soup.title else "",
"status_code": resp.status_code,
"content_type": resp.headers.get("content-type", ""),
"final_url": str(resp.url)
}
return FetchResult(url=url, status=resp.status_code, content=content, metadata=metadata)
def _extract(self, soup: BeautifulSoup, base_url: str, extract_type: str) -> dict:
"""提取内容"""
if extract_type == "text":
# 移除脚本和样式
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return {"text": text[:10000]} # 限制长度
elif extract_type == "links":
links = []
for a in soup.find_all("a", href=True):
href = urljoin(base_url, a["href"])
if urlparse(href).scheme in ("http", "https"):
links.append({
"text": a.get_text(strip=True),
"url": href
})
return {"links": links[:100]}
elif extract_type == "images":
images = []
for img in soup.find_all("img", src=True):
src = urljoin(base_url, img["src"])
images.append({
"alt": img.get("alt", ""),
"src": src
})
return {"images": images[:50]}
elif extract_type == "structured":
# 提取结构化数据
structured = {
"title": soup.title.string if soup.title else "",
"meta": {},
"headings": [],
"paragraphs": []
}
# Meta 信息
for meta in soup.find_all("meta"):
name = meta.get("name") or meta.get("property", "")
if name:
structured["meta"][name] = meta.get("content", "")
# 标题
for i in range(1, 7):
for h in soup.find_all(f"h{i}"):
structured["headings"].append({
"level": i,
"text": h.get_text(strip=True)
})
# 段落
for p in soup.find_all("p"):
text = p.get_text(strip=True)
if len(text) > 20:
structured["paragraphs"].append(text)
return {"structured": structured}
return {}
def fetch(self, url: str, extract_type: str = "text") -> dict:
"""同步抓取接口"""
result = asyncio.run(self.fetch_async(url, extract_type))
return {
"success": result.status == 200,
"url": result.url,
"content": result.content,
"metadata": result.metadata
}
```
#### 3.2.3 任务管理器
```python
# backend/crawler/task_manager.py
import asyncio
import uuid
from datetime import datetime
from typing import Literal
from dataclasses import dataclass, field
from enum import Enum
from concurrent.futures import ThreadPoolExecutor
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class CrawlTask:
id: str
urls: list[str]
extract_type: str
parallel: int
status: TaskStatus = TaskStatus.PENDING
progress: int = 0
total: int = 0
results: list[dict] = field(default_factory=list)
errors: list[dict] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
completed_at: datetime | None = None
class CrawlTaskManager:
"""爬虫任务管理器"""
def __init__(self, max_workers: int = 3):
self.tasks: dict[str, CrawlTask] = {}
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self._fetch_service = None
@property
def fetch_service(self):
if self._fetch_service is None:
from .fetcher import FetchService
self._fetch_service = FetchService()
return self._fetch_service
def create_task(
self,
urls: list[str],
extract_type: Literal["text", "links", "images", "structured"] = "text",
parallel: int = 3
) -> str:
"""创建爬虫任务"""
task_id = str(uuid.uuid4())[:8]
task = CrawlTask(
id=task_id,
urls=urls,
extract_type=extract_type,
parallel=min(parallel, self.max_workers),
total=len(urls)
)
self.tasks[task_id] = task
# 异步执行
self.executor.submit(self._execute_task, task_id)
return task_id
def _execute_task(self, task_id: str):
"""执行爬虫任务"""
task = self.tasks.get(task_id)
if not task:
return
task.status = TaskStatus.RUNNING
async def run():
semaphore = asyncio.Semaphore(task.parallel)
async def fetch_one(url: str):
async with semaphore:
try:
result = await self.fetch_service.fetch_async(url, task.extract_type)
return {"url": url, "data": result}
except Exception as e:
return {"url": url, "error": str(e)}
tasks = [fetch_one(url) for url in task.urls]
results = await asyncio.gather(*tasks)
for r in results:
task.progress += 1
if "error" in r:
task.errors.append(r)
else:
task.results.append(r)
try:
asyncio.run(run())
task.status = TaskStatus.COMPLETED
except Exception as e:
task.status = TaskStatus.FAILED
task.errors.append({"error": str(e)})
finally:
task.completed_at = datetime.utcnow()
def get_task_status(self, task_id: str) -> dict:
"""获取任务状态"""
task = self.tasks.get(task_id)
if not task:
return {"error": "Task not found"}
return {
"id": task.id,
"status": task.status.value,
"progress": task.progress,
"total": task.total,
"results": task.results if task.status == TaskStatus.COMPLETED else [],
"errors": task.errors,
"created_at": task.created_at.isoformat(),
"completed_at": task.completed_at.isoformat() if task.completed_at else None
}
# 全局任务管理器
crawl_manager = CrawlTaskManager()
```
### 3.3 数据模型扩展
```python
# backend/models.py 新增模型
class CrawlTaskRecord(db.Model):
"""爬虫任务记录(持久化)"""
__tablename__ = "crawl_tasks"
id = db.Column(db.String(32), primary_key=True)
user_id = db.Column(db.BigInteger, db.ForeignKey("users.id"))
conversation_id = db.Column(db.String(64), db.ForeignKey("conversations.id"))
urls = db.Column(db.JSON) # URL 列表
extract_type = db.Column(db.String(32))
status = db.Column(db.String(16), default="pending")
result_count = db.Column(db.Integer, default=0)
error_count = db.Column(db.Integer, default=0)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime)
class CrawlResult(db.Model):
"""爬虫结果"""
__tablename__ = "crawl_results"
id = db.Column(db.BigInteger, primary_key=True, autoincrement=True)
task_id = db.Column(db.String(32), db.ForeignKey("crawl_tasks.id"))
url = db.Column(db.String(1024))
content = db.Column(db.JSON) # 提取的内容
metadata = db.Column(db.JSON)
status_code = db.Column(db.Integer)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
```
---
## 四、API 接口设计
### 4.1 工具相关 API
#### 获取可用工具列表
```
GET /api/tools
```
**响应:**
```json
{
"code": 0,
"data": {
"tools": [
{
"name": "web_search",
"description": "搜索互联网获取实时信息",
"parameters": { ... }
}
]
}
}
```
### 4.2 爬虫相关 API
#### 创建爬虫任务
```
POST /api/crawler/tasks
```
**请求体:**
```json
{
"urls": ["https://example.com/page1", "https://example.com/page2"],
"extract_type": "text",
"parallel": 3
}
```
**响应:**
```json
{
"code": 0,
"data": {
"task_id": "abc12345",
"status": "pending",
"total": 2
}
}
```
#### 查询任务状态
```
GET /api/crawler/tasks/:task_id
```
**响应:**
```json
{
"code": 0,
"data": {
"id": "abc12345",
"status": "completed",
"progress": 2,
"total": 2,
"results": [
{
"url": "https://example.com/page1",
"data": { "content": { "text": "..." }, "metadata": { ... } }
}
]
}
}
```
#### 获取任务列表
```
GET /api/crawler/tasks?status=completed&limit=20
```
---
## 五、前端集成
### 5.1 工具调用 UI 组件
```vue
```
### 5.2 SSE 事件扩展
扩展消息 API 的 SSE 事件,新增工具调用相关事件:
| 事件 | 说明 |
| ------------- | -------- |
| `tool_call` | 模型发起工具调用 |
| `tool_result` | 工具执行结果 |
| `thinking` | 思维链内容 |
| `message` | 回复内容片段 |
| `done` | 完成 |
---
## 六、配置与部署
### 6.1 配置文件扩展
```yaml
# config.yml
# ... 现有配置 ...
# 爬虫配置
crawler:
max_workers: 5
timeout: 30
max_retries: 2
user_agent: "Mozilla/5.0 (compatible; NanoClawBot/1.0)"
# 工具配置
tools:
enabled:
- web_search
- fetch_page
- crawl_batch
- query_crawl_task
max_iterations: 5 # 最大工具调用迭代次数
```
### 6.2 依赖安装
```toml
# pyproject.toml 新增依赖
dependencies = [
# ... 现有依赖 ...
"duckduckgo-search>=4.0.0",
"beautifulsoup4>=4.12.0",
"httpx>=0.25.0",
"lxml>=4.9.0",
]
```
---
## 七、使用示例
### 7.1 用户对话示例
```
用户: 帮我搜索一下最近 AI 领域有什么重要新闻
助手: [调用 web_search 工具]
query: "AI 人工智能 最新新闻 2024"
→ 返回搜索结果
助手: 根据搜索结果,最近 AI 领域有以下重要新闻:
1. **OpenAI 发布 GPT-5** - [链接]
OpenAI 正式发布了新一代模型 GPT-5...
2. **Google Gemini 2.0 发布** - [链接]
Google 宣布推出 Gemini 2.0...
---
用户: 帮我把这几个链接的内容都抓取下来
助手: [调用 crawl_batch 工具]
urls: ["https://...", "https://..."]
→ 返回任务ID
助手: 已创建爬虫任务,正在抓取 2 个网页...
助手: [自动调用 query_crawl_task 查询结果]
助手: 抓取完成!以下是内容摘要:
**文章1: OpenAI 发布 GPT-5**
> 核心内容:新模型在推理能力上提升了 50%...
**文章2: Google Gemini 2.0 发布**
> 核心内容:多模态能力大幅增强...
```
### 7.2 API 调用示例
```python
# 创建会话并启用工具
import requests
# 创建会话
resp = requests.post("http://localhost:3000/api/conversations", json={
"title": "AI 新闻调研",
"model": "glm-5"
})
conv_id = resp.json()["data"]["id"]
# 发送消息(自动触发工具调用)
resp = requests.post(
f"http://localhost:3000/api/conversations/{conv_id}/messages",
json={"content": "帮我搜索最新的 AI 新闻", "stream": true},
stream=True
)
# 处理 SSE 事件
for line in resp.iter_lines():
# event: tool_call
# event: tool_result
# event: message
# event: done
pass
```
---
## 八、安全与限制
### 8.1 安全措施
1. **URL 白名单/黑名单**:限制可爬取的域名
2. **速率限制**:控制请求频率,避免被封禁
3. **内容过滤**:过滤敏感内容
4. **用户隔离**:任务按用户隔离
### 8.2 使用限制
```python
# backend/tools/limits.py
TOOL_LIMITS = {
"web_search": {
"max_results": 10,
"rate_limit": "10/minute"
},
"fetch_page": {
"max_content_size": 1024 * 1024, # 1MB
"timeout": 30
},
"crawl_batch": {
"max_urls": 50,
"parallel_max": 5
}
}
```
---
## 九、后续扩展
1. **更多工具类型**:
- 数据分析工具(图表生成、数据统计)
- 文件处理工具(PDF 解析、Excel 处理)
- 代码执行工具(安全沙箱中运行代码)
2. **爬虫增强**:
- JavaScript 渲染(Playwright/Selenium)
- 代理池支持
- 分布式爬虫
3. **智能调度**:
- 基于对话上下文的工具推荐
- 工具链组合执行
- 异步任务通知
---
## 十、总结
本设计文档描述了 NanoClaw 的爬虫流水线和工具系统架构:
1. **工具系统**:采用 OpenAI 兼容的工具定义格式,通过工具注册表管理,支持 GLM 模型的自动工具调用。
2. **爬虫流水线**:包含搜索服务、抓取服务、任务管理器,支持单页抓取和批量任务,提供多种内容提取模式。
3. **API 设计**:扩展现有 API 支持 SSE 工具调用事件,新增爬虫任务管理接口。
4. **前端集成**:提供工具调用可视化组件,支持工具执行过程的实时展示。
这套架构使 NanoClaw 能够突破模型知识截止日期的限制,获取实时网络信息,大幅扩展应用场景。