ましめも

技術系メモ

Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ

こういう人は、あとあと処理が詰まったり理解できない挙動が起きたりして困るので注意。

  • よくわからないけどコンパイル時に怒られるので import scala.concurrent.ExecutionContext.Implicits.global を書いている
  • Future.apply は 「スレッドを立ち上げて非同期に実行する」と理解している
  • 特に何も考えず Future 内で Thread.sleep をしている

ExecutionContextとは

Future#mapやFuture.applyにimplicitパラメータとして要求される*1ExecutionContextって何なのか?
何か渡さないといけないからとりあえず import scala.concurrent.ExecutionContext.Implicits.global と書いている人もいるんじゃないだろうか。

ExecutionContext*2は簡単に説明するといい感じに非同期に実行してくれる仕組み*3。ExecutionContextのexecuteメソッドは、Runnableを受け取り「適当なタイミングで」実行してくれる。適当なタイミングって結局いつよ?というのはExecutionContextの実装による。

scala.concurrent.ExecutionContext.Implicits.global は scalaが標準で提供しているExecutionContextだと思えば良い。

RunnableはJavaのinterfaceで、runという関数を持っているものを表す。
https://docs.oracle.com/javase/7/docs/api/java/lang/Runnable.html

試しにExecutionContextに Thread.sleepしてprintlnするだけ のRunnableを渡してみよう。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
// ec: scala.concurrent.ExecutionContext = scala.concurrent.impl.ExecutionContextImpl@6168d3c5

def now() = new java.util.Date();
println(now); ec.execute(new Runnable{def run: Unit = {Thread.sleep(3000); println("fuga! " + now);}}); println(now);
// Sun Nov 23 23:41:25 JST 2014
// Sun Nov 23 23:41:25 JST 2014
// fuga! Sun Nov 23 23:41:28 JST 2014

このようにExecutionContextのexecuteに渡したRunnableは非同期に実行されていることがわかる。

ExecutionContextのありがたみとは

そうすると次のような疑問が出てくると思う

  • ああ、つまりスレッド立ち上がって非同期に実行するってこと?
  • 非同期に動作させたいならjava.lang.Thread使えば同じなんじゃないの?

これはいずれもNoである*4

例えば10個の重いタスクがあったとして、それぞれのタスクに対し new Thread & start (スレッド立ち上げ) して処理した場合は当然次のように10個のスレッドが立ち上がる。

f:id:mashijp:20141124002645p:plain
しかし、この実装は以下のような問題点がある

  • タスク数分スレッドを立ち上げるためメモリ/CPU資源の無駄になる
  • しかも暇しているスレッドがいる

もし次のように最小限のスレッドでタスクを分配できれば最高のパフォーマンスを出せるのではないだろうか?
f:id:mashijp:20141124003318p:plain

このようにタスク(Runnableといったほうが適切か)を「いい感じ」にスレッドに分配するのがExecutionContextの役目だ。繰り返しになるが、「いい感じ」とはどういう感じなのかは実装による。一般には「一定数を最大数とするスレッドプールを持っており、空いてるスレッドを利用してRunnableを処理してくれる」と考えればいいのではないだろうか。
ExecutionContextを使う側は内部実装がどうであるかとかスレッドがどう存在するのかとかを一切意識せずRunnableを渡すだけで良い。

scala.concurrent.ExecutionContext.Implicits.global はどういう実装なのか

ExecutionContext.Implicits.global を使ってるとどういうふうに「いい感じ」に動くのかというと*5 最大でCPU論理プロセッサ数のn倍(nはJavaオプションで指定可能。デフォルトは1)のスレッドを立ち上げ処理してくれるようだ。

実際、8コアのマシンで使うと次のように動く。

val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
def now() = new java.util.Date();
(1 to 30).foreach{_ => ec.execute(new Runnable {def run {Thread.sleep(3000); println(now)}})}
/*
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:03 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:06 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
Mon Nov 24 00:43:09 JST 2014
(以下略)
*/

このように、8個ずつRunnableが処理されているのがわかる。

Future.applyとは一体なんなのか

Futureを使った簡単なサンプルコードを以下に示す。

import scala.concurrent.ExecutionContext.Implicits.global
val a: Future[String] = Future {
  Thread.sleep(3000)
  "fuga"
}
a.foreach(println)

このFuture.applyを使ったコードは次のように書き表すことができる。

