Stream 是什么?
Stream 是Java 8的新特性之一,是对容器对象功能的增强,借助Lambda表达式,以函数式的方式处理数据,以提高广大程序员的生产力。
-
stream 将要处理的元素当做流;
-
借助steam api对流元素进行中间操作,比如筛选、排序、聚合等;
-
提高了开发效率和程序的可读性
-
提供串行和并行两种模式
什么是流?
Stream不是集合元素,它不是数据结构,并不保存数据,它是有关算法和计算的(可以理解为对Iterator的增强)。Stream并行遍历依赖于Fork/Join框架来拆分任务和加速任务处理。
Stream的结构
整个Stream的操作可以分为三大阶段
-
数据源创建 只有一次
-
中间操作 (各种筛选、排序、聚合等),lazy,多次
-
终端操作(获取想要的结果) 终止只有一次
我们看下java.util.stream包:
BaseStream一共有四个继承接口
-
Stream 通用的
-
LongStream 可以理解为Stream
,减少了装箱拆箱的损耗,还有一些别的额外操作 -
IntStream 可以理解为Stream
,减少了装箱拆箱的损耗,还有一些别的额外操作 -
DoubleStream 可以理解为Stream
,减少了装箱拆箱的损耗,还有一些别的额外操作
每个具体接口里有哪些方法,可以具体看代码 我们重点看下Stream接口:
我们看下Stream里面的方法
数据源创建
-
empty() 创建一个空的Stream
-
of() 创建有限元素的Stream
-
iterate() 创建无限元素的Stream
-
generate() 创建无限元素的Stream
中间操作:
-
filter 过滤,按lambda表达式
-
map (mapTo*, flatMap* ) 映射,按lambda表达式
-
distinct 去重
-
sorted 排序 (可以自定义Comparator)
-
peek 可以理解为克隆一份,优先级高,两边引用的对象都一样
-
skip 跳过Stream中前n个元素
-
limit 只取前n个,这个比较特殊,可以理解为短路,因为返回Stream,就归到了中间操作
-
concat() 将两个Stream拼接到一起合成一个
-
parallel(父)
-
sequential(父)
-
unordered(父)
终止操作
-
reduce 将Stream元素按一个规则组合起来,如sum,avg
-
collect 将流转化为其他形式
-
foreach 遍历Stream的元素
-
foreachOrdered 按Stream的顺序执行
-
toArray 将Stream转为数组
-
min 根据指定的Comparator返回一个最小的Optional对象
-
max 根据指定的Comparator返回一个最大的Optional对象
-
count 返回Stream中元素的个数
short-circuiting (可以理解为特殊的终止操作)
短路操作,有时候在遍历的过程中,达到某个条件就终止
-
anyMatch 判断Stream中的元素是否有满足指定条件的元素,有满足的返回true
-
allMatch 判断Stream中元素是否全部满足指定条件,全部满足返回true
-
noneMatch 都不满足指定条件,返回true
-
findFirst 找到第一个元素的Optional对象
-
findAny 返回任意一个元素,并行中随机,串行中一直返回第一个
数据源的创建方式
- 从Collection和数组获得
public interface Collection<E> extends Iterable<E> {
//串行
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
// 并行
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
}
Arrays.stream(T array) or Stream.of()
- 从数组获取
//通过Arrays的静态方法获取
public class Arrays {
//有很多重载方法
public static IntStream stream(int[] array) {
return stream(array, 0, array.length);
}
//这里有很多指定类型的Stream重载
public static *Stream stream(....) {
return stream(...);
}
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
}
// 通过Stream的静态方法获取
public interface Stream<T> extends BaseStream<T, Stream<T>> {
public static<T> Stream<T> of(T t) {
return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
}
@SafeVarargs
@SuppressWarnings("varargs") // Creating a stream from an array is safe
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
//这里还有一些其他的方式获取,就不列举了
}
- 从BufferedReader获得
public class BufferedReader extends Reader {
public Stream<String> lines() {
.....
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}
}
- 静态工厂
// LongStream 和DoubleStream类似
public interface IntStream extends BaseStream<Integer, IntStream> {
public static IntStream range(int startInclusive, int endExclusive) {
if (startInclusive >= endExclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive, endExclusive, false), false);
}
}
}
public final class Files {
public static Stream<Path> walk(Path start, FileVisitOption... options) throws IOException {
return walk(start, Integer.MAX_VALUE, options);
}
}
- 自己构建
通过实现java.util.Spliterator 自己构建,具体可以参考其中的任意一个示例
- 其他方式
public class Random implements java.io.Serializable {
ints(...)
longs(...)
doubles(...)
}
public final class Pattern implements java.io.Serializable{
public Stream<String> splitAsStream(final CharSequence input) {}
}
public class JarFile extends ZipFile {
public Stream<JarEntry> stream() {}
}
我们通过以上方式构建,跟踪源代码,我们发现最终指向的是:
public final class StreamSupport {
}
中间操作
**一个流可以后面跟随零个或多个中间操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
中间操作都是lazy的,多个中间操作只会在终止操作的时候融合(融合的不仅仅是中间操作,还有数据源的创建)起来,一次循环完成。我们可以这样简单的理解,Stream里有个中间操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在终止 操作的时候循环Stream对应的集合,然后对每个元素执行所有的函数。**
比如:下面的这两个创建的都是无限流,如果不是最终遍历的时候才执行,那么我们为了取几个数,创建的数据源是多大呢?
我们把jvm的参数设置为3mb,如下图:
@Test
public void generate(){
Stream.iterate(0, (x) -> x + 2).limit(5).forEach(System.out::println);
Stream.generate(()-> Math.random()).limit(5L).forEach(System.out::println);
}
输出结果:
0
2
4
6
8
0.9473776969196388
0.6723046943435518
0.1704873740373829
0.169983516956142
0.6080414664225631
我们看下基础依赖,用于构建测试所需要的对象
public class Java8StreamTest {
List<LoginInfo> list = null;
Map<String,LoginInfo> map = null;
private int getAge(){
int age =new Random().nextInt(100);
return age>18 ? age : 18;
}
private long getDate(){
long det = new Random().nextLong();
return System.currentTimeMillis() - det;
}
private String getLoginSource(){
String[] sources = {"ios","android","h5"};
return sources[new Random().nextInt(sources.length)];
}
private String getNickName(int i){
String[] nicks = {"yxk","yxkong","tao","java"};
return nicks[new Random().nextInt(nicks.length)] +i;
}
private String getMobile(int i){
long[] nicks = {15600000000L,13600000000L,15100000000L,13300000000L};
return nicks[new Random().nextInt(nicks.length)] + Long.valueOf(i) +"";
}
@Before
public void init(){
list = new ArrayList<>();
map = new HashMap<>();
LoginInfo info = null;
for (int i = 0; i < 100; i++) {
info = new LoginInfo(Long.valueOf(i),getMobile(i),getNickName(i),getDate(),getLoginSource(),getAge(),getDate());
list.add(info);
map.put(info.getMobile(),info);
}
}
@Test
public void test(){
list.stream().forEach(System.out::println);
}
}
class LoginInfo implements Serializable {
private Long userId;
private String mobile;
private String nickName;
private long loginTime;
private String loginSource;
private int age;
private long registerTime;
public LoginInfo(Long userId, String mobile, String nickName, long loginTime, String loginSource, int age, long registerTime) {
this.userId = userId;
this.mobile = mobile;
this.nickName = nickName;
this.loginTime = loginTime;
this.loginSource = loginSource;
this.age = age;
this.registerTime = registerTime;
}
public Long getUserId() {
return userId;
}
public String getMobile() {
return mobile;
}
public String getNickName() {
return nickName;
}
public long getLoginTime() {
return loginTime;
}
public String getLoginSource() {
return loginSource;
}
public int getAge() {
return age;
}
public long getRegisterTime() {
return registerTime;
}
@Override
public String toString() {
return "LoginInfo{" +
"userId=" + userId +
", mobile='" + mobile + '\'' +
", nickName='" + nickName + '\'' +
", loginTime=" + loginTime +
", loginSource='" + loginSource + '\'' +
", age=" + age +
", registerTime=" + registerTime +
'}';
}
}
- filter 操作,过滤满足条件的数据
@Test
public void filter(){
/**
* 我们过滤得到年龄>18 且小于30的登录渠道为h5的用户
*/
list.stream().filter(s->s.getAge()>18 && s.getAge()<30)
.filter(s->"h5".equals(s.getLoginSource())).forEach(System.out::println);
boolean exist = list.stream().filter(s->s.getAge()>18 && s.getAge()<30)
.filter(s->"h5".equals(s.getLoginSource())).anyMatch(s->s.getNickName().contains("yxk"));
System.out.println(exist);
}
- map/flatmap map 的意思是将Stream里的对象进行映射,至于映射成什么,看你写的lambda
flatmap 和map的意思差不多,只不过flatmap映射的是一个Stream对象,这点不同
@Test
public void map(){
list.stream().map(s -> s.getNickName()).limit(3).forEach(System.out::println);
list.stream().flatMap(s->Stream.of(s.getNickName())).limit(3).forEachOrdered(System.out::println);
}
- distinct/sorted 去重
@Test
public void distinctAndSorted(){
int[] nums = {5,1,3,3,1};
Arrays.stream(nums).distinct().sorted().forEach(System.out::println);
list.stream().distinct().sorted((a,b)-> a.getMobile().compareTo(b.getMobile())).limit(3).forEach(System.out::println);
list.stream().distinct().sorted((a,b)-> b.getMobile().compareTo(a.getMobile())).limit(3).forEach(System.out::println);
//可以推断出中间操作是按顺序执行的
list.stream().distinct().limit(3).sorted((a,b)-> a.getMobile().compareTo(b.getMobile())).forEach(System.out::println);
list.stream().distinct().limit(3).sorted((a,b)-> b.getMobile().compareTo(a.getMobile())).forEach(System.out::println);
}
- peek和skip
@Test
public void skipAndPeek(){
list.stream().skip(98).peek(s-> s.setAge(s.getAge()+100)).forEach(System.out::println);
}
- reduce 归约,也称缩减,顾名思义,是把一个流缩减成一个值,能实现对集合求和、求乘积和求最值操作。
@Test
public void reduce(){
int[] nums = {5,1,3,3,1};
final OptionalInt reduceSum = Arrays.stream(nums).reduce(Integer::sum);
final int sum = Arrays.stream(nums).sum();
Assert.assertEquals(reduceSum.getAsInt(),sum);
final OptionalInt reduceMax = Arrays.stream(nums).reduce(Integer::max);
final OptionalInt max = Arrays.stream(nums).max();
Assert.assertEquals(reduceMax,max);
}
- collect
@Test
public void collect(){
//转成list
final List<LoginInfo> list1 = list.stream().filter(l -> l.getAge() > 18 && l.getAge() < 30).collect(Collectors.toList());
list1.forEach(l-> System.out.println("list:"+l.getUserId()));
//转成set
final Set<LoginInfo> set = list.stream().filter(l -> l.getAge() > 18 && l.getAge() < 30).collect(Collectors.toSet());
set.forEach(l-> System.out.println("set:"+l.getUserId()));
//转成map key是userId,value 是 LoginInfo
final Map<Long, LoginInfo> map = list.stream().filter(l -> l.getAge() > 18 && l.getAge() < 30).collect(Collectors.toMap(LoginInfo::getUserId, l -> l));
map.forEach((k,v)-> System.out.println("map:"+k));
//转成map key是userId,value 是age
final Map<Long, Integer> map1 = list.stream().filter(l -> l.getAge() > 18 && l.getAge() < 30).collect(Collectors.toMap(LoginInfo::getUserId, LoginInfo::getAge));
// 按loginSource 进行分组
final Map<String, List<LoginInfo>> map2 = list.stream().collect(Collectors.groupingBy(LoginInfo::getLoginSource));
map2.forEach((k,v)-> System.out.println(k+" size:"+ v.size()));
// 年龄大于60进行分组
final ConcurrentMap<Boolean, List<Integer>> concurrentMap = list.stream().flatMap(l -> Stream.of(l.getAge())).collect(Collectors.groupingByConcurrent(l -> l.intValue() > 60));
concurrentMap.forEach((k,v)-> System.out.println(k+" size:"+ v.size()));
// 将nickName 拼接起来 [yxkong0,yxk1,java2...]
final String collect = list.stream().map(LoginInfo::getNickName).collect(Collectors.joining(",","[","]"));
System.out.println(collect);
}
- foreach
@Test
public void foreach(){
//foreach无序
list.stream().parallel().limit(5).forEach(System.out::println);
//forEachOrdered 按.sorted的顺序
list.stream().sorted((a,b)->a.getUserId().compareTo(b.getUserId())).parallel().limit(5).forEachOrdered(System.out::println);
}
- toArray
@Test
public void toArray(){
final Object[] objects = list.stream().limit(5).toArray();
Arrays.stream(objects).forEach(System.out::println);
//明确指定类型LoginInfo
final LoginInfo[] infos = list.stream().limit(5).toArray(LoginInfo[]::new);
Arrays.stream(infos).forEach(System.out::println);
}
- min、max 、count
@Test
public void maxAndMinAndCount(){
final Optional<LoginInfo> max = list.stream().filter(l -> l.getAge() < 18).max((m, n) -> m.getUserId().compareTo(n.getUserId()));
Assert.assertFalse("没有符合条件的数据",max.isPresent());
final Optional<LoginInfo> max1 = list.stream().max((m, n) -> m.getUserId().compareTo(n.getUserId()));
System.out.println(max1.get());
final Optional<LoginInfo> min = list.stream().min((m, n) -> m.getUserId().compareTo(n.getUserId()));
System.out.println(min.get());
final long count = list.stream().count();
Assert.assertEquals("总数等于100",100,count);
}
- short-circuiting
@Test
public void shortCircuiting(){
final boolean noneMatch = list.stream().noneMatch(l -> l.getAge() > 100);
Assert.assertTrue("没有符合年龄大于100的数据",noneMatch);
final boolean anyMatch1 = list.stream().anyMatch(l -> l.getAge() > 90);
Assert.assertTrue("有符合条件的数据",anyMatch1);
final boolean allMatch = list.stream().allMatch(l -> l.getAge() >= 18);
Assert.assertTrue("所有登录用户都满足大于等于18",allMatch);
final Optional<LoginInfo> first = list.stream().findFirst();
System.out.println(first.get());
final Optional<LoginInfo> any = list.stream().findAny();
final Optional<LoginInfo> any1 = list.stream().findAny();
final Optional<LoginInfo> any2 = list.stream().findAny();
Assert.assertTrue("都相等",any.equals(any1) && any.equals(any2));
final Optional<LoginInfo> pany = list.stream().parallel().findAny();
final Optional<LoginInfo> pany1 = list.stream().parallel().findAny();
final Optional<LoginInfo> pany2 = list.stream().parallel().findAny();
Assert.assertFalse("在parallel情况下不相等",pany.equals(pany1) && pany.equals(pany2));
}
总结
-
中间操作多次执行
-
中间操作惰性执行
-
中间操作按顺序执行
-
终止操作一次后无法再获取流
-
Stream支持并行操作
文章评论