読者です 読者をやめる 読者になる 読者になる

ましめも

技術系メモ

スノボで必要な用意

服装

レンタル可能な持ち物 - スキーウエア上下 - ニット帽、もしくはヘルメット - スノボ、スノボブーツ - ゴーグル (レンタルは曇りやすい) - スノボ用グローブ

持ち物 - 暖かい下着上下(ブレスサーモとか、なければヒートテックとか) - 長い靴下(スキー用推奨、なければ普通のを重ねて履くとか) - お尻保護用のパッド、もしくは厚手の短パン(こけても痛くない、雪で座っていても冷たくない) - ネックウォーマー

注意点 - できる限り着ていくと楽。更衣室は狭くて混んでることが多い

その他便利グッズ

あってもなくてもいい

  • 貼るカイロ2枚(背中に貼ると断然に暖かい、スマホを暖めておくと電池が切れにくい)
  • スマホを入れて首から下げられるいれもの(スマホ落とすと見つからないので)
  • 現金を持ち歩く入れ物

ScalaのFutureについてのスライド書きました/つまらないシステム

つまらないシステム(1) - Scala の Future と ExecutionContext http://niconare.nicovideo.jp/watch/kn653

つまらないシステム(2) - Scala 書きやすすぎるFutureの罠 http://niconare.nicovideo.jp/watch/kn654 というスライドを公開しました。

Scala の Future を使うのに慣れてきた人向けの、Future の ExecutionContext を上手に使って詰まらない堅牢なシステムを作ろうという話です。

Scala の Future ってどうやって使うの?Promiseって何?

"Scala Future" で検索して出てくるFutureの解説は、Scala公式サイトのドキュメントを除いて大体こんな感じで紹介されてることが多い。

import scala.concurrent._
import ExecutionContext.Implicits.global
Future {
  Thread.sleep(1000)
  println("hoge")
}
println("fuga")
//->
// fuga
// hoge

こういうFuture.applyにThread.sleepやIOのblockingをする例って非常に悪いと思っている。まるで、Futureでsleepするのが普通のコードっぽく見えるじゃん。違うの、単に説明の時に楽だからsleepしてるだけなの。説明コードが短くてすむの。ちょっと使ってるだけ。プロダクトコードでsleepするのやめろ。うわあ!!Future内でブロッキングやめろ!!そこみんなのトイレだから!無駄に占有しないで!

かといって、公式ドキュメント読めっていってもあのドキュメント長いし退屈。詳細に書いてあるのはわかるんだけど、初学者がちょっとFuture使ってみたいわ〜ってときに「ん?長いな別のサイト見るか」ってなってもしょうがない。

じゃあお前がかけよって言われるわけだが、なんか上手に書けない。前々から書こう書こうと思ってるんだけど。。箇条書きレベルでメモを残すので誰か日本語にしてください。

Scala の Future と Promise って何?

Promise なんて興味ない?いや、Futureの本体はPromiseといっても過言じゃないですよ。

Future[A]... いつか型Aの値が与えられる
Promise[A]... いつか型Aの値を与える

Future[A] いつか型Aの値が与えられる

Future にはいつか値が与えられる。値が与えられたときどういう動作をしてほしいのかは、foreach(等)で定義できる。

import scala.concurrent._
import ExecutionContext.Implicits.global
val a: Future[String] = getFuture()
// a にはいつかStringの値が与えられる。
// a に値が与えられたら、printlnは実行される。
a.foreach(e => println("値やっときたわ: " + e))
println("piyopiyo")

// ->
//   piyopiyo
//   値やっときたわ: ???

Promise[A] いつか型Aの値を与える

import scala.concurrent._
val b: Promise[String] = Promise[String]
// b にはいつかStringの値を与える

b.success("値あげるよ〜")
b.success("2回目実行するよ〜") // -> java.lang.IllegalStateException: Promise already completed.

これ何の役にたつの?

Promise[A] から Future[A] を作ることができる

Promise#futureを呼ぶと、Futureを作ることができる。
Promiseのsuccessを呼ぶと、その瞬間にFutureのforeach(等)が呼び出される。

