如何用Java案例实现数据聚合?

wen java案例 3

本文目录导读:

如何用Java案例实现数据聚合?

  1. 使用Stream API进行聚合
  2. 使用自定义聚合器
  3. 大数据量下的高效聚合
  4. 使用Apache Commons Collections进行聚合
  5. 实时数据聚合(模拟Kafka Streams风格)
  6. 性能优化建议

我来给你讲解Java中实现数据聚合的几种常见方式,并提供完整的代码案例。

使用Stream API进行聚合

这是Java 8+最常用的数据聚合方式。

import java.util.*;
import java.util.stream.*;
// 员工类
class Employee {
    private String name;
    private String department;
    private double salary;
    private int age;
    public Employee(String name, String department, double salary, int age) {
        this.name = name;
        this.department = department;
        this.salary = salary;
        this.age = age;
    }
    // getters
    public String getName() { return name; }
    public String getDepartment() { return department; }
    public double getSalary() { return salary; }
    public int getAge() { return age; }
}
public class StreamAggregationDemo {
    public static void main(String[] args) {
        // 准备数据
        List<Employee> employees = Arrays.asList(
            new Employee("张三", "技术部", 15000, 28),
            new Employee("李四", "技术部", 18000, 32),
            new Employee("王五", "市场部", 12000, 25),
            new Employee("赵六", "市场部", 16000, 30),
            new Employee("钱七", "技术部", 20000, 35),
            new Employee("孙八", "人事部", 10000, 27)
        );
        // 1. 按部门分组并计算平均薪资
        Map<String, Double> avgSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.averagingDouble(Employee::getSalary)
            ));
        System.out.println("各部门平均薪资: " + avgSalaryByDept);
        // 2. 按部门分组并统计人数
        Map<String, Long> countByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.counting()
            ));
        System.out.println("各部门人数: " + countByDept);
        // 3. 按部门分组汇总薪资总额
        Map<String, Double> totalSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.summingDouble(Employee::getSalary)
            ));
        System.out.println("各部门薪资总额: " + totalSalaryByDept);
        // 4. 多级分组:按部门和年龄段
        Map<String, Map<String, List<Employee>>> complexGroup = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.groupingBy(emp -> {
                    if (emp.getAge() < 30) return "青年";
                    else if (emp.getAge() < 40) return "中年";
                    else return "老年";
                })
            ));
        System.out.println("多级分组结果: " + complexGroup);
        // 5. 汇总统计
        DoubleSummaryStatistics stats = employees.stream()
            .collect(Collectors.summarizingDouble(Employee::getSalary));
        System.out.println("薪资统计: 总计=" + stats.getSum() + 
                           ", 平均=" + stats.getAverage() + 
                           ", 最大=" + stats.getMax() + 
                           ", 最小=" + stats.getMin());
    }
}

使用自定义聚合器

当需要更复杂的聚合逻辑时,可以自定义聚合器。

import java.util.*;
import java.util.stream.*;
// 销售订单类
class Order {
    private String orderId;
    private String product;
    private int quantity;
    private double unitPrice;
    private String category;
    public Order(String orderId, String product, int quantity, double unitPrice, String category) {
        this.orderId = orderId;
        this.product = product;
        this.quantity = quantity;
        this.unitPrice = unitPrice;
        this.category = category;
    }
    // getters
    public String getOrderId() { return orderId; }
    public String getProduct() { return product; }
    public int getQuantity() { return quantity; }
    public double getUnitPrice() { return unitPrice; }
    public String getCategory() { return category; }
    public double getTotalPrice() { return quantity * unitPrice; }
}
public class CustomAggregationDemo {
    public static void main(String[] args) {
        List<Order> orders = Arrays.asList(
            new Order("001", "笔记本电脑", 2, 5000, "电子产品"),
            new Order("002", "手机", 5, 3000, "电子产品"),
            new Order("003", "书籍", 10, 50, "学习用品"),
            new Order("004", "办公桌", 3, 800, "家具"),
            new Order("005", "耳机", 8, 200, "电子产品"),
            new Order("006", "笔记本", 20, 10, "学习用品")
        );
        // 自定义聚合:按类别统计销售总额和平均单价
        Map<String, SalesSummary> summaryByCategory = orders.stream()
            .collect(Collectors.groupingBy(
                Order::getCategory,
                Collectors.collectingAndThen(
                    Collectors.toList(),
                    list -> {
                        double totalSales = list.stream()
                            .mapToDouble(Order::getTotalPrice)
                            .sum();
                        double totalQuantity = list.stream()
                            .mapToInt(Order::getQuantity)
                            .sum();
                        return new SalesSummary(totalSales, totalQuantity, list.size());
                    }
                )
            ));
        System.out.println("按类别销售汇总:");
        summaryByCategory.forEach((category, summary) -> 
            System.out.printf("%s: 总销售=%.2f, 总数量=%.0f, 订单数=%d%n",
                category, summary.getTotalSales(), summary.getTotalQuantity(), summary.getOrderCount()));
    }
}
// 销售汇总类
class SalesSummary {
    private double totalSales;
    private double totalQuantity;
    private int orderCount;
    public SalesSummary(double totalSales, double totalQuantity, int orderCount) {
        this.totalSales = totalSales;
        this.totalQuantity = totalQuantity;
        this.orderCount = orderCount;
    }
    public double getTotalSales() { return totalSales; }
    public double getTotalQuantity() { return totalQuantity; }
    public int getOrderCount() { return orderCount; }
}

大数据量下的高效聚合

