Python案例如何实现权限校验?

wen python案例 15

本文目录导读:

Python案例如何实现权限校验?

  1. 装饰器方式实现权限校验
  2. 基于角色的权限管理(RBAC)
  3. Flask Web应用权限校验
  4. 细粒度权限控制(ABAC - 基于属性的访问控制)
  5. 完整的权限校验系统示例
  6. 最佳实践建议

我来介绍几种Python实现权限校验的常用方法:

装饰器方式实现权限校验

基础装饰器实现

from functools import wraps
from enum import Enum
class Role(Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"
# 简单的权限检查装饰器
def require_permission(required_role):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 模拟获取当前用户角色
            current_user_role = get_current_user_role()
            if current_user_role == required_role.value or current_user_role == Role.ADMIN.value:
                return func(*args, **kwargs)
            else:
                raise PermissionError(f"需要 {required_role.value} 权限才能执行此操作")
        return wrapper
    return decorator
def get_current_user_role():
    # 模拟从会话或token中获取用户角色
    return "admin"  # 这里简化为固定返回值
@require_permission(Role.ADMIN)
def delete_user(user_id):
    print(f"删除用户 {user_id}")
@require_permission(Role.USER)
def view_profile(user_id):
    print(f"查看用户 {user_id} 资料")
# 测试
try:
    delete_user(123)  # admin权限,执行成功
    view_profile(123)  # user权限,执行成功
except PermissionError as e:
    print(f"权限错误: {e}")

基于角色的权限管理(RBAC)

from functools import wraps
from typing import List, Dict, Set
class RBAC:
    def __init__(self):
        # 定义角色和权限映射
        self.role_permissions: Dict[str, Set[str]] = {
            "admin": {"create", "read", "update", "delete", "manage_users"},
            "editor": {"create", "read", "update"},
            "viewer": {"read"},
            "guest": {"read"}
        }
        # 定义用户和角色映射
        self.user_roles: Dict[str, str] = {}
    def assign_role(self, username: str, role: str):
        """为用户分配角色"""
        if role in self.role_permissions:
            self.user_roles[username] = role
        else:
            raise ValueError(f"无效的角色: {role}")
    def get_user_permissions(self, username: str) -> Set[str]:
        """获取用户的所有权限"""
        role = self.user_roles.get(username)
        return self.role_permissions.get(role, set())
    def has_permission(self, username: str, permission: str) -> bool:
        """检查用户是否有特定权限"""
        return permission in self.get_user_permissions(username)
# 创建权限检查实例
rbac = RBAC()
# 分配角色
rbac.assign_role("alice", "admin")
rbac.assign_role("bob", "editor")
rbac.assign_role("charlie", "viewer")
# 权限检查装饰器
def check_permission(permission: str):
    def decorator(func):
        @wraps(func)
        def wrapper(user, *args, **kwargs):
            if rbac.has_permission(user, permission):
                return func(user, *args, **kwargs)
            else:
                raise PermissionError(f"用户 {user} 没有 {permission} 权限")
        return wrapper
    return decorator
# 使用示例
@check_permission("delete")
def delete_article(user, article_id):
    print(f"{user} 删除了文章 {article_id}")
@check_permission("read")
def read_article(user, article_id):
    print(f"{user} 读取了文章 {article_id}")
# 测试
try:
    read_article("charlie", 1)  # view可以读取
    delete_article("alice", 2)  # admin可以删除
    delete_article("bob", 3)    # editor不可以删除,会报错
except PermissionError as e:
    print(f"权限错误: {e}")

Flask Web应用权限校验

from flask import Flask, request, jsonify, session, g
from functools import wraps
import jwt
import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# 定义权限常量
PERMISSIONS = {
    'view_products': ['user', 'admin'],
    'edit_products': ['admin'],
    'delete_products': ['admin'],
    'manage_users': ['admin']
}
class PermissionDenied(Exception):
    pass
@app.errorhandler(PermissionDenied)
def handle_permission_error(error):
    return jsonify({'error': str(error)}), 403
def login_required(f):
    """登录认证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            raise PermissionDenied('需要登录')
        try:
            # 验证JWT token
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            g.current_user = payload
        except jwt.ExpiredSignatureError:
            raise PermissionDenied('Token已过期')
        except jwt.InvalidTokenError:
            raise PermissionDenied('无效的Token')
        return f(*args, **kwargs)
    return decorated_function
def permission_required(permission):
    """权限校验装饰器"""
    def decorator(f):
        @wraps(f)
        @login_required
        def decorated_function(*args, **kwargs):
            user_role = g.current_user.get('role')
            allowed_roles = PERMISSIONS.get(permission, [])
            if user_role not in allowed_roles:
                raise PermissionDenied(f'需要 {permission} 权限')
            return f(*args, **kwargs)
        return decorated_function
    return decorator
@app.route('/login', methods=['POST'])
def login():
    """用户登录"""
    data = request.get_json()
    username = data.get('username')
    password = data.get('password')
    # 这里应该有真实的用户验证逻辑
    if username == 'admin' and password == 'admin123':
        # 生成JWT token
        token = jwt.encode({
            'user': username,
            'role': 'admin',
            'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        }, app.config['SECRET_KEY'])
        return jsonify({'token': token})
    return jsonify({'error': '用户名或密码错误'}), 401
@app.route('/products')
@permission_required('view_products')
def get_products():
    """获取产品列表(需要查看权限)"""
    return jsonify({
        'products': [
            {'id': 1, 'name': '产品A'},
            {'id': 2, 'name': '产品B'}
        ]
    })
@app.route('/products/<int:product_id>', methods=['DELETE'])
@permission_required('delete_products')
def delete_product(product_id):
    """删除产品(需要删除权限)"""
    return jsonify({'message': f'产品 {product_id} 已删除'})
# Flask应用运行
if __name__ == '__main__':
    app.run(debug=True)

细粒度权限控制(ABAC - 基于属性的访问控制)

from functools import wraps
from typing import Dict, Any, Callable
import re
class ABAC:
    """基于属性的访问控制"""
    def __init__(self):
        self.policies: list = []
    def add_policy(self, name: str, condition: Callable, effect: str = 'allow'):
        """
        添加访问策略
        :param name: 策略名称
        :param condition: 条件函数,接收subject, resource, action, environment
        :param effect: 允许(allow)或拒绝(deny)
        """
        self.policies.append({
            'name': name,
            'condition': condition,
            'effect': effect
        })
    def check_access(self, subject: Dict, resource: Dict, action: str, environment: Dict = None) -> bool:
        """
        检查访问权限
        :param subject: 访问主体(用户)
        :param resource: 被访问的资源
        :param action: 要执行的操作
        :param environment: 环境信息
        """
        if environment is None:
            environment = {}
        # 默认拒绝
        result = False
        for policy in self.policies:
            if policy['condition'](subject, resource, action, environment):
                if policy['effect'] == 'allow':
                    result = True
                elif policy['effect'] == 'deny':
                    return False
        return result
# 创建ABAC实例
abac = ABAC()
# 添加策略:管理员可以执行所有操作
abac.add_policy(
    'admin_full_access',
    lambda subject, resource, action, env: subject.get('role') == 'admin'
)
# 添加策略:用户只能操作自己的资源
abac.add_policy(
    'user_own_resources',
    lambda subject, resource, action, env: (
        subject.get('role') == 'user' and 
        resource.get('owner') == subject.get('username')
    )
)
# 添加策略:工作时间可以访问非敏感数据
abac.add_policy(
    'working_hours_access',
    lambda subject, resource, action, env: (
        subject.get('role') == 'user' and
        resource.get('sensitivity') == 'low' and
        env.get('time', '').startswith('09:')
    )
)
def require_abac(action: str):
    """ABAC装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 获取当前用户信息(实际应用从上下文获取)
            current_user = {
                'username': 'alice',
                'role': 'user',
                'department': 'IT'
            }
            # 资源信息
            resource = {
                'id': kwargs.get('resource_id', 1),
                'type': 'document',
                'owner': 'alice',  # 模拟资源所有者
                'sensitivity': 'high'
            }
            # 环境信息
            environment = {
                'time': '14:30',
                'ip_address': '192.168.1.100'
            }
            if abac.check_access(current_user, resource, action, environment):
                return func(*args, **kwargs)
            else:
                raise PermissionError("访问被拒绝")
        return wrapper
    return decorator
@require_abac('read')
def read_document(resource_id):
    print(f"读取文档 {resource_id}")
@require_abac('write')
def write_document(resource_id):
    print(f"编辑文档 {resource_id}")
# 测试
try:
    read_document(1)  # 用户alice读取自己的文档(所有者)
    write_document(2) # 写操作权限检查
except PermissionError as e:
    print(f"权限错误: {e}")

完整的权限校验系统示例

import json
from datetime import datetime
from functools import wraps
class PermissionSystem:
    """完整的权限管理系统"""
    def __init__(self):
        # 权限数据库(实际应用中存储在数据库中)
        self.users = {}
        self.roles = {}
        self.permissions = {}
        # 初始化默认角色和权限
        self._init_defaults()
    def _init_defaults(self):
        """初始化默认配置"""
        # 定义权限
        self.permissions = {
            'user:create': '创建用户',
            'user:read': '读取用户信息',
            'user:update': '更新用户信息',
            'user:delete': '删除用户',
            'report:generate': '生成报告',
            'report:export': '导出报告',
            'system:config': '系统配置'
        }
        # 定义角色和权限映射
        self.roles = {
            'super_admin': {
                'permissions': list(self.permissions.keys()),
                'description': '超级管理员'
            },
            'admin': {
                'permissions': ['user:read', 'user:create', 'user:update', 'report:generate', 'report:export'],
                'description': '管理员'
            },
            'operator': {
                'permissions': ['user:read', 'report:generate'],
                'description': '操作员'
            },
            'viewer': {
                'permissions': ['user:read'],
                'description': '查看者'
            }
        }
    def create_user(self, username: str, password: str, role: str = 'viewer'):
        """创建用户"""
        if username in self.users:
            raise ValueError(f"用户 {username} 已存在")
        if role not in self.roles:
            raise ValueError(f"无效的角色: {role}")
        self.users[username] = {
            'username': username,
            'password': password,  # 实际应用应该加密存储
            'role': role,
            'created_at': datetime.now(),
            'is_active': True,
            'login_attempts': 0,
            'locked_until': None
        }
        return True
    def authenticate(self, username: str, password: str) -> bool:
        """用户认证"""
        user = self.users.get(username)
        if not user:
            return False
        if not user['is_active']:
            return False
        # 检查账户是否被锁定
        if user['locked_until'] and user['locked_until'] > datetime.now():
            return False
        if user['password'] == password:
            # 重置登录尝试次数
            user['login_attempts'] = 0
            return True
        else:
            # 增加登录尝试次数
            user['login_attempts'] += 1
            # 5次失败后锁定账户
            if user['login_attempts'] >= 5:
                user['locked_until'] = datetime.now().replace(hour=datetime.now().hour + 1)
            return False
    def check_permission(self, username: str, permission: str) -> bool:
        """检查用户权限"""
        user = self.users.get(username)
        if not user or not user['is_active']:
            return False
        role = user['role']
        role_permissions = self.roles.get(role, {}).get('permissions', [])
        return permission in role_permissions
    def require_permission(self, *permissions):
        """权限检查装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 获取当前用户(从session或token中)
                current_user = get_current_user()
                if not current_user:
                    raise PermissionError('用户未登录')
                # 检查所有需要的权限
                for permission in permissions:
                    if not self.check_permission(current_user, permission):
                        raise PermissionError(f'用户 {current_user} 没有 {permission} 权限')
                return func(*args, **kwargs)
            return wrapper
        return decorator
    def get_user_permissions(self, username: str) -> list:
        """获取用户所有权限"""
        user = self.users.get(username)
        if not user:
            return []
        role = user['role']
        return self.roles.get(role, {}).get('permissions', [])
# 创建权限系统实例
ps = PermissionSystem()
# 创建测试用户
ps.create_user('admin', 'admin123', 'admin')
ps.create_user('operator', 'op123', 'operator')
ps.create_user('viewer', 'view123', 'viewer')
def get_current_user():
    """模拟获取当前登录用户"""
    return 'operator'  # 模拟当前用户
# 使用权限系统
@ps.require_permission('user:read', 'report:generate')
def view_reports():
    print("查看报告中...")
@ps.require_permission('user:delete')
def delete_user(username):
    print(f"删除用户 {username}")
# 测试
try:
    view_reports()  # operator可以查看报告
    delete_user('test')  # operator没有删除权限,会报错
except PermissionError as e:
    print(f"权限校验失败: {e}")
# 列出用户权限
print(f"\n操作员权限列表: {ps.get_user_permissions('operator')}")
print(f"管理员权限列表: {ps.get_user_permissions('admin')}")

最佳实践建议

  1. 使用JWT或Session管理用户认证状态
  2. 权限校验应该在多个层面实现:前端、后端API、数据库层
  3. 避免硬编码权限:使用配置文件或数据库存储
  4. 实现日志审计:记录所有权限校验操作
  5. 考虑性能:使用缓存减少数据库查询
  6. 支持动态权限:允许运行时修改权限配置
  7. 安全考虑:防止权限提升攻击

这些示例展示了不同场景下的权限校验实现方式,你可以根据具体需求选择合适的方案。

抱歉,评论功能暂时关闭!