Files
effekt-interface/app/api/utils/authMiddleware.py
2026-05-07 19:21:19 +08:00

153 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# encoding: UTF-8
import json
import uuid
from functools import wraps
import redis
from sqlalchemy.exc import OperationalError
from flask import request, g
from const import REDIS_URL
from common.apiResponse import ApiResponse
from ..service.userService import UserService
from ..service.rbacService import RbacService
from ..model.userModel import User
from common.sqlSession import SqlSession
TOKEN_PREFIX = 'effekt:token:'
TOKEN_CONTEXT_PREFIX = 'effekt:token:ctx:'
TOKEN_EXPIRE_SECONDS = 7200
TOKEN_REFRESH_THRESHOLD_SECONDS = 1800
TOKEN_CONTEXT_EXPIRE_SECONDS = 300
WHITELIST_PATHS = ['/it/api/auth/login', '/it/api/auth/register']
_redis_client = redis.from_url(REDIS_URL, decode_responses=True)
_redis_client.ping()
def create_token(user_id):
token = uuid.uuid4().hex
key = TOKEN_PREFIX + token
_redis_client.setex(key, TOKEN_EXPIRE_SECONDS, str(user_id))
return token, TOKEN_EXPIRE_SECONDS
def get_token_ttl(token):
return _redis_client.ttl(TOKEN_PREFIX + token)
def refresh_token_if_needed(token):
ttl = get_token_ttl(token)
if ttl != -2 and ttl < TOKEN_REFRESH_THRESHOLD_SECONDS:
_redis_client.expire(TOKEN_PREFIX + token, TOKEN_EXPIRE_SECONDS)
return TOKEN_EXPIRE_SECONDS
return ttl
def get_current_user_id(token):
user_id = _redis_client.get(TOKEN_PREFIX + token)
return int(user_id) if user_id else 0
def parse_token():
return request.headers.get('accessToken') or request.headers.get('accesstoken') or request.headers.get('Authorization', '').replace('Bearer ', '')
def get_token_context(token):
context_str = _redis_client.get(TOKEN_CONTEXT_PREFIX + token)
return json.loads(context_str) if context_str else None
def cache_token_context(token, user, role_ids, permission_codes):
_redis_client.setex(TOKEN_CONTEXT_PREFIX + token, TOKEN_CONTEXT_EXPIRE_SECONDS, json.dumps({
'user': user.to_dict(),
'role_ids': role_ids,
'permission_codes': permission_codes
}, default=str))
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
token = parse_token()
if not token:
return ApiResponse.build_failure(40004, msg='未登录或缺少token')
user_id = get_current_user_id(token)
if not user_id:
return ApiResponse.build_failure(40004, msg='token无效或已过期')
session = None
try:
token_context = get_token_context(token)
if token_context:
g.current_user_id = user_id
g.current_user = token_context.get('user', {})
g.current_role_ids = token_context.get('role_ids', [])
g.current_permission_codes = token_context.get('permission_codes', [])
g.current_token = token
g.current_token_ttl = refresh_token_if_needed(token)
return func(*args, **kwargs)
session = SqlSession()
user = UserService.get_by_id(session, User, user_id)
if not user:
return ApiResponse.build_failure(40011, msg='未查询到对应用户!')
role_ids = UserService.get_user_role_ids(session, user_id)
permission_codes = RbacService.get_role_permission_codes(session, role_ids)
cache_token_context(token, user, role_ids, permission_codes)
g.current_user_id = user_id
g.current_user = user
g.current_role_ids = role_ids
g.current_permission_codes = permission_codes
g.current_token = token
g.current_token_ttl = refresh_token_if_needed(token)
return func(*args, **kwargs)
except OperationalError:
return ApiResponse.build_failure(40008, msg='数据库连接超时,请稍后重试!')
finally:
if session:
session.close()
return wrapper
def has_permission(permission_code, permission_codes):
if not permission_code:
return True
if not permission_codes:
return False
if permission_code in permission_codes:
return True
if '*:*' in permission_codes:
return True
if ':' in permission_code:
module_code = permission_code.split(':', 1)[0]
if f'{module_code}:*' in permission_codes:
return True
if '_' in module_code:
parent_module_code = module_code.split('_', 1)[0]
if f'{parent_module_code}:*' in permission_codes:
return True
return False
def permission_required(permission_code):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not getattr(g, 'current_user_id', None):
return ApiResponse.build_failure(40004, msg='未登录或缺少token')
current_permission_codes = getattr(g, 'current_permission_codes', [])
if not has_permission(permission_code, current_permission_codes):
return ApiResponse.build_failure(40004, msg='无权限访问该接口!')
return func(*args, **kwargs)
return wrapper
return decorator
def should_skip_auth(path):
return path in WHITELIST_PATHS
def logout_token(token):
if token:
_redis_client.delete(TOKEN_PREFIX + token)
_redis_client.delete(TOKEN_CONTEXT_PREFIX + token)