本文目录导读:

我来帮您写一个Python脚本,自动将Excel数据导入到数据库,这个脚本支持多种数据库(MySQL、PostgreSQL、SQLite等)。
完整的Excel导入数据库脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel数据导入数据库脚本
支持多种数据库:MySQL, PostgreSQL, SQLite
支持多种Excel格式:.xlsx, .xls
"""
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine, inspect
import argparse
import os
import sys
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ExcelToDatabase:
"""Excel导入数据库工具类"""
def __init__(self, db_type='sqlite', **kwargs):
"""
初始化数据库连接
参数:
db_type: 数据库类型 ('mysql', 'postgresql', 'sqlite')
kwargs: 数据库连接参数
"""
self.db_type = db_type
self.engine = self._create_engine(**kwargs)
def _create_engine(self, **kwargs):
"""创建数据库引擎"""
try:
if self.db_type == 'mysql':
# MySQL连接
connection_str = f"mysql+pymysql://{kwargs['user']}:{kwargs['password']}@{kwargs['host']}:{kwargs.get('port', 3306)}/{kwargs['database']}"
elif self.db_type == 'postgresql':
# PostgreSQL连接
connection_str = f"postgresql://{kwargs['user']}:{kwargs['password']}@{kwargs['host']}:{kwargs.get('port', 5432)}/{kwargs['database']}"
elif self.db_type == 'sqlite':
# SQLite连接
connection_str = f"sqlite:///{kwargs.get('path', 'database.db')}"
else:
raise ValueError(f"不支持的数据库类型: {self.db_type}")
engine = create_engine(connection_str, echo=False)
logger.info(f"成功连接到 {self.db_type} 数据库")
return engine
except Exception as e:
logger.error(f"数据库连接失败: {str(e)}")
sys.exit(1)
def read_excel(self, file_path, sheet_name=0, header=0):
"""
读取Excel文件
参数:
file_path: Excel文件路径
sheet_name: 工作表名称或索引
header: 表头行索引
返回:
DataFrame
"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
# 获取文件扩展名
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header,
dtype=str # 统一转换为字符串避免类型问题
)
logger.info(f"成功读取Excel文件: {file_path}")
logger.info(f"数据形状: {df.shape}")
return df
else:
raise ValueError(f"不支持的文件格式: {file_ext}")
except Exception as e:
logger.error(f"读取Excel文件失败: {str(e)}")
sys.exit(1)
def clean_dataframe(self, df):
"""
清洗DataFrame数据
参数:
df: 原始DataFrame
返回:
清洗后的DataFrame
"""
# 去除列名两端的空格
df.columns = df.columns.str.strip()
# 去除字符串数据两端的空格
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].str.strip()
# 处理空值
df = df.fillna('') # 将空值替换为空字符串
# 删除空列
df = df.dropna(axis=1, how='all')
logger.info(f"数据清洗完成,处理后形状: {df.shape}")
return df
def table_exists(self, table_name):
"""检查表是否存在"""
inspector = inspect(self.engine)
return table_name in inspector.get_table_names()
def import_to_database(self, df, table_name, if_exists='append',
chunksize=1000, create_table=True):
"""
将DataFrame导入数据库
参数:
df: 要导入的DataFrame
table_name: 目标表名
if_exists: 表存在时的处理方式
('fail': 失败, 'replace': 替换, 'append': 追加)
chunksize: 每批次导入的行数
create_table: 是否自动创建表
"""
try:
# 数据清洗
df = self.clean_dataframe(df)
# 检查表是否存在
table_exists = self.table_exists(table_name)
if not table_exists and not create_table:
raise ValueError(f"表 {table_name} 不存在且不允许自动创建")
# 导入数据
logger.info(f"开始导入数据到表 {table_name}...")
df.to_sql(
name=table_name,
con=self.engine,
if_exists=if_exists,
index=False,
chunksize=chunksize,
method='multi' # 使用批量插入提高性能
)
# 统计导入结果
row_count = len(df)
logger.info(f"成功导入 {row_count} 条数据到表 {table_name}")
return {
'status': 'success',
'table': table_name,
'rows_inserted': row_count,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
logger.error(f"数据库导入失败: {str(e)}")
return {
'status': 'error',
'error': str(e),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def close(self):
"""关闭数据库连接"""
if self.engine:
self.engine.dispose()
logger.info("数据库连接已关闭")
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='将Excel数据导入到数据库中',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
1. SQLite数据库:
python excel_to_db.py --db sqlite --path data.db --excel data.xlsx --table users
2. MySQL数据库:
python excel_to_db.py --db mysql --host localhost --user root --password 123456 --database test --excel data.xlsx --table users
3. PostgreSQL数据库:
python excel_to_db.py --db postgresql --host localhost --user postgres --password 123456 --database test --excel data.xlsx --table users
4. 指定工作表:
python excel_to_db.py --db sqlite --excel data.xlsx --table users --sheet "Sheet2"
5. 替换已有表:
python excel_to_db.py --db sqlite --excel data.xlsx --table users --mode replace
"""
)
# 数据库参数
parser.add_argument('--db', choices=['sqlite', 'mysql', 'postgresql'],
default='sqlite', help='数据库类型 (默认: sqlite)')
# SQLite参数
parser.add_argument('--path', default='database.db', help='SQLite数据库文件路径')
# MySQL/PostgreSQL参数
parser.add_argument('--host', default='localhost', help='数据库主机地址')
parser.add_argument('--port', type=int, help='数据库端口')
parser.add_argument('--user', help='数据库用户名')
parser.add_argument('--password', help='数据库密码')
parser.add_argument('--database', help='数据库名称')
# Excel参数
parser.add_argument('--excel', required=True, help='Excel文件路径')
parser.add_argument('--sheet', default=0, help='工作表名称或索引 (默认: 第一个工作表)')
parser.add_argument('--header', type=int, default=0, help='表头行索引 (默认: 0)')
# 导入参数
parser.add_argument('--table', required=True, help='目标表名')
parser.add_argument('--mode', choices=['append', 'replace', 'fail'],
default='append', help='表存在时的处理方式 (默认: append)')
parser.add_argument('--chunksize', type=int, default=1000,
help='每批次导入行数 (默认: 1000)')
return parser.parse_args()
def main():
"""主函数"""
# 解析参数
args = parse_arguments()
# 构建数据库连接参数
db_kwargs = {}
if args.db == 'sqlite':
db_kwargs['path'] = args.path
else:
db_kwargs['host'] = args.host
db_kwargs['user'] = args.user
db_kwargs['password'] = args.password
db_kwargs['database'] = args.database
if args.port:
db_kwargs['port'] = args.port
try:
# 创建导入工具实例
importer = ExcelToDatabase(db_type=args.db, **db_kwargs)
# 读取Excel数据
df = importer.read_excel(args.excel, args.sheet, args.header)
# 导入数据库
result = importer.import_to_database(
df,
table_name=args.table,
if_exists=args.mode,
chunksize=args.chunksize
)
# 输出结果
if result['status'] == 'success':
print(f"\n✅ 导入成功!")
print(f" 表名: {result['table']}")
print(f" 导入行数: {result['rows_inserted']}")
print(f" 时间: {result['timestamp']}")
else:
print(f"\n❌ 导入失败!")
print(f" 错误信息: {result['error']}")
except Exception as e:
logger.error(f"程序执行失败: {str(e)}")
sys.exit(1)
finally:
# 关闭数据库连接
if 'importer' in locals():
importer.close()
if __name__ == "__main__":
main()
安装依赖
# 安装必要的Python包 pip install pandas openpyxl sqlalchemy pymysql psycopg2-binary
使用示例
导入到SQLite数据库
# 基本用法 python excel_to_db.py --db sqlite --excel data.xlsx --table users # 指定数据库文件名 python excel_to_db.py --db sqlite --path mydatabase.db --excel data.xlsx --table users
导入到MySQL数据库
python excel_to_db.py --db mysql --host localhost --user root --password 123456 --database testdb --excel data.xlsx --table users
导入到PostgreSQL数据库
python excel_to_db.py --db postgresql --host localhost --user postgres --password 123456 --database testdb --excel data.xlsx --table users
高级用法
# 替换已有表 python excel_to_db.py --db sqlite --excel data.xlsx --table users --mode replace # 指定工作表 python excel_to_db.py --db sqlite --excel data.xlsx --table users --sheet "Sheet2" # 设置批次大小 python excel_to_db.py --db sqlite --excel data.xlsx --table users --chunksize 500
扩展功能:批量导入多个Excel文件
# batch_import.py
import os
import glob
from excel_to_db import ExcelToDatabase
def batch_import_excel_files(directory, pattern="*.xlsx", db_config=None):
"""
批量导入Excel文件到数据库
参数:
directory: Excel文件目录
pattern: 文件匹配模式
db_config: 数据库配置
"""
# 连接数据库
importer = ExcelToDatabase(**db_config)
# 查找所有Excel文件
excel_files = glob.glob(os.path.join(directory, pattern))
print(f"找到 {len(excel_files)} 个Excel文件")
for file_path in excel_files:
try:
# 使用文件名作为表名(不含扩展名)
table_name = os.path.splitext(os.path.basename(file_path))[0]
print(f"\n正在处理: {file_path}")
print(f"目标表: {table_name}")
# 读取并导入
df = importer.read_excel(file_path)
result = importer.import_to_database(df, table_name, if_exists='replace')
if result['status'] == 'success':
print(f"✓ 导入成功: {result['rows_inserted']} 条记录")
else:
print(f"✗ 导入失败: {result['error']}")
except Exception as e:
print(f"✗ 处理失败 {file_path}: {str(e)}")
importer.close()
# 使用示例
if __name__ == "__main__":
# 配置数据库连接
db_config = {
'db_type': 'sqlite',
'path': 'merged_database.db'
}
# 批量导入
batch_import_excel_files('./excel_files/', '*.xlsx', db_config)
这个脚本具有以下特点:
- 多数据库支持:MySQL、PostgreSQL、SQLite
- 自动清洗数据:处理空值、空格等
- 批量导入:支持大批量数据的高效导入
- 错误处理:完善的异常处理机制
- 日志记录:详细的导入过程日志
- 灵活配置:支持命令行参数配置
使用前请确保已安装所需的Python包,并根据您的数据库类型配置相应的连接信息。