Python案例怎么实现数据库分页?

wen python案例 10

本文目录导读:

Python案例怎么实现数据库分页?

  1. 基础分页实现(MySQL示例)
  2. 高级分页类封装
  3. Flask Web应用中的分页实现
  4. 前端分页模板(HTML)
  5. 最佳实践建议
  6. 核心要点

我来为你详细讲解Python实现数据库分页的几种常见方法。

基础分页实现(MySQL示例)

import pymysql
from typing import List, Dict, Tuple
class DatabasePagination:
    def __init__(self, host='localhost', user='root', password='password', database='test'):
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            cursorclass=pymysql.cursors.DictCursor
        )
    def paginate(self, table: str, page: int = 1, per_page: int = 10, 
                 conditions: str = None) -> Dict:
        """
        基础分页查询
        Args:
            table: 表名
            page: 当前页码(从1开始)
            per_page: 每页记录数
            conditions: 查询条件(如 "status = 1")
        Returns:
            分页结果字典
        """
        try:
            with self.connection.cursor() as cursor:
                # 计算总数
                count_sql = f"SELECT COUNT(*) as total FROM {table}"
                if conditions:
                    count_sql += f" WHERE {conditions}"
                cursor.execute(count_sql)
                total = cursor.fetchone()['total']
                # 计算偏移量
                offset = (page - 1) * per_page
                # 查询当前页数据
                data_sql = f"SELECT * FROM {table}"
                if conditions:
                    data_sql += f" WHERE {conditions}"
                data_sql += f" LIMIT {per_page} OFFSET {offset}"
                cursor.execute(data_sql)
                data = cursor.fetchall()
                # 计算总页数
                total_pages = (total + per_page - 1) // per_page
                return {
                    'data': data,
                    'pagination': {
                        'current_page': page,
                        'per_page': per_page,
                        'total': total,
                        'total_pages': total_pages,
                        'has_next': page < total_pages,
                        'has_prev': page > 1
                    }
                }
        except Exception as e:
            print(f"分页查询失败: {e}")
            return None
    def close(self):
        self.connection.close()
# 使用示例
db = DatabasePagination()
result = db.paginate('users', page=2, per_page=10, conditions='status = 1')
print(f"当前页数据: {result['data']}")
print(f"分页信息: {result['pagination']}")
db.close()

高级分页类封装

from typing import Optional, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PaginationResult:
    """分页结果数据类"""
    items: List[Dict[str, Any]]
    current_page: int
    per_page: int
    total: int
    total_pages: int
    has_next: bool
    has_prev: bool
    has_next_page: Optional[int] = None
    has_prev_page: Optional[int] = None
class AdvancedPagination:
    """高级分页类"""
    def __init__(self, connection):
        self.connection = connection
    def paginate_with_sort(self, table: str, page: int = 1, per_page: int = 10,
                          order_by: str = 'id', order: str = 'DESC',
                          search_field: str = None, search_value: str = None,
                          date_field: str = None, start_date: str = None, 
                          end_date: str = None) -> PaginationResult:
        """
        支持排序和搜索的分页查询
        """
        try:
            with self.connection.cursor() as cursor:
                # 构建查询条件
                conditions = []
                params = []
                # 搜索条件
                if search_field and search_value:
                    conditions.append(f"{search_field} LIKE %s")
                    params.append(f"%{search_value}%")
                # 日期范围条件
                if date_field and start_date and end_date:
                    conditions.append(f"{date_field} BETWEEN %s AND %s")
                    params.extend([start_date, end_date])
                elif date_field and start_date:
                    conditions.append(f"{date_field} >= %s")
                    params.append(start_date)
                elif date_field and end_date:
                    conditions.append(f"{date_field} <= %s")
                    params.append(end_date)
                # 构建WHERE子句
                where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
                # 查询总数
                count_sql = f"SELECT COUNT(*) as total FROM {table}{where_clause}"
                cursor.execute(count_sql, params)
                total = cursor.fetchone()['total']
                # 计算偏移量
                offset = (page - 1) * per_page
                # 参数化查询
                data_sql = f"SELECT * FROM {table}{where_clause} ORDER BY {order_by} {order} LIMIT %s OFFSET %s"
                query_params = params + [per_page, offset]
                cursor.execute(data_sql, query_params)
                data = cursor.fetchall()
                # 计算分页信息
                total_pages = (total + per_page - 1) // per_page
                return PaginationResult(
                    items=data,
                    current_page=page,
                    per_page=per_page,
                    total=total,
                    total_pages=total_pages,
                    has_next=page < total_pages,
                    has_prev=page > 1,
                    has_next_page=page + 1 if page < total_pages else None,
                    has_prev_page=page - 1 if page > 1 else None
                )
        except Exception as e:
            print(f"高级分页查询失败: {e}")
            return None
    def get_page_range(self, current_page: int, total_pages: int, 
                       max_visible: int = 5) -> List[int]:
        """
        获取分页按钮显示范围
         [1, 2, 3, 4, 5] 或 [8, 9, 10, 11, 12]
        """
        if total_pages <= max_visible:
            return list(range(1, total_pages + 1))
        half = max_visible // 2
        start = max(1, current_page - half)
        end = min(total_pages, start + max_visible - 1)
        if end - start + 1 < max_visible:
            start = max(1, end - max_visible + 1)
        return list(range(start, end + 1))

