本文目录导读:

我来介绍几种Java实现批量数据处理的常用方案,包含具体案例。
使用JDBC批量操作
批量插入示例
public class BatchInsertExample {
public void batchInsert(List<User> users) {
String sql = "INSERT INTO users (name, email, age) VALUES (?, ?, ?)";
try (Connection conn = getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
// 关闭自动提交
conn.setAutoCommit(false);
int batchSize = 1000; // 每批处理1000条
int count = 0;
for (User user : users) {
pstmt.setString(1, user.getName());
pstmt.setString(2, user.getEmail());
pstmt.setInt(3, user.getAge());
pstmt.addBatch();
count++;
// 每1000条执行一次批量操作
if (count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
pstmt.clearBatch();
}
}
// 处理剩余数据
if (count % batchSize != 0) {
pstmt.executeBatch();
conn.commit();
}
} catch (SQLException e) {
// 发生错误时回滚
if (conn != null) {
try {
conn.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
e.printStackTrace();
}
}
}
使用Stream API处理大数据集合
分批处理示例
import java.util.*;
import java.util.stream.*;
public class StreamBatchProcessor {
public <T> void processInBatches(List<T> items, int batchSize,
Consumer<List<T>> processor) {
int totalSize = items.size();
IntStream.range(0, (totalSize + batchSize - 1) / batchSize)
.mapToObj(i -> items.subList(
i * batchSize,
Math.min((i + 1) * batchSize, totalSize)
))
.forEach(processor);
}
// 使用示例
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 10001)
.boxed()
.collect(Collectors.toList());
StreamBatchProcessor processor = new StreamBatchProcessor();
// 每批处理500个元素
processor.processInBatches(numbers, 500, batch -> {
System.out.println("Processing batch of " + batch.size() + " items");
// 处理这批数据
batch.forEach(num -> {
// 执行处理逻辑
System.out.println("Processing: " + num);
});
});
}
}
使用ExecutorService并行处理
import java.util.concurrent.*;
import java.util.*;
public class ParallelBatchProcessor {
private final ExecutorService executor;
private final int batchSize;
public ParallelBatchProcessor(int threadCount, int batchSize) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.batchSize = batchSize;
}
public <T> void processDataset(List<T> dataset,
Function<List<T>, Boolean> processor)
throws InterruptedException {
List<Future<Boolean>> futures = new ArrayList<>();
// 分割数据集并提交到线程池
for (int i = 0; i < dataset.size(); i += batchSize) {
int end = Math.min(i + batchSize, dataset.size());
List<T> batch = dataset.subList(i, end);
Callable<Boolean> task = () -> processor.apply(batch);
futures.add(executor.submit(task));
}
// 等待所有任务完成
for (Future<Boolean> future : futures) {
try {
future.get(); // 获取结果,检查是否成功
} catch (ExecutionException e) {
System.err.println("Batch processing failed: " + e.getMessage());
}
}
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 使用示例
public static void main(String[] args) throws InterruptedException {
List<Integer> data = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
data.add(i);
}
ParallelBatchProcessor processor = new ParallelBatchProcessor(4, 500);
processor.processDataset(data, batch -> {
System.out.println(Thread.currentThread().getName() +
" processing " + batch.size() + " items");
for (Integer item : batch) {
// 模拟处理逻辑
System.out.println("Processing: " + item);
}
return true;
});
processor.shutdown();
}
}
使用Spring Batch批量处理框架
配置文件
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ItemReader<User> reader() {
return new JdbcCursorItemReaderBuilder<User>()
.dataSource(dataSource)
.name("userReader")
.sql("SELECT id, name, email FROM users")
.rowMapper(new UserRowMapper())
.build();
}
@Bean
public ItemProcessor<User, ProcessedUser> processor() {
return user -> {
// 处理逻辑
return new ProcessedUser(user.getName().toUpperCase(),
user.getEmail(),
calculateAge(user.getBirthDate()));
};
}
@Bean
public ItemWriter<ProcessedUser> writer() {
return new RepositoryItemWriterBuilder<ProcessedUser>()
.repository(userRepository)
.methodName("save")
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, ProcessedUser>chunk(1000) // 每1000条一批
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1())
.end()
.build();
}
}
文件批量处理示例
public class FileBatchProcessor {
public void processLargeFile(String filePath, int batchSize)
throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
List<String> batch = new ArrayList<>(batchSize);
String line;
int lineCount = 0;
try {
while ((line = reader.readLine()) != null) {
batch.add(line);
lineCount++;
if (lineCount % batchSize == 0) {
processBatch(batch);
batch.clear();
}
}
// 处理最后一批
if (!batch.isEmpty()) {
processBatch(batch);
}
} finally {
reader.close();
}
}
private void processBatch(List<String> batch) {
// 处理一批数据
batch.parallelStream().forEach(line -> {
// 处理每一行数据
System.out.println("Processing: " + line);
});
}
}
完整的分批工具类
public class BatchUtils {
/**
* 将集合分为指定大小的批次
*/
public static <T> List<List<T>> partition(List<T> list, int batchSize) {
List<List<T>> batches = new ArrayList<>();
int totalSize = list.size();
for (int i = 0; i < totalSize; i += batchSize) {
int end = Math.min(i + batchSize, totalSize);
batches.add(new ArrayList<>(list.subList(i, end)));
}
return batches;
}
/**
* 批量处理并统计结果
*/
public static <T, R> BatchResult<R> processBatch(
List<T> items,
int batchSize,
Function<List<T>, List<R>> processor) {
BatchResult<R> result = new BatchResult<>();
List<List<T>> batches = partition(items, batchSize);
for (List<T> batch : batches) {
try {
List<R> processedItems = processor.apply(batch);
result.addSuccessItems(processedItems);
result.incrementProcessed(batch.size());
} catch (Exception e) {
result.addFailedBatch(batch);
result.incrementFailed(batch.size());
System.err.println("Batch processing failed: " + e.getMessage());
}
}
return result;
}
// 结果统计类
public static class BatchResult<R> {
private int totalProcessed = 0;
private int totalFailed = 0;
private List<R> successItems = new ArrayList<>();
private List<List<?>> failedBatches = new ArrayList<>();
// getters and setters
// ...
}
}
// 使用示例
public class BatchProcessingDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 1001)
.boxed()
.collect(Collectors.toList());
BatchUtils.BatchResult<String> result =
BatchUtils.processBatch(numbers, 100, batch -> {
List<String> processed = new ArrayList<>();
for (Integer num : batch) {
processed.add("Number: " + num);
// 模拟处理时间
Thread.sleep(10);
}
return processed;
});
System.out.println("Processed: " + result.getTotalProcessed());
System.out.println("Failed: " + result.getTotalFailed());
}
}
性能优化建议
- 选择合适的批次大小:通常1000-5000条之间性能较好
- 使用预编译SQL:提高数据库操作性能
- 关闭自动提交:批量操作时手动控制事务
- 使用索引:确保查询和更新操作使用合适的索引
- 内存控制:注意JVM堆内存,避免OOM
- 异常处理:实现完善的错误处理和回滚机制
这些方案可以根据实际需求选择使用,对于特别大的数据集,建议使用Spring Batch等专业框架。