Luxx/luxx/routes/auth.py

133 lines
3.7 KiB
Python

"""Authentication routes"""
from datetime import timedelta
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from pydantic import BaseModel
from luxx.database import get_db
from luxx.models import User
from luxx.utils.helpers import (
hash_password,
verify_password,
create_access_token,
decode_access_token,
success_response,
error_response
)
router = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
class UserRegister(BaseModel):
"""User registration model"""
username: str
email: str | None = None
password: str
class UserLogin(BaseModel):
"""User login model"""
username: str
password: str
class UserResponse(BaseModel):
"""User response model"""
id: int
username: str
email: str | None
role: str
class TokenResponse(BaseModel):
"""Token response model"""
access_token: str
token_type: str
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get current user"""
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
@router.post("/register", response_model=dict)
def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""User registration"""
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
return error_response("Username already exists", 400)
if user_data.email:
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
return error_response("Email already registered", 400)
password_hash = hash_password(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
password_hash=password_hash
)
db.add(user)
db.commit()
db.refresh(user)
return success_response(
data={"id": user.id, "username": user.username},
message="Registration successful"
)
@router.post("/login", response_model=dict)
def login(user_data: UserLogin, db: Session = Depends(get_db)):
"""User login"""
user = db.query(User).filter(User.username == user_data.username).first()
if not user or not verify_password(user_data.password, user.password_hash or ""):
return error_response("Invalid username or password", 401)
if not user.is_active:
return error_response("User account is disabled", 403)
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(days=7)
)
return success_response(
data={
"access_token": access_token,
"token_type": "bearer",
"user": user.to_dict()
},
message="Login successful"
)
@router.post("/logout")
def logout(current_user: User = Depends(get_current_user)):
"""User logout (client should delete token)"""
return success_response(message="Logout successful")
@router.get("/me", response_model=dict)
def get_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return success_response(data=current_user.to_dict())