"""File operation tools with workspace path injection""" import os import hashlib from pathlib import Path from typing import Dict, Any, Optional from luxx.tools.factory import tool from luxx.tools.core import ToolContext, CommandPermission from luxx.config import config def get_user_workspace(user_id: int) -> str: """Get user's workspace path, create if not exists Directory name is based on user_id hash for privacy: {workspace_root}/{hash_of_user_id} """ workspace_root = Path(config.workspace_root).resolve() # Generate hash from user_id for privacy user_hash = hashlib.sha256(str(user_id).encode()).hexdigest()[:16] # User-specific workspace: {root}/{hash} user_workspace = workspace_root / user_hash if config.workspace_auto_create and not user_workspace.exists(): user_workspace.mkdir(parents=True, exist_ok=True) return str(user_workspace) def safe_path(file_path: str, workspace: str) -> Optional[Path]: """ Validate and resolve file path within workspace. Returns resolved Path if valid, None if path escapes workspace (security check). """ try: workspace_path = Path(workspace).resolve() requested_path = (workspace_path / file_path).resolve() # Check if resolved path is within workspace if requested_path.is_relative_to(workspace_path): return requested_path else: return None # Path escapes workspace except (ValueError, OSError): return None def get_workspace_from_context(context: ToolContext) -> Optional[str]: """Get workspace from context, auto-create if needed""" if context.workspace: return context.workspace if context.user_id is not None: return get_user_workspace(context.user_id) return None @tool( name="file_read", description="Read content from a file within the user's workspace", parameters={ "type": "object", "properties": { "path": { "type": "string", "description": "Relative path of the file to read (relative to workspace root)" }, "max_lines": { "type": "integer", "description": "Maximum number of lines to read", "default": 100 } }, "required": ["path"] }, required_params=["path"], category="file", required_permission=CommandPermission.READ_ONLY ) def file_read(arguments: Dict[str, Any], context: ToolContext = None): """ Read file content from user's workspace. Args: path: Relative path within workspace max_lines: Maximum lines to read (default 100) Returns: {"content": str, "lines": int, "path": str} """ workspace = get_workspace_from_context(context) if context else None file_path = arguments.get("path", "") max_lines = arguments.get("max_lines", 100) if not workspace: return {"error": "Workspace not configured"} # Security check safe_file_path = safe_path(file_path, workspace) if not safe_file_path: return {"error": f"Access denied: path '{file_path}' is outside workspace"} # Check if file exists if not safe_file_path.exists(): return {"error": f"File not found: {file_path}"} if not safe_file_path.is_file(): return {"error": f"Path is not a file: {file_path}"} try: with open(safe_file_path, 'r', encoding='utf-8') as f: lines = [] for i, line in enumerate(f): if i >= max_lines: break lines.append(line.rstrip('\n')) content = '\n'.join(lines) total_lines = sum(1 for _ in open(safe_file_path, 'r', encoding='utf-8')) return { "content": content, "lines": len(lines), "total_lines": total_lines, "path": file_path, "truncated": total_lines > len(lines) } except UnicodeDecodeError: return {"error": "File is not text-readable (binary file)"} except Exception as e: return {"error": f"Read error: {str(e)}"} @tool( name="file_write", description="Write content to a file within the user's workspace", parameters={ "type": "object", "properties": { "path": { "type": "string", "description": "Relative path of the file to write (relative to workspace root)" }, "content": { "type": "string", "description": "Content to write to the file" }, "append": { "type": "boolean", "description": "Append to existing file instead of overwriting", "default": False } }, "required": ["path", "content"] }, required_params=["path", "content"], category="file", required_permission=CommandPermission.WRITE ) def file_write(arguments: Dict[str, Any], context: ToolContext = None): """ Write content to file in user's workspace. Args: path: Relative path within workspace content: Content to write append: Whether to append (default False = overwrite) Returns: {"bytes_written": int, "path": str} """ workspace = get_workspace_from_context(context) if context else None file_path = arguments.get("path", "") content = arguments.get("content", "") append = arguments.get("append", False) if not workspace: return {"error": "Workspace not configured"} # Security check safe_file_path = safe_path(file_path, workspace) if not safe_file_path: return {"error": f"Access denied: path '{file_path}' is outside workspace"} try: # Create parent directories if needed safe_file_path.parent.mkdir(parents=True, exist_ok=True) mode = 'a' if append else 'w' with open(safe_file_path, mode, encoding='utf-8') as f: bytes_written = f.write(content) return { "bytes_written": bytes_written, "path": file_path, "action": "appended" if append else "written" } except Exception as e: return {"error": f"Write error: {str(e)}"} @tool( name="file_list", description="List files and directories within the user's workspace", parameters={ "type": "object", "properties": { "path": { "type": "string", "description": "Relative path of directory to list (relative to workspace root)", "default": "." }, "recursive": { "type": "boolean", "description": "List recursively", "default": False } }, "required": [] }, required_params=[], category="file", required_permission=CommandPermission.READ_ONLY ) def file_list(arguments: Dict[str, Any], context: ToolContext = None): """ List directory contents in user's workspace. Args: path: Relative directory path (default ".") recursive: Whether to list recursively Returns: {"files": list, "dirs": list} """ workspace = get_workspace_from_context(context) if context else None dir_path = arguments.get("path", ".") recursive = arguments.get("recursive", False) if not workspace: return {"error": "Workspace not configured"} # Security check safe_dir_path = safe_path(dir_path, workspace) if not safe_dir_path: return {"error": f"Access denied: path '{dir_path}' is outside workspace"} if not safe_dir_path.exists(): return {"error": f"Directory not found: {dir_path}"} if not safe_dir_path.is_dir(): return {"error": f"Path is not a directory: {dir_path}"} try: files = [] dirs = [] if recursive: for root, dirnames, filenames in os.walk(safe_dir_path): root_rel = os.path.relpath(root, safe_dir_path) if root_rel == '.': root_rel = '' for d in dirnames: dirs.append(os.path.join(root_rel, d) if root_rel else d) for f in filenames: files.append(os.path.join(root_rel, f) if root_rel else f) else: for item in safe_dir_path.iterdir(): if item.is_dir(): dirs.append(item.name) else: files.append(item.name) return { "files": sorted(files), "dirs": sorted(dirs), "path": dir_path } except Exception as e: return {"error": f"List error: {str(e)}"} @tool( name="file_exists", description="Check if a file or directory exists within the user's workspace", parameters={ "type": "object", "properties": { "path": { "type": "string", "description": "Relative path to check (relative to workspace root)" } }, "required": ["path"] }, required_params=["path"], category="file", required_permission=CommandPermission.READ_ONLY ) def file_exists(arguments: Dict[str, Any], context: ToolContext = None): """ Check if path exists in user's workspace. Returns: {"exists": bool, "type": "file"|"dir"|"none"} """ workspace = get_workspace_from_context(context) if context else None file_path = arguments.get("path", "") if not workspace: return {"error": "Workspace not configured"} # Security check safe_file_path = safe_path(file_path, workspace) if not safe_file_path: return {"exists": False, "type": "none", "path": file_path} if not safe_file_path.exists(): return {"exists": False, "type": "none", "path": file_path} item_type = "dir" if safe_file_path.is_dir() else "file" return {"exists": True, "type": item_type, "path": file_path} @tool( name="file_grep", description="Search for lines matching a pattern in a file (read-only grep)", parameters={ "type": "object", "properties": { "path": { "type": "string", "description": "Relative path of the file to search (relative to workspace root)" }, "pattern": { "type": "string", "description": "Regex pattern to search for" }, "max_results": { "type": "integer", "description": "Maximum number of matching lines to return", "default": 100 }, "case_sensitive": { "type": "boolean", "description": "Case sensitive search", "default": False } }, "required": ["path", "pattern"] }, required_params=["path", "pattern"], category="file", required_permission=CommandPermission.READ_ONLY ) def file_grep(arguments: Dict[str, Any], context: ToolContext = None): """ Search for lines matching a pattern in a file. Returns: {"matches": [{"line": int, "content": str}], "path": str, "count": int} """ import re workspace = get_workspace_from_context(context) if context else None file_path = arguments.get("path", "") pattern = arguments.get("pattern", "") max_results = arguments.get("max_results", 100) case_sensitive = arguments.get("case_sensitive", False) if not workspace: return {"error": "Workspace not configured"} # Security check safe_file_path = safe_path(file_path, workspace) if not safe_file_path: return {"error": f"Access denied: path '{file_path}' is outside workspace"} if not safe_file_path.exists(): return {"error": f"File not found: {file_path}"} if not safe_file_path.is_file(): return {"error": f"Path is not a file: {file_path}"} try: flags = 0 if case_sensitive else re.IGNORECASE regex = re.compile(pattern, flags) matches = [] with open(safe_file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): if regex.search(line): matches.append({ "line": line_num, "content": line.rstrip('\n') }) if len(matches) >= max_results: break return { "matches": matches, "path": file_path, "count": len(matches), "pattern": pattern } except re.error as e: return {"error": f"Invalid regex pattern: {str(e)}"} except UnicodeDecodeError: return {"error": "File is not text-readable (binary file)"} except Exception as e: return {"error": f"Search error: {str(e)}"}