nanoClaw/backend/tools/builtin/code.py

308 lines
12 KiB
Python

"""Safe code execution tool with sandboxing and strictness levels"""
import ast
import subprocess
import sys
import tempfile
import textwrap
from typing import Dict, List, Set
from backend.tools.factory import tool
from backend.config import config
from backend.tools.docker_executor import DockerExecutor
# Strictness profiles configuration
# - lenient: no restrictions at all
# - standard: allowlist based, only safe modules permitted
# - strict: minimal allowlist, only pure computation modules
STRICTNESS_PROFILES: Dict[str, dict] = {
"lenient": {
"timeout": 30,
"description": "No restrictions, all modules and builtins allowed",
"allowlist_modules": None, # None means all allowed
"blocked_builtins": set(),
},
"standard": {
"timeout": 10,
"description": "Allowlist based, only safe modules and builtins permitted",
"allowlist_modules": {
# Data types & serialization
"json", "csv", "re", "typing",
# Data structures
"collections", "itertools", "functools", "operator", "heapq", "bisect",
"array", "copy", "pprint", "enum",
# Math & numbers
"math", "cmath", "statistics", "random", "fractions", "decimal", "numbers",
# Date & time
"datetime", "time", "calendar",
# Text processing
"string", "textwrap", "unicodedata", "difflib",
# Data formats
"base64", "binascii", "quopri", "uu", "html", "xml.etree.ElementTree",
# Functional & concurrency helpers
"dataclasses", "hashlib", "hmac",
# Common utilities
"abc", "contextlib", "warnings", "logging",
},
"blocked_builtins": {
"eval", "exec", "compile", "__import__",
"open", "input", "globals", "locals", "vars",
"breakpoint", "exit", "quit",
"memoryview", "bytearray",
"getattr", "setattr", "delattr",
},
},
"strict": {
"timeout": 5,
"description": "Minimal allowlist, only pure computation modules",
"allowlist_modules": {
# Pure data structures
"collections", "itertools", "functools", "operator",
"array", "copy", "enum",
# Pure math
"math", "cmath", "numbers", "fractions", "decimal",
"random", "statistics",
# Pure text
"string", "textwrap", "unicodedata",
# Type hints
"typing",
# Utilities (no I/O)
"dataclasses", "abc", "contextlib",
},
"blocked_builtins": {
"eval", "exec", "compile", "__import__",
"open", "input", "globals", "locals", "vars",
"breakpoint", "exit", "quit",
"memoryview", "bytearray",
"dir", "hasattr", "getattr", "setattr", "delattr",
"type", "isinstance", "issubclass",
},
},
}
def register_extra_modules(strictness: str, modules: Set[str] | List[str]) -> None:
"""Register additional modules to a strictness level's allowlist.
Args:
strictness: One of "lenient", "standard", "strict".
modules: Module names to add to the allowlist.
"""
if strictness not in STRICTNESS_PROFILES:
raise ValueError(f"Invalid strictness level: {strictness}. Must be one of: {', '.join(STRICTNESS_PROFILES.keys())}")
profile = STRICTNESS_PROFILES[strictness]
if profile.get("allowlist_modules") is None:
return # lenient mode allows everything, nothing to add
profile["allowlist_modules"].update(modules)
# Apply extra modules from config.yml on module load
for _level, _mods in config.code_execution.extra_allowed_modules.items():
if isinstance(_mods, list) and _mods:
register_extra_modules(_level, _mods)
@tool(
name="execute_python",
description="Execute Python code in a sandboxed environment with configurable strictness levels (lenient/standard/strict). "
"Default: 'standard' mode - balances security and flexibility with 10s timeout. "
"Use 'lenient' for data processing tasks (30s timeout, more modules allowed). "
"Use 'strict' for basic calculations only (5s timeout, minimal module access).",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute. Available modules depend on strictness level."
},
"strictness": {
"type": "string",
"enum": ["lenient", "standard", "strict"],
"description": "Optional. Security strictness level (default: standard). "
"lenient: 30s timeout, most modules allowed; "
"standard: 10s timeout, balanced security; "
"strict: 5s timeout, minimal permissions."
}
},
"required": ["code"]
},
category="code"
)
def execute_python(arguments: dict) -> dict:
"""
Execute Python code safely with sandboxing.
Security measures:
1. Lenient mode: no restrictions
2. Standard/strict mode: allowlist based module restrictions
3. Configurable blocked builtins based on strictness level
4. Timeout limit (5s/10s/30s based on strictness)
5. Subprocess isolation
"""
code = arguments["code"]
strictness = arguments.get("strictness", config.code_execution.default_strictness)
# Validate strictness level
if strictness not in STRICTNESS_PROFILES:
return {
"success": False,
"error": f"Invalid strictness level: {strictness}. Must be one of: {', '.join(STRICTNESS_PROFILES.keys())}"
}
# Get profile configuration
profile = STRICTNESS_PROFILES[strictness]
allowlist_modules = profile.get("allowlist_modules")
blocked_builtins = profile["blocked_builtins"]
timeout = profile["timeout"]
# Parse and validate code syntax first
try:
tree = ast.parse(code)
except SyntaxError as e:
return {"success": False, "error": f"Syntax error in code: {e}"}
# Security check: detect disallowed imports
disallowed_imports = _check_disallowed_imports(tree, allowlist_modules)
if disallowed_imports:
return {
"success": False,
"error": f"Blocked imports: {', '.join(disallowed_imports)}. These modules are not allowed in '{strictness}' mode."
}
# Security check: detect dangerous function calls (skip if no restrictions)
if blocked_builtins:
dangerous_calls = _check_dangerous_calls(tree, blocked_builtins)
if dangerous_calls:
return {
"success": False,
"error": f"Blocked functions: {', '.join(dangerous_calls)}. These functions are not allowed in '{strictness}' mode."
}
# Choose execution backend
backend = config.code_execution.backend
if backend == "docker":
# Use Docker executor
executor = DockerExecutor(
image=config.code_execution.docker_image,
network=config.code_execution.docker_network,
user=config.code_execution.docker_user,
memory_limit=config.code_execution.docker_memory_limit,
cpu_shares=config.code_execution.docker_cpu_shares,
)
result = executor.execute(
code=code,
timeout=timeout,
strictness=strictness,
)
# Docker executor already returns the same dict structure
return result
else:
# Default subprocess backend
try:
result = subprocess.run(
[sys.executable, "-c", _build_safe_code(code, blocked_builtins, allowlist_modules)],
capture_output=True,
timeout=timeout,
cwd=tempfile.gettempdir(),
encoding="utf-8",
env={ # Clear environment variables
"PYTHONIOENCODING": "utf-8",
}
)
if result.returncode == 0:
return {
"success": True,
"output": result.stdout,
"strictness": strictness,
"timeout": timeout
}
else:
return {"success": False, "error": result.stderr or "Execution failed"}
except subprocess.TimeoutExpired:
return {"success": False, "error": f"Execution timeout ({timeout}s limit in '{strictness}' mode)"}
except Exception as e:
return {"success": False, "error": f"Execution error: {str(e)}"}
def _build_safe_code(code: str, blocked_builtins: Set[str],
allowlist_modules: Set[str] | None = None) -> str:
"""Build sandboxed code with restricted globals and runtime import hook."""
allowlist_repr = "None" if allowlist_modules is None else repr(allowlist_modules)
template = textwrap.dedent('''
import builtins
# Block dangerous builtins
_BLOCKED = %r
_safe_builtins = {k: getattr(builtins, k) for k in dir(builtins) if k not in _BLOCKED}
# Runtime import hook for allowlist enforcement
_ALLOWLIST = %s
if _ALLOWLIST is not None:
_original_import = builtins.__import__
def _restricted_import(name, *args, **kwargs):
top_level = name.split(".")[0]
if top_level not in _ALLOWLIST:
raise ImportError(
f"'{top_level}' is not allowed in the current strictness mode"
)
return _original_import(name, *args, **kwargs)
builtins.__import__ = _restricted_import
_safe_builtins["__import__"] = _restricted_import
# Create safe namespace
_safe_globals = {
"__builtins__": _safe_builtins,
"__name__": "__main__",
}
# Execute code
exec(%r, _safe_globals)
''').strip()
return template % (blocked_builtins, allowlist_repr, code)
def _check_disallowed_imports(tree: ast.AST, allowlist_modules: Set[str] | None) -> List[str]:
"""Check for imports not in allowlist. None allowlist means everything is allowed."""
if allowlist_modules is None:
return []
disallowed = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
module = alias.name.split(".")[0]
if module not in allowlist_modules:
disallowed.append(module)
elif isinstance(node, ast.ImportFrom):
if node.module:
module = node.module.split(".")[0]
if module not in allowlist_modules:
disallowed.append(module)
return list(dict.fromkeys(disallowed)) # deduplicate while preserving order
def _check_dangerous_calls(tree: ast.AST, blocked_builtins: Set[str]) -> List[str]:
"""Check for blocked function calls including attribute access patterns."""
dangerous = []
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
# Direct call: eval("...")
if node.func.id in blocked_builtins:
dangerous.append(node.func.id)
elif isinstance(node.func, ast.Attribute):
# Attribute call: builtins.open(...) or os.system(...)
attr_name = node.func.attr
if attr_name in blocked_builtins:
dangerous.append(attr_name)
return list(dict.fromkeys(dangerous))