Flask Web应用中的分页实现

from flask import Flask, request, jsonify, render_template
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@localhost/test'
db = SQLAlchemy(app)
# 用户模型
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100))
    email = db.Column(db.String(100))
    created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
@app.route('/users')
def get_users():
    # 获取分页参数
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    # Flask-SQLAlchemy 自带分页
    pagination = User.query.paginate(
        page=page, 
        per_page=per_page, 
        error_out=False
    )
    users = pagination.items
    return jsonify({
        'users': [{
            'id': user.id,
            'name': user.name,
            'email': user.email,
            'created_at': user.created_at.isoformat()
        } for user in users],
        'pagination': {
            'page': pagination.page,
            'per_page': pagination.per_page,
            'total': pagination.total,
            'pages': pagination.pages,
            'has_prev': pagination.has_prev,
            'has_next': pagination.has_next,
            'prev_num': pagination.prev_num,
            'next_num': pagination.next_num
        }
    })
# 前端模板
@app.route('/users/page')
def users_page():
    page = request.args.get('page', 1, type=int)
    per_page = 10
    pagination = User.query.paginate(page=page, per_page=per_page, error_out=False)
    # 计算显示的页码范围
    pages = get_page_range(pagination.page, pagination.pages)
    return render_template('users.html', 
                          users=pagination.items,
                          pagination=pagination,
                          pages=pages)
def get_page_range(current_page, total_pages, max_visible=5):
    """获取显示的页码范围"""
    if total_pages <= max_visible:
        return list(range(1, total_pages + 1))
    half = max_visible // 2
    start = max(1, current_page - half)
    end = min(total_pages, start + max_visible - 1)
    if end - start + 1 < max_visible:
        start = max(1, end - max_visible + 1)
    return list(range(start, end + 1))
if __name__ == '__main__':
    app.run(debug=True)

前端分页模板(HTML)

<!-- templates/users.html -->
<!DOCTYPE html>
<html>
<head>用户列表 - 分页演示</title>
    <style>
        .pagination {
            display: flex;
            list-style: none;
            padding: 0;
            justify-content: center;
            margin-top: 20px;
        }
        .pagination li {
            margin: 0 3px;
        }
        .pagination a, .pagination span {
            display: block;
            padding: 8px 12px;
            text-decoration: none;
            border: 1px solid #ddd;
            color: #333;
            border-radius: 4px;
        }
        .pagination .active a {
            background-color: #007bff;
            color: white;
            border-color: #007bff;
        }
        .pagination .disabled span {
            color: #ccc;
            cursor: not-allowed;
        }
        .pagination a:hover {
            background-color: #f0f0f0;
        }
        table {
            width: 100%;
            border-collapse: collapse;
            margin-top: 20px;
        }
        th, td {
            padding: 10px;
            text-align: left;
            border-bottom: 1px solid #ddd;
        }
        th {
            background-color: #f5f5f5;
        }
    </style>
