본문 바로가기
Book

[모던 자바 인 액션] 15장. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

by Jordy-torvalds 2021. 9. 29.
반응형

소프트웨어 개발 방법을 획기 적으로 뒤집는 2가지 추세

  • 멀티코어 프로세서의 발전으로 애플리케이션의 속도는 이를 얼마나 잘 활용할 수 있도록 개발하느냐가 중요해짐
  • 인터넷 서비스에서 사용하는 애플리케이션이 증가하는 현상
    • 서비스가 작아진 대신 네트워크 통신이 증가함.
    • 다양한 소스의 콘텐츠를 가져와서 사용자의 삶이 풍요롭게 만들도록 합치는 메시업mashup 형태가 될 가능성이 큼
    • 다른 서비스의 응답을 기다리는 동안 연산이 블록되거나 귀중한 CPU 클록 사이클 자원을 낭비하고 싶지는 않으 것이다.
    • 반면에 동시성을 필요로 하는 상황 즉 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는 애플리케이션을 생상선을 극대화할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면, 원격 서비스나 데이터베이스 결과를 기다리는 스레드를 블록함으로써 연산 자워을 낭비하는 일을 피해야 한다.

동시성을 구현하는 자바의 진화

  • 자바 5:
    • 스레드의 실행과 테스트 제출을 분리한는 ExecutorService
    • Runnable, Thread의 변형을 반환하는 Callable, Future, 제네릭
    • ExecutorService는 Runnable과 Callable 둘 다 실행할 수 있으며 멀티코어 CPU에서 병렬 프로그램을 구현하는데 용이함.
  • 자바 7:
    • 분할 그리고 정보 알고리즘의 포크/조인을 구현을 지원하는 RecursiveTask 추가
  • 자바 8:
    • 스트림과 새로 추가된 람다 지원에 기반한 병렬 프로세싱 추가
  • 자바 9:
    • 분산 비동기 프로그래밍을 명시적으로 지원
    • 발행 ㅡ구독 프로토콜(Flow 인터페이스) 지원

스레드의 높은 수준의 추상화

운영체제는 주기적으로 번갈아가며 각 프로세스에 CPU를 할당함으로써 태스트를 동시에 또는 협력적으로 실행할 수 있음.

스트림은 이러한 병렬 처리를 쉽게 함.

Executor와 스레드 풀

스레드의 문제

  • 자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드를 만들고 종료하려면 비싼 비용(페이지 테이블과 관련한 상호작용)을 치러야 하며 더욱이 운영체제 스레드의 숫자는 제한되어 있는 것이 문제다.
  • 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시 될 수 있으므로 기존 스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다.
  • 보통 운영체제와 자바의 스레드 개수가 하드웨어 스레드(논리 CPU) 개수보다 많으므로 일부 운영체제 스레드가 블록되거나 자고있는 상황에서 모든 하드웨어 스레드가 코드를 실행하도록 할당된 상황에 놓을 수 있다.
  • 최적의 자바 스레드 개수는 사용할 수 있는 하드웨어 코어의 개수에 따라 달라진다.

스레드 풀 그리고 스레드 풀이 더 좋은 경우

자바 ExecutorService는 태스트를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다. 프로그램은 newFixedThreadPool 같은 팩토리 메소드 중 하나를 이용해 스레드 풀을 만들어 사용하 수 있다.

ExecutorService newFixedThreadPool(int nThreads)

이 메서드는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다. 스레드 풀에서 사용하지 않는 스레드로 제출된 태스트를 먼저온 순서대로 실핸한다.

하드웨어에 맞는 수의 태스크를 유지함과 동시에 수천개의 태스크를 스레드 풀에 아무 오버헤드없이 제출할 수 있다는 점이다. 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 다양한 우선순위 등 다양한 설정을 할 수 있다.

태스크를 제공하면 스레드가 이를 실행한다.

스레드 풀 그리고 스레드 풀이 나쁜 이유

스레드 풀을 사용함에 있어 두 가지를 주의해야한다.

  • n개의 스레드를 가진 스레드 풀은 오직 n만큼의 스레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 큐에 저장되며 이전에 태스크 중 하나가 종료되기 전까지는 스레드에 할당하지 않는다.
    • I/O를 기다리는 블럭 상황에서 이들 태스크가 워커 스레드에 할당된 상태를 유지하지만 아무 작업도 하지 않게 된다.
    • 예를 들어 5개의 스레드를 갖는 스레드 풀에 20개의 태스크가 제출된 상황을 가정해보겠다. 이 떄 세 개의 스레드가 I/O를 대기하면 나머지 2개의 스레드로 남은 태스크를 처리하게 된다. 결론적으로 효율성이 떨어지게 된다.
  • 자바 메인 스레드를 종료하기 전에 관련된 스레드 풀을 모두 종료하는 습관을 갖는 것이 중요하다.
    • 데몬 스레드일 경우 자동으로 함꼐 종료되지만 진행 중인 작업을 일어버릴 수 있다. 데이터 일관성이 중요한 처리를 하는 경우 데몬으로 설정해서는 안된다.
    • 비데몬 스레드일 경우 메인 메소드는 비데몬스레드가 종료될 때까지 프로그램을 종료허지 않고 기다린다.

스레드에 무엇을 바라는가?

스레드를 조작하는 복잡한 코드 없이 메서드를 호출하는 방법을 알아보자!

명시적인 스레드 구현

public class ConcurrencyWIthThread {
    private static String result = "";

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            appendResult("hello");
        });
        Thread t2 = new Thread(() -> {
            appendResult("world");
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(result);
    }

    private synchronized static void  appendResult(String message) {
        result += message;
    }
}

