Java Stream API 实战:从入门到性能调优

Java Stream API 实战:从入门到性能调优

一、前言

Java 8 引入的 Stream API 极大地简化了集合操作,让代码更简洁、可读性更强,同时也为并行计算打开了方便之门。这篇文章将带你:

  • 理解 Stream 的核心机制
  • 掌握常用操作符
  • 分析性能陷阱
  • 并行流优化实战

二、什么是 Stream?

Stream 是一种声明性的数据处理方式,专注于数据的流式处理,支持链式调用和函数式编程风格。

其设计灵感类似 Unix shell 中的管道(pipe):

cat data.txt | grep keyword | sort | uniq

Java 中的写法也类似:

list.stream()
    .filter(x -> x.contains("keyword"))
    .sorted()
    .distinct()
    .collect(Collectors.toList());

三、Stream 的工作模型

Stream 分为三类操作:

类型说明是否终止操作
创建stream(), of(), generate()
中间操作map(), filter(), sorted()
终端操作collect(), forEach(), count()

流处理管道图示:

数据源 ──→ 中间操作链 ──→ 终端操作(执行)
           filter
           map
           sorted

四、常用操作实战

1. filter:过滤元素

List<String> result = list.stream()
    .filter(s -> s.startsWith("A"))
    .collect(Collectors.toList());

2. map:元素变换

List<Integer> lengths = list.stream()
    .map(String::length)
    .collect(Collectors.toList());

3. flatMap:扁平化嵌套结构

List<String> words = sentences.stream()
    .flatMap(line -> Arrays.stream(line.split(" ")))
    .collect(Collectors.toList());

4. sorted:排序

list.stream()
    .sorted(Comparator.reverseOrder())
    .forEach(System.out::println);

5. collect:终结流并生成集合

Map<String, Long> group = list.stream()
    .collect(Collectors.groupingBy(
        Function.identity(),
        Collectors.counting()));

五、Stream 是惰性求值的

示例:

Stream<String> stream = list.stream()
    .filter(s -> {
        System.out.println("Filtering: " + s);
        return s.startsWith("A");
    });

没有终止操作时,不会打印任何内容!

只有调用:

stream.collect(Collectors.toList());

才会真正触发执行。


六、串行流 vs 并行流

Stream 默认是串行执行:

list.stream().forEach(...);

使用 parallelStream() 则并行执行:

list.parallelStream().forEach(...);

性能对比图示(建议配图):

元素个数stream() 执行时间parallelStream() 执行时间
10 万120 ms70 ms
100 万1100 ms300 ms

⚠️ 小数据量不推荐并行流,反而可能更慢!


七、Stream 常见陷阱

1. 修改外部变量(副作用)

List<String> result = new ArrayList<>();
list.stream().forEach(result::add); // 不推荐!

应使用:

List<String> result = list.stream().collect(Collectors.toList());

2. 并行流中共享变量导致线程安全问题

AtomicInteger counter = new AtomicInteger();
list.parallelStream().forEach(s -> counter.incrementAndGet());

八、实战示例:统计词频 + 并行优化

Map<String, Long> wordCount = lines.parallelStream()
    .flatMap(line -> Arrays.stream(line.split("\\s+")))
    .map(String::toLowerCase)
    .collect(Collectors.groupingByConcurrent(
        Function.identity(),
        Collectors.counting()));
  • groupingByConcurrent:并发安全分组统计
  • 并行流:提升大文本处理性能

九、调优建议

调优点建议说明
是否并行小数据不并行,大数据才划算
CPU 核心数parallelStream 会默认使用 ForkJoinPool.commonPool()
使用并发收集器groupingByConcurrent, toConcurrentMap

十、总结

  • Stream 是函数式处理集合的强大工具
  • 注意惰性求值、不可变操作
  • 并行流并非越多越好,需结合场景调优

Comments

No comments yet. Why don’t you start the discussion?

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注