LLM工具调用与流式生成
ViperEkura Lv1

项目概览

属性
项目名称 NanoClaw
技术栈 Flask + SQLAlchemy + PyJWT
Python 版本 >= 3.10
核心能力 多 LLM 提供商适配 / 工具调用 / 流式 SSE / 工作空间隔离 / 用户认证支持的 LLM 提供商:

技术依赖:

1
2
3
4
5
6
7
8
flask>=3.0            # Web 框架
flask-sqlalchemy>=3.1 # ORM
flask-cors>=4.0 # 跨域支持
PyJWT>=2.8 # JWT 认证
requests>=2.31 # HTTP 客户端
httpx>=0.25 # 异步 HTTP
beautifulsoup4>=4.12 # HTML 解析
duckduckgo-search>=8.0 # 搜索引擎

整体架构

NanoClaw 后端采用经典的 分层架构 模式:

graph TB
    subgraph "Client Layer"
        FE[Vue.js Frontend]
    end

    subgraph "Route Layer - routes/"
        AUTH[auth.py]
        CONV[conversations.py]
        MSG[messages.py]
        MDL[models.py]
        TPL[tools.py]
        PRJ[projects.py]
        STA[stats.py]
    end

    subgraph "Service Layer - services/"
        CS["ChatService\nAgentic Loop + SSE"]
        LC["LLMClient\n多提供商适配"]
    end

    subgraph "Tool System - tools/"
        TR["ToolRegistry\n单例注册表"]
        TE["ToolExecutor\n缓存 + 去重 + 并行"]
        TF["tool() 装饰器\n工厂注册"]
        BT["内置工具 / 14 个"]
    end

    subgraph "Data Layer"
        ORM[SQLAlchemy ORM]
        DB[(SQLite / MySQL / PostgreSQL)]
    end

    FE --> AUTH
    FE --> CONV
    FE --> MSG
    FE --> MDL
    FE --> TPL
    FE --> PRJ
    FE --> STA

    MSG --> CS
    CS --> LC
    CS --> TE
    TE --> TR
    TR --> BT
    TF --> TR

    CS --> ORM
    ORM --> DB

    LC -.-> |HTTP Stream| LLM["LLM API\nDeepSeek / GLM / OpenAI"]

目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
backend/
├── __init__.py # Flask 应用工厂
├── run.py # 入口
├── config.py # 配置管理
├── models.py # ORM 模型
├── routes/ # 路由层
│ ├── auth.py # 认证 (JWT / Single)
│ ├── messages.py # 消息 + 触发流式响应
│ ├── conversations.py # 会话 CRUD
│ ├── models.py # 模型列表
│ ├── tools.py # 工具列表
│ ├── projects.py # 项目管理 + 文件操作
│ └── stats.py # Token 统计
├── services/ # 服务层
│ ├── chat.py # 核心聊天服务 (SSE + Agentic Loop)
│ └── llm_client.py # LLM API 客户端
├── tools/ # 工具系统
│ ├── core.py # ToolDefinition / ToolResult / ToolRegistry
│ ├── factory.py # @tool 装饰器
│ ├── executor.py # ToolExecutor (缓存/去重/并行)
│ ├── services.py # 搜索/抓取/计算 辅助服务
│ └── builtin/ # 内置工具
│ ├── crawler.py # 网页搜索 + 抓取
│ ├── code.py # Python 代码执行 (沙箱)
│ ├── data.py # 计算器 + 文本/JSON 处理
│ ├── file_ops.py # 文件读写 (工作空间隔离)
│ ├── weather.py # 天气查询
│ └── agent.py # 并行执行 / 子 Agent
└── utils/ # 工具函数
├── helpers.py # 响应封装 / Token 记录
└── workspace.py # 路径安全验证

工具系统设计

工具系统是 NanoClaw 实现 “Agentic” 能力的基石。它采用 注册表 + 工厂 + 执行器 三层架构,实现了声明式工具定义、自动注册、安全执行和智能缓存。

工具定义与注册

工具定义通过 Python @dataclass 和装饰器模式实现,极为优雅:

