Luxx/luxx/tools/builtin/file.py

414 lines
13 KiB
Python

"""File operation tools with workspace path injection"""
import os
import hashlib
from pathlib import Path
from typing import Dict, Any, Optional
from luxx.tools.factory import tool
from luxx.tools.core import ToolContext, CommandPermission
from luxx.config import config
def get_user_workspace(user_id: int) -> str:
"""Get user's workspace path, create if not exists
Directory name is based on user_id hash for privacy:
{workspace_root}/{hash_of_user_id}
"""
workspace_root = Path(config.workspace_root).resolve()
# Generate hash from user_id for privacy
user_hash = hashlib.sha256(str(user_id).encode()).hexdigest()[:16]
# User-specific workspace: {root}/{hash}
user_workspace = workspace_root / user_hash
if config.workspace_auto_create and not user_workspace.exists():
user_workspace.mkdir(parents=True, exist_ok=True)
return str(user_workspace)
def safe_path(file_path: str, workspace: str) -> Optional[Path]:
"""
Validate and resolve file path within workspace.
Returns resolved Path if valid, None if path escapes workspace (security check).
"""
try:
workspace_path = Path(workspace).resolve()
requested_path = (workspace_path / file_path).resolve()
# Check if resolved path is within workspace
if requested_path.is_relative_to(workspace_path):
return requested_path
else:
return None # Path escapes workspace
except (ValueError, OSError):
return None
def get_workspace_from_context(context: ToolContext) -> Optional[str]:
"""Get workspace from context, auto-create if needed"""
if context.workspace:
return context.workspace
if context.user_id is not None:
return get_user_workspace(context.user_id)
return None
@tool(
name="file_read",
description="Read content from a file within the user's workspace",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path of the file to read (relative to workspace root)"
},
"max_lines": {
"type": "integer",
"description": "Maximum number of lines to read",
"default": 100
}
},
"required": ["path"]
},
required_params=["path"],
category="file",
required_permission=CommandPermission.READ_ONLY
)
def file_read(arguments: Dict[str, Any], context: ToolContext = None):
"""
Read file content from user's workspace.
Args:
path: Relative path within workspace
max_lines: Maximum lines to read (default 100)
Returns:
{"content": str, "lines": int, "path": str}
"""
workspace = get_workspace_from_context(context) if context else None
file_path = arguments.get("path", "")
max_lines = arguments.get("max_lines", 100)
if not workspace:
return {"error": "Workspace not configured"}
# Security check
safe_file_path = safe_path(file_path, workspace)
if not safe_file_path:
return {"error": f"Access denied: path '{file_path}' is outside workspace"}
# Check if file exists
if not safe_file_path.exists():
return {"error": f"File not found: {file_path}"}
if not safe_file_path.is_file():
return {"error": f"Path is not a file: {file_path}"}
try:
with open(safe_file_path, 'r', encoding='utf-8') as f:
lines = []
for i, line in enumerate(f):
if i >= max_lines:
break
lines.append(line.rstrip('\n'))
content = '\n'.join(lines)
total_lines = sum(1 for _ in open(safe_file_path, 'r', encoding='utf-8'))
return {
"content": content,
"lines": len(lines),
"total_lines": total_lines,
"path": file_path,
"truncated": total_lines > len(lines)
}
except UnicodeDecodeError:
return {"error": "File is not text-readable (binary file)"}
except Exception as e:
return {"error": f"Read error: {str(e)}"}
@tool(
name="file_write",
description="Write content to a file within the user's workspace",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path of the file to write (relative to workspace root)"
},
"content": {
"type": "string",
"description": "Content to write to the file"
},
"append": {
"type": "boolean",
"description": "Append to existing file instead of overwriting",
"default": False
}
},
"required": ["path", "content"]
},
required_params=["path", "content"],
category="file",
required_permission=CommandPermission.WRITE
)
def file_write(arguments: Dict[str, Any], context: ToolContext = None):
"""
Write content to file in user's workspace.
Args:
path: Relative path within workspace
content: Content to write
append: Whether to append (default False = overwrite)
Returns:
{"bytes_written": int, "path": str}
"""
workspace = get_workspace_from_context(context) if context else None
file_path = arguments.get("path", "")
content = arguments.get("content", "")
append = arguments.get("append", False)
if not workspace:
return {"error": "Workspace not configured"}
# Security check
safe_file_path = safe_path(file_path, workspace)
if not safe_file_path:
return {"error": f"Access denied: path '{file_path}' is outside workspace"}
try:
# Create parent directories if needed
safe_file_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if append else 'w'
with open(safe_file_path, mode, encoding='utf-8') as f:
bytes_written = f.write(content)
return {
"bytes_written": bytes_written,
"path": file_path,
"action": "appended" if append else "written"
}
except Exception as e:
return {"error": f"Write error: {str(e)}"}
@tool(
name="file_list",
description="List files and directories within the user's workspace",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path of directory to list (relative to workspace root)",
"default": "."
},
"recursive": {
"type": "boolean",
"description": "List recursively",
"default": False
}
},
"required": []
},
required_params=[],
category="file",
required_permission=CommandPermission.READ_ONLY
)
def file_list(arguments: Dict[str, Any], context: ToolContext = None):
"""
List directory contents in user's workspace.
Args:
path: Relative directory path (default ".")
recursive: Whether to list recursively
Returns:
{"files": list, "dirs": list}
"""
workspace = get_workspace_from_context(context) if context else None
dir_path = arguments.get("path", ".")
recursive = arguments.get("recursive", False)
if not workspace:
return {"error": "Workspace not configured"}
# Security check
safe_dir_path = safe_path(dir_path, workspace)
if not safe_dir_path:
return {"error": f"Access denied: path '{dir_path}' is outside workspace"}
if not safe_dir_path.exists():
return {"error": f"Directory not found: {dir_path}"}
if not safe_dir_path.is_dir():
return {"error": f"Path is not a directory: {dir_path}"}
try:
files = []
dirs = []
if recursive:
for root, dirnames, filenames in os.walk(safe_dir_path):
root_rel = os.path.relpath(root, safe_dir_path)
if root_rel == '.':
root_rel = ''
for d in dirnames:
dirs.append(os.path.join(root_rel, d) if root_rel else d)
for f in filenames:
files.append(os.path.join(root_rel, f) if root_rel else f)
else:
for item in safe_dir_path.iterdir():
if item.is_dir():
dirs.append(item.name)
else:
files.append(item.name)
return {
"files": sorted(files),
"dirs": sorted(dirs),
"path": dir_path
}
except Exception as e:
return {"error": f"List error: {str(e)}"}
@tool(
name="file_exists",
description="Check if a file or directory exists within the user's workspace",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path to check (relative to workspace root)"
}
},
"required": ["path"]
},
required_params=["path"],
category="file",
required_permission=CommandPermission.READ_ONLY
)
def file_exists(arguments: Dict[str, Any], context: ToolContext = None):
"""
Check if path exists in user's workspace.
Returns:
{"exists": bool, "type": "file"|"dir"|"none"}
"""
workspace = get_workspace_from_context(context) if context else None
file_path = arguments.get("path", "")
if not workspace:
return {"error": "Workspace not configured"}
# Security check
safe_file_path = safe_path(file_path, workspace)
if not safe_file_path:
return {"exists": False, "type": "none", "path": file_path}
if not safe_file_path.exists():
return {"exists": False, "type": "none", "path": file_path}
item_type = "dir" if safe_file_path.is_dir() else "file"
return {"exists": True, "type": item_type, "path": file_path}
@tool(
name="file_grep",
description="Search for lines matching a pattern in a file (read-only grep)",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path of the file to search (relative to workspace root)"
},
"pattern": {
"type": "string",
"description": "Regex pattern to search for"
},
"max_results": {
"type": "integer",
"description": "Maximum number of matching lines to return",
"default": 100
},
"case_sensitive": {
"type": "boolean",
"description": "Case sensitive search",
"default": False
}
},
"required": ["path", "pattern"]
},
required_params=["path", "pattern"],
category="file",
required_permission=CommandPermission.READ_ONLY
)
def file_grep(arguments: Dict[str, Any], context: ToolContext = None):
"""
Search for lines matching a pattern in a file.
Returns:
{"matches": [{"line": int, "content": str}], "path": str, "count": int}
"""
import re
workspace = get_workspace_from_context(context) if context else None
file_path = arguments.get("path", "")
pattern = arguments.get("pattern", "")
max_results = arguments.get("max_results", 100)
case_sensitive = arguments.get("case_sensitive", False)
if not workspace:
return {"error": "Workspace not configured"}
# Security check
safe_file_path = safe_path(file_path, workspace)
if not safe_file_path:
return {"error": f"Access denied: path '{file_path}' is outside workspace"}
if not safe_file_path.exists():
return {"error": f"File not found: {file_path}"}
if not safe_file_path.is_file():
return {"error": f"Path is not a file: {file_path}"}
try:
flags = 0 if case_sensitive else re.IGNORECASE
regex = re.compile(pattern, flags)
matches = []
with open(safe_file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if regex.search(line):
matches.append({
"line": line_num,
"content": line.rstrip('\n')
})
if len(matches) >= max_results:
break
return {
"matches": matches,
"path": file_path,
"count": len(matches),
"pattern": pattern
}
except re.error as e:
return {"error": f"Invalid regex pattern: {str(e)}"}
except UnicodeDecodeError:
return {"error": "File is not text-readable (binary file)"}
except Exception as e:
return {"error": f"Search error: {str(e)}"}