val p: Promise[String] = Promise[String]
val a: Future[String] = p.future
a.foreach(println)
scala.concurrent.ExecutionContext.Implicits.global.execute(
  new Runnable{def run = {
    Thread.sleep(3000)
    p.success("fuga")
  }}
)

後者のコードはPromiseを書いたりしなければならなくて面倒。後者のコードを簡単に実現してくれるのが、Future.applyだ。

Future.apply は CPUコア数分しか同時に処理しない

つまりFuture.applyはCPUコア数分しかスレッドを使わず、同時に処理する数もCPUコア数と同等になる(ExecutionContext.Implicits.global を使った場合は)。Future.applyは決して「スレッドを立ち上げる」という処理と同等でないことに気をつけよう。

import scala.concurrent.ExecutionContext.Implicits.global
(1 to 30).foreach(_ => Future{Thread.sleep(2000);println(now)})

/*
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:42 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:44 JST 2014
Mon Nov 24 00:29:46 JST 2014
Mon Nov 24 00:29:46 JST 2014
*/

(追記)

同時並行数を変えたい場合やExecutionContext.Implicits.globalを使う上での注意点をまとめた記事を上げた
Future内でThread.sleepはするな - ましめも

*1:http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future

*2:http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext

*3:やろうとすれば同期するExecutionContextも作れるが基本的に誰も得しないと思う

*4:必ずしもNoではないが(当然ExecutionContextの実装による)

*5:https://github.com/scala/scala/blob/v2.10.3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L65

IntelliJ IDEAさんがplayの自動生成ファイルを認識してくれない

play2.3で自動生成するコード(ReverseRoutingやView)をIntelliJ IDEAが認識してくれないことがあるのでその解決方法をメモ

認識されていない様子(Assets.atが赤い)
f:id:mashijp:20140607123209p:plain

1) File -> Project Structure
f:id:mashijp:20140607123556p:plain

2) Modulesを開き、targetの下を見る
src_managed/main/controllers と twirl/main/views がSourcesになっている
f:id:mashijp:20140607123415p:plain

3) 上の"Mark as"のところをポチポチしてsrc_managed/main と twirl/main がSourcesになるように設定する
f:id:mashijp:20140607123424p:plain

4) 直った!
f:id:mashijp:20140607123430p:plain


play2.2のときもこういう問題が起きていたけど、play2.3になってscala templateが分離されたので作業が増えた…
activator ideaせずにsbt projectとしてimportしたせいかもしれない(play2.2時代はplay ideaしてもこの問題が発生していた)。

お手軽Akka Schedulerとハマりポイント

定期実行処理を簡単に行えるAkka Schedulerというものがある。Akkaはplay framework上から簡単に使うことができるので、バッチ処理等を行うのにとても便利。
Schedulerには次のようなメソッドが定義されている*1

  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit)
    • initialDelay後, intervalの間隔で関数fを実行する
  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any)
    • initialDelay後, intervalの間隔で receiver(Actor)に message を送る

「ちょっと定期実行したいだけだし&Actor定義するのめんどくさいし前者を使おう」と思って使ったら1つ問題点に気づいた。

例外発生時、定期実行されなくなってしまう

この方法の問題点は、scheduleに渡した関数内で例外が発生すると定期実行されなくなってしまうということだ*2。この方法で確実に定期実行するなら決して例外が発生しないようにしないといけない。

    var i = 0
    Akka.system.scheduler.schedule(5 seconds, 1 seconds) {
      i = i + 1
      Logger.info(s"$i 回目 実行します")
      if (i > 5) {
        throw new IllegalArgumentException("ごめん")
      }
      Logger.info("実行に成功しました!")
    }

実行結果(6回で実行が止まってしまう)

[info] application - 1 回目 実行します
[info] application - 実行に成功しました!
[info] application - 2 回目 実行します
[info] application - 実行に成功しました!
[info] application - 3 回目 実行します
[info] application - 実行に成功しました!
[info] application - 4 回目 実行します
[info] application - 実行に成功しました!
[info] application - 5 回目 実行します
[info] application - 実行に成功しました!
[info] application - 6 回目 実行します
java.lang.IllegalArgumentException: ごめん
	at Global$$anonfun$autoCleanUpFileStorage$1.apply$mcV$sp(Global.scala:35)
...
※以降何も出力されない

Actorを使った場合は例外が発生しても止まらない

