Fork/Join フレームワークを正しく効率的に利用したい

Fork/Join Parallelism in the Wild の論文を読んだメモです。

はじめに

Java 7 で導入された Fork/Join フレームワークは、Java のその他の並列処理の機構 (Executors クラスが提供する類のもの) と比較して普段使いするような機能ではないがゆえに、Java の経験が長い方であっても直接的に1利用する機会は滅多にないかと思われます。しかし、この Fork/Join フレームワークはいわゆる「分割統治法」に基づくアルゴリズムとの相性が極めてよく、メソッドの再帰呼び出しに基づく実装から Fork/Join フレームワークを利用した実装に比較的容易に書き換えて並列化を実現できます。

今回読んだ論文 Fork/Join Parallelism in the Wild では、この Fork/Join フレームワークの利用における原則とベストプラクティス、デザインパターンやアンチパターンについて書かれています。このブログエントリでは、それぞれについて概要をメモしています。

Fork/Join の計算効率を最大化するための原則

論文の 2.2 節では、Fork/Join フレームワークによる計算効率を最大化するために大事な 4 つの原則が挙げられています。

  1. 並列性を最大化する
  2. オーバーヘッドを最小化する
  3. リソースなどの競合を最小化する
  4. 局所性を最大化する

この 4 つの原則を守るために論文で述べられている具体的なベストプラクティスについて見ていきます。

適切なタスクの粒度を選択する

Fork/Join フレームワークに限った話ではないかと思いますが、並列処理ではよく、並列性オーバーヘッド がトレードオフの関係になりがちです。これは、タスクの同時実行数を高く保つ、すなわち並列性を高めようとすると並列処理に関わるオーバーヘッドが増加し、逆にオーバーヘッドを小さくしようとすると並列性が犠牲になる、という関係になります。

Fork/Join フレームワークにおいても、この並列性とオーバーヘッドのどちらかだけに偏って最適化したところで全体の計算効率が最適化 (処理時間が最小化) されるわけではありません。ゆえに、並列性とオーバーヘッドのバランスをとった最適化をする必要があります。

先に述べたとおり、Fork/Join フレームワークは分割統治法のようにタスクを再帰的にサブタスクに分割して処理する問題での利用に適しています。このとき、サブタスクへの再帰的な分割を終えて実際の計算が行われるときのタスクの 粒度 が、Fork/Join フレームワークにおける並列性とオーバーヘッドのトレードオフを左右する要素となり得ます。

ここで「タスクの粒度」をより深く理解するために、具体例として マージソート を取り上げます。リンク先の Wikipedia ページに下記に引用した記述があるように、マージソートでは再帰処理によるデータの分割を最小単位までせず、要素数がしきい値を下回ったところで分割を止め、挿入ソートなど別の単純なソートアルゴリズムに切り替える実装をすることがあります2。このしきい値設定が、Fork/Join モデルでいうところの「適切なタスクの粒度を選択する」ことに相当します。

クイックソート等と同様、完全に細分化せずにスレッショルドとして適度に大きい個数を設定し、それ以下になった時点でなんらかの「ごく少数の対象専用の高速なコードによるソート」を併用するという高速化手法がある。

Fork/Join フレームワークにおいてはこのように、タスクの粒度が十分に小さくなったところでサブタスクへの分割を止め、以降の処理を (並列処理を伴わずに) 直列的に処理する実装が勧められます。これにより、並列性は高めつつも fork や join 操作によるオーバーヘッドも許容できる程度に小さい、それなりによい計算効率が実現できるようになります3

タスクオブジェクトを小さく保つ

Fork/Join フレームワークでは一般的に、処理対象かつ分割対象である「タスク」は ForkJoinTaskRecursiveTask, RecursiveAction といった抽象クラスを継承したクラスで表現します。Fork/Join フレームワークはこのタスクを再帰的に分割していくわけですが、このフレームワークによって生成されるタスクオブジェクトを数えると、サブタスクの分割個数 を 、分割回数 (再帰の深さに相当) を としたときに 個となります。

これはつまり、サブタスクの分割個数や分割回数が多かったりすると、それだけオブジェクトが大量生産されることを意味します。そのため、ヒープや GC 由来のオーバーヘッドを小さくするためにも、一つ一つのタスクを表すオブジェクトはバイト数的な意味でなるべく小さくなるようにタスククラスを実装し、また無駄なオブジェクト生成を避けるような処理を実装するよう心がける必要があります。

