Python案例如何实现角色管理?

wen python案例 17

本文目录导读:

Python案例如何实现角色管理?

  1. 基础角色管理(使用字典)
  2. 面向对象版本(更完整)
  3. 带数据库的实战版本
  4. Web应用中的角色管理(Flask示例)

我来给你介绍几种Python实现角色管理的常见方案,从简单到复杂逐步说明。

基础角色管理(使用字典)

最简单的角色管理实现:

class RoleManager:
    def __init__(self):
        # 定义角色和权限
        self.roles = {
            'admin': {'create', 'read', 'update', 'delete', 'manage_users'},
            'editor': {'create', 'read', 'update'},
            'viewer': {'read'}
        }
        # 用户-角色映射
        self.user_roles = {}
    def assign_role(self, username, role):
        """为用户分配角色"""
        if role in self.roles:
            self.user_roles[username] = role
            return True
        return False
    def has_permission(self, username, permission):
        """检查用户是否有某个权限"""
        if username in self.user_roles:
            role = self.user_roles[username]
            return permission in self.roles[role]
        return False
    def get_user_permissions(self, username):
        """获取用户的所有权限"""
        if username in self.user_roles:
            role = self.user_roles[username]
            return self.roles[role]
        return set()
# 使用示例
manager = RoleManager()
manager.assign_role('张三', 'admin')
print(manager.has_permission('张三', 'delete'))  # True
print(manager.has_permission('张三', 'manage_users'))  # True

面向对象版本(更完整)

from enum import Enum
from typing import Set, Dict, Optional
class Permission(Enum):
    """权限枚举"""
    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    MANAGE_USERS = "manage_users"
    EXPORT_DATA = "export_data"
class Role:
    """角色类"""
    def __init__(self, name: str, permissions: Set[Permission]):
        self.name = name
        self.permissions = permissions
    def has_permission(self, permission: Permission) -> bool:
        return permission in self.permissions
    def add_permission(self, permission: Permission):
        self.permissions.add(permission)
    def remove_permission(self, permission: Permission):
        self.permissions.discard(permission)
    def __str__(self):
        perms = ', '.join(p.value for p in self.permissions)
        return f"Role: {self.name}, Permissions: [{perms}]"
class User:
    """用户类"""
    def __init__(self, username: str, role: Optional[Role] = None):
        self.username = username
        self.role = role
    def assign_role(self, role: Role):
        self.role = role
    def has_permission(self, permission: Permission) -> bool:
        if self.role:
            return self.role.has_permission(permission)
        return False
    def __str__(self):
        role_name = self.role.name if self.role else "No role"
        return f"User: {self.username}, Role: {role_name}"
class RoleBasedAccessControl:
    """基于角色的访问控制系统"""
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.users: Dict[str, User] = {}
        self._init_default_roles()
    def _init_default_roles(self):
        """初始化默认角色"""
        self.add_role('admin', {
            Permission.CREATE, Permission.READ, 
            Permission.UPDATE, Permission.DELETE,
            Permission.MANAGE_USERS, Permission.EXPORT_DATA
        })
        self.add_role('editor', {
            Permission.CREATE, Permission.READ, Permission.UPDATE
        })
        self.add_role('viewer', {
            Permission.READ
        })
    def add_role(self, name: str, permissions: Set[Permission]):
        """添加角色"""
        self.roles[name] = Role(name, permissions)
    def add_user(self, username: str, role_name: str) -> bool:
        """添加用户并分配角色"""
        if role_name in self.roles:
            user = User(username, self.roles[role_name])
            self.users[username] = user
            return True
        return False
    def check_permission(self, username: str, permission: Permission) -> bool:
        """检查用户权限"""
        if username in self.users:
            return self.users[username].has_permission(permission)
        return False
    def get_user_info(self, username: str) -> Optional[str]:
        """获取用户信息"""
        if username in self.users:
            return str(self.users[username])
        return None
# 使用示例
rbac = RoleBasedAccessControl()
# 添加用户
rbac.add_user('张三', 'admin')
rbac.add_user('李四', 'editor')
rbac.add_user('王五', 'viewer')
# 检查权限
print(rbac.check_permission('张三', Permission.DELETE))  # True
print(rbac.check_permission('李四', Permission.DELETE))  # False
print(rbac.check_permission('王五', Permission.CREATE))  # False

带数据库的实战版本