import scala.concurrent._
import ExecutionContext.Implicits.global
val promise: Promise[String] = Promise[String]
val future: Future[String] = promise.future
future.foreach(e => println("値やっときたわ: " + e))
println("piyopiyo")
promise.success("こんにちは")
// ->
//  piyopiyo
//  値やっときたわ: こんにちは

じゃあ Future.apply って何

Futureの作り方ってFuture.applyじゃないんですか?!そう聞きました!こういう例で習いました!

val future: Future[A] = Future {
  Thread.sleep(1000)
  function1("fugahoge")
}

それでも作れるけど内部で同じように Promise 使ってます。ただの便利関数です。

まとめ

  • Future[A]... いつか型Aの値が与えられる
  • Promise[A]... いつか型Aの値を与える
  • Promise と Future で イベントハンドリング が楽に書ける
  • FutureでThread.sleepやめろ!(Thread.sleepやブロッキングすると当然暇なスレッドができます。それを意識してやってるなら別にいいです)

というのがわかる5分で読める解説ページができるといいな〜

Future内でThread.sleepはするな

前回の記事(Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ - ましめも) で import scala.concurrent.ExecutionContext.Implicits.global とは何なのか、そもそも ExecutionContext とは ということについて解説した。

おさらい

  • ExecutionContext は スレッドプールを持っていて、そこにタスクを割り当てる機構
  • ExecutionContext.Implicits.global はデフォルトではCPUコア数分のスレッドを持っている

Future内でThread.sleepはご法度

ExecutionContext.Implicits.global を使っている状態でThread.sleepをすると非常に迷惑になることがある。
例えば次のようなコードがあったとして、CPUコア数4のマシンで関数fugaと関数hogeが同時に実行されると一体どういうことになってしまうか?

import scala.concurrent._
import ExecutionContext.Implicits.global
def fuga = {
  // CPUコストのかかる function1 という関数をマルチコアを使って効率よく処理したい
  (1 to 10).map {e =>
    Future {function1(e)}
  }
}
def hoge = {
  // 3秒待ってから処理するのを簡単に書いてみた
  (1 to 3).map {e =>
    Future {Thread.sleep(3000); function2(e)} 
  }
}

ご想像の通り、処理がつっかえてしまう(下図)。

f:id:mashijp:20141207235401p:plain

もちろんCPU使用率が100%であればマシン資源をフルに使っているからいいのだが、緑色のタスクはただ寝てるだけだ!(Thread.sleep しているだけ) 資源を無駄にしてしまっている!
橙色のタスクは律儀にglobalのスレッドが空くのをただただ待っている状態だ…

よって、ExecutionContext.Implicits.global を使う場合に原則 Thread.sleep を行ってはいけない。同様に、ブロッキングが発生する操作を行うべきではない(IOのblockingなど)。ここ、みんなの使うところだから。

Thread.sleepしたい場合やIOのblockingを行う場合どうすりゃいいのか

タイトルは半分ウソで、絶対にThread.sleepしてはいけないわけではない。みんなで共用する(可能性のある) ExecutionContext.Implicits.global でそういうことをすると思わぬパフォーマンスの低下を生む可能性があるわけであって、自分で使う用の ExecutionContext を作ってそれを使えば問題ない。

ExecutionContext のscaladocにもそう書いてある

A custom ExecutionContext may be appropriate to execute code which blocks on IO or performs long-running computations. ExecutionContext.fromExecutorService and ExecutionContext.fromExecutor are good ways to create a custom ExecutionContext.

例えばこんな感じ

import scala.concurrent._
import java.util.concurrent.Executors
val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
(1 to 30).foreach {_ => Future{Thread.sleep(3000); println(new java.util.Date)}(ec)}
/* 2個ずつ動いている
Sun Dec 07 23:47:55 JST 2014
Sun Dec 07 23:47:55 JST 2014
Sun Dec 07 23:47:58 JST 2014
Sun Dec 07 23:47:58 JST 2014
Sun Dec 07 23:48:01 JST 2014
Sun Dec 07 23:48:01 JST 2014
Sun Dec 07 23:48:04 JST 2014
**/