不要な fork 操作を避ける

Fork/Join フレームワークにおいて、あるタスクから分割されたサブタスクの並列処理を開始するには、それぞれのサブタスクのオブジェクトに対して ForkJoinTask#fork() メソッドを呼び出します。一方で、サブタスクでの処理結果を得るには、サブタスクのオブジェクトに対して ForkJoinTask#join() メソッドを呼び出します。また ForkJoinTask クラスには、これらの fork と join の操作を複数のサブタスクオブジェクトに対して一括して呼び出せるようにする invokeAll(ForkJoinTask...) メソッドなども提供されています。

ForkJoinTask#fork() メソッドと ForkJoinTask#join() メソッドによるタスクの非同期処理では、その処理が完了するまでの処理時間に非同期処理に関わる諸々のオーバーヘッドが加わることになります。そのため、オーバーヘッドを最小化するためには不要な ForkJoinTask#fork() メソッドの呼び出し、すなわち fork を避ける必要があります。

例えばマージソートのように、サブタスクの分割個数が事前に明らかになっている問題では不要な fork 操作を避ける手段が存在します。具体的には、あるタスクを 個のサブタスクに分割するとして、 個のサブタスクは ForkJoinTask#fork() メソッドで非同期で処理させます。そして残された 1 つのサブタスクについては、RecursiveTask#compute() などのメソッドを呼び出して (非同期別スレッドではなく) 現在のスレッド上で即時に処理します。これにより、RecursiveTask#compute() メソッドの呼び出しで実行されるサブタスクについては非同期処理に関わるオーバーヘッドを完全に回避することができます。

リソース競合を避ける

並列処理でリソースを共有する場合、複数のスレッド上で動く処理がそのリソースに対して同時に変更操作を加えようとするとリソース競合が生じてしまいます。このリソース競合を避けるにはロックを用いる手段がありますが、並列処理においてはロック処理はオーバーヘッドを生み出す要素になるため、なるべくならロックを避ける、ひいてはリソース競合を生じるようなリソース共有を避けることが望まれます。これは Fork/Join フレームワークにおいても同様のことが言えます。

そのため、Fork/Join フレームワークを利用して並列化をするときは、そもそもの話としてタスク間でリソース共有しないような作りを前提とし、リソースの共有が避けられない場合はできる限りロックを必要としないような作りを目指すべき、となります。さらにどうしてもロックが必要となるのであれば、適切な粒度でロックをするような実装を心がけるようにします。

デザインパターン

論文の 4.1 節では、パフォーマンスおよびソフトウェアエンジニアの観点でベストプラクティスとされている Fork/Join フレームワークのデザインパターンとして、次の 4 つのデザインパターンが挙げられています。

Linked Subtasks

Linked Subtasks パターンは、サブタスクへの分割数が不定となる処理を Fork/Join フレームワークで実装するときに適用するパターンです。ここで「サブタスクの分割数が不定となる」とは、例えばあるディレクトリ以下のファイル・ディレクトリを再帰的に辿る処理などが該当します。この類の処理では、fork して非同期させているサブタスクの処理結果を取得する、もしくはサブタスクの完了を待機するために、サブタスクを表すオブジェクトをコレクションなど何かしらのデータ構造を用いて保持しつつ、後で ForkJoinTask#join() メソッドを呼び出せるようにする必要があります。

論文では、リストなどのコレクションでサブタスクの ForkJoinTask のオブジェクトを保持する方法と、サブタスクを表現する ForkJoinTask を継承したクラス自身に兄弟タスクへのリンクを持たせて連結リストを構成する方法の 2 つが紹介されています。前者の方法は、汎用的なコレクションクラスを使うがゆえの時間的・空間的なオーバーヘッドが生じてしまいますが、後者の方法であれば (実装の手間は若干かかるものの) それらオーバーヘッドの最小化が期待できるでしょう。

