本文目录导读:

我来为你详细讲解Python实现数据库分页的几种常见方法。
基础分页实现(MySQL示例)
import pymysql
from typing import List, Dict, Tuple
class DatabasePagination:
def __init__(self, host='localhost', user='root', password='password', database='test'):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
cursorclass=pymysql.cursors.DictCursor
)
def paginate(self, table: str, page: int = 1, per_page: int = 10,
conditions: str = None) -> Dict:
"""
基础分页查询
Args:
table: 表名
page: 当前页码(从1开始)
per_page: 每页记录数
conditions: 查询条件(如 "status = 1")
Returns:
分页结果字典
"""
try:
with self.connection.cursor() as cursor:
# 计算总数
count_sql = f"SELECT COUNT(*) as total FROM {table}"
if conditions:
count_sql += f" WHERE {conditions}"
cursor.execute(count_sql)
total = cursor.fetchone()['total']
# 计算偏移量
offset = (page - 1) * per_page
# 查询当前页数据
data_sql = f"SELECT * FROM {table}"
if conditions:
data_sql += f" WHERE {conditions}"
data_sql += f" LIMIT {per_page} OFFSET {offset}"
cursor.execute(data_sql)
data = cursor.fetchall()
# 计算总页数
total_pages = (total + per_page - 1) // per_page
return {
'data': data,
'pagination': {
'current_page': page,
'per_page': per_page,
'total': total,
'total_pages': total_pages,
'has_next': page < total_pages,
'has_prev': page > 1
}
}
except Exception as e:
print(f"分页查询失败: {e}")
return None
def close(self):
self.connection.close()
# 使用示例
db = DatabasePagination()
result = db.paginate('users', page=2, per_page=10, conditions='status = 1')
print(f"当前页数据: {result['data']}")
print(f"分页信息: {result['pagination']}")
db.close()
高级分页类封装
from typing import Optional, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PaginationResult:
"""分页结果数据类"""
items: List[Dict[str, Any]]
current_page: int
per_page: int
total: int
total_pages: int
has_next: bool
has_prev: bool
has_next_page: Optional[int] = None
has_prev_page: Optional[int] = None
class AdvancedPagination:
"""高级分页类"""
def __init__(self, connection):
self.connection = connection
def paginate_with_sort(self, table: str, page: int = 1, per_page: int = 10,
order_by: str = 'id', order: str = 'DESC',
search_field: str = None, search_value: str = None,
date_field: str = None, start_date: str = None,
end_date: str = None) -> PaginationResult:
"""
支持排序和搜索的分页查询
"""
try:
with self.connection.cursor() as cursor:
# 构建查询条件
conditions = []
params = []
# 搜索条件
if search_field and search_value:
conditions.append(f"{search_field} LIKE %s")
params.append(f"%{search_value}%")
# 日期范围条件
if date_field and start_date and end_date:
conditions.append(f"{date_field} BETWEEN %s AND %s")
params.extend([start_date, end_date])
elif date_field and start_date:
conditions.append(f"{date_field} >= %s")
params.append(start_date)
elif date_field and end_date:
conditions.append(f"{date_field} <= %s")
params.append(end_date)
# 构建WHERE子句
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM {table}{where_clause}"
cursor.execute(count_sql, params)
total = cursor.fetchone()['total']
# 计算偏移量
offset = (page - 1) * per_page
# 参数化查询
data_sql = f"SELECT * FROM {table}{where_clause} ORDER BY {order_by} {order} LIMIT %s OFFSET %s"
query_params = params + [per_page, offset]
cursor.execute(data_sql, query_params)
data = cursor.fetchall()
# 计算分页信息
total_pages = (total + per_page - 1) // per_page
return PaginationResult(
items=data,
current_page=page,
per_page=per_page,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1,
has_next_page=page + 1 if page < total_pages else None,
has_prev_page=page - 1 if page > 1 else None
)
except Exception as e:
print(f"高级分页查询失败: {e}")
return None
def get_page_range(self, current_page: int, total_pages: int,
max_visible: int = 5) -> List[int]:
"""
获取分页按钮显示范围
[1, 2, 3, 4, 5] 或 [8, 9, 10, 11, 12]
"""
if total_pages <= max_visible:
return list(range(1, total_pages + 1))
half = max_visible // 2
start = max(1, current_page - half)
end = min(total_pages, start + max_visible - 1)
if end - start + 1 < max_visible:
start = max(1, end - max_visible + 1)
return list(range(start, end + 1))
Flask Web应用中的分页实现
from flask import Flask, request, jsonify, render_template
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@localhost/test'
db = SQLAlchemy(app)
# 用户模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
email = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
@app.route('/users')
def get_users():
# 获取分页参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# Flask-SQLAlchemy 自带分页
pagination = User.query.paginate(
page=page,
per_page=per_page,
error_out=False
)
users = pagination.items
return jsonify({
'users': [{
'id': user.id,
'name': user.name,
'email': user.email,
'created_at': user.created_at.isoformat()
} for user in users],
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total': pagination.total,
'pages': pagination.pages,
'has_prev': pagination.has_prev,
'has_next': pagination.has_next,
'prev_num': pagination.prev_num,
'next_num': pagination.next_num
}
})
# 前端模板
@app.route('/users/page')
def users_page():
page = request.args.get('page', 1, type=int)
per_page = 10
pagination = User.query.paginate(page=page, per_page=per_page, error_out=False)
# 计算显示的页码范围
pages = get_page_range(pagination.page, pagination.pages)
return render_template('users.html',
users=pagination.items,
pagination=pagination,
pages=pages)
def get_page_range(current_page, total_pages, max_visible=5):
"""获取显示的页码范围"""
if total_pages <= max_visible:
return list(range(1, total_pages + 1))
half = max_visible // 2
start = max(1, current_page - half)
end = min(total_pages, start + max_visible - 1)
if end - start + 1 < max_visible:
start = max(1, end - max_visible + 1)
return list(range(start, end + 1))
if __name__ == '__main__':
app.run(debug=True)
前端分页模板(HTML)
<!-- templates/users.html -->
<!DOCTYPE html>
<html>
<head>用户列表 - 分页演示</title>
<style>
.pagination {
display: flex;
list-style: none;
padding: 0;
justify-content: center;
margin-top: 20px;
}
.pagination li {
margin: 0 3px;
}
.pagination a, .pagination span {
display: block;
padding: 8px 12px;
text-decoration: none;
border: 1px solid #ddd;
color: #333;
border-radius: 4px;
}
.pagination .active a {
background-color: #007bff;
color: white;
border-color: #007bff;
}
.pagination .disabled span {
color: #ccc;
cursor: not-allowed;
}
.pagination a:hover {
background-color: #f0f0f0;
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
th, td {
padding: 10px;
text-align: left;
border-bottom: 1px solid #ddd;
}
th {
background-color: #f5f5f5;
}
</style>
</head>
<body>
<h1>用户列表</h1>
<table>
<thead>
<tr>
<th>ID</th>
<th>姓名</th>
<th>邮箱</th>
<th>创建时间</th>
</tr>
</thead>
<tbody>
{% for user in users %}
<tr>
<td>{{ user.id }}</td>
<td>{{ user.name }}</td>
<td>{{ user.email }}</td>
<td>{{ user.created_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<!-- 分页导航 -->
<ul class="pagination">
{% if pagination.has_prev %}
<li>
<a href="{{ url_for('users_page', page=pagination.prev_num) }}">
上一页
</a>
</li>
{% else %}
<li class="disabled">
<span>上一页</span>
</li>
{% endif %}
{% for page_num in pages %}
<li class="{% if page_num == pagination.page %}active{% endif %}">
<a href="{{ url_for('users_page', page=page_num) }}">
{{ page_num }}
</a>
</li>
{% endfor %}
{% if pagination.has_next %}
<li>
<a href="{{ url_for('users_page', page=pagination.next_num) }}">
下一页
</a>
</li>
{% else %}
<li class="disabled">
<span>下一页</span>
</li>
{% endif %}
</ul>
<!-- 分页信息 -->
<p style="text-align: center; color: #666;">
共 {{ pagination.total }} 条记录,第 {{ pagination.page }}/{{ pagination.pages }} 页
</p>
<!-- 跳转功能 -->
<div style="text-align: center; margin-top: 10px;">
<form method="get" action="{{ url_for('users_page') }}" style="display: inline;">
<label>跳转到第</label>
<input type="number" name="page" min="1" max="{{ pagination.pages }}"
style="width: 50px;" value="{{ pagination.page }}">
<label>页</label>
<button type="submit">跳转</button>
</form>
</div>
</body>
</html>
最佳实践建议
class PaginationUtils:
"""分页工具类"""
@staticmethod
def validate_page_params(page: int, per_page: int,
max_per_page: int = 100) -> Tuple[int, int]:
"""
验证和修正分页参数
Args:
page: 页码
per_page: 每页记录数
max_per_page: 最大每页记录数
Returns:
修正后的 (page, per_page)
"""
# 确保页码为正数
page = max(1, page)
# 限制每页记录数范围
per_page = max(1, min(per_page, max_per_page))
return page, per_page
@staticmethod
def keyset_pagination(cursor, table: str, last_id: int = 0,
per_page: int = 10, order_by: str = 'id') -> List[Dict]:
"""
基于游标的分页(Keyset Pagination)
适用于大数据量,避免OFFSET的性能问题
Args:
cursor: 数据库游标
table: 表名
last_id: 上一页最后一条记录的ID
per_page: 每页记录数
order_by: 排序列
Returns:
当前页数据
"""
sql = f"""
SELECT * FROM {table}
WHERE {order_by} > %s
ORDER BY {order_by} ASC
LIMIT %s
"""
cursor.execute(sql, (last_id, per_page))
return cursor.fetchall()
@staticmethod
def calculate_page_info(total: int, page: int, per_page: int) -> Dict:
"""
计算分页信息
Returns:
包含分页信息的字典
"""
total_pages = (total + per_page - 1) // per_page
return {
'total': total,
'page': page,
'per_page': per_page,
'total_pages': total_pages,
'has_prev': page > 1,
'has_next': page < total_pages,
'prev_page': page - 1 if page > 1 else None,
'next_page': page + 1 if page < total_pages else None,
'start': (page - 1) * per_page + 1,
'end': min(page * per_page, total)
}
# 使用示例
def example_usage():
# 验证参数
page, per_page = PaginationUtils.validate_page_params(1, 10)
# 计算分页信息
info = PaginationUtils.calculate_page_info(total=100, page=1, per_page=10)
print(f"分页信息: {info}")
# 游标分页(适用于无限滚动场景)
cursor = db.connection.cursor()
data = PaginationUtils.keyset_pagination(cursor, 'users', last_id=100, per_page=20)
核心要点
- LIMIT/OFFSET: 最常用的分页方式,适合中小数据量
- 游标分页: 对于大数据量更高效,避免OFFSET性能问题
- 参数验证: 始终验证分页参数,防止恶意请求
- 总数缓存: 频繁分页时缓存总数,减少数据库查询
- 前端交互: 提供完整的页面导航UI,包括跳转功能
选择哪种分页方式取决于你的数据量和性能需求,对于大多数应用,基础的LIMIT/OFFSET分页就足够了。