これで、global側を邪魔しない。

なおPlay frameworkを使っている場合だと、内包してる akka で簡単に ExecutionContext を作ることができたりする ("Many specific thread pools" のところ参照)
https://www.playframework.com/documentation/2.3.x/ThreadPools

ExecutionContext.Implicits.global のご利用は計画的に

ここまで書いて、ExecutionContext とトイレ(個室)はほとんど同じなんじゃと思い始めた。ExecutionContext.Implicits.global は共用トイレと考えると…

  • 個室の数(= Thread数 = 同時に処理できる数)には限りがある。先着順で処理する
  • 空いてるときは100歩譲って寝てようと何してようと問題にならない
  • 待ち行列ができてるにも関わらずトイレ(ExecutionContext)で寝てるのは許せない。本当に用を足してるならしょうがない
  • 待ち行列ができることに何も問題ないと言い切れるなら別に構わない
  • 自分で作った自分専用のトイレ(ExecutionContext)なら自由にしてくれ

うん!大体あってる気がする!それでは快適なトイレライフを!

Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ

こういう人は、あとあと処理が詰まったり理解できない挙動が起きたりして困るので注意。

  • よくわからないけどコンパイル時に怒られるので import scala.concurrent.ExecutionContext.Implicits.global を書いている
  • Future.apply は 「スレッドを立ち上げて非同期に実行する」と理解している
  • 特に何も考えず Future 内で Thread.sleep をしている

ExecutionContextとは

Future#mapやFuture.applyにimplicitパラメータとして要求される*1ExecutionContextって何なのか?
何か渡さないといけないからとりあえず import scala.concurrent.ExecutionContext.Implicits.global と書いている人もいるんじゃないだろうか。

ExecutionContext*2は簡単に説明するといい感じに非同期に実行してくれる仕組み*3。ExecutionContextのexecuteメソッドは、Runnableを受け取り「適当なタイミングで」実行してくれる。適当なタイミングって結局いつよ?というのはExecutionContextの実装による。

scala.concurrent.ExecutionContext.Implicits.global は scalaが標準で提供しているExecutionContextだと思えば良い。

RunnableはJavaのinterfaceで、runという関数を持っているものを表す。
https://docs.oracle.com/javase/7/docs/api/java/lang/Runnable.html

試しにExecutionContextに Thread.sleepしてprintlnするだけ のRunnableを渡してみよう。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
// ec: scala.concurrent.ExecutionContext = scala.concurrent.impl.ExecutionContextImpl@6168d3c5

def now() = new java.util.Date();
println(now); ec.execute(new Runnable{def run: Unit = {Thread.sleep(3000); println("fuga! " + now);}}); println(now);
// Sun Nov 23 23:41:25 JST 2014
// Sun Nov 23 23:41:25 JST 2014
// fuga! Sun Nov 23 23:41:28 JST 2014

このようにExecutionContextのexecuteに渡したRunnableは非同期に実行されていることがわかる。

ExecutionContextのありがたみとは

そうすると次のような疑問が出てくると思う

  • ああ、つまりスレッド立ち上がって非同期に実行するってこと?
  • 非同期に動作させたいならjava.lang.Thread使えば同じなんじゃないの?

これはいずれもNoである*4

例えば10個の重いタスクがあったとして、それぞれのタスクに対し new Thread & start (スレッド立ち上げ) して処理した場合は当然次のように10個のスレッドが立ち上がる。

f:id:mashijp:20141124002645p:plain
しかし、この実装は以下のような問題点がある

  • タスク数分スレッドを立ち上げるためメモリ/CPU資源の無駄になる
  • しかも暇しているスレッドがいる

もし次のように最小限のスレッドでタスクを分配できれば最高のパフォーマンスを出せるのではないだろうか?
f:id:mashijp:20141124003318p:plain

