本文目录导读:

我可以帮你提供几种Python批量新增数据的方法,下面我给出几个常见场景的示例:
批量新增到Excel文件
import pandas as pd
from openpyxl import load_workbook
# 方法1:使用pandas
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 28, 35],
'城市': ['北京', '上海', '广州', '深圳']
}
df = pd.DataFrame(data)
df.to_excel('新增数据.xlsx', index=False, sheet_name='Sheet1')
# 方法2:追加到已有Excel
book = load_workbook('已有文件.xlsx')
writer = pd.ExcelWriter('已有文件.xlsx', engine='openpyxl')
writer.book = book
df.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row)
writer.save()
批量新增到数据库(MySQL示例)
import mysql.connector
from mysql.connector import Error
def batch_insert_to_mysql(data_list):
try:
connection = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_user',
password='your_password'
)
cursor = connection.cursor()
# 批量插入SQL
sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
# 准备数据
records = [
('张三', 25, 'zhangsan@email.com'),
('李四', 30, 'lisi@email.com'),
('王五', 28, 'wangwu@email.com')
]
# 批量执行
cursor.executemany(sql, records)
connection.commit()
print(f"成功插入 {cursor.rowcount} 条数据")
except Error as e:
print(f"错误: {e}")
connection.rollback()
finally:
if connection.is_connected():
cursor.close()
connection.close()
批量新增到JSON文件
import json
def batch_add_to_json(new_data, json_file='data.json'):
# 读取现有数据
try:
with open(json_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = []
# 添加新数据
existing_data.extend(new_data)
# 写回文件
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=2)
print(f"成功添加 {len(new_data)} 条数据,总计 {len(existing_data)} 条")
# 使用示例
new_students = [
{'id': 4, 'name': '小明', 'score': 95},
{'id': 5, 'name': '小红', 'score': 88},
{'id': 6, 'name': '小刚', 'score': 92}
]
batch_add_to_json(new_students)
批量生成测试数据
import random
import string
import pandas as pd
def generate_test_data(count=100):
"""生成测试数据"""
data = []
for i in range(count):
record = {
'ID': i + 1,
'姓名': ''.join(random.choices('张三丰李四王五赵六', k=2)),
'年龄': random.randint(18, 60),
'工资': random.randint(5000, 20000),
'部门': random.choice(['技术部', '市场部', '行政部', '财务部']),
'入职日期': f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}'
}
data.append(record)
return data
# 生成并保存
test_data = generate_test_data(1000)
df = pd.DataFrame(test_data)
df.to_excel('测试数据.xlsx', index=False)
print("测试数据生成完成!")
批量新增到CSV文件
import csv
def batch_append_to_csv(data_list, filename='data.csv'):
"""批量追加数据到CSV文件"""
# 检查文件是否存在
file_exists = False
try:
with open(filename, 'r') as f:
file_exists = True
except FileNotFoundError:
pass
# 写入数据
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data_list[0].keys())
# 如果文件不存在,写入表头
if not file_exists:
writer.writeheader()
# 批量写入数据
writer.writerows(data_list)
print(f"成功追加 {len(data_list)} 条数据到 {filename}")
# 使用示例
new_records = [
{'name': 'Tom', 'age': 25, 'city': 'Beijing'},
{'name': 'Jerry', 'age': 28, 'city': 'Shanghai'},
{'name': 'Alice', 'age': 23, 'city': 'Guangzhou'}
]
batch_append_to_csv(new_records)
性能优化的批量插入
import time
import sqlite3
def batch_insert_optimized(data_list, batch_size=1000):
"""优化批量插入性能"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
age INTEGER
)
''')
start_time = time.time()
# 分批插入
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i+batch_size]
# 使用事务
conn.execute('BEGIN TRANSACTION')
for data in batch:
cursor.execute(
"INSERT INTO users (name, age) VALUES (?, ?)",
(data['name'], data['age'])
)
conn.commit()
end_time = time.time()
print(f"插入 {len(data_list)} 条数据耗时: {end_time - start_time:.2f} 秒")
conn.close()
# 生成大量测试数据
test_data = [{'name': f'User{i}', 'age': i % 50 + 18} for i in range(10000)]
batch_insert_optimized(test_data)
- 选择合适的工具:根据数据存储位置选择对应的方法
- 使用事务:数据库操作使用事务提高性能
- 分批处理:处理大量数据时分批进行
- 异常处理:添加适当的错误处理
- 性能优化:使用
executemany()等批量操作方法
你需要针对哪种具体场景的批量新增?我可以提供更针对性的代码示例。