本文目录导读:

我来介绍几种Python实现权限校验的常用方法:
装饰器方式实现权限校验
基础装饰器实现
from functools import wraps
from enum import Enum
class Role(Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
# 简单的权限检查装饰器
def require_permission(required_role):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 模拟获取当前用户角色
current_user_role = get_current_user_role()
if current_user_role == required_role.value or current_user_role == Role.ADMIN.value:
return func(*args, **kwargs)
else:
raise PermissionError(f"需要 {required_role.value} 权限才能执行此操作")
return wrapper
return decorator
def get_current_user_role():
# 模拟从会话或token中获取用户角色
return "admin" # 这里简化为固定返回值
@require_permission(Role.ADMIN)
def delete_user(user_id):
print(f"删除用户 {user_id}")
@require_permission(Role.USER)
def view_profile(user_id):
print(f"查看用户 {user_id} 资料")
# 测试
try:
delete_user(123) # admin权限,执行成功
view_profile(123) # user权限,执行成功
except PermissionError as e:
print(f"权限错误: {e}")
基于角色的权限管理(RBAC)
from functools import wraps
from typing import List, Dict, Set
class RBAC:
def __init__(self):
# 定义角色和权限映射
self.role_permissions: Dict[str, Set[str]] = {
"admin": {"create", "read", "update", "delete", "manage_users"},
"editor": {"create", "read", "update"},
"viewer": {"read"},
"guest": {"read"}
}
# 定义用户和角色映射
self.user_roles: Dict[str, str] = {}
def assign_role(self, username: str, role: str):
"""为用户分配角色"""
if role in self.role_permissions:
self.user_roles[username] = role
else:
raise ValueError(f"无效的角色: {role}")
def get_user_permissions(self, username: str) -> Set[str]:
"""获取用户的所有权限"""
role = self.user_roles.get(username)
return self.role_permissions.get(role, set())
def has_permission(self, username: str, permission: str) -> bool:
"""检查用户是否有特定权限"""
return permission in self.get_user_permissions(username)
# 创建权限检查实例
rbac = RBAC()
# 分配角色
rbac.assign_role("alice", "admin")
rbac.assign_role("bob", "editor")
rbac.assign_role("charlie", "viewer")
# 权限检查装饰器
def check_permission(permission: str):
def decorator(func):
@wraps(func)
def wrapper(user, *args, **kwargs):
if rbac.has_permission(user, permission):
return func(user, *args, **kwargs)
else:
raise PermissionError(f"用户 {user} 没有 {permission} 权限")
return wrapper
return decorator
# 使用示例
@check_permission("delete")
def delete_article(user, article_id):
print(f"{user} 删除了文章 {article_id}")
@check_permission("read")
def read_article(user, article_id):
print(f"{user} 读取了文章 {article_id}")
# 测试
try:
read_article("charlie", 1) # view可以读取
delete_article("alice", 2) # admin可以删除
delete_article("bob", 3) # editor不可以删除,会报错
except PermissionError as e:
print(f"权限错误: {e}")
Flask Web应用权限校验
from flask import Flask, request, jsonify, session, g
from functools import wraps
import jwt
import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# 定义权限常量
PERMISSIONS = {
'view_products': ['user', 'admin'],
'edit_products': ['admin'],
'delete_products': ['admin'],
'manage_users': ['admin']
}
class PermissionDenied(Exception):
pass
@app.errorhandler(PermissionDenied)
def handle_permission_error(error):
return jsonify({'error': str(error)}), 403
def login_required(f):
"""登录认证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
raise PermissionDenied('需要登录')
try:
# 验证JWT token
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
g.current_user = payload
except jwt.ExpiredSignatureError:
raise PermissionDenied('Token已过期')
except jwt.InvalidTokenError:
raise PermissionDenied('无效的Token')
return f(*args, **kwargs)
return decorated_function
def permission_required(permission):
"""权限校验装饰器"""
def decorator(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
user_role = g.current_user.get('role')
allowed_roles = PERMISSIONS.get(permission, [])
if user_role not in allowed_roles:
raise PermissionDenied(f'需要 {permission} 权限')
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
# 这里应该有真实的用户验证逻辑
if username == 'admin' and password == 'admin123':
# 生成JWT token
token = jwt.encode({
'user': username,
'role': 'admin',
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, app.config['SECRET_KEY'])
return jsonify({'token': token})
return jsonify({'error': '用户名或密码错误'}), 401
@app.route('/products')
@permission_required('view_products')
def get_products():
"""获取产品列表(需要查看权限)"""
return jsonify({
'products': [
{'id': 1, 'name': '产品A'},
{'id': 2, 'name': '产品B'}
]
})
@app.route('/products/<int:product_id>', methods=['DELETE'])
@permission_required('delete_products')
def delete_product(product_id):
"""删除产品(需要删除权限)"""
return jsonify({'message': f'产品 {product_id} 已删除'})
# Flask应用运行
if __name__ == '__main__':
app.run(debug=True)
细粒度权限控制(ABAC - 基于属性的访问控制)
from functools import wraps
from typing import Dict, Any, Callable
import re
class ABAC:
"""基于属性的访问控制"""
def __init__(self):
self.policies: list = []
def add_policy(self, name: str, condition: Callable, effect: str = 'allow'):
"""
添加访问策略
:param name: 策略名称
:param condition: 条件函数,接收subject, resource, action, environment
:param effect: 允许(allow)或拒绝(deny)
"""
self.policies.append({
'name': name,
'condition': condition,
'effect': effect
})
def check_access(self, subject: Dict, resource: Dict, action: str, environment: Dict = None) -> bool:
"""
检查访问权限
:param subject: 访问主体(用户)
:param resource: 被访问的资源
:param action: 要执行的操作
:param environment: 环境信息
"""
if environment is None:
environment = {}
# 默认拒绝
result = False
for policy in self.policies:
if policy['condition'](subject, resource, action, environment):
if policy['effect'] == 'allow':
result = True
elif policy['effect'] == 'deny':
return False
return result
# 创建ABAC实例
abac = ABAC()
# 添加策略:管理员可以执行所有操作
abac.add_policy(
'admin_full_access',
lambda subject, resource, action, env: subject.get('role') == 'admin'
)
# 添加策略:用户只能操作自己的资源
abac.add_policy(
'user_own_resources',
lambda subject, resource, action, env: (
subject.get('role') == 'user' and
resource.get('owner') == subject.get('username')
)
)
# 添加策略:工作时间可以访问非敏感数据
abac.add_policy(
'working_hours_access',
lambda subject, resource, action, env: (
subject.get('role') == 'user' and
resource.get('sensitivity') == 'low' and
env.get('time', '').startswith('09:')
)
)
def require_abac(action: str):
"""ABAC装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取当前用户信息(实际应用从上下文获取)
current_user = {
'username': 'alice',
'role': 'user',
'department': 'IT'
}
# 资源信息
resource = {
'id': kwargs.get('resource_id', 1),
'type': 'document',
'owner': 'alice', # 模拟资源所有者
'sensitivity': 'high'
}
# 环境信息
environment = {
'time': '14:30',
'ip_address': '192.168.1.100'
}
if abac.check_access(current_user, resource, action, environment):
return func(*args, **kwargs)
else:
raise PermissionError("访问被拒绝")
return wrapper
return decorator
@require_abac('read')
def read_document(resource_id):
print(f"读取文档 {resource_id}")
@require_abac('write')
def write_document(resource_id):
print(f"编辑文档 {resource_id}")
# 测试
try:
read_document(1) # 用户alice读取自己的文档(所有者)
write_document(2) # 写操作权限检查
except PermissionError as e:
print(f"权限错误: {e}")
完整的权限校验系统示例
import json
from datetime import datetime
from functools import wraps
class PermissionSystem:
"""完整的权限管理系统"""
def __init__(self):
# 权限数据库(实际应用中存储在数据库中)
self.users = {}
self.roles = {}
self.permissions = {}
# 初始化默认角色和权限
self._init_defaults()
def _init_defaults(self):
"""初始化默认配置"""
# 定义权限
self.permissions = {
'user:create': '创建用户',
'user:read': '读取用户信息',
'user:update': '更新用户信息',
'user:delete': '删除用户',
'report:generate': '生成报告',
'report:export': '导出报告',
'system:config': '系统配置'
}
# 定义角色和权限映射
self.roles = {
'super_admin': {
'permissions': list(self.permissions.keys()),
'description': '超级管理员'
},
'admin': {
'permissions': ['user:read', 'user:create', 'user:update', 'report:generate', 'report:export'],
'description': '管理员'
},
'operator': {
'permissions': ['user:read', 'report:generate'],
'description': '操作员'
},
'viewer': {
'permissions': ['user:read'],
'description': '查看者'
}
}
def create_user(self, username: str, password: str, role: str = 'viewer'):
"""创建用户"""
if username in self.users:
raise ValueError(f"用户 {username} 已存在")
if role not in self.roles:
raise ValueError(f"无效的角色: {role}")
self.users[username] = {
'username': username,
'password': password, # 实际应用应该加密存储
'role': role,
'created_at': datetime.now(),
'is_active': True,
'login_attempts': 0,
'locked_until': None
}
return True
def authenticate(self, username: str, password: str) -> bool:
"""用户认证"""
user = self.users.get(username)
if not user:
return False
if not user['is_active']:
return False
# 检查账户是否被锁定
if user['locked_until'] and user['locked_until'] > datetime.now():
return False
if user['password'] == password:
# 重置登录尝试次数
user['login_attempts'] = 0
return True
else:
# 增加登录尝试次数
user['login_attempts'] += 1
# 5次失败后锁定账户
if user['login_attempts'] >= 5:
user['locked_until'] = datetime.now().replace(hour=datetime.now().hour + 1)
return False
def check_permission(self, username: str, permission: str) -> bool:
"""检查用户权限"""
user = self.users.get(username)
if not user or not user['is_active']:
return False
role = user['role']
role_permissions = self.roles.get(role, {}).get('permissions', [])
return permission in role_permissions
def require_permission(self, *permissions):
"""权限检查装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取当前用户(从session或token中)
current_user = get_current_user()
if not current_user:
raise PermissionError('用户未登录')
# 检查所有需要的权限
for permission in permissions:
if not self.check_permission(current_user, permission):
raise PermissionError(f'用户 {current_user} 没有 {permission} 权限')
return func(*args, **kwargs)
return wrapper
return decorator
def get_user_permissions(self, username: str) -> list:
"""获取用户所有权限"""
user = self.users.get(username)
if not user:
return []
role = user['role']
return self.roles.get(role, {}).get('permissions', [])
# 创建权限系统实例
ps = PermissionSystem()
# 创建测试用户
ps.create_user('admin', 'admin123', 'admin')
ps.create_user('operator', 'op123', 'operator')
ps.create_user('viewer', 'view123', 'viewer')
def get_current_user():
"""模拟获取当前登录用户"""
return 'operator' # 模拟当前用户
# 使用权限系统
@ps.require_permission('user:read', 'report:generate')
def view_reports():
print("查看报告中...")
@ps.require_permission('user:delete')
def delete_user(username):
print(f"删除用户 {username}")
# 测试
try:
view_reports() # operator可以查看报告
delete_user('test') # operator没有删除权限,会报错
except PermissionError as e:
print(f"权限校验失败: {e}")
# 列出用户权限
print(f"\n操作员权限列表: {ps.get_user_permissions('operator')}")
print(f"管理员权限列表: {ps.get_user_permissions('admin')}")
最佳实践建议
- 使用JWT或Session管理用户认证状态
- 权限校验应该在多个层面实现:前端、后端API、数据库层
- 避免硬编码权限:使用配置文件或数据库存储
- 实现日志审计:记录所有权限校验操作
- 考虑性能:使用缓存减少数据库查询
- 支持动态权限:允许运行时修改权限配置
- 安全考虑:防止权限提升攻击
这些示例展示了不同场景下的权限校验实现方式,你可以根据具体需求选择合适的方案。