"""辅助工具模块""" import shortuuid import jwt from datetime import datetime, timedelta from typing import Optional, Dict, Any from alcor.config import config def generate_id(prefix: str = "") -> str: """生成唯一ID""" unique_id = shortuuid.uuid() if prefix: return f"{prefix}_{unique_id}" return unique_id def hash_password(password: str) -> str: """密码哈希""" import bcrypt salt = bcrypt.gensalt() return bcrypt.hashpw(password.encode(), salt).decode() def verify_password(password: str, hashed: str) -> bool: """验证密码""" import bcrypt return bcrypt.checkpw(password.encode(), hashed.encode()) def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """创建JWT访问令牌""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(hours=24) to_encode.update({"exp": expire, "iat": datetime.utcnow()}) encoded_jwt = jwt.encode( to_encode, config.secret_key, algorithm="HS256" ) return encoded_jwt def decode_access_token(token: str) -> Optional[Dict[str, Any]]: """解码JWT令牌""" try: payload = jwt.decode( token, config.secret_key, algorithms=["HS256"] ) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def success_response(data: Any = None, message: str = "Success") -> Dict[str, Any]: """成功响应封装""" return { "success": True, "message": message, "data": data } def error_response(message: str, code: int = 400, errors: Any = None) -> Dict[str, Any]: """错误响应封装""" response = { "success": False, "message": message, "code": code } if errors: response["errors"] = errors return response def paginate(query, page: int = 1, page_size: int = 20): """分页辅助""" total = query.count() items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": items, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size }