import sqlite3
from datetime import datetime
class DatabaseRoleManager:
    """使用SQLite数据库的角色管理"""
    def __init__(self, db_path='role_management.db'):
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        self._create_tables()
    def _create_tables(self):
        """创建数据库表"""
        self.cursor.executescript('''
            CREATE TABLE IF NOT EXISTS roles (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT UNIQUE NOT NULL,
                description TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
            CREATE TABLE IF NOT EXISTS permissions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT UNIQUE NOT NULL,
                description TEXT
            );
            CREATE TABLE IF NOT EXISTS role_permissions (
                role_id INTEGER,
                permission_id INTEGER,
                FOREIGN KEY (role_id) REFERENCES roles(id),
                FOREIGN KEY (permission_id) REFERENCES permissions(id),
                PRIMARY KEY (role_id, permission_id)
            );
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT UNIQUE NOT NULL,
                email TEXT,
                role_id INTEGER,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (role_id) REFERENCES roles(id)
            );
        ''')
        self.conn.commit()
    def create_role(self, role_name, description=''):
        """创建角色"""
        try:
            self.cursor.execute(
                "INSERT INTO roles (name, description) VALUES (?, ?)",
                (role_name, description)
            )
            self.conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False
    def add_permission(self, perm_name, description=''):
        """添加权限"""
        try:
            self.cursor.execute(
                "INSERT INTO permissions (name, description) VALUES (?, ?)",
                (perm_name, description)
            )
            self.conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False
    def assign_permission_to_role(self, role_name, perm_name):
        """为角色分配权限"""
        self.cursor.execute("SELECT id FROM roles WHERE name = ?", (role_name,))
        role = self.cursor.fetchone()
        self.cursor.execute("SELECT id FROM permissions WHERE name = ?", (perm_name,))
        perm = self.cursor.fetchone()
        if role and perm:
            try:
                self.cursor.execute(
                    "INSERT INTO role_permissions (role_id, permission_id) VALUES (?, ?)",
                    (role[0], perm[0])
                )
                self.conn.commit()
                return True
            except sqlite3.IntegrityError:
                return False
        return False
    def add_user(self, username, email, role_name):
        """添加用户并分配角色"""
        self.cursor.execute("SELECT id FROM roles WHERE name = ?", (role_name,))
        role = self.cursor.fetchone()
        if role:
            try:
                self.cursor.execute(
                    "INSERT INTO users (username, email, role_id) VALUES (?, ?, ?)",
                    (username, email, role[0])
                )
                self.conn.commit()
                return True
            except sqlite3.IntegrityError:
                return False
        return False
    def check_user_permission(self, username, permission_name):
        """检查用户是否有特定权限"""
        query = '''
            SELECT COUNT(*)
            FROM users u
            JOIN roles r ON u.role_id = r.id
            JOIN role_permissions rp ON r.id = rp.role_id
            JOIN permissions p ON rp.permission_id = p.id
            WHERE u.username = ? AND p.name = ?
        '''
        self.cursor.execute(query, (username, permission_name))
        count = self.cursor.fetchone()[0]
        return count > 0
    def get_user_roles_and_permissions(self, username):
        """获取用户的所有角色和权限"""
        query = '''
            SELECT r.name as role_name, p.name as permission_name
            FROM users u
            JOIN roles r ON u.role_id = r.id
            LEFT JOIN role_permissions rp ON r.id = rp.role_id
            LEFT JOIN permissions p ON rp.permission_id = p.id
            WHERE u.username = ?
        '''
        self.cursor.execute(query, (username,))
        results = self.cursor.fetchall()
        if not results:
            return None
        role_name = results[0][0]
        permissions = [r[1] for r in results if r[1]]
        return {
            'username': username,
            'role': role_name,
            'permissions': permissions
        }
    def __del__(self):
        """清理资源"""
        if hasattr(self, 'conn'):
            self.conn.close()
# 使用示例
db_manager = DatabaseRoleManager()
# 初始化角色和权限
db_manager.create_role('admin', '系统管理员')
db_manager.create_role('editor', '编辑者')
db_manager.create_role('viewer', '查看者')
# 添加权限
db_manager.add_permission('create', '创建内容')
db_manager.add_permission('read', '查看内容')
db_manager.add_permission('update', '更新内容')
db_manager.add_permission('delete', '删除内容')
# 分配权限到角色
db_manager.assign_permission_to_role('admin', 'create')
db_manager.assign_permission_to_role('admin', 'read')
db_manager.assign_permission_to_role('admin', 'update')
db_manager.assign_permission_to_role('admin', 'delete')
db_manager.assign_permission_to_role('editor', 'create')
db_manager.assign_permission_to_role('editor', 'read')
db_manager.assign_permission_to_role('editor', 'update')
db_manager.assign_permission_to_role('viewer', 'read')
# 添加用户
db_manager.add_user('张三', 'zhangsan@example.com', 'admin')
db_manager.add_user('李四', 'lisi@example.com', 'editor')
# 检查权限
print(db_manager.check_user_permission('张三', 'delete'))  # True
print(db_manager.check_user_permission('李四', 'delete'))  # False
# 获取用户信息
info = db_manager.get_user_roles_and_permissions('张三')
print(f"用户信息: {info}")

