本文目录导读:

我来给你介绍几种Python实现角色管理的常见方案,从简单到复杂逐步说明。
基础角色管理(使用字典)
最简单的角色管理实现:
class RoleManager:
def __init__(self):
# 定义角色和权限
self.roles = {
'admin': {'create', 'read', 'update', 'delete', 'manage_users'},
'editor': {'create', 'read', 'update'},
'viewer': {'read'}
}
# 用户-角色映射
self.user_roles = {}
def assign_role(self, username, role):
"""为用户分配角色"""
if role in self.roles:
self.user_roles[username] = role
return True
return False
def has_permission(self, username, permission):
"""检查用户是否有某个权限"""
if username in self.user_roles:
role = self.user_roles[username]
return permission in self.roles[role]
return False
def get_user_permissions(self, username):
"""获取用户的所有权限"""
if username in self.user_roles:
role = self.user_roles[username]
return self.roles[role]
return set()
# 使用示例
manager = RoleManager()
manager.assign_role('张三', 'admin')
print(manager.has_permission('张三', 'delete')) # True
print(manager.has_permission('张三', 'manage_users')) # True
面向对象版本(更完整)
from enum import Enum
from typing import Set, Dict, Optional
class Permission(Enum):
"""权限枚举"""
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
MANAGE_USERS = "manage_users"
EXPORT_DATA = "export_data"
class Role:
"""角色类"""
def __init__(self, name: str, permissions: Set[Permission]):
self.name = name
self.permissions = permissions
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions
def add_permission(self, permission: Permission):
self.permissions.add(permission)
def remove_permission(self, permission: Permission):
self.permissions.discard(permission)
def __str__(self):
perms = ', '.join(p.value for p in self.permissions)
return f"Role: {self.name}, Permissions: [{perms}]"
class User:
"""用户类"""
def __init__(self, username: str, role: Optional[Role] = None):
self.username = username
self.role = role
def assign_role(self, role: Role):
self.role = role
def has_permission(self, permission: Permission) -> bool:
if self.role:
return self.role.has_permission(permission)
return False
def __str__(self):
role_name = self.role.name if self.role else "No role"
return f"User: {self.username}, Role: {role_name}"
class RoleBasedAccessControl:
"""基于角色的访问控制系统"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
self._init_default_roles()
def _init_default_roles(self):
"""初始化默认角色"""
self.add_role('admin', {
Permission.CREATE, Permission.READ,
Permission.UPDATE, Permission.DELETE,
Permission.MANAGE_USERS, Permission.EXPORT_DATA
})
self.add_role('editor', {
Permission.CREATE, Permission.READ, Permission.UPDATE
})
self.add_role('viewer', {
Permission.READ
})
def add_role(self, name: str, permissions: Set[Permission]):
"""添加角色"""
self.roles[name] = Role(name, permissions)
def add_user(self, username: str, role_name: str) -> bool:
"""添加用户并分配角色"""
if role_name in self.roles:
user = User(username, self.roles[role_name])
self.users[username] = user
return True
return False
def check_permission(self, username: str, permission: Permission) -> bool:
"""检查用户权限"""
if username in self.users:
return self.users[username].has_permission(permission)
return False
def get_user_info(self, username: str) -> Optional[str]:
"""获取用户信息"""
if username in self.users:
return str(self.users[username])
return None
# 使用示例
rbac = RoleBasedAccessControl()
# 添加用户
rbac.add_user('张三', 'admin')
rbac.add_user('李四', 'editor')
rbac.add_user('王五', 'viewer')
# 检查权限
print(rbac.check_permission('张三', Permission.DELETE)) # True
print(rbac.check_permission('李四', Permission.DELETE)) # False
print(rbac.check_permission('王五', Permission.CREATE)) # False
带数据库的实战版本
import sqlite3
from datetime import datetime
class DatabaseRoleManager:
"""使用SQLite数据库的角色管理"""
def __init__(self, db_path='role_management.db'):
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""创建数据库表"""
self.cursor.executescript('''
CREATE TABLE IF NOT EXISTS roles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS permissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT
);
CREATE TABLE IF NOT EXISTS role_permissions (
role_id INTEGER,
permission_id INTEGER,
FOREIGN KEY (role_id) REFERENCES roles(id),
FOREIGN KEY (permission_id) REFERENCES permissions(id),
PRIMARY KEY (role_id, permission_id)
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
role_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (role_id) REFERENCES roles(id)
);
''')
self.conn.commit()
def create_role(self, role_name, description=''):
"""创建角色"""
try:
self.cursor.execute(
"INSERT INTO roles (name, description) VALUES (?, ?)",
(role_name, description)
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def add_permission(self, perm_name, description=''):
"""添加权限"""
try:
self.cursor.execute(
"INSERT INTO permissions (name, description) VALUES (?, ?)",
(perm_name, description)
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def assign_permission_to_role(self, role_name, perm_name):
"""为角色分配权限"""
self.cursor.execute("SELECT id FROM roles WHERE name = ?", (role_name,))
role = self.cursor.fetchone()
self.cursor.execute("SELECT id FROM permissions WHERE name = ?", (perm_name,))
perm = self.cursor.fetchone()
if role and perm:
try:
self.cursor.execute(
"INSERT INTO role_permissions (role_id, permission_id) VALUES (?, ?)",
(role[0], perm[0])
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
return False
def add_user(self, username, email, role_name):
"""添加用户并分配角色"""
self.cursor.execute("SELECT id FROM roles WHERE name = ?", (role_name,))
role = self.cursor.fetchone()
if role:
try:
self.cursor.execute(
"INSERT INTO users (username, email, role_id) VALUES (?, ?, ?)",
(username, email, role[0])
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
return False
def check_user_permission(self, username, permission_name):
"""检查用户是否有特定权限"""
query = '''
SELECT COUNT(*)
FROM users u
JOIN roles r ON u.role_id = r.id
JOIN role_permissions rp ON r.id = rp.role_id
JOIN permissions p ON rp.permission_id = p.id
WHERE u.username = ? AND p.name = ?
'''
self.cursor.execute(query, (username, permission_name))
count = self.cursor.fetchone()[0]
return count > 0
def get_user_roles_and_permissions(self, username):
"""获取用户的所有角色和权限"""
query = '''
SELECT r.name as role_name, p.name as permission_name
FROM users u
JOIN roles r ON u.role_id = r.id
LEFT JOIN role_permissions rp ON r.id = rp.role_id
LEFT JOIN permissions p ON rp.permission_id = p.id
WHERE u.username = ?
'''
self.cursor.execute(query, (username,))
results = self.cursor.fetchall()
if not results:
return None
role_name = results[0][0]
permissions = [r[1] for r in results if r[1]]
return {
'username': username,
'role': role_name,
'permissions': permissions
}
def __del__(self):
"""清理资源"""
if hasattr(self, 'conn'):
self.conn.close()
# 使用示例
db_manager = DatabaseRoleManager()
# 初始化角色和权限
db_manager.create_role('admin', '系统管理员')
db_manager.create_role('editor', '编辑者')
db_manager.create_role('viewer', '查看者')
# 添加权限
db_manager.add_permission('create', '创建内容')
db_manager.add_permission('read', '查看内容')
db_manager.add_permission('update', '更新内容')
db_manager.add_permission('delete', '删除内容')
# 分配权限到角色
db_manager.assign_permission_to_role('admin', 'create')
db_manager.assign_permission_to_role('admin', 'read')
db_manager.assign_permission_to_role('admin', 'update')
db_manager.assign_permission_to_role('admin', 'delete')
db_manager.assign_permission_to_role('editor', 'create')
db_manager.assign_permission_to_role('editor', 'read')
db_manager.assign_permission_to_role('editor', 'update')
db_manager.assign_permission_to_role('viewer', 'read')
# 添加用户
db_manager.add_user('张三', 'zhangsan@example.com', 'admin')
db_manager.add_user('李四', 'lisi@example.com', 'editor')
# 检查权限
print(db_manager.check_user_permission('张三', 'delete')) # True
print(db_manager.check_user_permission('李四', 'delete')) # False
# 获取用户信息
info = db_manager.get_user_roles_and_permissions('张三')
print(f"用户信息: {info}")
Web应用中的角色管理(Flask示例)
from flask import Flask, request, jsonify, session
from functools import wraps
import sqlite3
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# 权限装饰器
def require_permission(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 检查用户是否登录
if 'username' not in session:
return jsonify({'error': '未登录'}), 401
username = session['username']
# 检查用户权限
db_manager = DatabaseRoleManager()
if not db_manager.check_user_permission(username, permission):
return jsonify({'error': '权限不足'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# API路由
@app.route('/api/users', methods=['GET'])
@require_permission('read')
def get_users():
# 获取用户列表的逻辑
return jsonify({'users': ['张三', '李四', '王五']})
@app.route('/api/users', methods=['POST'])
@require_permission('create')
def create_user():
# 创建用户的逻辑
data = request.json
return jsonify({'message': f'用户 {data["username"]} 创建成功'})
@app.route('/api/users/<username>', methods=['DELETE'])
@require_permission('delete')
def delete_user(username):
# 删除用户的逻辑
return jsonify({'message': f'用户 {username} 已删除'})
# 前端角色管理界面
from flask import render_template_string
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>角色管理系统</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 800px; margin: 0 auto; }
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #4CAF50; color: white; }
form { margin: 20px 0; }
input, select { padding: 8px; margin: 5px; }
.success { color: green; }
.error { color: red; }
</style>
</head>
<body>
<div class="container">
<h1>角色管理</h1>
<h2>添加用户</h2>
<form action="/add_user" method="POST">
<input type="text" name="username" placeholder="用户名" required>
<input type="email" name="email" placeholder="邮箱" required>
<select name="role">
{% for role in roles %}
<option value="{{ role }}">{{ role }}</option>
{% endfor %}
</select>
<button type="submit">添加</button>
</form>
<h2>用户列表</h2>
<table>
<tr>
<th>用户名</th>
<th>角色</th>
<th>权限</th>
</tr>
{% for user_info in users %}
<tr>
<td>{{ user_info.username }}</td>
<td>{{ user_info.role }}</td>
<td>{{ user_info.permissions | join(', ') }}</td>
</tr>
{% endfor %}
</table>
<h2>角色权限</h2>
<table>
<tr>
<th>角色</th>
<th>权限</th>
</tr>
{% for role_info in all_roles %}
<tr>
<td>{{ role_info.name }}</td>
<td>{{ role_info.permissions | join(', ') }}</td>
</tr>
{% endfor %}
</table>
{% if message %}
<p class="{{ 'success' if success else 'error' }}">{{ message }}</p>
{% endif %}
</div>
</body>
</html>
'''
@app.route('/management')
def management_interface():
"""管理界面"""
db_manager = DatabaseRoleManager()
# 获取所有角色
db_manager.cursor.execute("SELECT name FROM roles")
roles = [row[0] for row in db_manager.cursor.fetchall()]
# 获取所有用户信息
db_manager.cursor.execute("SELECT username FROM users")
users_info = []
for row in db_manager.cursor.fetchall():
user_info = db_manager.get_user_roles_and_permissions(row[0])
if user_info:
users_info.append(user_info)
# 获取所有角色权限
all_roles = []
for role_name in roles:
db_manager.cursor.execute('''
SELECT p.name FROM roles r
JOIN role_permissions rp ON r.id = rp.role_id
JOIN permissions p ON rp.permission_id = p.id
WHERE r.name = ?
''', (role_name,))
permissions = [row[0] for row in db_manager.cursor.fetchall()]
all_roles.append({'name': role_name, 'permissions': permissions})
return render_template_string(
HTML_TEMPLATE,
roles=roles,
users=users_info,
all_roles=all_roles
)
if __name__ == '__main__':
app.run(debug=True)
实现角色管理的核心思路:
- 角色-权限模型:角色是权限的集合,用户通过角色获得权限
- 权限检查:在执行操作前检查用户是否具有所需权限
- 层次化设计:从简单的字典实现到数据库持久化
- 灵活的扩展性:支持动态添加角色和权限
选择哪种实现方式取决于你的具体需求:
- 小型项目:使用字典或类实现
- 大型项目:使用数据库持久化
- Web应用:结合框架的认证和授权系统