본문 바로가기
JAVA/[JAVA] Stream

Stream (3)

by oncerun 2022. 2. 16.
반응형

 

1. groupingBy

 

groupingBy는 Stream 안의 데이터에 classifier를 적용했을 때 동일한 값끼리 List로 모아서 Map의 형태로 변환해주는 Collector의 메서드이다. 

public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier) {}

Collectors Class에서 groupingBy로 검색하면 수많은 오버 로드된 메서드들이 나온다.  

groupingBy의 반환 타입인 Map의 key는 classifier의 결괏값이며, value는 그 결괏값을 갖는 데이터들이다.

오버 로드된 몇몇 메서드를 보면 Map 뿐만 아니라 concurrentMap을 반환하는 것도 있다.

Map<Integer, List<Integer>> collect = Stream.of(1, 2, 3, 4, 5, 67, 87, 9, 13, 2, 426, 32)
        .collect(Collectors
                .groupingBy(x -> x % 3));

classifier로 3으로 나눈 나머지를 키로 지정했다. 

Map의 key는 0, 1, 2로 지정되고 같은 값이 나오는 Integer들을 모아 List로 Map의 value로 지정한다.

두 번째 인자로 Collectors를 넘길 수 있습니다. downStream을 통해 두 번째 가공된 컬렉터를 반환하게 할 수 있습니다.

public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                      Collector<? super T, A, D> downstream) {
    return groupingBy(classifier, HashMap::new, downstream);
}

downStream을 넘기는 경우에는 List 대신 collector를 적용시킨 값으로 map의 value가 만들어집니다.

Map<Integer, Set<Integer>> unitDigitSet = 
		Stream.of(1, 2, 3, 4, 5, 67, 87, 9, 13, 2, 426, 32, -12, -353, 3215215)
        	.collect(Collectors
              	     	 .groupingBy(number -> number % 10, Collectors.toSet()));

 

또한 reduce, mapping을 활용해 더욱 복잡한 것을 처리할 수 있습니다.

주문 객체의 상태에 따라 그룹을 지어 주문 상태를 key로 가지고 주문 상태로 그룹이 지어진 주문 객체들의 ID만 담은 리스트 객체를 value로 갖는 Map을 만들어 보자.

Map<Order.OrderStatus, List<Long>> collect = createOrder()
        .stream()
        .collect(Collectors
                    .groupingBy(Order::getStatus, Collectors
                                                    .mapping(Order::getId, Collectors.toList())));

 

아 근데 스트림 쓰는 경우 개행을 어떻게 해야 가독성이 좋은지 전혀 감이 안 온다.. 일렬로 하기에는 뭔가 길고 체이닝 하는 것마다 개행하기엔 또 애매하고..

다음처럼 CREATED, ERROR, IN_PROGRESS, PROCESSED 상태로 나어지고 값으로 주문 ID가 담겨있다.

두 번째 Collectors에 따라 여러 가지 반환을 조작할 수 있다.!

 

 

2. partitoningBy

public static <T>
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {}

GroupingBy와 유사하지만 Function 대신 Predicate를 넘기기 때문에 true/false로 나누어진 key만 존재하는 map을 반환합니다.

groupingBy와 동일하게 downStream을 넘겨 Map의 value를 지정할 수 있습니다.

 

Map<Boolean, List<Integer>> collect =
	Stream.of(1, 2, 3, 4, 5, 67, 87, 9, 13, 2, 426, 32, -12, 353, 3215215)
        .collect(Collectors.partitioningBy(number -> number % 2 == 0));

조건을 만족하는 두 그룹으로 분리해야 할 때 매우 유용하게 사용할 수 있을 것 같다. 

 

 

3. forEach

 

Stream에 정의된 메서드로 제공된 action을 Stream의 각 데이터에 적용해주는 종결 처리 메서드입니다.

void forEach(Consumer<? super T> action);

인자 타입은 Consumer로 인자를 받아 아무것도 리턴하지 않는 메서드입니다. 

Java의 iterable 인터페이스에도 forEach가 있기 때문에 Stream의 중간 처리가 필요 없다면 iterable collection에서 바로 쓰는 것도 가능합니다.

Arrays.asList(1,45,2,3,6)
        .stream()
        .forEach( number -> log.info("number is {}", number));

Arrays.asList(1,45,2,3,6)
        .forEach( number -> log.info("number is {}", number));

 

만약 Index를 사용해야다면 IntStream을 사용해보자.

IntStream.range(0, users.size())
        .forEach(System.out::println);

0부터 배열의 사이즈전까지 돌면서 기존 for문을 대체할 수 있다.

 

 

4. Parallel Stream

 

순차적으로 처리하는 기존의 Sequential 스트림과 달리 Parallel Stream은 여러 개의 스레드를 이용하여 stream의 처리 과정을 병렬화(parallelize)합니다. 

 

스트림은 초기셋팅, 중간처리, 종결처리로 이루어져 있는데, 중간 처리과정에는 병렬 처리되지만 순서가 있는 Stream의 경우 종결 처리했을 경우 기존의 순차적 처리와 일치하도록 종결 처리과정에서 조정됩니다. 

순서가 있는 경우는 List를 collect 한다면 순서가 항상 올바르게 나옵니다.

 

바로 병렬 스트림을 생성할 수 도 있고

Arrays.asList(1, 2, 3, 5, 6)
        .parallelStream();

 

중간에 병렬스트림으로 변경할 수 있습니다.

Arrays.asList(1, 2, 3, 5, 6)
        .stream()
        .parallel();

 

Parallel Stream은 매우 간단하게 병렬 처리를 사용할 수 있고, 비약적으로 처리 속도를 향상할 수 있습니다.

다만 항상 속도가 빨라지는 것은 아니며, deadlock과 동기화문제를 고려해야 합니다.

이를 막기위해 mutex, 세마포어 등 병렬 처리 기술을 이용하게 된다면 순차처리 보다 성능이 저하될 수 있습니다.


long startTime = System.currentTimeMillis();
createUser()
        .stream()
        .filter( user -> !user.isVerified())
        .forEach(emailService::sendVerifyYourEmail);
long endTime = System.currentTimeMillis();
log.info("Sequential Time {}ms", endTime - startTime);



long startTime2 = System.currentTimeMillis();
createUser()
        .stream().parallel()
        .filter( user -> !user.isVerified())
        .forEach(emailService::sendVerifyYourEmail);
long endTime2 = System.currentTimeMillis();
log.info("Parallel Time {}ms", endTime2- startTime2);

 

기존의 이메일을 보내는 과정을 병렬, 순차처리의 시간을 비교하였습니다. 

이 경우는 필터링을 하는 과정과 이메일을 보내는 과정을 상관이 없기 때문에 병렬 처리를 할 수 있습니다.

 

 

 

반응형

'JAVA > [JAVA] Stream' 카테고리의 다른 글

Optional  (0) 2022.10.12
[Java] Functional extends  (0) 2022.02.17
Stream (2)  (0) 2022.02.15
Stream  (0) 2022.02.12
Method Reference  (0) 2022.02.09

댓글