Web应用中的角色管理(Flask示例)

from flask import Flask, request, jsonify, session
from functools import wraps
import sqlite3
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# 权限装饰器
def require_permission(permission):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # 检查用户是否登录
            if 'username' not in session:
                return jsonify({'error': '未登录'}), 401
            username = session['username']
            # 检查用户权限
            db_manager = DatabaseRoleManager()
            if not db_manager.check_user_permission(username, permission):
                return jsonify({'error': '权限不足'}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator
# API路由
@app.route('/api/users', methods=['GET'])
@require_permission('read')
def get_users():
    # 获取用户列表的逻辑
    return jsonify({'users': ['张三', '李四', '王五']})
@app.route('/api/users', methods=['POST'])
@require_permission('create')
def create_user():
    # 创建用户的逻辑
    data = request.json
    return jsonify({'message': f'用户 {data["username"]} 创建成功'})
@app.route('/api/users/<username>', methods=['DELETE'])
@require_permission('delete')
def delete_user(username):
    # 删除用户的逻辑
    return jsonify({'message': f'用户 {username} 已删除'})
# 前端角色管理界面
from flask import render_template_string
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>角色管理系统</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .container { max-width: 800px; margin: 0 auto; }
        table { width: 100%; border-collapse: collapse; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #4CAF50; color: white; }
        form { margin: 20px 0; }
        input, select { padding: 8px; margin: 5px; }
        .success { color: green; }
        .error { color: red; }
    </style>
</head>
<body>
    <div class="container">
        <h1>角色管理</h1>
        <h2>添加用户</h2>
        <form action="/add_user" method="POST">
            <input type="text" name="username" placeholder="用户名" required>
            <input type="email" name="email" placeholder="邮箱" required>
            <select name="role">
                {% for role in roles %}
                <option value="{{ role }}">{{ role }}</option>
                {% endfor %}
            </select>
            <button type="submit">添加</button>
        </form>
        <h2>用户列表</h2>
        <table>
            <tr>
                <th>用户名</th>
                <th>角色</th>
                <th>权限</th>
            </tr>
            {% for user_info in users %}
            <tr>
                <td>{{ user_info.username }}</td>
                <td>{{ user_info.role }}</td>
                <td>{{ user_info.permissions | join(', ') }}</td>
            </tr>
            {% endfor %}
        </table>
        <h2>角色权限</h2>
        <table>
            <tr>
                <th>角色</th>
                <th>权限</th>
            </tr>
            {% for role_info in all_roles %}
            <tr>
                <td>{{ role_info.name }}</td>
                <td>{{ role_info.permissions | join(', ') }}</td>
            </tr>
            {% endfor %}
        </table>
        {% if message %}
        <p class="{{ 'success' if success else 'error' }}">{{ message }}</p>
        {% endif %}
    </div>
</body>
</html>
'''
@app.route('/management')
def management_interface():
    """管理界面"""
    db_manager = DatabaseRoleManager()
    # 获取所有角色
    db_manager.cursor.execute("SELECT name FROM roles")
    roles = [row[0] for row in db_manager.cursor.fetchall()]
    # 获取所有用户信息
    db_manager.cursor.execute("SELECT username FROM users")
    users_info = []
    for row in db_manager.cursor.fetchall():
        user_info = db_manager.get_user_roles_and_permissions(row[0])
        if user_info:
            users_info.append(user_info)
    # 获取所有角色权限
    all_roles = []
    for role_name in roles:
        db_manager.cursor.execute('''
            SELECT p.name FROM roles r
            JOIN role_permissions rp ON r.id = rp.role_id
            JOIN permissions p ON rp.permission_id = p.id
            WHERE r.name = ?
        ''', (role_name,))
        permissions = [row[0] for row in db_manager.cursor.fetchall()]
        all_roles.append({'name': role_name, 'permissions': permissions})
    return render_template_string(
        HTML_TEMPLATE,
        roles=roles,
        users=users_info,
        all_roles=all_roles
    )
if __name__ == '__main__':
    app.run(debug=True)

实现角色管理的核心思路:

  1. 角色-权限模型:角色是权限的集合,用户通过角色获得权限
  2. 权限检查:在执行操作前检查用户是否具有所需权限
  3. 层次化设计:从简单的字典实现到数据库持久化
  4. 灵活的扩展性:支持动态添加角色和权限

选择哪种实现方式取决于你的具体需求:

  • 小型项目:使用字典或类实现
  • 大型项目:使用数据库持久化
  • Web应用:结合框架的认证和授权系统

抱歉,评论功能暂时关闭!