feat: 添加工具执行严格程度分级与白名单安全机制

This commit is contained in:
ViperEkura 2026-03-28 16:18:23 +08:00
parent 24e8497230
commit 3970c0b9a0
3 changed files with 256 additions and 80 deletions

View File

@ -46,3 +46,8 @@ SUB_AGENT_MAX_ITERATIONS = _sa.get("max_iterations", 3)
SUB_AGENT_MAX_TOKENS = _sa.get("max_tokens", 4096)
SUB_AGENT_MAX_AGENTS = _sa.get("max_agents", 5)
SUB_AGENT_MAX_CONCURRENCY = _sa.get("max_concurrency", 3)
# Code execution settings
_ce = _cfg.get("code_execution", {})
CODE_EXECUTION_DEFAULT_STRICTNESS = _ce.get("default_strictness", "standard")
CODE_EXECUTION_EXTRA_MODULES = _ce.get("extra_allowed_modules", {})

View File

@ -1,55 +1,131 @@
"""Safe code execution tool with sandboxing"""
"""Safe code execution tool with sandboxing and strictness levels"""
import ast
import subprocess
import sys
import tempfile
import textwrap
from pathlib import Path
from typing import Dict, List, Set
from backend.tools.factory import tool
from backend.config import CODE_EXECUTION_DEFAULT_STRICTNESS as DEFAULT_STRICTNESS
from backend.config import CODE_EXECUTION_EXTRA_MODULES as _CFG_EXTRA_MODULES
# Blacklist of dangerous modules - all other modules are allowed
BLOCKED_MODULES = {
# System-level access
"os", "sys", "subprocess", "shutil", "signal", "ctypes",
"multiprocessing", "threading", "_thread",
# Network access
"socket", "http", "urllib", "requests", "ftplib", "smtplib",
"telnetlib", "xmlrpc", "asyncio",
# File system / I/O
"pathlib", "io", "glob", "tempfile", "shutil", "fnmatch",
# Code execution / introspection
"importlib", "pkgutil", "code", "codeop", "compileall",
"runpy", "pdb", "profile", "cProfile",
# Dangerous stdlib
"webbrowser", "antigravity", "turtle",
# IPC / persistence
"pickle", "shelve", "marshal", "sqlite3", "dbm",
# Process / shell
"commands", "pipes", "pty", "posix", "posixpath",
# Strictness profiles configuration
# - lenient: no restrictions at all
# - standard: allowlist based, only safe modules permitted
# - strict: minimal allowlist, only pure computation modules
STRICTNESS_PROFILES: Dict[str, dict] = {
"lenient": {
"timeout": 30,
"description": "No restrictions, all modules and builtins allowed",
"allowlist_modules": None, # None means all allowed
"blocked_builtins": set(),
},
"standard": {
"timeout": 10,
"description": "Allowlist based, only safe modules and builtins permitted",
"allowlist_modules": {
# Data types & serialization
"json", "csv", "re", "typing",
# Data structures
"collections", "itertools", "functools", "operator", "heapq", "bisect",
"array", "copy", "pprint", "enum",
# Math & numbers
"math", "cmath", "statistics", "random", "fractions", "decimal", "numbers",
# Date & time
"datetime", "time", "calendar",
# Text processing
"string", "textwrap", "unicodedata", "difflib",
# Data formats
"base64", "binascii", "quopri", "uu", "html", "xml.etree.ElementTree",
# Functional & concurrency helpers
"dataclasses", "hashlib", "hmac",
# Common utilities
"abc", "contextlib", "warnings", "logging",
},
"blocked_builtins": {
"eval", "exec", "compile", "__import__",
"open", "input", "globals", "locals", "vars",
"breakpoint", "exit", "quit",
"memoryview", "bytearray",
"getattr", "setattr", "delattr",
},
},
"strict": {
"timeout": 5,
"description": "Minimal allowlist, only pure computation modules",
"allowlist_modules": {
# Pure data structures
"collections", "itertools", "functools", "operator",
"array", "copy", "enum",
# Pure math
"math", "cmath", "numbers", "fractions", "decimal",
"random", "statistics",
# Pure text
"string", "textwrap", "unicodedata",
# Type hints
"typing",
# Utilities (no I/O)
"dataclasses", "abc", "contextlib",
},
"blocked_builtins": {
"eval", "exec", "compile", "__import__",
"open", "input", "globals", "locals", "vars",
"breakpoint", "exit", "quit",
"memoryview", "bytearray",
"dir", "hasattr", "getattr", "setattr", "delattr",
"type", "isinstance", "issubclass",
},
},
}
# Blacklist of dangerous builtins
BLOCKED_BUILTINS = {
"eval", "exec", "compile", "open", "input",
"__import__", "globals", "locals", "vars",
"breakpoint", "exit", "quit",
"memoryview", "bytearray",
}
def register_extra_modules(strictness: str, modules: Set[str] | List[str]) -> None:
"""Register additional modules to a strictness level's allowlist.
Args:
strictness: One of "lenient", "standard", "strict".
modules: Module names to add to the allowlist.
"""
if strictness not in STRICTNESS_PROFILES:
raise ValueError(f"Invalid strictness level: {strictness}. Must be one of: {', '.join(STRICTNESS_PROFILES.keys())}")
profile = STRICTNESS_PROFILES[strictness]
if profile.get("allowlist_modules") is None:
return # lenient mode allows everything, nothing to add
profile["allowlist_modules"].update(modules)
# Apply extra modules from config.yml on module load
for _level, _mods in _CFG_EXTRA_MODULES.items():
if isinstance(_mods, list) and _mods:
register_extra_modules(_level, _mods)
@tool(
name="execute_python",
description="Execute Python code in a sandboxed environment. Most standard library modules are allowed, with dangerous modules (os, subprocess, socket, etc.) blocked. Max execution time: 10 seconds.",
description="Execute Python code in a sandboxed environment with configurable strictness levels (lenient/standard/strict). "
"Default: 'standard' mode - balances security and flexibility with 10s timeout. "
"Use 'lenient' for data processing tasks (30s timeout, more modules allowed). "
"Use 'strict' for basic calculations only (5s timeout, minimal module access).",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute. Dangerous modules (os, subprocess, socket, etc.) are blocked."
"description": "Python code to execute. Available modules depend on strictness level."
},
"strictness": {
"type": "string",
"enum": ["lenient", "standard", "strict"],
"description": "Optional. Security strictness level (default: standard). "
"lenient: 30s timeout, most modules allowed; "
"standard: 10s timeout, balanced security; "
"strict: 5s timeout, minimal permissions."
}
},
"required": ["code"]
@ -61,36 +137,57 @@ def execute_python(arguments: dict) -> dict:
Execute Python code safely with sandboxing.
Security measures:
1. Blocked dangerous imports (blacklist)
2. Blocked dangerous builtins
3. Timeout limit (10s)
4. No file system access
5. No network access
1. Lenient mode: no restrictions
2. Standard/strict mode: allowlist based module restrictions
3. Configurable blocked builtins based on strictness level
4. Timeout limit (5s/10s/30s based on strictness)
5. Subprocess isolation
"""
code = arguments["code"]
# Security check: detect dangerous imports
dangerous_imports = _check_dangerous_imports(code)
if dangerous_imports:
strictness = arguments.get("strictness", DEFAULT_STRICTNESS)
# Validate strictness level
if strictness not in STRICTNESS_PROFILES:
return {
"success": False,
"error": f"Blocked imports: {', '.join(dangerous_imports)}. These modules are not allowed for security reasons."
"error": f"Invalid strictness level: {strictness}. Must be one of: {', '.join(STRICTNESS_PROFILES.keys())}"
}
# Get profile configuration
profile = STRICTNESS_PROFILES[strictness]
allowlist_modules = profile.get("allowlist_modules")
blocked_builtins = profile["blocked_builtins"]
timeout = profile["timeout"]
# Security check: detect dangerous function calls
dangerous_calls = _check_dangerous_calls(code)
if dangerous_calls:
# Parse and validate code syntax first
try:
tree = ast.parse(code)
except SyntaxError as e:
return {"success": False, "error": f"Syntax error in code: {e}"}
# Security check: detect disallowed imports
disallowed_imports = _check_disallowed_imports(tree, allowlist_modules)
if disallowed_imports:
return {
"success": False,
"error": f"Blocked functions: {', '.join(dangerous_calls)}"
"error": f"Blocked imports: {', '.join(disallowed_imports)}. These modules are not allowed in '{strictness}' mode."
}
# Security check: detect dangerous function calls (skip if no restrictions)
if blocked_builtins:
dangerous_calls = _check_dangerous_calls(tree, blocked_builtins)
if dangerous_calls:
return {
"success": False,
"error": f"Blocked functions: {', '.join(dangerous_calls)}. These functions are not allowed in '{strictness}' mode."
}
# Execute in isolated subprocess
try:
result = subprocess.run(
[sys.executable, "-c", _build_safe_code(code)],
[sys.executable, "-c", _build_safe_code(code, blocked_builtins, allowlist_modules)],
capture_output=True,
timeout=10,
timeout=timeout,
cwd=tempfile.gettempdir(),
encoding="utf-8",
env={ # Clear environment variables
@ -99,18 +196,25 @@ def execute_python(arguments: dict) -> dict:
)
if result.returncode == 0:
return {"success": True, "output": result.stdout}
return {
"success": True,
"output": result.stdout,
"strictness": strictness,
"timeout": timeout
}
else:
return {"success": False, "error": result.stderr or "Execution failed"}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Execution timeout (10s limit)"}
return {"success": False, "error": f"Execution timeout ({timeout}s limit in '{strictness}' mode)"}
except Exception as e:
return {"success": False, "error": f"Execution error: {str(e)}"}
def _build_safe_code(code: str) -> str:
"""Build sandboxed code with restricted globals"""
def _build_safe_code(code: str, blocked_builtins: Set[str],
allowlist_modules: Set[str] | None = None) -> str:
"""Build sandboxed code with restricted globals and runtime import hook."""
allowlist_repr = "None" if allowlist_modules is None else repr(allowlist_modules)
template = textwrap.dedent('''
import builtins
@ -118,6 +222,20 @@ def _build_safe_code(code: str) -> str:
_BLOCKED = %r
_safe_builtins = {k: getattr(builtins, k) for k in dir(builtins) if k not in _BLOCKED}
# Runtime import hook for allowlist enforcement
_ALLOWLIST = %s
if _ALLOWLIST is not None:
_original_import = builtins.__import__
def _restricted_import(name, *args, **kwargs):
top_level = name.split(".")[0]
if top_level not in _ALLOWLIST:
raise ImportError(
f"'{top_level}' is not allowed in the current strictness mode"
)
return _original_import(name, *args, **kwargs)
builtins.__import__ = _restricted_import
_safe_builtins["__import__"] = _restricted_import
# Create safe namespace
_safe_globals = {
"__builtins__": _safe_builtins,
@ -127,45 +245,44 @@ def _build_safe_code(code: str) -> str:
# Execute code
exec(%r, _safe_globals)
''').strip()
return template % (BLOCKED_BUILTINS, code)
return template % (blocked_builtins, allowlist_repr, code)
def _check_dangerous_imports(code: str) -> list:
"""Check for blocked (blacklisted) imports"""
try:
tree = ast.parse(code)
except SyntaxError:
def _check_disallowed_imports(tree: ast.AST, allowlist_modules: Set[str] | None) -> List[str]:
"""Check for imports not in allowlist. None allowlist means everything is allowed."""
if allowlist_modules is None:
return []
dangerous = []
disallowed = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
module = alias.name.split(".")[0]
if module in BLOCKED_MODULES:
dangerous.append(module)
if module not in allowlist_modules:
disallowed.append(module)
elif isinstance(node, ast.ImportFrom):
if node.module:
module = node.module.split(".")[0]
if module in BLOCKED_MODULES:
dangerous.append(module)
if module not in allowlist_modules:
disallowed.append(module)
return dangerous
return list(dict.fromkeys(disallowed)) # deduplicate while preserving order
def _check_dangerous_calls(code: str) -> list:
"""Check for blocked function calls"""
try:
tree = ast.parse(code)
except SyntaxError:
return []
def _check_dangerous_calls(tree: ast.AST, blocked_builtins: Set[str]) -> List[str]:
"""Check for blocked function calls including attribute access patterns."""
dangerous = []
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in BLOCKED_BUILTINS:
# Direct call: eval("...")
if node.func.id in blocked_builtins:
dangerous.append(node.func.id)
elif isinstance(node.func, ast.Attribute):
# Attribute call: builtins.open(...) or os.system(...)
attr_name = node.func.attr
if attr_name in blocked_builtins:
dangerous.append(attr_name)
return dangerous
return list(dict.fromkeys(dangerous))

View File

@ -217,14 +217,68 @@ file_read({"path": "src/main.py", "project_id": "xxx"})
| 工具名称 | 描述 | 参数 |
|---------|------|------|
| `execute_python` | 在沙箱环境中执行 Python 代码 | `code`: Python 代码 |
| `execute_python` | 在沙箱环境中执行 Python 代码 | `code`: Python 代码<br>`strictness`: 可选严格等级lenient/standard/strict |
安全措施:
- 白名单模块限制
- 危险内置函数禁止
- 10 秒超时限制
- 无文件系统访问
- 无网络访问
**严格等级配置:**
| 等级 | 超时 | 策略 | 适用场景 |
|------|------|------|---------|
| `lenient` | 30s | 无限制,所有模块和内置函数均可使用 | 数据处理、需要完整标准库 |
| `standard` | 10s | 白名单机制,仅允许安全模块(默认) | 通用场景 |
| `strict` | 5s | 精简白名单,仅允许纯计算模块 | 基础计算 |
**standard 白名单模块:** json, csv, re, typing, collections, itertools, functools, operator, heapq, bisect, array, copy, pprint, enum, math, cmath, statistics, random, fractions, decimal, numbers, datetime, time, calendar, string, textwrap, unicodedata, difflib, base64, binascii, quopri, uu, html, xml.etree.ElementTree, dataclasses, hashlib, hmac, abc, contextlib, warnings, logging
**strict 白名单模块:** collections, itertools, functools, operator, array, copy, enum, math, cmath, numbers, fractions, decimal, random, statistics, string, textwrap, unicodedata, typing, dataclasses, abc, contextlib
**内置函数限制:**
- standard 禁止eval, exec, compile, \_\_import\_\_, open, input, globals, locals, vars, breakpoint, exit, quit, memoryview, bytearray
- strict 额外禁止dir, hasattr, getattr, setattr, delattr, type, isinstance, issubclass
**白名单扩展方式:**
1. **config.yml 配置(持久化):**
```yaml
code_execution:
default_strictness: standard
extra_allowed_modules:
standard: [numpy, pandas]
strict: [numpy]
```
2. **代码 API插件/运行时):**
```python
from backend.tools.builtin.code import register_extra_modules
register_extra_modules("standard", {"numpy", "pandas"})
register_extra_modules("strict", {"numpy"})
```
**使用示例:**
```python
# 默认 standard 模式(白名单限制)
execute_python({"code": "import json; print(json.dumps({'key': 'value'}))"})
# lenient 模式 - 无限制
execute_python({
"code": "import os; print(os.getcwd())",
"strictness": "lenient"
})
# strict 模式 - 仅纯计算
execute_python({
"code": "result = sum([1, 2, 3, 4, 5]); print(result)",
"strictness": "strict"
})
```
**安全措施:**
- standard/strict: 白名单模块限制(默认拒绝,仅显式允许)
- lenient: 无限制
- 危险内置函数按等级禁止
- 可配置超时限制5s/10s/30s
- subprocess 隔离执行
### 5.4 文件操作工具 (file)