また前者の汎用的なコレクションを利用する方法についてはさらに、2 種類のバリエーションが存在します。一つはサブタスクのオブジェクトを生成する時点で fork を済ませておき、後で ForkJoinTask#join() メソッドでサブタスクの処理結果を受け取る方法です。もう一つは、すべてのサブタスクを生成し終えてから ForkJoinTask#invokeAll(Collection<ForkJoinTask>) を呼び出して一括 fork/join 操作をする方法です。

Leaf Tasks

Leaf Tasks パターンは、あるタスクをサブタスクに分割するタスクと実際に処理をするタスク、言い換えると、タスクの再帰的な関係を木構造に見立てたときの内部ノードと葉ノードに当たるタスクをそれぞれ異なる ForkJoinTask クラスの継承クラスで表現するパターンです。具体的には、Linked Subtasks パターンでも取り上げたファイル・ディレクトリを再帰的に辿る例の論文中での実装 (下記コード参照) がそれになります。

public class DirectoryTask extends RecursiveTask<Long> {
  // コンストラクタなど一部実装は省略
  protected Long compute() {
    List<RecursiveTask> tasks = new ArrayList<>();
    for (File f : dir.listFiles()) {
      if (f.isDirectory()) {
        tasks.add(new DirectoryTask(f).fork());
      } else {
        tasks.add(new FileTask(f).fork());
      }
    }
    long sum = 0;
    for (RecursiveTask<Long> task : tasks) {
      sum += task.join();
    }
    return sum;
  }
}

この例では、ファイルを辿る処理が葉ノードに当たり FileTask クラスとして表現され、ディレクトリを再帰的に辿る処理は DirectoryTask クラスで表現されています。

Avoid Unnecessary Forking

こちらは 不要な fork 操作を避ける で既に言及しているように、サブタスクのうち一つは親のタスクのスレッド上で直接実行することで非同期処理に関わるオーバーヘッドを回避するパターンとなります。これにより、マージソートのように再帰的に 2 つのタスクに分割実行するケースであれば、生成されるサブタスクのうち半分のサブタスクの fork が回避できるようになります。

なお論文内では ForkJoinTask#invokeAll() における fork について言及がありますが、OpenJDK 8 の時点の実装を確認する限りでは、(直接 RecursiveTask#compute() メソッドなどを呼ぶよりは多少のオーバーヘッドが生じるものの) サブタスクの一つは fork による非同期ではなく即時実行される実装となっているようです。

Sequential Cutoff

こちらも同じく 適切なタスクの粒度を選択する で前述した内容と被りますが、サブタスクへの分割を打ち止めにするしきい値 (cutoff) を適宜設定することで並列性とオーバーヘッドのバランスをとるパターンです。

さて、この sequential cutoff を実現するための肝心なしきい値をどう設定するべきかについて考えると、ForkJoinTask クラスの Javadoc には

非常に大まかな経験則として、1つのタスクでは100より多く10000より少ない基本的な計算ステップを実行するようにし、無期限ループを回避するようにしてください。

と書かれているものの、実際には処理内容や CPU のコア数によっても最適なしきい値は変化しえます。そのため、ベンチマークプログラムなどでパフォーマンスを実測して決めるのが望ましでしょう4

アンチパターン

ここからはアンチパターンについて見ていきます。論文では、以下の 3 つのアンチパターンが紹介されています。

Heavyweight Splitting

配列やリストなどのコレクションで保持されている入力データをサブタスクに分割して与える類の処理において、データの分割に時間的もしくは空間的に非効率な手段を用いてしまうアンチパターンを heavyweight splitting と呼んでいます。

具体例として、ここではマージソートを考えます。マージソートでは与えられた配列を再帰的に 2 つに分割しますが、例えばサブタスクへの分割の際に 2 つの配列を物理的に生成 (new Object[])し、元の配列からそれぞれの配列に System.arrayCopy() メソッドでデータをコピーする実装は線形オーダーの時間計算量になるため、非効率な処理となります。これが heavyweight splitting のアンチパターンに該当します。このマージソートの例であれば配列を物理的に 2 分割する必要はなく、各々のサブタスクが担当すべき配列の範囲を示す情報 (配列上の開始と終了のインデックス) をサブタスクに引き渡すだけで十分です。

