読者です 読者をやめる 読者になる 読者になる

ましめも

技術系メモ

Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ

こういう人は、あとあと処理が詰まったり理解できない挙動が起きたりして困るので注意。

  • よくわからないけどコンパイル時に怒られるので import scala.concurrent.ExecutionContext.Implicits.global を書いている
  • Future.apply は 「スレッドを立ち上げて非同期に実行する」と理解している
  • 特に何も考えず Future 内で Thread.sleep をしている

ExecutionContextとは

Future#mapやFuture.applyにimplicitパラメータとして要求される*1ExecutionContextって何なのか?
何か渡さないといけないからとりあえず import scala.concurrent.ExecutionContext.Implicits.global と書いている人もいるんじゃないだろうか。

ExecutionContext*2は簡単に説明するといい感じに非同期に実行してくれる仕組み*3。ExecutionContextのexecuteメソッドは、Runnableを受け取り「適当なタイミングで」実行してくれる。適当なタイミングって結局いつよ?というのはExecutionContextの実装による。

scala.concurrent.ExecutionContext.Implicits.global は scalaが標準で提供しているExecutionContextだと思えば良い。

RunnableはJavaのinterfaceで、runという関数を持っているものを表す。
https://docs.oracle.com/javase/7/docs/api/java/lang/Runnable.html

試しにExecutionContextに Thread.sleepしてprintlnするだけ のRunnableを渡してみよう。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
// ec: scala.concurrent.ExecutionContext = scala.concurrent.impl.ExecutionContextImpl@6168d3c5

def now() = new java.util.Date();
println(now); ec.execute(new Runnable{def run: Unit = {Thread.sleep(3000); println("fuga! " + now);}}); println(now);
// Sun Nov 23 23:41:25 JST 2014
// Sun Nov 23 23:41:25 JST 2014
// fuga! Sun Nov 23 23:41:28 JST 2014

このようにExecutionContextのexecuteに渡したRunnableは非同期に実行されていることがわかる。

ExecutionContextのありがたみとは

そうすると次のような疑問が出てくると思う

  • ああ、つまりスレッド立ち上がって非同期に実行するってこと?
  • 非同期に動作させたいならjava.lang.Thread使えば同じなんじゃないの?

これはいずれもNoである*4

例えば10個の重いタスクがあったとして、それぞれのタスクに対し new Thread & start (スレッド立ち上げ) して処理した場合は当然次のように10個のスレッドが立ち上がる。

f:id:mashijp:20141124002645p:plain
しかし、この実装は以下のような問題点がある

  • タスク数分スレッドを立ち上げるためメモリ/CPU資源の無駄になる
  • しかも暇しているスレッドがいる

もし次のように最小限のスレッドでタスクを分配できれば最高のパフォーマンスを出せるのではないだろうか?
f:id:mashijp:20141124003318p:plain

このようにタスク(Runnableといったほうが適切か)を「いい感じ」にスレッドに分配するのがExecutionContextの役目だ。繰り返しになるが、「いい感じ」とはどういう感じなのかは実装による。一般には「一定数を最大数とするスレッドプールを持っており、空いてるスレッドを利用してRunnableを処理してくれる」と考えればいいのではないだろうか。
ExecutionContextを使う側は内部実装がどうであるかとかスレッドがどう存在するのかとかを一切意識せずRunnableを渡すだけで良い。

scala.concurrent.ExecutionContext.Implicits.global はどういう実装なのか

ExecutionContext.Implicits.global を使ってるとどういうふうに「いい感じ」に動くのかというと*5 最大でCPU論理プロセッサ数のn倍(nはJavaオプションで指定可能。デフォルトは1)のスレッドを立ち上げ処理してくれるようだ。

実際、8コアのマシンで使うと次のように動く。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
def now() = new java.util.Date();
(1 to 30).foreach{_ => ec.execute(new Runnable {def run {Thread.sleep(3000); println(now)}})}
/*
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
(以下略)
*/

このように、8個ずつRunnableが処理されているのがわかる。

Future.applyとは一体なんなのか

Futureを使った簡単なサンプルコードを以下に示す。

import scala.concurrent.ExecutionContext.Implicits.global
val a: Future[String] = Future {
  Thread.sleep(3000)
  "fuga"
}
a.foreach(println)

このFuture.applyを使ったコードは次のように書き表すことができる。

val p: Promise[String] = Promise[String]
val a: Future[String] = p.future
a.foreach(println)
scala.concurrent.ExecutionContext.Implicits.global.execute(
  new Runnable{def run = {
    Thread.sleep(3000)
    p.success("fuga")
  }}
)

後者のコードはPromiseを書いたりしなければならなくて面倒。後者のコードを簡単に実現してくれるのが、Future.applyだ。

Future.apply は CPUコア数分しか同時に処理しない

つまりFuture.applyはCPUコア数分しかスレッドを使わず、同時に処理する数もCPUコア数と同等になる(ExecutionContext.Implicits.global を使った場合は)。Future.applyは決して「スレッドを立ち上げる」という処理と同等でないことに気をつけよう。

import scala.concurrent.ExecutionContext.Implicits.global
(1 to 30).foreach(_ => Future{Thread.sleep(2000);println(now)})

/*
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:46 JST 2014
Mon Nov 24 00:29:46 JST 2014
*/

(追記)

同時並行数を変えたい場合やExecutionContext.Implicits.globalを使う上での注意点をまとめた記事を上げた
Future内でThread.sleepはするな - ましめも

*1:http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future

*2:http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext

*3:やろうとすれば同期するExecutionContextも作れるが基本的に誰も得しないと思う

*4:必ずしもNoではないが(当然ExecutionContextの実装による)

*5:https://github.com/scala/scala/blob/v2.10.3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L65