자바 ForkJoin Framework(포크조인)

이번 글은 자바 7에 도입된 Fork/Join Framework에 대한 내용입니다.

Fork/Join Framework

자바 7에는 Fork/Join Framework가 도입되었는데 이는 ExecutorService의 구현체로서 이를 활용하면 작업들을 멀티코어를 사용하도록 작업할 수 있습니다. 기본적으로 Fork/Join은 하나의 병렬화할 수 있는 작업을 재귀적으로 여러개의 작은 작업들로 분할하고 각 subtask들의 결과를 합쳐서 전체 결과를 반환합니다.
Fork/Join은 divide-and-conquer 알고리즘과 굉장히 비슷하다. 다만 Fork/Join Framework는 한가지 중요한 개념이 있는데 이상적으로는 worker thread가 노는경우가 없다. 왜냐하면 Fork/Join Framework에서는 work stealing이라는 기법을 사용해서 바쁜 worker thread로 부터 작업을 steal, 즉 작업을 훔쳐온다.
먼저 ForkJoin의 thread pool에 있는 모든 thread를 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 task를 포함하는 double linked list를 참조하면서 작업이 끝날때마다 queue의 헤드에서 다른 task를 가져와서 처리한다. 다만 아무리 공정하게 태스크들을 분할한다고 해도 특정 한 스레드는 다른 스레드보다 자신에게 할당된 태스크들을 더 빠르게 처리 할 수 있는데, 이렇게 자신에게 주어진 태스크들을 다 처리해서 할일이 없어진 스레드는 다른 스레드의 queue의 tail에서 작업을 훔쳐(steal)온다. 모든 태스크가 다 끝날때까지 이 과정을 반복하여 스레드간의 작업부하를 균등하게 맞출 수 있다.

ForkJoinPool

java.util.concurrent.ForkJoinPool은 위에서 설명한 work stealing 방식으로 동작하는 ExecutorService의 구현체이다. 우리는 ForkJoinPool의 생성자로 작업에 사용할 processor number를 넘겨줌으로서 병렬화 레벨을 정할 수 있다. 기본값은 Runtime.getRunTime().availableProcessors() 결과로 결정된다. 또 다른 특징으로는 ExecutorService들의 구현체와는 다르게 ForkJoinPool은 모든 워커 스레드가 데몬스레드로 명시적으로 program을 exit할 때 shutdown을 호출할 필요가 없다. ForkJoinPool의 내부에서 worker thread를 등록하는 과정에서 daemon 스레드로 설정한다.

ForkJoinTask

java.util.concurrent.ForkJoinTask는 ForkJoinPool에서 실행되는 task의 abstract class이다. ForkJoinTask<V>Future<V>를 구현한다. ForkJoinTask는 일종의 light 한 스레드라고 생각하면 쉽다. 여러개의 task 들이 생성되면 이들은 ForkJoinPool의 설정된 스레드들에 의해 실행되게 된다.
RecursiveActionRecursiveTask<R>가 ForkJoinTask의 서브클래스들인데 이들 또한 abstract class들이다. 그래서 이들을 구현한 서브클래스를 만들어서 사용한다. RecursiveActionRecursiveTask<R>의 차이점은 RecursiveAction은 태스크가 생성하는 결과가 없을때 사용하고 결과가 있을때에는 RecursiveTask<R>을 사용한다. 두 클래스 모두 abstract method인 compute() 를 구현해야한다.

ForkJoinTask는 현재 실행상태를 확인하기 위한 몇가지 메서드를 제공한다.
isDone()은 태스크가 완료되었는지의 여부를 반환한다. isCompletedNormally()는 태스크가 cancellation이나 exception 없이 완료되었는지의 여부를 반환하고 이외에도 isCancelled(), isCompletedAbnormally() 등의 메서드를 제공한다.

RecursiveTask 활용

스레드 풀을 이용하기위해 RecursiveTask<R>의 서브클래스를 만들어보자. parameter type R은 결과 형식을 의미한다. 우리는 RecursiveTask의 compute 메서드를 구현해야 한다.
protected abstract R compute();
compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더이상 분할할 수 없을때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다. 따라서 대부분의 compute 메서드의 구현은 다음과 같다.

1
2
3
4
5
6
7
8
if (태스크가 충분히 작거나 분할할 수 없으면) {
태스크 계산
} else {
태스크를 두 서브태스크로 분할한다.
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출한다.
모든 서브태스크의 연산이 완료될때까지 기다린다.
각 서브태스크의 결과를 합친다.
}

그렇다면 1부터 N까지의 합을 구하는 프로그램을 Fork/Join Framework를 사용하여 작성해보자. 코드는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.RecursiveTask;

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
private static final int THRESHOLD = 10_000;

public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int size = end - start;
if (size <= THRESHOLD) {
return computeSequentially();
}

ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(
numbers, start, start + size / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(
numbers, start + size / 2, end);
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}

private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}

return sum;
}
}

그리고 다음과 같이 ForkJoinPool의 invoke 메서드를 사용해 실행시켜보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Main {
private static final long N = 30_000_000L;

public static void main(String args[]) {
long[] numbers = LongStream.rangeClosed(1, N).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
long sum = new ForkJoinPool().invoke(task);
System.out.println(sum);
}
}

Fork/Join을 사용할 때 왼쪽 작업과 오른쪽 작업에 모두 fork를 호출하는게 자연스러운것 처럼 보이지만 한쪽에는 fork를 호출하는 것 보다 compute를 호출하는게 더 효율적이다. 한 태스크에는 이 Fork/Join 스레드를 실행시킨 스레드를 재사용할 수 있으므로 불필요한 태스크를 다른 스레드에 할당하는 오버헤드를 피할 수 있다.
또 멀티코어에 Fork/Join을 사용하는게 무조건 순차처리보다 빠르지 않다. 각 서브태스크의 실행시간이 새로운 태스크를 forking하는데 드는 시간보다 충분히 길수록 좋다.

위의 예제에서는 덧셈을 수행할 숫자가 만개 이하면 분할을 더이상 하지 않고 계산했다. 그러면 현재는 태스크가 3천개가 생성되는데 어차피 코어의 수는 정해져있으므로 코어가 3개라면 각 코어마다 1천만개씩 덧셈을 수행하면 딱 알맞게 효율적으로 동작하지 않을까?
그렇지는 않다. 실제로는 코어 개수와 관계없이 적절하게 작은 크기로 분할된 많은 태스크를 forking 하는것이 바람직하다. 1천만개씩 덧셈을 수행하도록 한다고 해도 각 3개의 코어에서 이루어지는 작업이 동시에 끝나지는 않는다. 각 태스크에서 예상치못하게 지연이 생길 수 있어 작업완료시간이 크게 달라질 수 있다. 다만 Fork/Join Framework는 work-stealing 기법으로 idel한 스레드는 다른 스레드의 workQueue로 부터 작업을 훔쳐오기 때문에 모든 스레들에게 작업을 거의 공정하게 분할할 수 있다. 그러므로 태스크의 크기를 작게 나누어야 스레드 간의 작업부하 수준을 비슷하게 맞출 수 있다.

References