なおこの heavyweight splitting アンチパターンは配列だけに限らず、Java のリストでも同様のことが言えます。幸いなことに Java のリストには List#subList(int, int) メソッドが用意されているので、このメソッドを使えばリストの分割は要素のコピーなしに実現できます。しかし、List#subList(int, int) を呼び出すたびにオブジェクトの生成が生じてしまうことを考えると、これも最適な選択肢とは言い切れません。

配列にしてもリストにしてもいずれにせよ、このアンチパターンを避けるにはデータを分割する際は物理的にではなく論理的に分割すること、加えてサブタスクが担当すべき範囲を示すインデックス情報をサブタスクの入力データとして渡すのが最適と言えるでしょう。

Heavyweight Merging

タスクの分割に伴うデータ分割のアンチパターンである heavyweight splitting に対して、こちらの heavyweight merging はサブタスクの処理結果をマージする際のアンチパターンになります。具体的には、一つのサブタスクが複数個の処理結果を返却しようとする場合、リストなどのコレクションに格納した上で親のタスクに返却することになりますが、ここで親タスクでのマージ処理に適さないデータ構造や実装クラスを採用して不必要なオーバーヘッドを生じさせてしまうケースがこのアンチパターンにあたります。

例えば、ディレクトリを再帰的に辿ってファイル名をリストに格納して列挙する処理を考えます。当然のことながらディレクトリ内にあるファイルの数は一定ではありません。従って、タスクの処理結果であるファイル名の一覧はナイーブに実装すれば ArrayList クラスなどのリストに格納して返却することになるでしょう。そして、各サブタスクの結果をマージする処理もナイーブに実装するならば、List#addAll(Collection) メソッドを呼び出してすべてのサブタスクの処理結果を一つの ArrayList オブジェクトにまとめる実装となるでしょう (下記コード参照)。

public class ListFilesInDirTask extends RecursiveTask<List<String>> {
  public List<String> compute() {
    List<RecursiveTask> tasks = new ArrayList<>();
    for (File f : dir.listFiles()) {
      if (f.isDirectory()) {
        tasks.add(new ListFilesInDirTask(f).fork());
      } else {
        tasks.add(new ListFileTask(f).fork());
      }
    }
    List<String> result = new ArrayList<>();
    for (RecursiveTask<List<String>> task : tasks) {
      result.addAll(task.join());
    }
    return result;
  }
}

しかし、ArrayList クラスにおける addAll(Collection) メソッドの実装は内部で System.arrayCopy() メソッドを呼んでいることからも分かるとおり、時間計算量は線形オーダーであるためこのマージ処理は効率的には行われません。このようなマージ処理を論文では heavyweight merging アンチパターンと定義しています。なお LinkedList クラスの addAll(Collection) メソッドも時間計算量が線形オーダーの実装となるため、heavyweight merging アンチパターンを回避するには Java 標準のクラスライブラリに頼らない実装が求められます。例えばこのケースであれば、シンプルな単方向の連結リストを実装することでアンチパターンを回避できるでしょう。

そのほか、サブタスクが返す処理結果の数が事前に明らかであったりもしくは計算可能である場合に限られますが、Fork/Join フレームワークによる並列処理をする前に十分な大きさの配列などを用意しておき、サブタスクが直接その配列に処理結果を書き込むことでもこのアンチパターンを回避することができます。

Inappropriate Sharing Resources Among Tasks

リソース競合を避ける でも触れているように、並列処理におけるリソース共有はなるべく避けるべきではあるもの、現実世界ではリソース共有をせざるを得ない問題に出くわすこともあるかと思います。このアンチパターンでは、そんなリソース共有における 3 つの不適切なケースを挙げています。

そもそもリソース共有が不要であるケース

論文では、グローバルなカウンタをタスク間で共有して各々のタスクでカウンタをインクリメントする操作を例として挙げています。 このグローバルなカウンタは例えば、書き込み操作時に synchronized によるロックを必要とする実装の場合に並列処理のパフォーマンスを低下させる結果に繋がりかねません。

しかし、このカウンタの利用目的が単に数え上げだけであれば、各々のタスク内で用意したローカルなカウンタをインクリメントして結果をタスクの出力とし、親のタスクでサブタスクのカウンタを積算することでリソース共有そのものを排除できるでしょう。

リソース共有が必要ではあるがロックの単位が粗いケース