Future를 사용한 스레드 사용

public class ExecutorServiceExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<String> x = executorService.submit(() -> "hello");
        Future<String> y = executorService.submit(() -> "world");

        System.out.println(x.get() + y.get());
        executorService.shutdown();
    }
}

병렬 처리를 위해 스레드를 다루는 두 가지 방법을 보았다. 하지만 두 코드 모두 비즈니스 로직 외에 부연적인 코드가 너무 많이 필요하다.

이러한 로직을 다룸에 있어 가장 중요한 것은 이것이다.

  • 병렬 하드웨어로 프로그램 실행 속도를 극대화하려면 여러 작은 하지만 합리적인 크기의 태스크로 나누는 것이 좋다.

CompletableFutrue를 사용하면 좀 더 간결하고 비동기로 처리하도록 개발할 수 있다.(물론 동기도 가능하다)

public class CompletableFutureExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletableFuture<String> helloFuture 
                        = CompletableFuture.supplyAsync(() -> "hello", executorService);
        CompletableFuture<String> worldFuture 
                        = CompletableFuture.supplyAsync(() -> "world", executorService);

        System.out.println(helloFuture.get() +  worldFuture.get());
                executorService.shutdown();
    }
}

CompletableFuture는 기본적인 비동기 Supplier 외에 다양한 비동기 API를 제공한다.

자바 9에서는 발행ㅡ구독 프로토콜 기반으로 동작하는 Flow 인터페이스도 제공한다.

잠자기(기타 블로킹 동작)는 해로운 것으로 간주

사용자와 상호작용하거나 어떤 일이 일정 속도로 제한되어 일어나는 상황의 애플리케이션을 만들 때 자연스럽게 sleep() 메소드를 사용할 수 있다. 하지만 스레드는 잠들어도 여전히 시스템 자원을 점유한다.(ex. 메모리, 스레드 풀의 가용한 스레드 등) 그 숫자가 많아 질 경우 시스템에 치명적일 수 있다.

아래 코드를 실행 해보면, 스레드 풀에서 사용 가능한 스레드의 수는 3개지만 한 스레드가 sleep 된 관계로 실제로는 2개만 처리를 하고 있음을 알 수 있다.

public class CompletableFutureExample {
    private static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws Exception {
        request(() -> {
            sleep(10_000);
            return "hello";
        });

        IntStream.rangeClosed(1, 10).forEach(num -> {
            request(() -> {
                sleep(1_000);
                return "hello";
            });
        });

        executorService.shutdown();
    }

    public static void request(Supplier<String> requestBody) {
        CompletableFuture<String> requestCompletableFuture 
                        = CompletableFuture.supplyAsync(
                () -> requestBody.get(), executorService);
    }

    public static void sleep(long time) {
        try {
            Thread.sleep(time);
            System.out.println("OK");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

sleep을 사용하지 않으면서도 원하는 시간 이후에 Task를 처리하도록 하는 방법이 있을까? ScheduledExecutorServiceschedule 를 사용하면 원하는 시간 이후에 Task를 처리하도록 할 수 있다! 구현할 로직의 특성에 맞춰 ExecutorService를 선택해서 사용하면 된다.

효율적으로 원하는 로직을 원하는 시점에 처리하는 여러가지 방법을 알아봤다. 이 떄 명심해야할 것은 절대로 은총알은 없으며 구현할 비즈니스에 맞춰 현명히 판단하는 것이 중요하다는 점이다.

박스와 채널 모델

동시성 모델을 가장 잘 설계하고 개념화하려면 그림이 필요하다. 우리는 이 기법을 박스와 채널 모델이라고 한다.

CompletableFuture이 CompletableFuture인 이유는?

일반적으로 Future는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들 수 있도록 허용하며 complete() 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 값을 얻을 수 있도록 허용한다.

발행-구독 그리고 리액티브 프로그래밍

구성요소

  • 이벤트(메시지)를 발행하는 발행자
  • 발행자의 이벤트를 구독하는 구독자
  • 발행자와 구독자 간의 연결을 구독이라 한다.

압력

발행자가 발행한 메시지가 구독자에게 푸쉬 방식으로 전달되는 것

역압력

구독자가 발행된 이벤트를 본인의 페이스에 맞게 가져와서 처리하는 것

리액티브 시스템 vs 리액티브 프로그래밍

리액티브 시스템은 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램을 가리킨다. 리액티브 시스템이 가져야 할 공식적인 속성은 Reactive Manifesto 에서 확인할 수 있다. 반응성, 회복성, 탄력성으로 세가지 속성을 요약할 수 있다.

반응성은 리액티브 시스템이 큰 작업을 처리하느라 간단한 질의의 응답을 지연하지 않고 실시간으로 입력에 반응하는 것을 의미한다.

회복성은 한 컴포넌트의 실패로 전체 시스템이 실패하지 않음을 의미한다. 네트워크가 고장났어도 이와 관계가 없는 질의에는 아무 영향이 없어야 하며 반응이 없는 컴포넌트를 향한 질의가 있다면 다른 대안 컴포넌트를 찾아야 한다.

탄력성은 시스템이 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함을 의미한다.

여러 가지 방법으로 이런 속성을 구현할 수 있지만 java.util.concurrent.Flow 관련된 자바 인터페이스에서 제공하는 리액티브 프로그래밍 형식을 이용하는 것도 주요 방법 중 하나다. 이들 인터페이스 설계는 Reactive Manifesto의 네 번째이자 마지막 속성 즉 메시지 주도을 반영 한다.

반응형