diff --git a/config.yaml b/config.yaml
index fc17190..8e636e1 100644
--- a/config.yaml
+++ b/config.yaml
@@ -9,6 +9,10 @@ database:
type: sqlite
url: sqlite:///./chat.db
+workspace:
+ root: ./workspaces
+ auto_create: true
+
llm:
provider: deepseek
api_key: ${DEEPSEEK_API_KEY}
diff --git a/dashboard/src/style.css b/dashboard/src/style.css
index f0df474..da40f43 100644
--- a/dashboard/src/style.css
+++ b/dashboard/src/style.css
@@ -336,14 +336,14 @@ body {
.message-bubble.user .message-container {
align-items: flex-end;
width: fit-content;
- max-width: 70%;
+ max-width: 80%;
}
.message-bubble.assistant .message-container {
align-items: flex-start;
flex: 1 1 auto;
- width: 90%;
- max-width: 90%;
+ width: 80%;
+ max-width: 80%;
min-width: 0;
}
@@ -354,7 +354,7 @@ body {
.avatar {
width: 32px;
height: 32px;
- border-radius: 8px;
+ border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
diff --git a/dashboard/src/utils/api.js b/dashboard/src/utils/api.js
index 12029cf..b703224 100644
--- a/dashboard/src/utils/api.js
+++ b/dashboard/src/utils/api.js
@@ -157,7 +157,9 @@ export const authAPI = {
login: (data) => api.post('/auth/login', data),
register: (data) => api.post('/auth/register', data),
logout: () => api.post('/auth/logout'),
- getMe: () => api.get('/auth/me')
+ getMe: () => api.get('/auth/me'),
+ listUsers: () => api.get('/auth/users'),
+ updateUserPermission: (userId, data) => api.put(`/auth/users/${userId}`, data)
}
// ============ 会话接口 ============
diff --git a/dashboard/src/views/ConversationView.vue b/dashboard/src/views/ConversationView.vue
index 7978734..4e5f41f 100644
--- a/dashboard/src/views/ConversationView.vue
+++ b/dashboard/src/views/ConversationView.vue
@@ -80,7 +80,7 @@
/>
@@ -258,7 +301,7 @@ import { authAPI } from '../utils/api.js'
import { useRouter } from 'vue-router'
const router = useRouter()
-const { logout } = useAuth()
+const { logout, user: currentUser } = useAuth()
// 夜间模式
const isDark = ref(localStorage.getItem('theme') === 'dark')
@@ -293,6 +336,10 @@ const showUserModal = ref(false)
const savingUser = ref(false)
const userFormError = ref('')
const loadingUser = ref(false)
+const isAdmin = ref(false)
+const users = ref([])
+const loadingUsers = ref(false)
+const usersError = ref('')
const modelSettings = ref({
default_provider: null,
@@ -493,9 +540,54 @@ const saveDefaultProvider = async () => {
} catch (e) { alert('设置默认 Provider 失败: ' + e.message) }
}
+const fetchUsers = async () => {
+ loadingUsers.value = true
+ usersError.value = ''
+ try {
+ const res = await authAPI.listUsers()
+ if (res.success) {
+ users.value = res.data.users || []
+ } else if (res.message === 'Admin permission required') {
+ isAdmin.value = false
+ } else {
+ usersError.value = res.message || '获取用户列表失败'
+ }
+ } catch (e) {
+ isAdmin.value = false
+ } finally {
+ loadingUsers.value = false
+ }
+}
+
+const updateUserPermission = async (user) => {
+ try {
+ const res = await authAPI.updateUserPermission(user.id, { permission_level: user.permission_level })
+ if (!res.success) {
+ alert(res.message || '更新权限失败')
+ }
+ } catch (e) {
+ alert('更新权限失败: ' + e.message)
+ }
+}
+
+const getPermissionName = (level) => {
+ const names = { 1: '只读', 2: '写入', 3: '执行', 4: '管理员' }
+ return names[level] || '未知'
+}
+
onMounted(() => {
fetchUserInfo()
fetchProviders()
+ fetchUsers()
+
+ // 检查是否是管理员
+ const userData = localStorage.getItem('user')
+ if (userData) {
+ try {
+ const user = JSON.parse(userData)
+ isAdmin.value = user.permission_level === 4
+ } catch (e) {}
+ }
const savedSettings = localStorage.getItem('modelSettings')
if (savedSettings) {
@@ -626,4 +718,8 @@ textarea { width: 100%; padding: 0.75rem; border: 1px solid var(--border-input);
.spinner-small { width: 24px; height: 24px; border: 3px solid var(--border-light); border-top-color: var(--accent-primary); border-radius: 50%; animation: spin 1s linear infinite; margin: 0 auto 0.5rem; }
.spinner { width: 40px; height: 40px; border: 3px solid var(--border-light); border-top-color: var(--accent-primary); border-radius: 50%; animation: spin 1s linear infinite; margin: 0 auto 1rem; }
+/* 权限选择器 */
+.permission-select { min-width: 140px; }
+.permission-label { font-size: 0.8rem; color: var(--text-secondary); }
+
diff --git a/luxx/__init__.py b/luxx/__init__.py
index 27ebe70..bd1e09d 100644
--- a/luxx/__init__.py
+++ b/luxx/__init__.py
@@ -29,12 +29,12 @@ async def lifespan(app: FastAPI):
if not default_user:
default_user = User(
username="admin",
- password_hash=hash_password("admin123"),
+ password_hash=hash_password("admin"),
role="admin"
)
db.add(default_user)
db.commit()
- logger.info("Default admin user created: admin / admin123")
+ logger.info("Default admin user created: admin / admin")
finally:
db.close()
diff --git a/luxx/config.py b/luxx/config.py
index 6e5d083..7d0e372 100644
--- a/luxx/config.py
+++ b/luxx/config.py
@@ -124,6 +124,15 @@ class Config:
@property
def log_date_format(self) -> str:
return self.get("logging.date_format", "%Y-%m-%d %H:%M:%S")
+
+ # Workspace configuration
+ @property
+ def workspace_root(self) -> str:
+ return self.get("workspace.root", "./workspaces")
+
+ @property
+ def workspace_auto_create(self) -> bool:
+ return self.get("workspace.auto_create", True)
# Global configuration instance
diff --git a/luxx/models.py b/luxx/models.py
index fdf4c19..e2198a8 100644
--- a/luxx/models.py
+++ b/luxx/models.py
@@ -75,6 +75,8 @@ class User(Base):
email: Mapped[Optional[str]] = mapped_column(String(120), unique=True, nullable=True)
password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
role: Mapped[str] = mapped_column(String(20), default="user")
+ permission_level: Mapped[int] = mapped_column(Integer, default=1) # 1=READ_ONLY, 2=WRITE, 3=EXECUTE, 4=ADMIN
+ workspace_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) # 用户工作空间路径
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=local_now)
@@ -89,6 +91,8 @@ class User(Base):
"username": self.username,
"email": self.email,
"role": self.role,
+ "permission_level": self.permission_level,
+ "workspace_path": self.workspace_path,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else None
}
diff --git a/luxx/routes/auth.py b/luxx/routes/auth.py
index 6eac2cc..00daaae 100644
--- a/luxx/routes/auth.py
+++ b/luxx/routes/auth.py
@@ -41,6 +41,12 @@ class UserResponse(BaseModel):
username: str
email: str | None
role: str
+ permission_level: int
+
+
+class UserPermissionUpdate(BaseModel):
+ """User permission update model"""
+ permission_level: int
class TokenResponse(BaseModel):
@@ -66,6 +72,13 @@ def get_current_user(
return user
+def require_admin(current_user: User = Depends(get_current_user)) -> User:
+ """Require admin role"""
+ if current_user.permission_level < 4:
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required")
+ return current_user
+
+
@router.post("/register", response_model=dict)
def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""User registration"""
@@ -130,3 +143,27 @@ def logout(current_user: User = Depends(get_current_user)):
def get_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return success_response(data=current_user.to_dict())
+
+
+@router.get("/users", response_model=dict)
+def get_users(admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
+ """Get all users (admin only)"""
+ users = db.query(User).all()
+ return success_response(data={"users": [u.to_dict() for u in users]})
+
+
+@router.put("/users/{user_id}", response_model=dict)
+def update_user(user_id: int, data: UserPermissionUpdate, admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
+ """Update user permission level (admin only)"""
+ user = db.query(User).filter(User.id == user_id).first()
+ if not user:
+ return error_response("User not found", 404)
+
+ # Validate permission level
+ if data.permission_level < 1 or data.permission_level > 4:
+ return error_response("Invalid permission level (1-4)", 400)
+
+ user.permission_level = data.permission_level
+ db.commit()
+
+ return success_response(data=user.to_dict(), message="User permission updated")
diff --git a/luxx/routes/messages.py b/luxx/routes/messages.py
index 9381c3e..fac223d 100644
--- a/luxx/routes/messages.py
+++ b/luxx/routes/messages.py
@@ -138,12 +138,18 @@ async def stream_message(
conversation.updated_at = datetime.now()
db.commit()
+ workspace = current_user.workspace_path if current_user.workspace_path else None
+
async def event_generator():
async for sse_str in chat_service.stream_response(
conversation=conversation,
user_message=data.content,
thinking_enabled=data.thinking_enabled,
- enabled_tools=data.enabled_tools
+ enabled_tools=data.enabled_tools,
+ user_id=current_user.id,
+ username=current_user.username,
+ workspace=workspace,
+ user_permission_level=current_user.permission_level
):
# Chat service returns raw SSE strings (including done event)
yield sse_str
diff --git a/luxx/services/chat.py b/luxx/services/chat.py
index 9c4f538..718bc05 100644
--- a/luxx/services/chat.py
+++ b/luxx/services/chat.py
@@ -99,7 +99,11 @@ class ChatService:
conversation: Conversation,
user_message: str,
thinking_enabled: bool = False,
- enabled_tools: list = None
+ enabled_tools: list = None,
+ user_id: int = None,
+ username: str = None,
+ workspace: str = None,
+ user_permission_level: int = 1
) -> AsyncGenerator[Dict[str, str], None]:
"""
Streaming response generator
@@ -326,8 +330,14 @@ class ChatService:
yield _sse_event("process_step", {"step": call_step})
# Execute tools
+ tool_context = {
+ "workspace": workspace,
+ "user_id": user_id,
+ "username": username,
+ "user_permission_level": user_permission_level
+ }
tool_results = self.tool_executor.process_tool_calls_parallel(
- tool_calls_list, {}
+ tool_calls_list, tool_context
)
# Yield tool_result steps - use unified step-{index} format
diff --git a/luxx/tools/builtin/__init__.py b/luxx/tools/builtin/__init__.py
index 916333c..03d886f 100644
--- a/luxx/tools/builtin/__init__.py
+++ b/luxx/tools/builtin/__init__.py
@@ -3,5 +3,6 @@
from luxx.tools.builtin import crawler
from luxx.tools.builtin import code
from luxx.tools.builtin import data
+from luxx.tools.builtin import file
-__all__ = ["crawler", "code", "data"]
+__all__ = ["crawler", "code", "data", "file"]
diff --git a/luxx/tools/builtin/file.py b/luxx/tools/builtin/file.py
new file mode 100644
index 0000000..d33880f
--- /dev/null
+++ b/luxx/tools/builtin/file.py
@@ -0,0 +1,413 @@
+"""File operation tools with workspace path injection"""
+import os
+import hashlib
+from pathlib import Path
+from typing import Dict, Any, Optional
+
+from luxx.tools.factory import tool
+from luxx.tools.core import ToolContext, CommandPermission
+from luxx.config import config
+
+
+def get_user_workspace(user_id: int) -> str:
+ """Get user's workspace path, create if not exists
+
+ Directory name is based on user_id hash for privacy:
+ {workspace_root}/{hash_of_user_id}
+ """
+ workspace_root = Path(config.workspace_root).resolve()
+
+ # Generate hash from user_id for privacy
+ user_hash = hashlib.sha256(str(user_id).encode()).hexdigest()[:16]
+
+ # User-specific workspace: {root}/{hash}
+ user_workspace = workspace_root / user_hash
+
+ if config.workspace_auto_create and not user_workspace.exists():
+ user_workspace.mkdir(parents=True, exist_ok=True)
+
+ return str(user_workspace)
+
+
+def safe_path(file_path: str, workspace: str) -> Optional[Path]:
+ """
+ Validate and resolve file path within workspace.
+
+ Returns resolved Path if valid, None if path escapes workspace (security check).
+ """
+ try:
+ workspace_path = Path(workspace).resolve()
+ requested_path = (workspace_path / file_path).resolve()
+
+ # Check if resolved path is within workspace
+ if requested_path.is_relative_to(workspace_path):
+ return requested_path
+ else:
+ return None # Path escapes workspace
+ except (ValueError, OSError):
+ return None
+
+
+def get_workspace_from_context(context: ToolContext) -> Optional[str]:
+ """Get workspace from context, auto-create if needed"""
+ if context.workspace:
+ return context.workspace
+ if context.user_id is not None:
+ return get_user_workspace(context.user_id)
+ return None
+
+
+@tool(
+ name="file_read",
+ description="Read content from a file within the user's workspace",
+ parameters={
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Relative path of the file to read (relative to workspace root)"
+ },
+ "max_lines": {
+ "type": "integer",
+ "description": "Maximum number of lines to read",
+ "default": 100
+ }
+ },
+ "required": ["path"]
+ },
+ required_params=["path"],
+ category="file",
+ required_permission=CommandPermission.READ_ONLY
+)
+def file_read(arguments: Dict[str, Any], context: ToolContext = None):
+ """
+ Read file content from user's workspace.
+
+ Args:
+ path: Relative path within workspace
+ max_lines: Maximum lines to read (default 100)
+
+ Returns:
+ {"content": str, "lines": int, "path": str}
+ """
+ workspace = get_workspace_from_context(context) if context else None
+ file_path = arguments.get("path", "")
+ max_lines = arguments.get("max_lines", 100)
+
+ if not workspace:
+ return {"error": "Workspace not configured"}
+
+ # Security check
+ safe_file_path = safe_path(file_path, workspace)
+ if not safe_file_path:
+ return {"error": f"Access denied: path '{file_path}' is outside workspace"}
+
+ # Check if file exists
+ if not safe_file_path.exists():
+ return {"error": f"File not found: {file_path}"}
+
+ if not safe_file_path.is_file():
+ return {"error": f"Path is not a file: {file_path}"}
+
+ try:
+ with open(safe_file_path, 'r', encoding='utf-8') as f:
+ lines = []
+ for i, line in enumerate(f):
+ if i >= max_lines:
+ break
+ lines.append(line.rstrip('\n'))
+
+ content = '\n'.join(lines)
+ total_lines = sum(1 for _ in open(safe_file_path, 'r', encoding='utf-8'))
+
+ return {
+ "content": content,
+ "lines": len(lines),
+ "total_lines": total_lines,
+ "path": file_path,
+ "truncated": total_lines > len(lines)
+ }
+ except UnicodeDecodeError:
+ return {"error": "File is not text-readable (binary file)"}
+ except Exception as e:
+ return {"error": f"Read error: {str(e)}"}
+
+
+@tool(
+ name="file_write",
+ description="Write content to a file within the user's workspace",
+ parameters={
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Relative path of the file to write (relative to workspace root)"
+ },
+ "content": {
+ "type": "string",
+ "description": "Content to write to the file"
+ },
+ "append": {
+ "type": "boolean",
+ "description": "Append to existing file instead of overwriting",
+ "default": False
+ }
+ },
+ "required": ["path", "content"]
+ },
+ required_params=["path", "content"],
+ category="file",
+ required_permission=CommandPermission.WRITE
+)
+def file_write(arguments: Dict[str, Any], context: ToolContext = None):
+ """
+ Write content to file in user's workspace.
+
+ Args:
+ path: Relative path within workspace
+ content: Content to write
+ append: Whether to append (default False = overwrite)
+
+ Returns:
+ {"bytes_written": int, "path": str}
+ """
+ workspace = get_workspace_from_context(context) if context else None
+ file_path = arguments.get("path", "")
+ content = arguments.get("content", "")
+ append = arguments.get("append", False)
+
+ if not workspace:
+ return {"error": "Workspace not configured"}
+
+ # Security check
+ safe_file_path = safe_path(file_path, workspace)
+ if not safe_file_path:
+ return {"error": f"Access denied: path '{file_path}' is outside workspace"}
+
+ try:
+ # Create parent directories if needed
+ safe_file_path.parent.mkdir(parents=True, exist_ok=True)
+
+ mode = 'a' if append else 'w'
+ with open(safe_file_path, mode, encoding='utf-8') as f:
+ bytes_written = f.write(content)
+
+ return {
+ "bytes_written": bytes_written,
+ "path": file_path,
+ "action": "appended" if append else "written"
+ }
+ except Exception as e:
+ return {"error": f"Write error: {str(e)}"}
+
+
+@tool(
+ name="file_list",
+ description="List files and directories within the user's workspace",
+ parameters={
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Relative path of directory to list (relative to workspace root)",
+ "default": "."
+ },
+ "recursive": {
+ "type": "boolean",
+ "description": "List recursively",
+ "default": False
+ }
+ },
+ "required": []
+ },
+ required_params=[],
+ category="file",
+ required_permission=CommandPermission.READ_ONLY
+)
+def file_list(arguments: Dict[str, Any], context: ToolContext = None):
+ """
+ List directory contents in user's workspace.
+
+ Args:
+ path: Relative directory path (default ".")
+ recursive: Whether to list recursively
+
+ Returns:
+ {"files": list, "dirs": list}
+ """
+ workspace = get_workspace_from_context(context) if context else None
+ dir_path = arguments.get("path", ".")
+ recursive = arguments.get("recursive", False)
+
+ if not workspace:
+ return {"error": "Workspace not configured"}
+
+ # Security check
+ safe_dir_path = safe_path(dir_path, workspace)
+ if not safe_dir_path:
+ return {"error": f"Access denied: path '{dir_path}' is outside workspace"}
+
+ if not safe_dir_path.exists():
+ return {"error": f"Directory not found: {dir_path}"}
+
+ if not safe_dir_path.is_dir():
+ return {"error": f"Path is not a directory: {dir_path}"}
+
+ try:
+ files = []
+ dirs = []
+
+ if recursive:
+ for root, dirnames, filenames in os.walk(safe_dir_path):
+ root_rel = os.path.relpath(root, safe_dir_path)
+ if root_rel == '.':
+ root_rel = ''
+ for d in dirnames:
+ dirs.append(os.path.join(root_rel, d) if root_rel else d)
+ for f in filenames:
+ files.append(os.path.join(root_rel, f) if root_rel else f)
+ else:
+ for item in safe_dir_path.iterdir():
+ if item.is_dir():
+ dirs.append(item.name)
+ else:
+ files.append(item.name)
+
+ return {
+ "files": sorted(files),
+ "dirs": sorted(dirs),
+ "path": dir_path
+ }
+ except Exception as e:
+ return {"error": f"List error: {str(e)}"}
+
+
+@tool(
+ name="file_exists",
+ description="Check if a file or directory exists within the user's workspace",
+ parameters={
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Relative path to check (relative to workspace root)"
+ }
+ },
+ "required": ["path"]
+ },
+ required_params=["path"],
+ category="file",
+ required_permission=CommandPermission.READ_ONLY
+)
+def file_exists(arguments: Dict[str, Any], context: ToolContext = None):
+ """
+ Check if path exists in user's workspace.
+
+ Returns:
+ {"exists": bool, "type": "file"|"dir"|"none"}
+ """
+ workspace = get_workspace_from_context(context) if context else None
+ file_path = arguments.get("path", "")
+
+ if not workspace:
+ return {"error": "Workspace not configured"}
+
+ # Security check
+ safe_file_path = safe_path(file_path, workspace)
+ if not safe_file_path:
+ return {"exists": False, "type": "none", "path": file_path}
+
+ if not safe_file_path.exists():
+ return {"exists": False, "type": "none", "path": file_path}
+
+ item_type = "dir" if safe_file_path.is_dir() else "file"
+ return {"exists": True, "type": item_type, "path": file_path}
+
+
+@tool(
+ name="file_grep",
+ description="Search for lines matching a pattern in a file (read-only grep)",
+ parameters={
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Relative path of the file to search (relative to workspace root)"
+ },
+ "pattern": {
+ "type": "string",
+ "description": "Regex pattern to search for"
+ },
+ "max_results": {
+ "type": "integer",
+ "description": "Maximum number of matching lines to return",
+ "default": 100
+ },
+ "case_sensitive": {
+ "type": "boolean",
+ "description": "Case sensitive search",
+ "default": False
+ }
+ },
+ "required": ["path", "pattern"]
+ },
+ required_params=["path", "pattern"],
+ category="file",
+ required_permission=CommandPermission.READ_ONLY
+)
+def file_grep(arguments: Dict[str, Any], context: ToolContext = None):
+ """
+ Search for lines matching a pattern in a file.
+
+ Returns:
+ {"matches": [{"line": int, "content": str}], "path": str, "count": int}
+ """
+ import re
+
+ workspace = get_workspace_from_context(context) if context else None
+ file_path = arguments.get("path", "")
+ pattern = arguments.get("pattern", "")
+ max_results = arguments.get("max_results", 100)
+ case_sensitive = arguments.get("case_sensitive", False)
+
+ if not workspace:
+ return {"error": "Workspace not configured"}
+
+ # Security check
+ safe_file_path = safe_path(file_path, workspace)
+ if not safe_file_path:
+ return {"error": f"Access denied: path '{file_path}' is outside workspace"}
+
+ if not safe_file_path.exists():
+ return {"error": f"File not found: {file_path}"}
+
+ if not safe_file_path.is_file():
+ return {"error": f"Path is not a file: {file_path}"}
+
+ try:
+ flags = 0 if case_sensitive else re.IGNORECASE
+ regex = re.compile(pattern, flags)
+
+ matches = []
+ with open(safe_file_path, 'r', encoding='utf-8') as f:
+ for line_num, line in enumerate(f, 1):
+ if regex.search(line):
+ matches.append({
+ "line": line_num,
+ "content": line.rstrip('\n')
+ })
+ if len(matches) >= max_results:
+ break
+
+ return {
+ "matches": matches,
+ "path": file_path,
+ "count": len(matches),
+ "pattern": pattern
+ }
+ except re.error as e:
+ return {"error": f"Invalid regex pattern: {str(e)}"}
+ except UnicodeDecodeError:
+ return {"error": "File is not text-readable (binary file)"}
+ except Exception as e:
+ return {"error": f"Search error: {str(e)}"}
diff --git a/luxx/tools/core.py b/luxx/tools/core.py
index ef959fa..0b5f24c 100644
--- a/luxx/tools/core.py
+++ b/luxx/tools/core.py
@@ -1,6 +1,24 @@
"""Tool system core module"""
from dataclasses import dataclass, field
from typing import Callable, Any, Dict, List, Optional
+from enum import IntEnum
+
+
+class CommandPermission(IntEnum):
+ """Command permission level - higher value means more permissions"""
+ READ_ONLY = 1
+ WRITE = 2
+ EXECUTE = 3
+ ADMIN = 4
+
+
+@dataclass
+class ToolContext:
+ """Tool execution context - explicitly passed to tool execution"""
+ workspace: Optional[str] = None
+ user_id: Optional[int] = None
+ username: Optional[str] = None
+ extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -11,6 +29,7 @@ class ToolDefinition:
parameters: Dict[str, Any]
handler: Callable
category: str = "general"
+ required_permission: CommandPermission = CommandPermission.READ_ONLY
def to_openai_format(self) -> Dict[str, Any]:
"""Convert to OpenAI format"""
@@ -77,14 +96,27 @@ class ToolRegistry:
if t.category == category
]
- def execute(self, name: str, arguments: dict) -> Dict[str, Any]:
- """Execute tool"""
+ def execute(self, name: str, arguments: dict, context: ToolContext = None) -> Dict[str, Any]:
+ """Execute tool with optional context and automatic permission check"""
tool = self.get(name)
if not tool:
return {"success": False, "error": f"Tool '{name}' not found"}
+ # Automatic permission check (transparent to tool function)
+ if context is not None and context.user_id is not None:
+ user_level = context.extra.get("user_permission_level", CommandPermission.READ_ONLY)
+ if user_level < tool.required_permission:
+ return {
+ "success": False,
+ "error": f"Permission denied: requires {tool.required_permission.name}, you have {user_level.name}"
+ }
+
try:
- result = tool.handler(arguments)
+ # Pass context to handler if it accepts it
+ if context is not None:
+ result = tool.handler(arguments, context=context)
+ else:
+ result = tool.handler(arguments)
if isinstance(result, ToolResult):
return result.to_dict()
return result
diff --git a/luxx/tools/executor.py b/luxx/tools/executor.py
index 0e7ac60..e8e697d 100644
--- a/luxx/tools/executor.py
+++ b/luxx/tools/executor.py
@@ -3,7 +3,7 @@ import json
import time
from typing import List, Dict, Any, Optional
-from luxx.tools.core import registry, ToolResult
+from luxx.tools.core import registry, ToolResult, ToolContext
class ToolExecutor:
@@ -63,6 +63,17 @@ class ToolExecutor:
context: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Process tool calls sequentially"""
+ # Build ToolContext from context dict (includes user_permission_level)
+ tool_ctx = ToolContext(
+ workspace=context.get("workspace"),
+ user_id=context.get("user_id"),
+ username=context.get("username"),
+ extra={
+ "user_permission_level": context.get("user_permission_level", 1),
+ **(context.get("extra", {}))
+ }
+ )
+
results = []
for call in tool_calls:
@@ -75,15 +86,17 @@ class ToolExecutor:
except json.JSONDecodeError:
args = {}
- # Check cache
+ # Check cache (include context in cache key for file operations)
cache_key = self._make_cache_key(name, args)
+ if tool_ctx.workspace:
+ cache_key = f"{cache_key}:{tool_ctx.workspace}"
cached = self._get_cached(cache_key)
if cached is not None:
result = cached
else:
- # Execute tool
- result = registry.execute(name, args)
+ # Execute tool with context
+ result = registry.execute(name, args, context=tool_ctx)
self._set_cached(cache_key, result)
# Record call
@@ -103,6 +116,17 @@ class ToolExecutor:
if len(tool_calls) <= 1:
return self.process_tool_calls(tool_calls, context)
+ # Build ToolContext from context dict (includes user_permission_level)
+ tool_ctx = ToolContext(
+ workspace=context.get("workspace"),
+ user_id=context.get("user_id"),
+ username=context.get("username"),
+ extra={
+ "user_permission_level": context.get("user_permission_level", 1),
+ **(context.get("extra", {}))
+ }
+ )
+
try:
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -121,28 +145,29 @@ class ToolExecutor:
# Check cache
cache_key = self._make_cache_key(name, args)
+ if tool_ctx.workspace:
+ cache_key = f"{cache_key}:{tool_ctx.workspace}"
cached = self._get_cached(cache_key)
if cached is not None:
futures[call_id] = (name, args, cached)
else:
- # Submit task
- future = executor.submit(registry.execute, name, args)
- futures[future] = (call_id, name, args)
+ # Submit task with context
+ future = executor.submit(registry.execute, name, args, context=tool_ctx)
+ futures[future] = (call_id, name, args, cache_key)
results = []
for future in as_completed(futures.keys()):
if future in futures:
- call_id, name, args = futures[future]
+ item = futures[future]
+ if len(item) == 3:
+ call_id, name, args = item
+ cache_key = self._make_cache_key(name, args)
+ else:
+ call_id, name, args, cache_key = item
result = future.result()
- self._set_cached(self._make_cache_key(name, args), result)
- self._record_call(name, args, result)
- results.append(self._create_tool_result(call_id, name, result))
- else:
- call_id, name, args = futures[future]
- result = future.result()
- self._set_cached(self._make_cache_key(name, args), result)
+ self._set_cached(cache_key, result)
self._record_call(name, args, result)
results.append(self._create_tool_result(call_id, name, result))
diff --git a/luxx/tools/factory.py b/luxx/tools/factory.py
index dd9bde0..69b5695 100644
--- a/luxx/tools/factory.py
+++ b/luxx/tools/factory.py
@@ -1,6 +1,6 @@
"""Tool decorator factory with unified result wrapping"""
from typing import Callable, Any, Dict, Optional, List
-from luxx.tools.core import ToolDefinition, ToolResult, registry
+from luxx.tools.core import ToolDefinition, ToolResult, registry, CommandPermission
import traceback
import logging
@@ -12,7 +12,8 @@ def tool(
description: str,
parameters: Dict[str, Any],
category: str = "general",
- required_params: Optional[List[str]] = None
+ required_params: Optional[List[str]] = None,
+ required_permission: CommandPermission = CommandPermission.READ_ONLY
):
"""
Tool registration decorator with UNIFED result wrapping
@@ -60,7 +61,7 @@ def tool(
- Exception → {"success": false, "data": null, "error": "..."}
"""
def decorator(func: Callable) -> Callable:
- def wrapped_handler(arguments: Dict[str, Any]) -> ToolResult:
+ def wrapped_handler(arguments: Dict[str, Any], context=None) -> ToolResult:
try:
# 1. Validate required params
if required_params:
@@ -68,8 +69,13 @@ def tool(
if param not in arguments or arguments[param] is None:
return ToolResult.fail(f"Missing required parameter: {param}")
- # 2. Execute handler
- result = func(arguments)
+ # 2. Execute handler - pass context only if function accepts it
+ import inspect
+ sig = inspect.signature(func)
+ if 'context' in sig.parameters:
+ result = func(arguments, context=context)
+ else:
+ result = func(arguments)
# 3. Auto-wrap result
return _auto_wrap(result)
@@ -84,7 +90,8 @@ def tool(
description=description,
parameters=parameters,
handler=wrapped_handler,
- category=category
+ category=category,
+ required_permission=required_permission
)
registry.register(tool_def)
return func