</head>
<body>
    <h1>用户列表</h1>
    <table>
        <thead>
            <tr>
                <th>ID</th>
                <th>姓名</th>
                <th>邮箱</th>
                <th>创建时间</th>
            </tr>
        </thead>
        <tbody>
            {% for user in users %}
            <tr>
                <td>{{ user.id }}</td>
                <td>{{ user.name }}</td>
                <td>{{ user.email }}</td>
                <td>{{ user.created_at }}</td>
            </tr>
            {% endfor %}
        </tbody>
    </table>
    <!-- 分页导航 -->
    <ul class="pagination">
        {% if pagination.has_prev %}
        <li>
            <a href="{{ url_for('users_page', page=pagination.prev_num) }}">
                上一页
            </a>
        </li>
        {% else %}
        <li class="disabled">
            <span>上一页</span>
        </li>
        {% endif %}
        {% for page_num in pages %}
        <li class="{% if page_num == pagination.page %}active{% endif %}">
            <a href="{{ url_for('users_page', page=page_num) }}">
                {{ page_num }}
            </a>
        </li>
        {% endfor %}
        {% if pagination.has_next %}
        <li>
            <a href="{{ url_for('users_page', page=pagination.next_num) }}">
                下一页
            </a>
        </li>
        {% else %}
        <li class="disabled">
            <span>下一页</span>
        </li>
        {% endif %}
    </ul>
    <!-- 分页信息 -->
    <p style="text-align: center; color: #666;">
        共 {{ pagination.total }} 条记录,第 {{ pagination.page }}/{{ pagination.pages }} 页
    </p>
    <!-- 跳转功能 -->
    <div style="text-align: center; margin-top: 10px;">
        <form method="get" action="{{ url_for('users_page') }}" style="display: inline;">
            <label>跳转到第</label>
            <input type="number" name="page" min="1" max="{{ pagination.pages }}" 
                   style="width: 50px;" value="{{ pagination.page }}">
            <label>页</label>
            <button type="submit">跳转</button>
        </form>
    </div>
</body>
</html>

最佳实践建议

class PaginationUtils:
    """分页工具类"""
    @staticmethod
    def validate_page_params(page: int, per_page: int, 
                            max_per_page: int = 100) -> Tuple[int, int]:
        """
        验证和修正分页参数
        Args:
            page: 页码
            per_page: 每页记录数
            max_per_page: 最大每页记录数
        Returns:
            修正后的 (page, per_page)
        """
        # 确保页码为正数
        page = max(1, page)
        # 限制每页记录数范围
        per_page = max(1, min(per_page, max_per_page))
        return page, per_page
    @staticmethod
    def keyset_pagination(cursor, table: str, last_id: int = 0, 
                          per_page: int = 10, order_by: str = 'id') -> List[Dict]:
        """
        基于游标的分页(Keyset Pagination)
        适用于大数据量,避免OFFSET的性能问题
        Args:
            cursor: 数据库游标
            table: 表名
            last_id: 上一页最后一条记录的ID
            per_page: 每页记录数
            order_by: 排序列
        Returns:
            当前页数据
        """
        sql = f"""
            SELECT * FROM {table}
            WHERE {order_by} > %s
            ORDER BY {order_by} ASC
            LIMIT %s
        """
        cursor.execute(sql, (last_id, per_page))
        return cursor.fetchall()
    @staticmethod
    def calculate_page_info(total: int, page: int, per_page: int) -> Dict:
        """
        计算分页信息
        Returns:
            包含分页信息的字典
        """
        total_pages = (total + per_page - 1) // per_page
        return {
            'total': total,
            'page': page,
            'per_page': per_page,
            'total_pages': total_pages,
            'has_prev': page > 1,
            'has_next': page < total_pages,
            'prev_page': page - 1 if page > 1 else None,
            'next_page': page + 1 if page < total_pages else None,
            'start': (page - 1) * per_page + 1,
            'end': min(page * per_page, total)
        }
# 使用示例
def example_usage():
    # 验证参数
    page, per_page = PaginationUtils.validate_page_params(1, 10)
    # 计算分页信息
    info = PaginationUtils.calculate_page_info(total=100, page=1, per_page=10)
    print(f"分页信息: {info}")
    # 游标分页(适用于无限滚动场景)
    cursor = db.connection.cursor()
    data = PaginationUtils.keyset_pagination(cursor, 'users', last_id=100, per_page=20)

核心要点

  1. LIMIT/OFFSET: 最常用的分页方式,适合中小数据量
  2. 游标分页: 对于大数据量更高效,避免OFFSET性能问题
  3. 参数验证: 始终验证分页参数,防止恶意请求
  4. 总数缓存: 频繁分页时缓存总数,减少数据库查询
  5. 前端交互: 提供完整的页面导航UI,包括跳转功能

选择哪种分页方式取决于你的数据量和性能需求,对于大多数应用,基础的LIMIT/OFFSET分页就足够了。

抱歉,评论功能暂时关闭!