一方でActorを定義しActorに定期的にメッセージを送るように設定した場合は、処理中に例外が発生しても止まることはない*3

  var j = 0
  class BatchActor extends Actor {
    def receive: Actor.Receive = {
      case e: String =>
        j = j + 1
        Logger.info(s"$j 回目 実行します")
        if (j > 5) {
          throw new IllegalArgumentException("ごめん")
        }
        Logger.info("実行に成功しました!")
    }
  }
// 1秒毎にBatchActorにメッセージ"はい"を送る
Akka.system.scheduler.schedule(5 seconds, 1 seconds, Akka.system.actorOf(Props[BatchActor]), "はい")

実行結果(6回目以降もずっと呼ばれ続けている)

[info] application - 1 回目 実行します
[info] application - 実行に成功しました!
[info] application - 2 回目 実行します
[info] application - 実行に成功しました!
[info] application - 3 回目 実行します
[info] application - 実行に成功しました!
[info] application - 4 回目 実行します
[info] application - 実行に成功しました!
[info] application - 5 回目 実行します
[info] application - 実行に成功しました!
[info] application - 6 回目 実行します
[ERROR] [02/11/2014 20:26:07.746] [application-akka.actor.default-dispatcher-5] [akka://application/user/$a] ごめん
java.lang.IllegalArgumentException: ごめん
	at Global$BatchActor$$anonfun$receive$1.applyOrElse(Global.scala:37)
..

[info] application - 7 回目 実行します
[ERROR] [02/11/2014 20:26:08.741] [application-akka.actor.default-dispatcher-5] [akka://application/user/$a] ごめん
java.lang.IllegalArgumentException: ごめん
	at Global$BatchActor$$anonfun$receive$1.applyOrElse(Global.scala:37)
..
[info] application - 8 回目 実行します
[ERROR] [02/11/2014 20:26:09.741] [application-akka.actor.default-dispatcher-2] [akka://application/user/$a] ごめん
..

この仕様を知らずに前者の関数を渡すschedule関数を使っている&例外処理をちゃんとしてないと、「あれれー?気づいたら定期実行されなくなってるぞー」ということになるので注意。

*1:http://doc.akka.io/docs/akka/2.1.0/scala/scheduler.html

*2:https://github.com/akka/akka/blob/v2.2.0/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L527 例外が発生した場合は次の実行のためのスケジューリングがされない

*3:https://github.com/akka/akka/blob/v2.2.0/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L62 メッセージの送信自体で例外が発生した場合やActorが終了している場合は止まる

playのRequest#remoteAddressで取れるのはIPアドレスではない

ちゃんと細かくドキュメントを読まず http://www.playframework.com/documentation/2.1.0/HTTPServer の書くままに設定してると痛い目にあう

突然カンマ区切りのIPアドレスがやってくる

FugaLogging.write(id, "投稿しました", request.remoteAddress)

こんな感じでログを残していたのだが突然IPアドレスがカンマ区切りで記録されていることが…

2014-02-10 23:36:41,933 XXXXXX    A.B.C.D
2014-02-10 23:36:47,432 XXXXXX    A.B.C.D
2014-02-10 23:37:47,142 XXXXXX    A.B.C.D, E.F.G.H
2014-02-10 23:38:15,870 XXXXXX    A.B.C.D
2014-02-10 23:38:20,336 XXXXXX    A.B.C.D

うっ…

Request#remoteAddressはX-Forwarded-Forを返す

http://www.playframework.com/documentation/2.1.0/HTTPServerの下に記載されている通り次の場合はRequest#remoteAddressの返り値は接続元のIPアドレスではなくX-Forwarded-Forとなる。これは、nginxやApache等のリバースプロキシ存在下を考慮した仕様。

  • 127.0.0.1からのアクセス
  • application.conf等にtrustxforwarded=trueが書かれている

ドキュメントの通り次のようにnginxでproxy_set_headerを設定しているとX-Forwarded-Forには "接続元のIPアドレス" もしくは "接続元のIPアドレス, nginxにアクセスした時点で付加されていたX-Forwarded-For" という形になる。

  proxy_set_header   X-Real-IP $remote_addr;
  proxy_set_header   X-Scheme $scheme;
  proxy_set_header   X-Forwarded-For $proxy_add_x_forwarded_for;

アクセス元のIPアドレスだけを取る方法

今回のケースでは迷惑行為防止としてログに記録するのが目的なのでX-Forwarded-Forが全て記録されているのは好都合だったが、本当に「アクセス元のIPアドレス」がほしい場合があるかもしれない。

そういう場合は次のいずれかの方法をとれば良い。

  • Request#remoteAddress の結果を分解して一番右のIPアドレスを得る (おすすめ)
    • メリット: 確実
    • デメリット: ちょっとだけ面倒
    • (僕の頭でplayが勝手にX-Forwarded-Forの一番右のIPアドレスを取り出すのかと思い込んでいた…)
  • 上記のドキュメント通りのnginxの設定だった場合、X-Real-IPヘッダを取り出し使う
    • メリット: 簡単
    • デメリット: playには直接アクセスできないことが条件(X-Real-IPヘッダが信頼できる状況)
      • パフォーマンスの関係上、クライアントが直接playにリクエストを送りたい場合がある(一部の通信はnginxを通して, 一部の通信は直接playに…とか)と、X-Real-IPヘッダが信頼できるのかどうか判定できない。
  • proxy_set_header X-Forwarded-For $remote_addr としてしまう
    • メリット: 簡単
    • デメリット: X-Forwarded-Forの意味からズレてる


Request#remoteAddress が playへの直接アクセスのときとリバースプロキシ通したときとで意味が変わるのはややこしいなあ…

play frameworkでPUT, DELETE, PATCH等のリクエストを<form>から受け取る

RESTfulなパスにしたいとき、<form>タグでmethod=PUT, DELETE等を送れない問題*1にぶち当たることがある。
全部POSTで送ればいいのだが、どうしてもPUTやDELETEを使いたいRESTful脳な人のための解決方法。

解決方法

Railsはこの願いを叶えるために_methodというパラメータを渡した場合はそのメソッドとしてリクエストを解釈するという仕様がある。(formヘルパーは、勝手に<input type="hidden" name="_method" value="put">というhiddenパラメータをつけてくれる *2 )play frameworkでもこれとほぼ同様の手段で解決できるよう、@helper.formの代わりになるヘルパーと、HTTPリクエストの処理の仕方を変えるようにした。

app/views/helper/formExtended.scala.html

GET or POSTでない場合のみGETパラメータに"_method=METHOD"をつけるヘルパー。@helper.formの代わりに@helper.formExtendedとしてあげれば良い。

@(action: Call, args: (Symbol, String)*)(body: => Html)

@availableFormMethod(method: String) = @{
  method.toUpperCase match {
    case "GET" | "POST" | "" => true
    case _ => false
  }
}
@appendMethod(action: Call) = @{
  val (url, method) = (action.url, action.method)
  if(availableFormMethod(method)) {
    url
  } else {
    url + (if(url.contains('?')) "&" else "?") + "_method=" + method
  }
}

<form action="@appendMethod(action)" method="@if(availableFormMethod(action.method)) {@action.method} else {POST}" @toHtmlArgs(args.toMap)>
  @body
</form>

(大抵のケースだとCSRF対策でtokenを送っていると思うので、送るロジックをついでにここに書いちゃえばいい)

app/Global.scala

あとはGlobal.scalaでリクエストのメソッドを書き換えてあげれば完成

object Global extends GlobalSettings with Results {
....
  override def onRouteRequest(request: RequestHeader): Option[Handler] = {
    val requestRewrited = request.method.toUpperCase match {
      case "POST" =>
        request.copy(method = request.getQueryString("_method").getOrElse("POST"))
      case _ => request
    }
    super.onRouteRequest(requestRewrited)
  }
}


(追記) @formExtended というヘルパーを作るより、単純にCallを書き換えてくれるやつ作ったほうが良いかも

*1:HTML4だとGET, POSTしか定義されていない。2014/02/02時点のChrome, Firefoxもこれら以外のmethodで送れないように思える…。要検証。

*2:http://guides.rubyonrails.org/form_helpers.html

play framework 2.2でテスト用configを設定する

FakeApplicationで指定のconfigを読み込ませたい場合(例えばconf/test.conf)は次のようにすれば読み込むことができる。

  "テストケース" should {
    "ふがほげ" in {
      running(new FakeApplication with DevSettings {
        def devSettings: Map[String, String] = Map(
          "config.file" -> "conf/test.conf"
        )
      }) {
        // テストコード
      }
    }
  }

JSONをHTML/CSSで表現する part2

再チャレンジしていいかんじにコピペできる版を作った。

コピペ例

{
  "a": "fuga",
  "b": [
    1,
    2,
    true
  ]
}

実現方法

見えない文字列を挿入することによりインデントを実現…。結局泥臭い方法になっちゃったなあ。