Streams(流)

java.util.Stream 表示能应用在一组元素上一次执行的操作序列。Stream 操作分为中间操作或者最终操作两种,最终操作返回一特定类型的计算结果,而中间操作返回Stream本身,这样你就可以将多个操作依次串起来。Stream 的创建需要指定一个数据源,比如 java.util.Collection 的子类,List 或者 Set, Map 不支持。Stream 的操作可以串行执行或者并行执行。

首先看看Stream是怎么用,首先创建实例代码需要用到的数据List:

1
2
3
4
5
6
7
8
9
List<String> stringList = new ArrayList<>();
stringList.add("ddd2");
stringList.add("aaa2");
stringList.add("bbb1");
stringList.add("aaa1");
stringList.add("bbb3");
stringList.add("ccc");
stringList.add("bbb2");
stringList.add("ddd1");

Java 8扩展了集合类,可以通过 Collection.stream() 或者 Collection.parallelStream() 来创建一个Stream。下面解释常用的Stream操作:

Filter(过滤)

过滤通过一个predicate接口来过滤并只保留符合条件的元素,该操作属于中间操作,所以我们可以在过滤后的结果来应用其他Stream操作(比如forEach)。forEach需要一个函数来对过滤后的元素依次执行。forEach是一个最终操作,所以我们不能在forEach之后来执行其他Stream操作。

1
2
3
4
5
// 测试 Filter(过滤)
stringList
.stream()
.filter((s) -> s.startsWith("a"))
.forEach(System.out::println);//aaa2 aaa1

forEach 是为 Lambda 而设计的,保持了最紧凑的风格。而且 Lambda 表达式本身是可以重用的,非常方便。

Sorted(排序)

排序是一个 中间操作,返回的是排序好后的 Stream。如果你不指定一个自定义的 Comparator 则会使用默认排序。

1
2
3
4
5
6
// 测试 Sort (排序)
stringList
.stream()
.sorted()
.filter((s) -> s.startsWith("a"))
.forEach(System.out::println);// aaa1 aaa2

**自定义Comparator **

1
2
3
4
list.stream()
.sorted(Comparator.comparing(Student::getAge)) //正序
list.stream()
.sorted(Comparator.comparing(Student::getAge).reversed()) //逆序

补充:当然还可以不用借助steam方式直接排序

1
2
list.sort(Comparator.comparing(Integer::intValue)); //正序
list.sort(Comparator.comparing(Integer::intValue).reversed());//逆序

需要注意的是,排序只创建了一个排列好后的Stream,而不会影响原有的数据源,排序之后原数据stringList是不会被修改的:

1
System.out.println(stringList);// ddd2, aaa2, bbb1, aaa1, bbb3, ccc, bbb2, ddd1

Map(映射)

中间操作 map 会将元素根据指定的 Function 接口来依次将元素转成另外的对象。

下面的示例展示了将字符串转换为大写字符串。你也可以通过map来将对象转换成其他类型,map返回的Stream类型是根据你map传递进去的函数的返回值决定的。

1
2
3
4
5
6
// 测试 Map 操作
stringList
.stream()
.map(String::toUpperCase)
.sorted((a, b) -> b.compareTo(a))
.forEach(System.out::println);// "DDD2", "DDD1", "CCC", "BBB3", "BBB2", "BBB1", "AAA2", "AAA1"

Match(匹配)

Stream提供了多种匹配操作,允许检测指定的Predicate是否匹配整个Stream。所有的匹配操作都是 最终操作 ,并返回一个 boolean 类型的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 测试 Match (匹配)操作
boolean anyStartsWithA =
stringList
.stream()
.anyMatch((s) -> s.startsWith("a"));
System.out.println(anyStartsWithA); // true

boolean allStartsWithA =
stringList
.stream()
.allMatch((s) -> s.startsWith("a"));

System.out.println(allStartsWithA); // false

boolean noneStartsWithZ =
stringList
.stream()
.noneMatch((s) -> s.startsWith("z"));

System.out.println(noneStartsWithZ); // true

Count(计数)

计数是一个 最终操作,返回Stream中元素的个数,返回值类型是 long

1
2
3
4
5
6
7
//测试 Count (计数)操作
long startsWithB =
stringList
.stream()
.filter((s) -> s.startsWith("b"))
.count();
System.out.println(startsWithB); // 3

Reduce(规约)

这是一个 最终操作 ,允许通过指定的函数来将stream中的多个元素规约为一个元素,规约后的结果是通过Optional 接口表示的:

1
2
3
4
5
//测试 Reduce (规约)操作
Optional<String> reduced = stringList.stream()
.sorted()
.reduce((s1, s2) -> s1 + "#" + s2);
reduced.ifPresent(System.out::println);

译者注: 这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于Integer sum = integers.reduce(0, (a, b) -> a+b);也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。

1
2
3
4
5
6
7
8
9
10
11
12
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F").
filter(x -> x.compareTo("Z") > 0).
reduce("", String::concat);

上面代码例如第一个示例的 reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为 BinaryOperator。这类有起始值的 reduce() 都返回具体的对象。而对于第四个示例没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional,请留意这个区别。更多内容查看: IBM:Java 8 中的 Streams API 详解

Parallel Streams(并行流)

前面提到过Stream有串行和并行两种,串行Stream上的操作是在一个线程中依次完成,而并行Stream则是在多个线程上同时执行。

下面的例子展示了是如何通过并行Stream来提升性能:

首先我们创建一个没有重复元素的大表:

1
2
3
4
5
6
int max = 1000000;
List<String> values = new ArrayList<>(max);
for (int i = 0; i < max; i++) {
UUID uuid = UUID.randomUUID();
values.add(uuid.toString());
}

我们分别用串行和并行两种方式对其进行排序,最后看看所用时间的对比。

Sequential Sort(串行排序)

1
2
3
4
5
6
7
8
9
10
11
//串行排序
long t0 = System.nanoTime();
long count = values.stream().sorted().count();
System.out.println(count);

long t1 = System.nanoTime();

long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
System.out.println(String.format("sequential sort took: %d ms", millis));
1000000
sequential sort took: 709 ms//串行排序所用的时间

Parallel Sort(并行排序)

1
2
3
4
5
6
7
8
9
10
11
12
//并行排序
long t0 = System.nanoTime();

long count = values.parallelStream().sorted().count();
System.out.println(count);

long t1 = System.nanoTime();

long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
System.out.println(String.format("parallel sort took: %d ms", millis));
1000000
parallel sort took: 475 ms//串行排序所用的时间

上面两个代码几乎是一样的,但是并行版的快了 50% 左右,唯一需要做的改动就是将 stream() 改为parallelStream()