Luxx/luxx/routes/auth.py

170 lines
5.1 KiB
Python

"""Authentication routes"""
from datetime import timedelta
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from pydantic import BaseModel
from luxx.database import get_db
from luxx.models import User
from luxx.utils.helpers import (
hash_password,
verify_password,
create_access_token,
decode_access_token,
success_response,
error_response
)
router = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
class UserRegister(BaseModel):
"""User registration model"""
username: str
email: str | None = None
password: str
class UserLogin(BaseModel):
"""User login model"""
username: str
password: str
class UserResponse(BaseModel):
"""User response model"""
id: int
username: str
email: str | None
role: str
permission_level: int
class UserPermissionUpdate(BaseModel):
"""User permission update model"""
permission_level: int
class TokenResponse(BaseModel):
"""Token response model"""
access_token: str
token_type: str
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get current user"""
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
def require_admin(current_user: User = Depends(get_current_user)) -> User:
"""Require admin role"""
if current_user.permission_level < 4:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required")
return current_user
@router.post("/register", response_model=dict)
def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""User registration"""
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
return error_response("Username already exists", 400)
if user_data.email:
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
return error_response("Email already registered", 400)
password_hash = hash_password(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
password_hash=password_hash
)
db.add(user)
db.commit()
db.refresh(user)
return success_response(
data={"id": user.id, "username": user.username},
message="Registration successful"
)
@router.post("/login", response_model=dict)
def login(user_data: UserLogin, db: Session = Depends(get_db)):
"""User login"""
user = db.query(User).filter(User.username == user_data.username).first()
if not user or not verify_password(user_data.password, user.password_hash or ""):
return error_response("Invalid username or password", 401)
if not user.is_active:
return error_response("User account is disabled", 403)
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(days=7)
)
return success_response(
data={
"access_token": access_token,
"token_type": "bearer",
"user": user.to_dict()
},
message="Login successful"
)
@router.post("/logout")
def logout(current_user: User = Depends(get_current_user)):
"""User logout (client should delete token)"""
return success_response(message="Logout successful")
@router.get("/me", response_model=dict)
def get_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return success_response(data=current_user.to_dict())
@router.get("/users", response_model=dict)
def get_users(admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
"""Get all users (admin only)"""
users = db.query(User).all()
return success_response(data={"users": [u.to_dict() for u in users]})
@router.put("/users/{user_id}", response_model=dict)
def update_user(user_id: int, data: UserPermissionUpdate, admin_user: User = Depends(require_admin), db: Session = Depends(get_db)):
"""Update user permission level (admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
return error_response("User not found", 404)
# Validate permission level
if data.permission_level < 1 or data.permission_level > 4:
return error_response("Invalid permission level (1-4)", 400)
user.permission_level = data.permission_level
db.commit()
return success_response(data=user.to_dict(), message="User permission updated")