例えば、タスク間でリストやマップなどのコレクションを共有するとします。そのコレクションに対して各々のタスク内で読み込み/書き込みなどの操作を同時並列で実行しようとする場合、リソース競合を解決するための仕組みが必要になります。

このリソース競合を解決するナイーブな方法としては、 Collections クラスが提供する synchronizedList() メソッドや synchronizedMap() メソッドを使って操作対象の ArrayList オブジェクトや HashMap オブジェクトをラップする方法が挙げられます。これらのメソッドが生成するコレクションのラッパーオブジェクトでは、読み込み・書き込みにかかわらずあらゆる操作においてコレクション単位でのロックが発生します。

一方でタスク内での処理内容次第では、必ずしもすべての操作でロックが必要なわけではなく List#add()Map#put() などの書き込み操作のときだけロックができれば十分であったり、またコレクション全体に対してロックを掛けずとも要素単位のロックで事が足りることもあります。このように、細やかなロック粒度を求めたりロック対象の操作を一部操作に限定したい場合、Collections.synchronizedList() メソッドなどで得られるラッパーオブジェクトの利用はそのロックの粗さ故に不適切と言えます。

このようなケースでは、Java の Concurrent Utilities フレームワークに含まれるコレクションの実装クラス、具体的には CopyOnWriteArrayList クラスや ConcurrentHashMap クラスの利用を検討してみるのがよいでしょう5

リソース共有が必要ではあるがロックを怠っているケース

前述のケースと同様にタスク間でリストやマップなどのコレクションを共有し、かつリソース競合が生じうるような操作をする場合の話ではありますが、こちらはロック等の制御を怠るケースになります。

当然のことながら、ArrayList クラスや HashMap クラスといったスレッドセーフを保証していないコレクション、もしくは配列に対して同時並列に読み込み/書き込み操作をする場合、処理内容次第ではリソース競合となり正しい処理が行われない可能性が出てきます。対処としては、前述のケースと同様に CopyOnWriteArrayList クラスや ConcurrentHashMap クラスを利用するなどの手段があります。

まとめ

Fork/Join Parallelism in the Wild の論文に書かれている内容をもとに、Fork/Join の計算効率を最大化するための原則、ベストプラクティス、そしてデザインパターンやアンチパターンについて見てきました。ここまでの内容の要点をまとめるならば、以下になるでしょう。

  • リソース共有はなるべく避けるようにしよう
    • リソース共有が避けられない場合は、ロックを必要とする操作を限定し、またロックの粒度をなるべく小さくしよう
  • コレクションの実装クラスの特性を意識して、split / merge 処理が重くならないように気をつけよう
  • タスクの粒度をいい感じに調整しよう
  • 不要な fork はしないようにしよう

論文にはこれ以外にも、アンチパターンのケースで実際どれくらいのパフォーマンス劣化が生じるかを実験した結果なども掲載されています。このブログエントを読んで興味を持った方、もしくは Fork/Join フレームワークを利用してアプリケーションを実装する機会が多い方はぜひこの論文をご一読されることをお勧めします。


  1. Fork/Join に関する Oracle の公式ドキュメントAnother implementation of the fork/join framework is used by methods in the java.util.streams package と書かれているように、Stream で並列処理をする場合は間接的に Fork/Join フレームワークを使っていることになります。 

  2. 実際に、Java 6 のレガシーな Arrays#sort(Object[]) ではマージソートが使われていますが、その詳細を確認すると、やはり一定の大きさを下回ったところで挿入ソートに切り替えるような実装になっています。 

  3. もちろん、論文中で挙げられているディレクトリ再帰の問題のように、しきい値の設定がそもそも困難な問題も存在します。 

  4. ちなみに Java のクラスライブラリにおける代表的な Fork/Join フレームワークを利用した実装である Arrays#parallelSort() では、CPU のコア数に応じてしきい値を変化 させています。 

  5. CopyOnWriteArrayList クラスは読み込み時にロックを必要としない一方で、書き込み処理は配列のコピーを生成したりとかなりのオーバーヘッドが生じるため、書き込み操作が多いケースでの利用を検討する場合はベンチマークをとって評価することをお勧めします (参考: What are the pros and cons of CopyOnWriteArrayList vs Collections.SynchronizedList? When do we choose one over another?)。