classDiagram
    class ToolDefinition {
        +str name
        +str description
        +dict parameters
        +Callable handler
        +str category
        +to_openai_format() dict
    }

    class ToolResult {
        +bool success
        +Any data
        +str error
        +to_dict() dict
        +ok(data) ToolResult$
        +fail(error) ToolResult$
    }

    class ToolRegistry {
        -Dict~str, ToolDefinition~ _tools
        -ToolRegistry _instance$
        +register(tool) void
        +get(name) ToolDefinition?
        +list_all() List~dict~
        +execute(name, arguments) dict
    }

    class tool_decorator {
        <>
        +tool(name, description, parameters, category) Callable
    }

    class register_tool_fn {
        <>
        +register_tool(name, handler, ...) void
    }

    tool_decorator --> ToolDefinition : creates
    register_tool_fn --> ToolDefinition : creates
    ToolDefinition --> ToolRegistry : register
    ToolRegistry --> ToolDefinition : stores
    ToolRegistry --> ToolResult : returns

使用示例 — 定义一个工具只需一个装饰器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from backend.tools.factory import tool

@tool(
name="web_search",
description="Search the internet for information",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search keywords"},
"max_results": {"type": "integer", "description": "Max results", "default": 5}
},
"required": ["query"]
},
category="crawler"
)
def web_search(arguments: dict) -> dict:
service = SearchService()
results = service.search(arguments["query"], arguments.get("max_results", 5))
return {"results": results}

@tool 装饰器在 模块导入时 自动触发,将 ToolDefinition 注册到全局单例 ToolRegistry。整个注册过程对开发者完全透明。

ToolRegistry 的单例模式实现:

1
2
3
4
5
6
7
8
class ToolRegistry:
_instance = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._tools: Dict[str, ToolDefinition] = {}
return cls._instance

to_openai_format() 方法将工具定义转换为 OpenAI 兼容格式,直接传递给 LLM API:

1
2
3
4
5
6
7
8
9
def to_openai_format(self) -> dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters # JSON Schema
}
}

工具执行器

ToolExecutor 是工具系统的“大脑”,负责管理工具调用的完整生命周期:

flowchart TD
    A[接收 LLM tool_calls] --> B[解析 JSON 参数]
    B --> C{参数合法?}
    C -->|No| D[返回错误结果]
    C -->|Yes| E_inject[_inject_context\n注入 project_id 等]
    E_inject --> F{批次内去重\nseen_calls}
    F -->|重复| G[返回去重结果]
    F -->|新调用| H{历史去重\n_call_history}
    H -->|命中| I[返回缓存结果]
    H -->|未命中| J{缓存查找\nMD5 key, TTL 5min}
    J -->|命中| K[返回缓存结果]
    J -->|未命中| L[_execute_tool\n实际执行]
    L --> M{执行成功?}
    M -->|Yes| N[写入缓存 + 历史]
    M -->|No| O[返回错误]
    N --> P[返回结果]

三级去重机制ToolExecutor 最重要的优化:

级别 作用域 实现 场景
批次内去重 单次 LLM 响应 seen_calls: set LLM 同一轮返回重复的工具调用
历史去重 整个请求会话 _call_history: List 多轮迭代中重复调用同一工具
TTL 缓存 跨请求 _cache: Dict, MD5 key, 5 分钟 相同参数的缓存复用

Per-request 实例隔离 — 每个流式请求创建独立的 ToolExecutor

1
2
# ChatService.stream_response() 中
executor = ToolExecutor(registry=registry)

这确保了并发请求之间 _call_history_cache 完全隔离,避免了线程安全问题。

内置工具一览

NanoClaw 内置了 14 个工具,覆盖 6 大类能力:

graph LR
    AGENT(["AGENT
parallel_execute"]) subgraph TOOLS CRAWLER["CRAWLER
web_search · fetch_page · crawl_batch"] CODE["CODE
execute_python"] FILE["FILE
read · write · delete · list · exists · mkdir"] DATA["DATA
calculator · text_process · json_process"] WEATHER["WEATHER
get_weather"] end AGENT --> CRAWLER AGENT --> CODE AGENT --> FILE AGENT --> DATA AGENT --> WEATHER style AGENT fill:#2c3e50,stroke:#1a2632,stroke-width:2px,color:#fff style TOOLS fill:#ecf0f1,stroke:#bdc3c7,stroke-width:1px style CRAWLER fill:#fff,stroke:#3498db style CODE fill:#fff,stroke:#2ecc71 style FILE fill:#fff,stroke:#9b59b6 style DATA fill:#fff,stroke:#f39c12 style WEATHER fill:#fff,stroke:#1abc9c

Agentic Loop 详解

Agentic Loop 是 NanoClaw 的核心智能循环,赋予了 LLM 自主决策和行动 的能力。它不是一个简单的请求-响应模型,而是一个 “思考 → 行动 → 观察 → 再思考” 的迭代过程。

从用户消息到工具执行

下面是完整的请求处理流程:

sequenceDiagram
    participant U as User
    participant API as POST /messages
    participant CS as ChatService
    participant LLM as LLM API
    participant TE as ToolExecutor
    participant TR as ToolRegistry
    participant DB as Database

    U->>API: POST {text, tools_enabled, project_id}
    API->>DB: 保存用户消息 (role=user)
    API->>CS: stream_response(conv, tools_enabled, project_id)

    loop 最多 MAX_ITERATIONS (15) 轮
        CS->>LLM: call(model, messages, tools, stream=true)

        LLM-->>CS: SSE Stream (chunks)

        alt 有 tool_calls
            CS->>TE: process_tool_calls(tool_calls, context)

            TE->>TE: 解析参数 → 注入上下文 → 去重 → 缓存检查

            alt 多个工具调用
                TE->>TR: ThreadPoolExecutor 并行执行
            else 单个工具
                TE->>TR: 串行执行
            end

            TR-->>TE: ToolResult
            TE-->>CS: tool_results

            CS->>CS: 追加 assistant msg + tool results 到 messages
            CS-->>U: SSE: tool_call + tool_result steps
        else 无 tool_calls (最终轮)
            CS->>DB: 保存助手消息 (role=assistant, steps)
            CS->>DB: 记录 token 用量
            CS->>DB: 自动生成会话标题
            CS-->>U: SSE: done {message_id, token_count}
        end
    end

关键代码路径 (services/chat.py:76-294):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for iteration in range(MAX_ITERATIONS):
# 1. 调用 LLM (流式)
resp = self.llm.call(model=..., messages=..., tools=tools, stream=True)

# 2. 流式解析 SSE chunks
for line in resp.iter_lines():
# 提取 reasoning_content (thinking) → 增量推送
# 提取 content (text) → 增量推送
# 累积 tool_calls delta

# 3. 分支判断
if tool_calls_list:
# Phase 1: 推送 tool_call steps
# Phase 2: 执行工具 (并行/串行)
# Phase 3: 推送 tool_result steps
# 追加到 messages → continue 下一轮
else:
# 保存消息到 DB → 发送 done → return

并行工具调用

当 LLM 在单轮中返回多个独立的工具调用时,ToolExecutor 会自动切换到并行模式:

flowchart LR
    subgraph "Phase 1: 准备 (串行)"
        A[解析参数] --> B[注入上下文]
        B --> C[去重检查]
        C --> D[缓存查找]
    end

    subgraph "Phase 2: 执行 (并行)"
        E["ThreadPoolExecutor\nmax_workers=4"]
        F[Task 1]
        G[Task 2]
        H[Task 3]
        E --> F
        E --> G
        E --> H
    end

    subgraph "Phase 3: 汇总"
        I["按原始顺序\n合并结果"]
    end

    D --> E
    F --> I
    G --> I
    H --> I

设计哲学:Phase 1 必须串行执行,避免 _call_history / _cache 的竞态条件。只有实际的工具执行才并行化。

子 Agent 系统

agent.py 中实现了更高级的 多 Agent 并行执行 能力(当前 @tool 装饰器已注释,但核心逻辑完整):

flowchart TB
    subgraph "Main Agent"
        MA[主 Agent Loop]
    end

    subgraph "Sub Agents (ThreadPool)"
        SA1["Sub Agent 1\n研究任务\nweb_search + fetch_page"]
        SA2["Sub Agent 2\n代码审查\nfile_read + file_list"]
        SA3["Sub Agent 3\n数据分析\ncalculator + json_process"]
    end

    MA -->|"agent_task"| SA1
    MA --> SA2
    MA --> SA3

    SA1 --> |"独立 LLM Loop\nmax 3 iterations\nmax 4096 tokens"| R1[研究结果]
    SA2 --> |"独立 LLM Loop"| R2[审查报告]
    SA3 --> |"独立 LLM Loop"| R3[分析结果]

    R1 --> MA
    R2 --> MA
    R3 --> MA

每个子 Agent 拥有:

  • 独立的 messages 列表
  • 独立的 ToolExecutor 实例
  • 最多 3 轮迭代
  • 最多 4096 tokens 的输出限制

流式响应机制

NanoClaw 的流式响应基于 Server-Sent Events (SSE) 协议,这是实现“打字机效果”和实时进度展示的核心。

SSE 协议设计

stateDiagram-v2
    [*] --> Thinking: LLM 返回 reasoning_content
    Thinking --> Text: LLM 返回 content
    Thinking --> ToolCall: LLM 返回 tool_calls
    Text --> ToolCall: LLM 返回 tool_calls
    Text --> Done: 无 tool_calls
    ToolCall --> ToolResult: 工具执行完成
    ToolResult --> Thinking: 继续下一轮迭代
    ToolResult --> Text: 继续下一轮迭代
    Done --> [*]: 保存消息到 DB

四种 SSE 事件类型:

事件类型 数据格式 触发时机
process_step {id, index, type: "thinking", content} LLM 推理过程 (GLM thinking)
process_step {id, index, type: "text", content} LLM 文本输出
process_step {id, index, type: "tool_call", id_ref, name, arguments} 工具调用声明
process_step {id, index, type: "tool_result", id_ref, name, content} 工具执行结果
done {message_id, token_count, suggested_title} 最终响应完成
error {content: "error message"} 出错时

SSE 数据流示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
event: process_step
data: {"id": "step-0", "index": 0, "type": "thinking", "content": "让我先搜索一下..."}

event: process_step
data: {"id": "step-0", "index": 0, "type": "thinking", "content": "让我先搜索一下用户的提问...用户问的是 Python 异步编程"}

event: process_step
data: {"id": "step-1", "index": 1, "type": "text", "content": "我来帮你搜索关于"}

event: process_step
data: {"id": "step-1", "index": 1, "type": "text", "content": "我来帮你搜索关于 Python asyncio 的资料"}

event: process_step
data: {"id": "step-2", "index": 2, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{\"query\": \"Python asyncio tutorial\"}"}

event: process_step
data: {"id": "step-3", "index": 3, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{\"results\": [...]}"}}

event: done
data: {"message_id": "uuid-xxx", "token_count": 1523, "suggested_title": "Python asyncio 教程"}

增量推送策略

NanoClaw 采用 累积式推送 而非增量式推送:

1
2
3
# thinking 和 text 每次推送完整的累积内容
full_thinking += reasoning # 累积
yield f"event: process_step\ndata: {json.dumps({'type': 'thinking', 'content': full_thinking})}\n\n"

为什么选择累积而非增量? 前端可以直接用收到的内容 替换 当前显示,无需维护增量拼接逻辑。这简化了前端实现,也避免了增量丢失时的渲染问题。

tool_call 和 tool_result 则是一次性推送,不需要增量。

客户端断连检测

流式生成过程中,服务端需要实时检测客户端是否已断开,以避免浪费 LLM Token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _client_disconnected():
"""Check if the client has disconnected."""
try:
stream = flask_request.input_stream
if stream is None:
return False
return stream.closed
except Exception:
return False

# 在流式循环中检查
for line in resp.iter_lines():
if _client_disconnected():
resp.close()
return # 立即终止生成

外层还有 safe_generate() 包裹,捕获 ClientDisconnected / BrokenPipeError / ConnectionResetError

1
2
3
4
5
def safe_generate():
try:
yield from generate()
except (ClientDisconnected, BrokenPipeError, ConnectionResetError):
pass # 客户端断连,静默停止

防 nginx 缓冲的响应头:

1
2
3
4
5
6
7
8
9
10
Response(
safe_generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"X-Accel-Buffering": "no", # 禁止 nginx 缓冲
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
}
)

类图与关系

以下是 NanoClaw 后端核心类的关系总览:

classDiagram
    class FlaskApp {
        +create_app() Flask
        +load_config() dict
        -_get_database_uri(cfg) str
    }

    class User {
        +int id
        +str username
        +str password_hash
        +str email
        +str role
        +conversations
        +projects
    }

    class Conversation {
        +str id
        +int user_id
        +str project_id
        +str title
        +str model
        +str system_prompt
        +float temperature
        +int max_tokens
        +bool thinking_enabled
        +messages
    }

    class Message {
        +str id
        +str conversation_id
        +str role
        +LongText content
        +int token_count
    }

    class TokenUsage {
        +int id
        +int user_id
        +date date
        +str model
        +int prompt_tokens
        +int completion_tokens
    }

    class Project {
        +str id
        +int user_id
        +str name
        +str path
        +conversations
    }

    class ChatService {
        -LLMClient llm
        +stream_response(conv, tools_enabled, project_id) Response
        -_build_tool_calls_json(tool_calls, tool_results) list
        -_process_tool_calls_delta(delta, tool_calls_list) list
    }

    class LLMClient {
        -dict model_config
        +call(model, messages, ...) Response
        -_get_credentials(model) tuple
        -_build_body(model, messages, ...) dict
    }

    class ToolExecutor {
        -ToolRegistry registry
        -bool enable_cache
        -int cache_ttl
        -Dict _cache
        -List _call_history
        +process_tool_calls(tool_calls, context) list
        +process_tool_calls_parallel(tool_calls, context, max_workers) list
        -_execute_tool(name, arguments) dict
        -_inject_context(name, args, context) void
        -_make_cache_key(name, args) str
    }

    class ToolRegistry {
        -Dict~str, ToolDefinition~ _tools
        +register(tool) void
        +get(name) ToolDefinition?
        +list_all() list
        +execute(name, arguments) dict
    }

    class ToolDefinition {
        +str name
        +str description
        +dict parameters
        +Callable handler
        +str category
        +to_openai_format() dict
    }

    class ToolResult {
        +bool success
        +Any data
        +str error
    }

    User "1" --> "*" Conversation
    User "1" --> "*" Project
    Project "1" --> "*" Conversation
    Conversation "1" --> "*" Message

    ChatService --> LLMClient
    ChatService --> ToolExecutor
    ToolExecutor --> ToolRegistry
    ToolRegistry --> ToolDefinition
    ToolRegistry ..> ToolResult
    FlaskApp ..> ChatService
    FlaskApp ..> ToolRegistry

数据库 ER 关系:

erDiagram
    users ||--o{ conversations : "has"
    users ||--o{ projects : "has"
    projects ||--o{ conversations : "belongs to (optional)"
    conversations ||--o{ messages : "has"
    users ||--o{ token_usage : "tracks"

    users {
        int id PK
        string username UK
        string password_hash
        string email UK
        string role
    }

    conversations {
        string id PK
        int user_id FK
        string project_id FK
        string title
        string model
        text system_prompt
        float temperature
        int max_tokens
        boolean thinking_enabled
    }

    messages {
        string id PK
        string conversation_id FK
        string role
        longtext content
        int token_count
    }

    projects {
        string id PK
        int user_id FK
        string name
        string path
    }

    token_usage {
        int id PK
        int user_id FK
        date date
        string model
        int prompt_tokens
        int completion_tokens
    }

关键技术亮点

1. 装饰器驱动的工具注册

1
2
3
4
# 一行装饰器 = 完整的工具定义 + 自动注册
@tool(name="web_search", description="...", parameters={...}, category="crawler")
def web_search(arguments: dict) -> dict:
...

无需手动注册、无需 XML/JSON 配置文件、无需继承特定基类。Pythonic 且优雅。

2. 多提供商 LLM 适配

1
2
3
4
def _detect_provider(api_url: str) -> str:
if "deepseek" in api_url: return "deepseek"
elif "bigmodel" in api_url: return "glm"
else: return "openai"

LLMClient._build_body() 根据 provider 自动调整参数:

参数 DeepSeek GLM OpenAI
max_tokens 上限 8192 上限 65536 用户指定
thinking 不支持 (R1 内置) {"type": "enabled"} 不支持
stream_options 不支持 {"include_usage": true} 可选
速率限制重试 指数退避 (1s→2s→4s) 同左 同左

3. 环境变量展开

1
2
3
# config.yml 中支持 ${VAR} 语法
api_key: ${DEEPSEEK_API_KEY}
api_url: ${LLM_BASE_URL}/chat/completions

避免将敏感密钥硬编码在配置文件中。

4. 沙箱代码执行

execute_python 工具实现了 三层安全防护

  1. AST 静态分析 — 检测危险 import 和函数调用
  2. 运行时限制 — 子进程隔离 + 10 秒超时 + 环境变量清理
  3. 黑名单机制 — 40+ 危险模块(os, subprocess, socket…)+ 12 个危险内置函数(eval, exec, open…)
1
2
3
# 黑名单检查
BLOCKED_MODULES = {"os", "sys", "subprocess", "socket", "http", "urllib", ...}
BLOCKED_BUILTINS = {"eval", "exec", "compile", "open", "__import__", ...}

5. 工作空间路径安全

1
2
3
4
def validate_path_in_project(path: str, project_dir: Path) -> Path:
p = Path(path).resolve()
p.relative_to(project_dir.resolve()) # ValueError = 路径逃逸!
return p

通过 Path.resolve() + relative_to() 双重验证,防止 ../../etc/passwd 等目录逃逸攻击。

6. 步骤有序存储

助手消息的 content 字段中包含 steps 数组,是前端渲染的 唯一真相来源 (Single Source of Truth)

1
2
3
4
5
6
7
8
9
{
"text": "根据搜索结果,Python asyncio 是...",
"steps": [
{"id": "step-0", "index": 0, "type": "thinking", "content": "需要搜索..."},
{"id": "step-1", "index": 1, "type": "tool_call", "id_ref": "call_abc", "name": "web_search", "arguments": "{...}"},
{"id": "step-2", "index": 2, "type": "tool_result", "id_ref": "call_abc", "name": "web_search", "content": "{...}"},
{"id": "step-3", "index": 3, "type": "text", "content": "根据搜索结果..."}
]
}

这确保了页面刷新后,历史消息依然能按照正确的顺序渲染所有步骤(thinking → text → tool_call → tool_result 交替出现)。

7. 认证双模式

特性 Single 模式 Multi 模式
用户管理 自动创建 default 用户 注册 + 登录
认证方式 免认证 JWT Bearer Token
适用场景 个人开发 / 内网部署 公网服务 / 多用户
Token 有效期 7 天

通过 config.ymlauth_mode 字段一键切换。


总结

NanoClaw 后端的工具调用和流式生成系统体现了以下设计理念:

设计原则 具体体现
声明式 @tool 装饰器定义工具,零配置注册
渐进式 Agentic Loop 迭代深化,多轮工具调用逐步逼近答案
实时性 SSE 流式推送,thinking/text 增量渲染
安全性 路径隔离 + 代码沙箱 + JWT 认证 + 敏感字段过滤
可扩展 工厂模式 + 注册表,新增工具只需一个 Python 函数
高性能 并行工具调用 + 三级去重 + TTL 缓存
容错性 客户端断连检测 + 速率限制重试 + 异常安全

这套架构可以支撑从简单的对话机器人到复杂的 AI Agent 系统的演进,是构建下一代智能应用的良好参考。

项目地址Github

 REWARD AUTHOR