このようにタスク(Runnableといったほうが適切か)を「いい感じ」にスレッドに分配するのがExecutionContextの役目だ。繰り返しになるが、「いい感じ」とはどういう感じなのかは実装による。一般には「一定数を最大数とするスレッドプールを持っており、空いてるスレッドを利用してRunnableを処理してくれる」と考えればいいのではないだろうか。
ExecutionContextを使う側は内部実装がどうであるかとかスレッドがどう存在するのかとかを一切意識せずRunnableを渡すだけで良い。

scala.concurrent.ExecutionContext.Implicits.global はどういう実装なのか

ExecutionContext.Implicits.global を使ってるとどういうふうに「いい感じ」に動くのかというと*5 最大でCPU論理プロセッサ数のn倍(nはJavaオプションで指定可能。デフォルトは1)のスレッドを立ち上げ処理してくれるようだ。

実際、8コアのマシンで使うと次のように動く。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
def now() = new java.util.Date();
(1 to 30).foreach{_ => ec.execute(new Runnable {def run {Thread.sleep(3000); println(now)}})}
/*
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
(以下略)
*/

このように、8個ずつRunnableが処理されているのがわかる。

Future.applyとは一体なんなのか

Futureを使った簡単なサンプルコードを以下に示す。

import scala.concurrent.ExecutionContext.Implicits.global
val a: Future[String] = Future {
  Thread.sleep(3000)
  "fuga"
}
a.foreach(println)

このFuture.applyを使ったコードは次のように書き表すことができる。

val p: Promise[String] = Promise[String]
val a: Future[String] = p.future
a.foreach(println)
scala.concurrent.ExecutionContext.Implicits.global.execute(
  new Runnable{def run = {
    Thread.sleep(3000)
    p.success("fuga")
  }}
)

後者のコードはPromiseを書いたりしなければならなくて面倒。後者のコードを簡単に実現してくれるのが、Future.applyだ。

Future.apply は CPUコア数分しか同時に処理しない

つまりFuture.applyはCPUコア数分しかスレッドを使わず、同時に処理する数もCPUコア数と同等になる(ExecutionContext.Implicits.global を使った場合は)。Future.applyは決して「スレッドを立ち上げる」という処理と同等でないことに気をつけよう。

import scala.concurrent.ExecutionContext.Implicits.global
(1 to 30).foreach(_ => Future{Thread.sleep(2000);println(now)})

/*
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:46 JST 2014
Mon Nov 24 00:29:46 JST 2014
*/

(追記)

同時並行数を変えたい場合やExecutionContext.Implicits.globalを使う上での注意点をまとめた記事を上げた
Future内でThread.sleepはするな - ましめも

*1:http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future

*2:http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext

*3:やろうとすれば同期するExecutionContextも作れるが基本的に誰も得しないと思う

*4:必ずしもNoではないが(当然ExecutionContextの実装による)

*5:https://github.com/scala/scala/blob/v2.10.3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L65

IntelliJ IDEAさんがplayの自動生成ファイルを認識してくれない

play2.3で自動生成するコード(ReverseRoutingやView)をIntelliJ IDEAが認識してくれないことがあるのでその解決方法をメモ

認識されていない様子(Assets.atが赤い)
f:id:mashijp:20140607123209p:plain

1) File -> Project Structure
f:id:mashijp:20140607123556p:plain

2) Modulesを開き、targetの下を見る
src_managed/main/controllers と twirl/main/views がSourcesになっている
f:id:mashijp:20140607123415p:plain

3) 上の"Mark as"のところをポチポチしてsrc_managed/main と twirl/main がSourcesになるように設定する
f:id:mashijp:20140607123424p:plain

4) 直った!
f:id:mashijp:20140607123430p:plain


play2.2のときもこういう問題が起きていたけど、play2.3になってscala templateが分離されたので作業が増えた…
activator ideaせずにsbt projectとしてimportしたせいかもしれない(play2.2時代はplay ideaしてもこの問題が発生していた)。

お手軽Akka Schedulerとハマりポイント

定期実行処理を簡単に行えるAkka Schedulerというものがある。Akkaはplay framework上から簡単に使うことができるので、バッチ処理等を行うのにとても便利。
Schedulerには次のようなメソッドが定義されている*1

  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit)
    • initialDelay後, intervalの間隔で関数fを実行する
  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any)
    • initialDelay後, intervalの間隔で receiver(Actor)に message を送る