当处理大量数据时,建议使用并行流或考虑性能优化。

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class EfficientAggregationDemo {
    // 模拟大量数据
    static List<Integer> generateLargeData(int size) {
        Random random = new Random();
        return random.ints(size, 1, 1000)
                    .boxed()
                    .collect(Collectors.toList());
    }
    public static void main(String[] args) {
        // 生成100万条数据
        List<Integer> largeData = generateLargeData(1_000_000);
        // 1. 并行流聚合(适合大数据量)
        long startTime = System.currentTimeMillis();
        // 按数值范围分组统计
        Map<String, Long> parallelResult = largeData.parallelStream()
            .collect(Collectors.groupingBy(
                num -> {
                    if (num < 300) return "low";
                    else if (num < 600) return "medium";
                    else return "high";
                },
                Collectors.counting()
            ));
        long endTime = System.currentTimeMillis();
        System.out.println("并行流聚合耗时: " + (endTime - startTime) + "ms");
        System.out.println("聚合结果: " + parallelResult);
        // 2. 使用Collectors.teeing(Java 12+)
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 同时计算总和和平均数
        var result = numbers.stream()
            .collect(Collectors.teeing(
                Collectors.summingInt(i -> i),
                Collectors.averagingInt(i -> i),
                (sum, avg) -> "总和: " + sum + ", 平均数: " + avg
            ));
        System.out.println("\nTeeing聚合结果: " + result);
    }
}

使用Apache Commons Collections进行聚合

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Transformer;
import java.util.*;
public class CommonsAggregationDemo {
    public static void main(String[] args) {
        Map<String, Integer> salesData = new HashMap<>();
        salesData.put("产品A", 100);
        salesData.put("产品B", 200);
        salesData.put("产品C", 150);
        salesData.put("产品D", 300);
        salesData.put("产品E", 80);
        // 使用Apache Commons进行聚合
        // 按销售量区间分组
        Map<String, List<Map.Entry<String, Integer>>> grouped = 
            new HashMap<>();
        salesData.entrySet().stream()
            .forEach(entry -> {
                String range = getRange(entry.getValue());
                grouped.computeIfAbsent(range, k -> new ArrayList<>())
                       .add(entry);
            });
        System.out.println("销售量分组统计:");
        grouped.forEach((range, items) -> {
            int total = items.stream()
                .mapToInt(Map.Entry::getValue)
                .sum();
            System.out.printf("%s: 产品数=%d, 总销量=%d%n", 
                range, items.size(), total);
        });
    }
    private static String getRange(int value) {
        if (value < 100) return "0-99";
        else if (value < 200) return "100-199";
        else if (value < 300) return "200-299";
        else return "300+";
    }
}

实时数据聚合(模拟Kafka Streams风格)

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
// 实时数据聚合(模拟流式处理)
public class RealTimeAggregationDemo {
    static class Event {
        private long timestamp;
        private String type;
        private double value;
        public Event(long timestamp, String type, double value) {
            this.timestamp = timestamp;
            this.type = type;
            this.value = value;
        }
        public String getType() { return type; }
        public double getValue() { return value; }
    }
    // 模拟实时数据流
    static class DataStream {
        private final Random random = new Random();
        private final String[] types = {"click", "purchase", "view"};
        public Event nextEvent() {
            return new Event(
                System.currentTimeMillis(),
                types[random.nextInt(types.length)],
                random.nextDouble() * 1000
            );
        }
    }
    public static void main(String[] args) throws InterruptedException {
        DataStream stream = new DataStream();
        // 使用滑动窗口进行实时聚合(每10秒聚合一次)
        Map<String, double[]> slidingWindow = new HashMap<>();
        System.out.println("开始实时数据聚合(模拟5秒数据):");
        for (int i = 0; i < 100; i++) {
            Event event = stream.nextEvent();
            // 更新滑动窗口
            slidingWindow.compute(event.getType(), (key, values) -> {
                if (values == null) {
                    values = new double[]{0, 0, 0}; // [count, sum, max]
                }
                values[0]++; // count
                values[1] += event.getValue(); // sum
                values[2] = Math.max(values[2], event.getValue()); // max
                return values;
            });
            // 每20个事件输出一次聚合结果
            if (i % 20 == 19) {
                System.out.println("\n第" + ((i / 20) + 1) + "次聚合结果:");
                slidingWindow.forEach((type, stats) -> {
                    double avg = stats[0] > 0 ? stats[1] / stats[0] : 0;
                    System.out.printf("%s: 事件数=%.0f, 总和=%.2f, 平均=%.2f, 最大=%.2f%n",
                        type, stats[0], stats[1], avg, stats[2]);
                });
            }
            Thread.sleep(50); // 模拟50ms处理一个事件
        }
    }
}

性能优化建议

  1. 选择合适的聚合方式

    // 小数据量使用串行流
    // 大数据量(>10000条)使用并行流
  2. 避免不必要的装箱拆箱

    // 推荐使用原始类型流
    IntStream.range(0, 1000).sum();
    // 而不是
    Stream.iterate(0, i -> i + 1).limit(1000).mapToInt(i -> i).sum();
  3. 使用合适的数据结构

    // 按需选择
    Map<String, List<Item>> grouped = data.stream()
        .collect(Collectors.groupingBy(
            Item::getCategory,
            () -> new LinkedHashMap<>(), // 保持插入顺序
            Collectors.toList()
        ));

这些案例覆盖了Java中数据聚合的常见场景,根据你的具体需求,可以选择合适的方式来实现。

抱歉,评论功能暂时关闭!