「ちょっと定期実行したいだけだし&Actor定義するのめんどくさいし前者を使おう」と思って使ったら1つ問題点に気づいた。

例外発生時、定期実行されなくなってしまう

この方法の問題点は、scheduleに渡した関数内で例外が発生すると定期実行されなくなってしまうということだ*2。この方法で確実に定期実行するなら決して例外が発生しないようにしないといけない。

    var i = 0
    Akka.system.scheduler.schedule(5 seconds, 1 seconds) {
      i = i + 1
      Logger.info(s"$i 回目 実行します")
      if (i > 5) {
        throw new IllegalArgumentException("ごめん")
      }
      Logger.info("実行に成功しました!")
    }

実行結果(6回で実行が止まってしまう)

[info] application - 1 回目 実行します
[info] application - 実行に成功しました!
[info] application - 2 回目 実行します
[info] application - 実行に成功しました!
[info] application - 3 回目 実行します
[info] application - 実行に成功しました!
[info] application - 4 回目 実行します
[info] application - 実行に成功しました!
[info] application - 5 回目 実行します
[info] application - 実行に成功しました!
[info] application - 6 回目 実行します
java.lang.IllegalArgumentException: ごめん
	at Global$$anonfun$autoCleanUpFileStorage$1.apply$mcV$sp(Global.scala:35)
...
※以降何も出力されない

Actorを使った場合は例外が発生しても止まらない

一方でActorを定義しActorに定期的にメッセージを送るように設定した場合は、処理中に例外が発生しても止まることはない*3

  var j = 0
  class BatchActor extends Actor {
    def receive: Actor.Receive = {
      case e: String =>
        j = j + 1
        Logger.info(s"$j 回目 実行します")
        if (j > 5) {
          throw new IllegalArgumentException("ごめん")
        }
        Logger.info("実行に成功しました!")
    }
  }
// 1秒毎にBatchActorにメッセージ"はい"を送る
Akka.system.scheduler.schedule(5 seconds, 1 seconds, Akka.system.actorOf(Props[BatchActor]), "はい")

実行結果(6回目以降もずっと呼ばれ続けている)

[info] application - 1 回目 実行します
[info] application - 実行に成功しました!
[info] application - 2 回目 実行します
[info] application - 実行に成功しました!
[info] application - 3 回目 実行します
[info] application - 実行に成功しました!
[info] application - 4 回目 実行します
[info] application - 実行に成功しました!
[info] application - 5 回目 実行します
[info] application - 実行に成功しました!
[info] application - 6 回目 実行します
[ERROR] [02/11/2014 20:26:07.746] [application-akka.actor.default-dispatcher-5] [akka://application/user/$a] ごめん
java.lang.IllegalArgumentException: ごめん
	at Global$BatchActor$$anonfun$receive$1.applyOrElse(Global.scala:37)
..

[info] application - 7 回目 実行します
[ERROR] [02/11/2014 20:26:08.741] [application-akka.actor.default-dispatcher-5] [akka://application/user/$a] ごめん
java.lang.IllegalArgumentException: ごめん
	at Global$BatchActor$$anonfun$receive$1.applyOrElse(Global.scala:37)
..
[info] application - 8 回目 実行します
[ERROR] [02/11/2014 20:26:09.741] [application-akka.actor.default-dispatcher-2] [akka://application/user/$a] ごめん
..

この仕様を知らずに前者の関数を渡すschedule関数を使っている&例外処理をちゃんとしてないと、「あれれー?気づいたら定期実行されなくなってるぞー」ということになるので注意。

*1:http://doc.akka.io/docs/akka/2.1.0/scala/scheduler.html

*2:https://github.com/akka/akka/blob/v2.2.0/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L527 例外が発生した場合は次の実行のためのスケジューリングがされない

*3:https://github.com/akka/akka/blob/v2.2.0/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L62 メッセージの送信自体で例外が発生した場合やActorが終了している場合は止まる