2015年8月24日月曜日

[scalaz]Task - 並列処理

先日の「Reactive System Meetup in 西新宿」で「Scalaz-StreamによるFunctional Reactive Programming」のスライドを作るにあたってscalazのTaskについて調べなおしてみたのですが、Taskの実用性について再確認することができました。

色々と切り口がありますが、その中で並列性能が今回のテーマです。

準備

準備として以下のものを用意します。

implicit val scheduler = new java.util.concurrent.ForkJoinPool(1000)

  def go[T](msg: String)(body: => T): Unit = {
    System.gc
    val ts = System.currentTimeMillis
    val r = body
    println(s"$msg(${System.currentTimeMillis - ts}): $r")
  }

  def fa(i: Int): Task[Int] = Task {
    Thread.sleep(i)
    i
  }

スレッド実行コンテキスト(ExecutorService)にはForkJoinPoolを使用します。

ForkJoinPoolは分割統治(devide and conquer)により並行処理を再帰的(recursive)に構成する処理向けのスレッドスケジュールを行います。ざっくりいうと並列で何らかの計算を行う処理全般に向くスケジュールといえるのではないかと思います。IO処理の場合も復数のIO発行後に同期待ち合わせをするケースでは同様にForkJoinPoolが有効と思われます。

今回の性能検証では1000並列させたいのでパラメタで指定しています。

goメソッドは性能測定用のユーティリティです。

fa関数は性能測定対象の関数です。関数faは指定されたマイクロ秒間ウェイトして指定されたマイクロ秒を返す関数をTask化したものです。

問題

以下は性能検証の課題のプログラムです。fa関数の呼出しを1000回逐次型で行います。

Vector.fill(1000)(fa(1000)).map(_.run).sum

関数faは指定されたマイクロ秒間ウェイトして指定されたマイクロ秒を返す関数をTask化したものです。これを1000回繰り返したものを合計したものを計算します。

パラメタでは1000マイクロ秒=1秒を指定しているので1000回繰り返すと16.7分程かかる処理になります。

性能検証

Taskの並列処理を行う以下の関数について性能測定を行いました。

  • Task/gatherUnordered
  • Task/reduceUnordered
  • Nondeterminism/gather
  • Nondeterminism/gatherUnordered
  • Nondeterminism/reduceUnordered
  • Nondeterminism/aggregate
  • Nondeterminism/aggregateCommutative

以下ではそれぞれの関数について説明します。

gatherUnordered(Task)

TaskのgatherUnordered関数の性能測定対象プログラムは以下になります。

def apply_gather_unordered_task: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Task.gatherUnordered(xs).map(_.sum)
    t.run
  }

TaskのgatherUnordered関数はNondeterminismのgatherUnordered関数とよく似ていますが、並列実行しているTaskの1つが例外になった時に処理全体を止める機能を持っている点が異なります。デフォルトではfalseになっているので、ここではこの機能は使っていません。

unorderedつまり結果順序は元の順序を維持していない(計算が終わった順の可能性が高い)ことは待ち合わせ処理を最適化できる可能性があるので実行性能的には好材料です。一方、アルゴリズム的には順序が保たれていることが必要な場合があるので、その場合はgatherUnorderedは使用することは難しくなります。

可換モノイド(commutative monoid)は演算の順序が変わっても結果が同じになることが保証されているので、並列処理結果が可換モノイドであり、並列処理結果の結合処理が可換モノイドの演算である場合は、並列処理結果が元の順序を保持している必要はありません。つまりgatherUnorderedを使っても全く問題ないわけです。

Intは「+」演算に対して可換モノイドなので、並列処理結果の総和を計算するという結合処理向けにgatherUnorderedを使うことができます。

reduceUnordered(Task)

TaskのreduceUnordered関数の性能測定対象プログラムは以下になります。

def apply_reduce_unordered_task: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Task.reduceUnordered(xs)(Reducer.identityReducer)
    t.run
  }

並列処理を行った後で、復数の処理結果をまとめる場合にはreduce機能を使用すると意図が分かりやすいですし、共通処理内での最適化も期待できます。

TaskのreduceUnorderedはscalazのReducerを使って、並列処理結果のreduce処理を行う関数です。NondeterminismのreduceUnordered関数とよく似ていますが、並列実行しているTaskの1つが例外になった時に処理全体を止める機能を持っている点が異なります。デフォルトではfalseになっているので、ここではこの機能は使っていません。

並列処理の結果得られるデータはIntで、Intは可換モノイドですから、Monoidの性質を使ってreduce処理を行うことができます。そこで、ReducerとしてidentityReducer(処理結果のMonoidをそのまま使ってreduce処理を行う)を指定しています。

gather(Nondeterminism)

Nondeterminismのgather関数の性能測定対象プログラムは以下になります。

def apply_gather: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].gather(xs).map(_.sum)
    t.run
  }

「Nondeterminism[Task]」は型クラスNondeterminismのTask向け型インスタンスの意味です。つまりTaskはNondeterminismでもあるので、Nondeterminismのgather関数を実行することができます。

gather関数はNondeterminismデータシーケンスに対してそれぞれの要素を並列処理し、シーケンスの順序を維持した結果を計算します。

上記ではその結果得られたIntシーケンスをsum関数で合算しています。

gatherUnordered(Nondeterminism)

NondeterminismのgatherUnordered関数の性能測定対象プログラムは以下になります。

def apply_gather_unordered: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].gatherUnordered(xs).map(_.sum)
    t.run
  }

TaskのgatherUnordered関数と同様に指定された並列処理の順序を保持せず、処理結果を順不同でシーケンスとして返します。

結果としてIntシーケンスが返ってきますが、Intは可換モノイドの性質を持つため順不同で返ってきてもsum関数で合算して問題ありません。

reduceUnordered(Nondeterminism)

NondeterminismのreduceUnordered関数の性能測定対象プログラムは以下になります。

def apply_reduce_unordered: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].reduceUnordered(xs)(Reducer.identityReducer)
    t.run
  }

型クラスNondeterminismのreduceUnordered関数を使って並列実行と実行結果のreduce処理を行います。

TaskのreduceUnorderedの場合と同じくReducerとしてidentityReducerを指定しています。

aggregate(Nondeterminism)

Nondeterminismのaggregate関数の性能測定対象プログラムは以下になります。

def apply_aggregate: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].aggregate(xs)
    t.run
  }

aggregate関数はreduceUnordered関数と並列実行後にreduce処理を行う点では同じ系統の計算を行いますが、以下の点が異なります。

  • aggregate関数はMonoidを前提としておりMonoidの性質を利用してreduce処理を行う。それに対してreduceUnordered関数はreduce処理を行うアルゴリズムをReducerとして指定する。
  • Monoidは可換モノイドとは限らないので並列計算の順序が保存されている必要がある。このためaggregate関数は並列実行順序を保存する処理を行っている。それに対してreduceUnordered関数は並列実行順序を保存する処理を行っていない。

aggregateは(可換モノイドでないかもしれない)Monoidを処理対象にしているため、並列計算の順序を保存する処理が必要になるので、その分性能的には不利になります。

この問題に対する改良策が次のaggregateCommutative関数です。

aggregateCommutative(Nondeterminism)

NondeterminismのaggregateCommutative関数の性能測定対象プログラムは以下になります。

def apply_aggregate_commutative: Int = {
    val xs = Vector.fill(1000)(fa(1000))
    val t = Nondeterminism[Task].aggregateCommutative(xs)
    t.run
  }

aggregateCommutative関数はaggreagte関数と同様にMonoidを処理対象としていますが、指定されたMonoidが可換モノイドであるという前提で計算を行います。

可換モノイドであるということは、演算の評価順序が異なっても同じ結果になるということなので、指定された並列計算シーケンスの実行順序を保存する処理は不要です。並列計算シーケンスの実行順序を保存する処理が不要になると実行性能的に有利です。

可換モノイドとモノイドの違い(つまり可換性)は並列処理で重要ですが、現在のところScalazには可換モノイドを表現する型クラスは用意されていないので、型を使ってエラーチェックを行ったり(例:可換性前提の関数で可換性なしのデータを使用できないようにチェック)、最適化(例:可換性の有無で実行順序の保存処理の有無を切り替える)を行うようなことはできません。

aggregateCommutative関数で行っているように、使用者側が違いを意識して使う形になります。

性能

クラスメソッド性能(ms)順序保存集約機能キャンセル機能
TaskgatherUnordered2139--
TaskreduceUnordered1869-
Nondeterminismgather1082--
NondeterminismgatherUnordered1048 ---
NondeterminismreduceUnordered1718--
Nondeterminismaggregate1049-
NondeterminismaggregateCommutative1040--

評価

表では各関数を以下の性質の有無で分類しました。

  • 順序保存
  • 集約機能
  • キャンセル機能

それぞれの性質毎に性能特性を考えます。

順序保存

評価順序保存をすると順序保存なしより少し遅くなります。

可能であれば順序保存なしを選ぶのがよいでしょう。

順序保存なしを選べるかどうかは、各並列処理の計算結果がreduce処理の演算に対して可換モノイド(commutative monoid)であるかどうかで判定できます。

整数の加算や積算は典型的な可換モノイドなので、最終的な計算結果が合算処理(全要素の加算)の場合は「順序保存なし」を選べるわけです。

集約機能

並列実行関数に集約機能が包含されていると、各並列処理の結果を使って直接集約処理を行うことができるので効率的です。一度、並列実行結果をリスト上に保存して、そのリストに対して集約するより、集約対象のデータ(例:整数値)を各並列処理の完了後に直接更新して行く方がオーバーヘッドは少なくなります。

集約機能を提供しているaggregate関数とaggregateCommutative関数が、それぞれ対応するgather関数、gatherUnorderedと比較して若干速いのはそのためだと思われます。

Monoidは集約対象として優れた性質を持っているので、集約機能の対象として使用するのがFunctional Programming(以下FP)のパターンになっています。aggregate関数とaggregateCommutative関数はこのパターンに則って、集約機能の対象としてMonoidを使用します。

一方Reducerを使った集約は大きなオーバーヘッドがあるようなので、積極的に利用する価値があるという感じではないようです。

NondeterminismのgatherUnordered関数とreduceUnordered関数の比較ではreduceUnordered関数がかなり遅くなっています。この場合、Reducer経由でMonoidの集約を行っているので、Monoidを直接集約するよりオーバーヘッドがあるのが原因と思われます。

一方TaskのgatherUnordered関数とreduceUnordered関数の場合、reduceUnordered関数の方が速いので、こちらの場合はreduceUnordered関数の利用は有力な選択肢です。キャンセル機能が重たいためにReducer機能の遅さが隠れてしまうのかもしれません。

キャンセル機能

キャンセル機能はTaskのgatherUnordered関数、reduceUnordered関数が提供しています。

NondeterminismのgatherUnordered関数、reduceUnordered関数と比較すると相当遅くなっています。キャンセル機能が必要でない場合は使わない方がよいでしょう。

まとめ

性能測定の結果、並列処理結果を可換モノイドで受け取り集約処理を行うaggregateCommutative関数が一番高速であることが分かりました。

並列処理実行後の集約処理までを一体化して処理の最適化ができるのは可換モノイドの効果です。

並列処理を設計する際には、各並列処理の結果を可換モノイドで返すような形に持ち込むことができるのかというのが一つの論点になると思います。

また可換モノイドにできない場合も、モノイドにできれば汎用関数で集約まで行うことができるので、並列処理を記述する上で大きな助けになります。

Scalazではモノイドを記述する型クラスMonoidが用意されています。MonoidはScalazによるFPで中心的な役割を担う型クラスの一つですが、並列処理においても重要な役割を担うことが確認できました。

Scalazでは可換モノイドを記述する型クラスはまだ用意されていないので、Monoidで代用することになります。aggregateCommutative関数のように引数の型としてはMonoidを使い、暗黙的に可換モノイドを前提とするような使い方になると思います。

メニーコアによる並列計算が本格化するのはもう少し先になると思いますが、その際のベースとなる要素技術はすでに実用技術として利用可能になっていることが確認できました。FPが並列計算に向いているという期待の大きな部分は、モノイドや可換モノイドのような数学的な概念をプログラミング言語で直接使用できる点にあります。Scala&Scalazはこれを実用プログラミングで可能にしているのが大きな美点といえるかと思います。

諸元

  • Mac OS 10.7.5 (2.6 GHz Intel Core i7)
  • Java 1.7.0_75
  • Scala 2.11.6

2015年8月17日月曜日

クラウドアプリケーション・モデリング考

8月7日に「匠の夏まつり ~モデリングの彼方に未来を見た~」のイベントが行われましたが、この中でパネルディスカッションに参加させていただきました。パネルディスカッションでご一緒させていただいた萩本さん、平鍋さん、高崎さん、会場の皆さん、どうもありがとうございました。

パネルディスカッションがよいきっかけとなって、クラウドアプリケーション開発におけるモデリングについての方向性について腰を落として考えることができました。このところFunctional Reactive Programmingを追いかけていましたが、ちょうどモデリングとの接続を考えられる材料が揃ってきているタイミングでした。

パネルディスカッションの前後に考えたことをこれまでの活動の振り返りも含めてまとめてみました。

基本アプローチ

2008年頃からクラウドアプリケーション開発の手法について以下の3点を軸に検討を進めています。

  • クラウド・アプリケーションのアーキテクチャ
  • メタ・モデルと実装技術
  • モデル駆動開発

検討結果は以下にあげるスライドとブログ記事としてまとめていますが、基本的な考え方は現在も同じです。

ざっくりいうと:

  • クラウド・アプリケーションのバックエンドのアーキテクチャはメッセージ方式になる。
  • クラウド・アプリケーションのモデリングではOOADの構造モデル、状態機械モデルを踏襲。
  • 協調モデルの主力モデルとしてメッセージフローまたはデータフローを採用。
  • OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成。
  • メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い。

という方針&仮説です。「OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成」についてはSimpleModeler、「メッセージフローまたはデータフローはDSLによる直接実行方式」についてはg3 frameworkで試作を行っていました。

ここまでが2010年から2012年中盤にかけての状況です。

ブログ

2012年以降のアプローチ

2012年の後半にEverforthに参画してApparel Cloudを始めとするCloud Service Platformの開発に注力しています。

前述の論点の中で以下の3点についてはApparel Cloudの開発に直接取り入れています。

  • クラウド・アプリケーションのバックエンドのアーキテクチャはメッセージ方式になる。
  • クラウド・アプリケーションのモデリングではOOADの構造モデル、状態機械モデルを踏襲。
  • OOADの構造モデル、状態機械モデルはモデル駆動開発による自動生成。

「メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い」については当初はg3 frameworkという独自DSLによる実装を考えていたのですが、Object-Functional Programmingの核となる技術であるモナドがパイプライン的なセマンティクスを持ち、データフローの記述にも使用できそうという感触を得られたため、ScalazベースのMonadic Programmingを追求して技術的な接点を探るという方針に変更しました。

2012年以降ブログの話題がScalaz中心になるのはこのためです。

その後、まさにドンピシャの技術であるscalaz-streamが登場したので、scalaz-streamをApparel Cloudの構築技術として採用し、「メッセージフローまたはデータフローはDSLによる直接実行方式が有効の可能性が高い」の可能性を実システム構築に適用しながら探っている状況です。

今後のアプローチ

現在懸案として残っている項目は以下のものになります。

  • 協調モデルの主力モデルとしてメッセージフローまたはデータフローを採用。

前述したようにメッセージングのDSLとしてはscalaz-streamをベースにノウハウを積み重ねている状況なので、この部分との連続性をみながらモデリングでの取り扱いを考えていく予定です。

また、ストリーミング指向のアーキテクチャ&プログラミングモデルとしては以下のような技術が登場しています。

このような新技術の状況をみながら実装技術の選択を行っていく予定です。

参考: スライド

パネルディスカッションでのポジション宣言的なスライドとして以下のものを作成しました。

この中で6ページ目の「Cloud時代のモデリング」が今回パネルディスカッションのテーマに合わせて新規に作成したものです。

このスライドで言いたいことは、伝統的なスクラッチ開発とくらべてクラウドアプリケーションではプログラミング量が大幅に減るので、要件定義やその上流であるビジネスモデリングが重要になる、ということです。

  • アプリケーションの大きな部分はCloud Service Platformが実現
  • モデル駆動開発によってドメインモデル(静的構造)の大部分は自動生成される
  • Scalaで実現されているDSL指向のOFP(Object-Functional Programming)は記述の抽象度が高いので設計レベルのモデリングは不要
  • Scalaの開発効率は高いのでプログラミングの比重は下がる
補足:Featureモデル

後日スライドのキーワードページに入れておくべきキーワードとしてFeatureモデルがあることに気付いたので、上記のスライドには追加しておきました。

スライドの想定する世界では、クラウドアプリケーションはクラウドサービスプラットフォーム上で動作するため、クラウドサービスプラットフォームが提供している機能とクラウドアプリケーションの機能の差分をモデル化し、このモデルを元に実際に開発する所、カスタマイズで済ませる所などを具体化していく必要があります。この目的にはSoftware ProductlineのFeatureモデルが有効ではないかと考えています。

2015年7月27日月曜日

[SDN]List性能問題

懸案だったMonadic Programming、Functional Reactive Programmingもある程度目処がついてきたこともあり、要件定義で作成されたモデルを実装に落とし込むという流れの中での総合的なScalaプログラミングの方法論をScala Design Noteという切り口で考察していこうと思います。

大昔にJava World誌で「Java Design Note」という連載を書いていましたが、そのScala版をイメージしています。

問題

Scalaを実務に使う場合に注意が必要なのがListの性能問題です。

Scalaでは関数型言語の伝統を踏襲してLispのList由来のListを基本データ構造としています。システムの各種デフォルトもListを使う方向になっていますし、文法を説明する場合にもListを使用するケースが多いと思います。

Listは関数型プログラミングとの相性はとてもよいので妥当な選択ではあるのですが、要素の後方追加が致命的に遅いという問題を持っています。この問題があるためListの使い方には注意が必要です。

致命的に遅いといっても小さなプログラムではほぼ問題はありませんが、プログラムの処理が本格化し、扱うデータ規模が大きくなってくると顕在化してきます。

ListはScalaの基本データ構造であり、普通にScalaを使っていると色々な所でListが自然に使用されますが、これがプロダクションコードとしてはアンチパターンという点がScalaプログラミングのハマりどころとなっています。

きっかけ

前回「Scalaへの道」で以下のcase class Wordsを実装しました。

この実装を行う中で、このように復数のオブジェクトを保持しているcase classを足し込んでいくような処理におけるListの危険性が判明したのが、今回の記事のきっかけになっています。

package sample

case class Words(
  smalls: Vector[String],
  middles: Vector[String],
  larges: Vector[String]
) {
  def +(word: String): Words = {
    if (word.length <= 3)
      copy(smalls = smalls :+ word)
    else if (word.length > 7)
      copy(larges = larges :+ word)
    else
      copy(middles = middles :+ word)
  }

  def +(words: Words): Words = {
    copy(
      smalls ++ words.smalls,
      middles ++ words.middles,
      larges ++ words.larges
    )
  }

  def ++(words: Seq[String]): Words = {
    words.foldLeft(this)(_ + _)
  }

  override def toString() = s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
}

object Words {
  val empty = Words(Vector.empty, Vector.empty, Vector.empty)

  def apply(word: String): Words = empty + word

  def apply(words: Seq[String]): Words = empty ++ words

  def getTokens(s: String): Vector[String] =
    s.split(" ").filter(_.nonEmpty).toVector
}
問題

性能測定のため前述のcase class Wordsを単純化し、測定パターンごとに復数の実装を用意しました。以下は、宣言にIndexedSeq、実装にVectorを採用した実装であるWordsIVです。これを含めて16パターンの測定を行いました。(後述の表参照)

case class WordsIV(
    smalls: IndexedSeq[String],
    middles: IndexedSeq[String],
    larges: IndexedSeq[String]
  ) {
    def +(word: String): WordsIV = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsIV {
    val empty = WordsIV(Vector.empty, Vector.empty, Vector.empty)
  }

case class WordsVは以下のようにして積算した時の実行時間を計測しました。

go("WordsIV") {
      inputs.foldLeft(WordsIV.empty)(_ + _)
    }
測定のパターン

性能測定パターンは以下になります。この中の「WordsIV」のプログラムは前述しました。その他のパターンのプログラムは一番最後に一括で置いておきます。

測定名性能(ms)宣言実装追加場所備考
WordsJ30ArrayBufferArrayBuffer
WordsV33VectorVector
WordsL27717ListList
WordsVP32VectorVector
WordsLP28ListList
WordsSV19840SeqVector
WordsSVV32SeqVectorロジック工夫
WordsS27620SeqSeq
WordsSP31SeqSeq
WordsIV30IndexedSeqVector
WordsIA614IndexedSeqArray
WordsI29IndexedSeqIndexedSeq
WordsIP29IndexedSeqIndexedSeq
WordsBuilderV25VectorArrayBufferBuilder
WordsBuilderL29ListArrayBufferBuilder
WordsBuilderS25SeqArrayBufferBuilder

考察

性能測定の結果をまとめると以下のような結果が得られました。

  • Listに追記する処理は圧倒的に遅い (WordsL, WordsSV, WordsS)
  • ArrayBuffer、Vectorへの追記、List、Vectorへの前置挿入はほとんど同じ性能 (WordsJ, WordsV, WordsVP, WordsLP)
  • SeqとVectorの組合せはロジックを工夫しないと非常に遅くなることがある (WordsSV)
  • Vectorへの前置挿入はほとんどListと変わらない (WordsVP, WordsLP)

Seqのデフォルト実装はListなのでWordsSも事実上Listの追記処理のパターンになります。WordLとWordsSの結果からListの追記処理が非常に低速なのは明らかです。

また、宣言にSeq、初期実装にVectorを使うWordsSVは実装にVectorを使う意図なのですが、ロジックの綾で実際はListが使われるようになるらしく非常に低速です。

ただWordsSVと同じパターンのWordsSVVは宣言Vector、実装VectorのWordsVなどと同じ性能が出ています。宣言にSeq、初期実装にVectorのパターンは実装時のロジックの綾で非常に低速になったり、通常性能になったりしてリスクが高い処理パターンといえます。

コンテナの選択

まずコンテナの候補としてはVector一択かなと考えています。

Listは追記性能が非常に遅いので、使い方に注意が必要な点がデメリットです。また、単方向リストによる実装方式からメモリも多目に消費することが推測されます。

VectorとListの性質を比較した場合、Listは前置挿入が得意で追記が苦手、Vectorは追記が得意で前置挿入が苦手、という整理の仕方がありますが、Vectorは追記が得意で前置挿入も大丈夫、というの実際のところです。

Vectorは万能で用途を選ばないのに対して、Listは追記が苦手という弱点があるので、利用局面に応じて意識して使用する必要があります。

このためコンテナの選択は弱点がないVectorの一択と考えてよいと思います。

Listは関数型言語の伝統的なデータ構造なので変な話ですが、ScalaにおけるListはHashSetなどと同様の特殊用途向けコンテナとして割り切って使うぐらいがベストプラクティスではないかと思います。

Listの長所

Listが向いている「特殊用途」は何かという話ですが、伝統的な関数型プログラムということになります。

関数型プログラミングでよく出てくる再帰呼び出しでの利用は、専用の文法が用意されていて便利です。たとえば、以下のようなコーディングパターンです。

def f(a: List[Int]): Int = {
  a match {
    case Nil => 0
    case x :: xs => x + f(xs)
  }
}

ただVectorを始めとするSeqでも以下のように書けるので使えないと困るという程ではありません。

def f(a: Vector[Int]): Int = {
  a.headOption match {
    case None => 0
    case Some(x) => x + f(a.tail)
  }
}
補足:末尾再帰呼び出し

念のために補足です。

「Listの長所」で取り上げたコードはList処理を分かりやすく提示するのが目的なので再帰呼び出しは自然な方法を用いていますが、実はこの「自然な方法」はプロダクションコードとしては適切ではありません。というのは、このコードでは末尾再帰の最適化が得られないのでスタックオーバーフローのリスクがあるためです。

今回の場合は実務的には以下のようなコーディングになります。

def f(a: List[Int]): Int = {
  @annotation.tailrec
  def go(b: List[Int], sum: Int): Int = {
    b match {
      case Nil => sum
      case x :: xs => go(xs, x + sum)
    }
  }
  go(a, 0)
}
def f(a: Vector[Int]): Int = {
  @annotation.tailrec
  def go(b: Vector[Int], sum: Int): Int = {
    b.headOption match {
      case None => sum
      case Some(x) => go(b.tail, x + sum)
    }
  }
  go(a, 0)
}

宣言の選択

オブジェクトの集まりを実現する場合に使用するコンテナとして概ね以下の4つを日常的に使用します。

コンテナ種別説明
Seqトレイト先頭からのシーケンシャルアクセス
IndexedSeqトレイトランダムアクセスに適したシーケンス
List具象クラスList構造のSeq
Vector具象クラス配列をベースとしたIndexedSq

SeqとIndexSeqはトレイトなので、それぞれ復数の具象クラスに対応します。関数の引数やオブジェクトのインスタンス変数にSeqやIndexSeqを指定することで、プログラムの実行時に具象クラスを使い分ける事ができるようになります。

ListとVectorは具象クラスなので、引数やインスタンス変数の型として指定すると型を最終決定してしまうことになります。逆に明確な決定が行われるので、予想外の動きによる性能劣化という事態は避ける事ができます。

一般的には、できるだけ抽象度の高いものを選択するのが得策です。その一方で、Seqを使うと前述のListの性能問題が発生する可能性があるので、一定のリスクがあります。

つまり汎用性と性能リスクのバランスを勘案して実装戦略を考えることになります。

実装戦略

case classの属性としてオブジェクトの集まりが必要になった時の実装戦略です。

以下が論点になります。

  • case class内での宣言
  • 実装に使用するコンテナの選択

まず実装に使用するコンテナの選択は前述した通りVector一択としました。

Listは追記性能が非常に遅いので、使い方に注意が必要な点がデメリットです。また、単方向リストによる実装方式からメモリも多目に消費することが推測されます。

つまり、実装の選択はVectorのみなので、残る選択は宣言に何を使うのかになります。

シンプル戦略

アプリケーション内で閉じた範囲で使用するcase classであれば、あまり凝ったつくりにしてもメリットは少ないので、宣言と実装の両方にVectorを使うのが簡明です。

性能測定パターンではWordsVが対応します。

安全戦略

宣言にSeqまたはIndexedSeq、実装にVectorを使うのが本格的な戦略です。

用途に応じてSeqとIndexedSeqを使い分けることができれば理想的ですが、Seqは前述のList性能問題が発生する可能性が出てくるので、安全策を採るのであれば常にIndexedSeqを使う戦略が有効です。

性能測定パターンではWordsIVが対応します。

上級戦略

対象となるcase classを操作する時のアクセスパターンが先頭からのシーケンシャルアクセスのみである場合、宣言には一番汎用性の高いSeqを使うのが理想的です。

性能測定パターンではWordsSVVが対応します。

Seqを使うことで、以下のように他のSeqコンテナをそのまま設定することとができるようになります。

WordsSVV(List("a"), List("abcde"), List("abcdefghijk"))

設定されたListをアクセスする場合は、そのままListでよいですが、後方追加の積算をそのままListで行うと性能問題が出てきます。

そこで、前出の「SeqとVectorの組合せはロジックを工夫」が必要になってきます。

具体的にはWordsSVで使用している以下のロジックでは不適切です。

def +(word: String): WordsSV = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

WordsSVVで使用している以下のロジックにする必要があります。このロジックのポイントは後方追加の演算の前にSeqをVectorに変換しているところです。こうすることによって、以降の計算に使用されるコンテナはVectorになるのでList性能問題は発生しなくなります。

def +(word: String): WordsSVV = {
      if (word.length <= 3)
        copy(smalls = smalls.toVector :+ word)
      else if (word.length > 7)
        copy(larges = larges.toVector :+ word)
      else
        copy(middles = middles.toVector :+ word)
    }

まとめ

case classを漫然と作っているとListの性能問題に遭遇してしまう可能性があるので、実装戦略としてまとめてみました。

結論としては以下の3パターンを適材適所で選んでいくのがよさそうです。

  • シンプル戦略 : 宣言 => Vector、実装 => Vector
  • 安全戦略 : 宣言 => IndexedSeq、実装 => Vector
  • 上級戦略 : 宣言 => Seq、実装 => Vector

上級戦略は「SeqとVectorの組合せはロジックを工夫」を意識しておく必要があるのが難点です。このような考慮が負担になるような場合は、安全戦略を基本戦略にしておくのが簡明でよいと思います。

測定プログラム

package sample

import scala.util.Try
import scala.collection.mutable.ArrayBuffer
import scalaz._, Scalaz._
import scalaz.stream._
import scalaz.concurrent.Task

object Performance {
  class WordsJ(
    val smalls: ArrayBuffer[String] = new ArrayBuffer[String],
    val middles: ArrayBuffer[String] = new ArrayBuffer[String],
    val larges: ArrayBuffer[String] = new ArrayBuffer[String]
  ) {
    def +(word: String): WordsJ = {
      if (word.length <= 3)
        smalls += word
      else if (word.length > 7)
        larges += word
      else
        middles += word
      this
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  case class WordsV(
    smalls: Vector[String],
    middles: Vector[String],
    larges: Vector[String]
  ) {
    def +(word: String): WordsV = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsV {
    val empty = WordsV(Vector.empty, Vector.empty, Vector.empty)
  }

  case class WordsL(
    smalls: List[String],
    middles: List[String],
    larges: List[String]
  ) {
    def +(word: String): WordsL = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsL {
    val empty = WordsL(List.empty, List.empty, List.empty)
  }

  case class WordsVP(
    smalls: Vector[String],
    middles: Vector[String],
    larges: Vector[String]
  ) {
    def +(word: String): WordsVP = {
      if (word.length <= 3)
        copy(smalls = word +: smalls)
      else if (word.length > 7)
        copy(larges = word +: larges)
      else
        copy(middles = word +: middles)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsVP {
    val empty = WordsVP(Vector.empty, Vector.empty, Vector.empty)
  }

  case class WordsLP(
    smalls: List[String],
    middles: List[String],
    larges: List[String]
  ) {
    def +(word: String): WordsLP = {
      if (word.length <= 3)
        copy(smalls = word :: smalls)
      else if (word.length > 7)
        copy(larges = word :: larges)
      else
        copy(middles = word :: middles)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsLP {
    val empty = WordsLP(List.empty, List.empty, List.empty)
  }

  case class WordsSV(
    smalls: Seq[String],
    middles: Seq[String],
    larges: Seq[String]
  ) {
    def +(word: String): WordsSV = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsSV {
    val empty = WordsSV(Vector.empty, Vector.empty, Vector.empty)
  }

  case class WordsSVV(
    smalls: Seq[String],
    middles: Seq[String],
    larges: Seq[String]
  ) {
    def +(word: String): WordsSVV = {
      if (word.length <= 3)
        copy(smalls = smalls.toVector :+ word)
      else if (word.length > 7)
        copy(larges = larges.toVector :+ word)
      else
        copy(middles = middles.toVector :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsSVV {
    val empty = WordsSVV(Vector.empty, Vector.empty, Vector.empty)
  }

  case class WordsS(
    smalls: Seq[String],
    middles: Seq[String],
    larges: Seq[String]
  ) {
    def +(word: String): WordsS = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsS {
    val empty = WordsS(Seq.empty, Seq.empty, Seq.empty)
  }

  case class WordsSP(
    smalls: Seq[String],
    middles: Seq[String],
    larges: Seq[String]
  ) {
    def +(word: String): WordsSP = {
      if (word.length <= 3)
        copy(smalls = word +: smalls)
      else if (word.length > 7)
        copy(larges = word +: larges)
      else
        copy(middles = word +: middles)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsSP {
    val empty = WordsSP(Seq.empty, Seq.empty, Seq.empty)
  }

  case class WordsIV(
    smalls: IndexedSeq[String],
    middles: IndexedSeq[String],
    larges: IndexedSeq[String]
  ) {
    def +(word: String): WordsIV = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsIV {
    val empty = WordsIV(Vector.empty, Vector.empty, Vector.empty)
  }

  case class WordsIA(
    smalls: IndexedSeq[String],
    middles: IndexedSeq[String],
    larges: IndexedSeq[String]
  ) {
    def +(word: String): WordsIA = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsIA {
    val empty = WordsIA(Array[String](), Array[String](), Array[String]())
  }

  case class WordsI(
    smalls: IndexedSeq[String],
    middles: IndexedSeq[String],
    larges: IndexedSeq[String]
  ) {
    def +(word: String): WordsI = {
      if (word.length <= 3)
        copy(smalls = smalls :+ word)
      else if (word.length > 7)
        copy(larges = larges :+ word)
      else
        copy(middles = middles :+ word)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsI {
    val empty = WordsI(IndexedSeq.empty, IndexedSeq.empty, IndexedSeq.empty)
  }

  case class WordsIP(
    smalls: IndexedSeq[String],
    middles: IndexedSeq[String],
    larges: IndexedSeq[String]
  ) {
    def +(word: String): WordsIP = {
      if (word.length <= 3)
        copy(smalls = word +: smalls)
      else if (word.length > 7)
        copy(larges = word +: larges)
      else
        copy(middles = word +: middles)
    }

    override def toString() = {
      s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
    }
  }

  object WordsIP {
    val empty = WordsIP(IndexedSeq.empty, IndexedSeq.empty, IndexedSeq.empty)
  }

  class WordsBuilderV {
    private val smalls = new ArrayBuffer[String]
    private val middles = new ArrayBuffer[String]
    private val larges = new ArrayBuffer[String]

    def +(word: String): WordsBuilderV = {
      if (word.length <= 3)
        smalls += word
      else if (word.length > 7)
        larges += word
      else
        middles += word
      this
    }

    def toWords = WordsV(smalls.toVector, middles.toVector, larges.toVector)
  }

  class WordsBuilderL {
    private val smalls = new ArrayBuffer[String]
    private val middles = new ArrayBuffer[String]
    private val larges = new ArrayBuffer[String]

    def +(word: String): WordsBuilderL = {
      if (word.length <= 3)
        smalls += word
      else if (word.length > 7)
        larges += word
      else
        middles += word
      this
    }

    def toWords = WordsL(smalls.toList, middles.toList, larges.toList)
  }

  class WordsBuilderS {
    private val smalls = new ArrayBuffer[String]
    private val middles = new ArrayBuffer[String]
    private val larges = new ArrayBuffer[String]

    def +(word: String): WordsBuilderS = {
      if (word.length <= 3)
        smalls += word
      else if (word.length > 7)
        larges += word
      else
        middles += word
      this
    }

    def toWords = WordsS(smalls, middles, larges)
  }

  def go[T](label: String)(body: => T) {
    val start = System.currentTimeMillis
    try {
      val r = body
      val result = r match {
        case x: Try[_] => x.toString
        case x: Task[_] => x.toString
        case x => x.toString
      }
      val end = System.currentTimeMillis
      println(s"$label (${end - start}): ${result}")
    } catch {
      case e: Throwable =>
        val end = System.currentTimeMillis
      println(s"$label (${end - start}): ${e}")
    }
  }

  import scalax.io.{Resource, Codec}

  def main(args: Array[String]) {
    import Words.getTokens
    implicit val codec = Codec.UTF8
    val filename = "PrincesOfMars.txt"
    val inputs = Resource.fromFile(filename).lines().
      map(getTokens).
      foldLeft(new ArrayBuffer[String])(_ ++ _)
    go("WordsJ") {
      inputs.foldLeft(new WordsJ)(_ + _)
    }
    go("WordsV") {
      inputs.foldLeft(WordsV.empty)(_ + _)
    }
    go("WordsVP") {
      inputs.foldLeft(WordsVP.empty)(_ + _)
    }
    go("WordsL") {
      inputs.foldLeft(WordsL.empty)(_ + _)
    }
    go("WordsLP") {
      inputs.foldLeft(WordsLP.empty)(_ + _)
    }
    go("WordsS") {
      inputs.foldLeft(WordsS.empty)(_ + _)
    }
    go("WordsSP") {
      inputs.foldLeft(WordsSP.empty)(_ + _)
    }
    go("WordsSV") {
      inputs.foldLeft(WordsSV.empty)(_ + _)
    }
    go("WordsSVV") {
      inputs.foldLeft(WordsSVV.empty)(_ + _)
    }
    go("WordsI") {
      inputs.foldLeft(WordsI.empty)(_ + _)
    }
    go("WordsIP") {
      inputs.foldLeft(WordsIP.empty)(_ + _)
    }
    go("WordsIV") {
      inputs.foldLeft(WordsIV.empty)(_ + _)
    }
    go("WordsIA") {
      inputs.foldLeft(WordsIA.empty)(_ + _)
    }
    go("WordsBuilderV") {
      inputs.foldLeft(new WordsBuilderV)(_ + _).toWords
    }
    go("WordsBuilderL") {
      inputs.foldLeft(new WordsBuilderL)(_ + _).toWords
    }
    go("WordsBuilderS") {
      inputs.foldLeft(new WordsBuilderS)(_ + _).toWords
    }
  }
}

諸元

  • Mac OS 10.7.5 (2.6 GHz Intel Core i7)
  • Java 1.7.0_75
  • Scala 2.11.6

2015年6月29日月曜日

[FP] Scalaへの道

先日、社内勉強会でScala的なプログラムにするためには、という話題が出たのでそのまとめです。

例題

勉強会で出たプログラム例をまとめて例題を作りました。

  • テキストファイル内の欧文単語を単語の長さで以下の三種類に分類してオブジェクトに設定する
  • 短 : 3文字以下
  • 中 : 7文字以下
  • 長 : 8文字以上

以下ではこの例題にそって説明していきます。

準備

分類した単語を設定するオブジェクトとしてcase classのWordsを中心としたクラスとコンパニオンオブジェクトを用意しました。

例題はテキストファイルを解析して、case class Wordsに解析結果を設定する処理になります。

package sample

case class Words(
  smalls: Vector[String],
  middles: Vector[String],
  larges: Vector[String]
) {
  def +(word: String): Words = {
    if (word.length <= 3)
      copy(smalls = smalls :+ word)
    else if (word.length > 7)
      copy(larges = larges :+ word)
    else
      copy(middles = middles :+ word)
  }

  def +(words: Words): Words = {
    copy(
      smalls ++ words.smalls,
      middles ++ words.middles,
      larges ++ words.larges
    )
  }

  def ++(words: Seq[String]): Words = {
    words.foldLeft(this)(_ + _)
  }

  override def toString() = s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
}

object Words {
  val empty = Words(Vector.empty, Vector.empty, Vector.empty)

  def apply(word: String): Words = empty + word

  def apply(words: Seq[String]): Words = empty ++ words

  def getTokens(s: String): Vector[String] =
    s.split(" ").filter(_.nonEmpty).toVector
}

Java風プログラム

JavaもJava 8でストリームAPIが追加されましたし、Functional Javaのようなアプローチも色々あるので、モダンJavaは事情が異なりますが、古くからJavaを使っているベテランプログラマ程Java 5〜7時代のプログラミング・スタイルがスタンダードだと思います。

この古めのJavaスタンダード的なプログラムをここではJava風プログラムと呼ぶことにします。

それではJava風プログラムです。

def calcWords(filename: String): Words = {
    val buf = new ArrayBuffer[String] // ArrayBuffer中心ロジック
    val in = new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8"))
    try {
      var s = in.readLine
      while (s != null) {
        buf ++= getTokens(s)
        s = in.readLine
      }
    } finally {
      in.close() // リソースの解放が手動
    }
    val smalls = new ArrayBuffer[String]
    val middles = new ArrayBuffer[String]
    val larges = new ArrayBuffer[String]
    for (x <- buf) { // アルゴリズムの配置がトランザクションスクリプト的
      if (x.length <= 3)
        smalls += x
      else if (x.length > 7)
        larges += x
      else
        middles += x
    }
    Words(smalls.toVector, middles.toVector, larges.toVector)
  }

ボクもJavaからScalaに移行した当初はこのようなプログラムを書いていました。

ポイントは以下の3つです。

  • ArrayBuffer中心ロジック
  • リソースの解放が手動
  • アルゴリズムの配置がトランザクションスクリプト的
問題 : ArrayBuffer中心ロジック

Javaプログラムでは、データの集まりを作成する処理はjava.util.ArrayListとfor文などのループを組み合わせるのが定番です。

JavaからScalaに移行したての時期は、Scalaプログラムでもこの作戦を踏襲して上記のようなプログラムを書くことが多いと思います。java.util.ArrayListの代わりにscala.collection.mutable.ArrayBufferを使います。

Better Javaという意味では問題はないのですが、よりScala的、関数型プログラミング的にステップアップしたい場合には、この作戦を捨てる必要があります。

関数型プログラミングでは不変オブジェクトを使って副作用なしで処理を行うことが基本です。つまり、可変オブジェクトであるscala.collection.mutable.ArrayBufferを使った瞬間にこの基本が崩れてしまうため、関数型プログラミング的ではなくなるわけです。

問題 : リソースの解放が手動

Javaではローンパターン的なDSLを用意することが難しいため、リソースの解放がtry文を使った手動になってしまう事が多いと思います。

Scalaではリソース解放の処理を自動的に行なってくれるDSLが多数用意されているのでできるだけそれらの機能を使うようにするのが得策です。

問題 : アルゴリズムの配置がトランザクションスクリプト的

ArrayBufferを使うことでできる限り性能劣化を避けるという方針の悪い側面として、アルゴリズムの部品化を阻害するという問題があります。

たとえばArrayBufferを使ったアルゴリズムの一部を性能劣化を起こさず部品化する場合、ArrayBufferを持ちまわすようなインタフェースとなり、インタフェースが複雑化します。

また、ArrayBufferを意識した部品の場合、ArrayBuffer以外のデータ構築用のオブジェクトには適用できないので、データ構築用のオブジェクト毎に部品を用意する必要が出てくるという問題もあります。

インタフェースが複雑化した部品は結局使われないようになりがちなので、結果としてアプリケーション・ロジック内によく似たアルゴリズム断片がベタ書きされるようなケースが多くなります。

めぐりめぐって部品化が進まずプログラム開発の効率化が阻害されるという悪循環に入ることになります。

この問題への対応策は前述したように「多少の性能問題には目をつぶる」覚悟をした上で、不変オブジェクトを使った関数型プログラミングに移行することです。

性能問題対策

ArrayBufferを多用するアルゴリズムを採用する意図としては性能問題があります。

上級者ほど性能問題を重要視しているので、関数型的な不変オブジェクト中心のアルゴリズムには本能的な拒否反応があると思います。

この問題への対応ですが「多少の性能問題には目をつぶる」につきます。

Java的な感覚でいうと、そもそもScalaの生成するコードはかなりオーバーヘッドが高いのでArrayBufferを採用して部分的に性能を上げても他の所でもオーバーヘッドが出るため、結局Javaと全く同じものにはなりません。

それであれば中途半端に性能改善するよりScalaのパワーを活かした開発効率を重視する戦略を採るのが得策です。

余談 : ListBuffer

java.util.ArrayListに対応するScala側のコレクションとして、「List」つながりでscala.collection.mutable.ListBufferを選んでしまうケースがあるかもしれません。

scala.collection.mutable.ListBufferはscala.collection.mutable.ArrayBufferと比べると性能的にかなり不利なので、余程のことがなければ採用することはありません。

そもそもArrayBufferは使わないようにしようという議論ですが、仮にjava.util.ArrayList相当のコレクションが必要になった場合はListBufferではなくArrayBufferを使うようにしましょう。

Scala的プログラム

前述のJava風プログラムの改良版としてScala的プログラムを4つ用意しました。

基本形

まず基本形です。

def calcWords(filename: String): Words = {
    var strings = Vector.empty[String]
    for (in <- resource.managed(new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8")))) {
      var s = in.readLine
      while (s != null) {
        strings = strings ++ getTokens(s)
        s = in.readLine
      }
    }
    strings.foldLeft(Words.empty)(_ + _)
  }
対策 : ArrayBuffer中心ロジック

ArrayBufferを使わず不変オブジェクトであるVectorを使うようにしています。

Vectorは不変オブジェクトですがvarと複写を併用することでアルゴリズム内では内容を更新しながら処理を進める効果を得ています。

var strings = Vector.empty[String]
...
    strings = strings ++ getTokens(s)

varとVectorのこのような使い方は定番のイディオムです。

対策 : リソースの解放が手動

ファイルアクセスを伝統的なjava.ioを使って行う場合は、ストリームの解放処理が問題となります。

このような処理を記述する時によく利用するのがScala ARMです。

Scala ARMの使い方の一つとして上記のプログラムのようにfor式とresource.managedを組み合わせる方式があります。

resource.managedの引数にリソースのライフサイクルを管理したいオブジェクトを設定すると、for式の終了時に自動的にリソースを解放してくれます。

対策 : アルゴリズムの配置がトランザクションスクリプト的

単語の振り分けアルゴリズムの呼出しは以下の場所で行っています。

strings.foldLeft(Words.empty)(_ + _)

振り分けアルゴリズムはcase class Words内で定義した部品(メソッド)を、非常に簡単にcalcWords関数から呼び出して使用できており、トランザクションスクリプト的な問題は解消しています。

アルゴリズム部品の利用に貢献しているのが畳込み処理を抽象化した関数であるfoldLeftコンビネータです。ArrayBufferとfor式を使って記述しているアルゴリズムの多くはfoldLeftまたはfoldRightコンビネータで記述することができます。

VectorとはfoldLeftが相性がよいので、ArrayBufferとfor式を使いたくなったらVectorとfoldLeftの組合せの解を考えてみるのがよいでしょう。

大規模データ問題

上述のScala基本形ですが、Java風版から引き継いだ本質的な問題点として大規模データに対応できないという問題があります。

というのは、ArrayBufferに一度全データを格納するため、データ量がメモリに載らない程大規模になった場合にプログラムがクラッシュしてしまうからです。

ArrayBufferを使わずデータ読み込みのwhile式の中にすべての処理を押し込めばこの問題には対応できますが、部品化とは真逆の一枚岩の見通しの悪いアルゴリズムになってしまいます。

大規模データ問題は後述のパイプライン・プログラミングやMonadicプログラミングで解決することができます。

余談 : Iteratorパターンの本質

forロープで記述したアルゴリズムをより関数的なやり方で記述する方法について参考になるのが以下のページです。

foldLeft/foldRightで記述しきれない複雑なforループもApplicativeとTraverseという機能を使って部品化を進めながら記述できるということのようです。より本格的な関数型プログラミングを追求する場合は、こちらの方面に進むことになります。

Try

関数型プログラミングでは、関数の実行結果は関数のシグネチャに完全に記述されていることが基本です。このため、暗黙的に関数の制御フローを乱す「例外」は関数型プログラミング的には好ましくない言語機能です。

とはいえ実用的には非常に便利なのでScalaプログラムでも日常的に使用するのは問題ないと思いますが、関数ワールド的なプログラムを書きたい場合はscala.util.Tryモナドを使うのが基本です。

Tryモナドを使った版は以下になります。

def calcWords(filename: String): Try[Words] = Try {
    var strings = Vector.empty[String]
    for (in <- resource.managed(new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8")))) {
      var s = in.readLine
      while (s != null) {
        strings = strings ++ getTokens(s)
        s = in.readLine
      }
    }
    strings.foldLeft(Words.empty)(_ + _)
  }

関数の処理全体をTryで囲むだけなので簡単ですね。

scalax.io

基本形ではリソース解放の汎用的な解としてScala ARMを紹介する目的でjava.ioによるファイルアクセスを行いましたが、Scala的にはファイルアクセスにはscalax.ioを使うのがお勧めです。

scalax.ioを使った版は以下になります。

import scalax.io.{Resource, Codec}

  def calcWords(filename: String): Words = {
    implicit val codec = Codec.UTF8
    Resource.fromFile(filename).lines().
      map(getTokens).
      foldLeft(Words.empty)(_ ++ _)
  }
対策 : リソースの解放が手動

ファイルからの入力処理はリソースの解放も含めてResource.fromFile関数が全て行ってくれます。

対策 : ArrayBuffer中心ロジック&アルゴリズムの配置がトランザクションスクリプト的

関数型プログラミングの常道であるパイプライン・プログラミングの一環としてmapコンビネータとfoldLeftコンビネータを使って処理を記述します。

この方式を取ることで自然に「ArrayBuffer中心ロジック」が解消されます。また、アルゴリズムの部品かも進むので「アルゴリズムの配置がトランザクションスクリプト的」も解消されます。

対策 : 大規模データ

scalax.ioの方式では、Scala基本形に残っていた「大規模データ問題」も解消されています。「大規模データ問題」を解消しつつ部品化も行えているのがパイプライン・プログラミングの効用です。

scalaz-stream

Scalaらしさを越えて、Monadicプログラミングを追求したい場合はscalaz-streamを使うとよいでしょう。

まず準備としてWordsをMonoidとして定義しておきます。

implicit object WordsMonoid extends Monoid[Words] {
    def append(lhs: Words, rhs: => Words) = lhs + rhs
    def zero = Words.empty
  }

その上でscalaz-streamを使った版は以下になります。

import scalaz.concurrent.Task

  def calcWords(filename: String): Task[Words] = {
    implicit val codec = Codec.UTF8
    io.linesR(filename).
      map(getTokens).
      runFoldMap(Words(_))
  }

scalax.io版とほぼ同じような処理になります。

Processモナドを使っているので本来は高度なフロー制御ができるのですが、この例のような単純なデータ読込みだとフロー制御を使う場所はないのでscalax.ioとの違いはでてきません。

scalax.io版との違いは、結果がTaskモナドで返される点です。

TaskモナドはScalaのTryモナドとFutureモナドを合わせたような機能を持つモナドです。前出の「Try」版ではTryモナドで結果を返しましたが、同じような用途と理解すればよいと思います。

まとめ

Java風プログラムのScala的でない点を指摘した上で、Scala的プログラムの解を4つ上げました。

それぞれ長短があるので適材適所で使い分けることになります。

なお、今回はScala ARMを紹介する都合上java.ioによるファイルアクセスも使用していますが、実務的には例題のようなテキストファイル処理はscalax.ioを使うのが効率的です。scalax.ioを基本線に考え、機能不足が判明した場合に、java.io(またはjava.nio)やscalaz-streamを使う方式を検討するのがよいでしょう。

2015年6月22日月曜日

[FP] パイプライン・プログラミング

関数型プログラミングとは何か?

この問は深遠すぎてとてもボクの手には負えませんが、実務的なプラクティスとしてはパイプライン・プログラミングとして考えると分かりやすいのではないかと思います。

そこでScalaでのパイプライン・プログラミングの選択肢を整理してみました。

関数呼び出し

関数型プログラミングにおける普通の関数呼び出しもパイプラインと考えることができます。

純粋関数型では副作用は発生しないので、表に見えている関数の引数と復帰値のみで関数の挙動のすべてが表現されているためです。

たとえば以下のプログラムは:

h(g(f(1)))

以下のようなパイプラインと考えることができます。

Functor

文脈を持ったパイプラインはFunctorを使って構成できます。関数呼び出しとの違いは「文脈」を持った計算になるという点です。

ここでいう「文脈」とはパイプラインの裏側で暗黙的に共有、引継ぎされる状態やデータというぐらいの意味です。関数の直接呼び出しにはそのような裏表はないので、Functorで加わった重要な機能ということになります。

以下の図はQCon Tokyo 2015のセッション「ScalaによるMonadic Programmingのススメ - Functional Reactive Programmingへのアプローチ」で使用したスライドから引用です。



Functor, Applicative Functor, Monadが構成するパイプラインを概観しています。

この中でFunctorはmapコンビネータを使ってパイプラインを構築します。

Option(1).map(f).map(g).map(h)

Applicative Functor

上記の図で示した通りApplicative Functorは復数のパイプラインを一つに統合する機能を提供します。

ScalazではApplicative Functorのために以下のような文法を提供しています。

(Option(1) |@| Option(2) |@| Option(3))(i(_, _, _))

以下の図は同スライドからFuture Applicative Functorの例です。



Monad

「Functor, Applicative Functor, Monadが構成するパイプラインを概観」する図の中のMonadは以下のプログラムになっています。このようにMonadではflatMapコンビネータを使ってパイプラインを構築します。

Functorとの違いは「文脈」をプログラム側で制御する機能が加わる点です。

Option(1).flatMap(f).flatMap(g).flatMap(h)

以下の図は同スライドからOption Monadの例をあげています。



こちらではflatMapコンビネータを使う代わりにfor式によるfor内包表記(for comprehension)を使用しています。

def bigCalcO(n: Int): Option[String] = {
    for {
      a <- Option(calcString(n))
      b <- Option(calcInt(n))
      c <- Option(calcFloat(n))
    } yield finalCalc(a, b, c)
  }
}

for内包表記はMonadを使ったパイプラインのための文法糖衣として機能します。

上記の例ではflatMapコンビネータ直接使用とfor内包表記の違いはそれほど分かりませんが以下に示すState Monadのような複雑な構造のMonadを使う場合にfor内包表記の効果が出てきます。



Reactive Stream

Applicative FunctorやMonadを使うとかなり複雑なパイプラインを構築することができますが、あくまでも制御上は関数呼び出しなのでフロー制御を行うことができません。

ここでいうフロー制御とは、一度に処理するデータ量を制限したり、データの発生またはデータの消費の契機で処理を進める制御を指します。

直感的にパイプラインというとこういう制御が入っていて欲しいところですが、Monadそのものには入っていないわけです。

そこで登場するのがReactive Streamです。単にStreamというとscala.collection.immutable.Streamと紛らわしいので、ここではReactive Streamと呼ぶことにします。

Scalaz StreamではProcess Monadを使ってReactive Streamをを実現しています。

大規模データ処理とストリーム処理それぞれでのProcessモナドの使い方は以下になります。

ストリーム処理


大規模データ処理とストリーム処理のいずれもパイプラインに抽象化されたほとんど同じプログラムになっています。

また、普通のMonadと比べても若干おまじないは増えるもののプログラミングとしては同じような形になります。

状態機械

Process MonadはMonadによるパイプラインで状態機械による制御を可能にしたものと考えることもできます。

詳しくは以下の記事を御覧ください。

まとめ

Scalaのパイプライン・プログラミングを以下のパターンに整理してみました。

  • 関数呼び出し
  • Functor
  • Applicative Functor
  • Monad
  • Reactive Stream

これらのパイプラインを適材適所で使い分ける、という切り口で関数型プログラミングを考えてみると面白いかもしれません。

2015年6月15日月曜日

野毛倶楽部夜会:超高速開発

野毛倶楽部(横浜クラウド勉強会)は土曜の昼に行うチュートリアル的な技術セッション(+夜学)を中心に活動してきましたが、これに加えて先端技術についてオフレコで語り合う場として新たに夜会を始めることにしました。

先週の月曜(6月8日)はその第1回でジャスミンソフトの贄さんを招いて超高速開発をテーマに野毛倶楽部の第1回夜会を行いました。

ジャスミンソフトは超高速開発ツールWagbyの開発/提供元で超高速開発コミュニティの幹事会社です。

プログラムの自動生成はボクも長年追いかけてきた技術分野なので、この分野で成功を収められているジャスミンソフトの具体的なお話をビジネス/技術両面からお聞きできてとても参考になりました。

ディスカッションの中では以下のキーワードが印象に残りました。

  • モデリング
  • リポジトリ

以下、夜会の後に考えたことなどをつらつら。

モデリング

プログラムの自動生成も現時点ではモデルからアプリケーション全体を生成できるわけではないですがアプリケーションの相当部分を自動生成(+フレームワーク)でカバーできるところまできています。

また「超高速開発」という枠組みでは仕様から直接システムやサブシステムを動作させるXEAD DriverASTERIA WARPといったアプローチもあります。

いずれにしても伝統的なスクラッチ開発と比べるとテストも含めたプログラム開発工数が劇的に少なくなることは明らかです。

プログラム開発工数が少なくなるのは「仕様」が直接、間接にそのまま動作するためですが、「動作可能な仕様」でなければならないので、動作可能な精密で網羅的な仕様を作成するスキルが必要になってきます。加えてシステムの既存部分やスクラッチ開発する部分との連携を仕様化するスキルも要求されます。

「動作可能な仕様」を記述する言語としてはOOADのUMLやデータモデリングのER図といったものが代表的ですがExcelやプレインテキストを用いるアプローチもあります。記述言語も大事ですが、裏側のメタモデルがより重要です。

顧客要件から動作可能な精密なレベルのUMLやER図などで記述した動作可能な精密な仕様を作成するモデリングのスキルが「超高速開発」を活用するための重要スキルということになります。

伝統的なスクラッチ開発でも基本設計書、構造設計書、詳細設計書という形で仕様書は書かれていたわけですが仕様に曖昧な点があってもプログラマが補完することで補うことが可能でした。というよりプログラマが高度な補完を行うことが前提だったといっても過言ではないでしょう。しかし「超高速開発」の場合は、仕様をそのまま動作させるため仕様段階で実行可能レベルの正確さが要求されるわけです。

こういった問題意識から超高速開発コミュニティでもモデリング分科会の活動を行っているのだと思います。

リポジトリ

「動作可能な仕様」によるシステム開発が軌道に乗ると、次はこの仕様をデータベースで一元管理したくなるのが道理です。「リポジトリ」はこのような機能を提供するものです。

「リポジトリ」はWikipediaでは以下のように説明されています。

  • リポジトリ (英: repository) とは、情報工学において、仕様・デザイン・ソースコード・テスト情報・インシデント情報など、システムの開発プロジェクトに関連するデータの一元的な貯蔵庫を意味する。

リポジトリは少なくてもボクがオブジェクト指向が調べ始めた90年代初頭にはすでにあった概念ですが、格納するメタデータの記述能力が追いつかず現時点でも部分的な応用にとどまっていると認識しています。メタデータと配備するプログラムが連動していないと、絵に描いた餅になってしまうわけですね。

OMGのMOF(Meta Object Facility)やEAI(Enterprise Application Integration)で話題となったMDM(Master Data Management)も同じようなジャンルの技術ですがまだ(一部の大企業のシステムは別として)一般的に幅広く利用されるような段階にはいたっていないと思います。

リポジトリで管理する情報が超高速開発流の「動作可能な仕様」となると、リポジトリで構想されていた本来の運用が見えてきます。

また、もうひとつの流れとしてソフトウェア構成管理技術の進歩があります。現在はCI技術によってGitHubにプログラムのソースコードをプッシュすると自動的にHerokuのような実行コンテナに配備したり、DockerHubのようにDockerイメージを生成したり、ということが可能になっています。

こういった構成管理技術と「動作可能な仕様」の「リポジトリ」を組み合わせることで、より高度な運用ができるはずです。

現時点で実用化されている要素技術を組み合わせれば、もともと「リポジトリ」で構想されていた理想的な運用に近しいところまでもっていけるのではないか。ブレークスルーの寸前まで各要素技術のポテンシャルが高くなってきており、後は誰がどのように最後のピースをつなげるか、が焦点の分野ではないかと思われるわけです。わくわくしますね。

"リポジトリ"のキーワードからそういったことを考えながら、野毛の夜は更けていきました。

SmartDoc

本題とは離れますが、Wagbyではマニュアル作成にSmartDocを現役で使って頂いているということです。うれしいですね。

2015年6月1日月曜日

DBアクセスライブラリ

Spark SQL 1.3の登場を機にプラットフォームのバッチ処理基盤の刷新を考えています。前回「JSONライブラリ性能比較」では、JSONライブラリについて考えました。

今回はDBアクセスライブラリがテーマです。

バッチ処理基盤として観点からDBアクセスライブラリに対して大きく以下の4つの要件を考えています。

  • CRUDの範囲の操作はcase classで定義したレコードで簡単に処理したい。
  • 細かい問合せ処理はSQLを直接使いたい。
  • SQLの組み立てを部品化したい。
  • Dockerコンテナ上でカスタマイズ可能にしたい。

逆に通常ORMに求められる以下の機能はあまり重要視していません。

  • 関連の自動制御
  • データのマイグレーション

「関連の自動制御」ですが関連の取り扱いは必要機能、性能要件、メモリ要件、キャッシュ要件がORMが提供する機能と合致するのかを熟知するのが大変なので、SQLを生で使う方式を採用しているためです。もちろん、うまく合致する場合は使ったほうが楽なので、あるに越したことはありません。

「データのマイグレーション」はアジャイルにカジュアルに行う運用ではなく、システム結合のタイミングで手動で行っているので今のところ本番運用では使う可能性がないためです。

制約

バッチ処理基盤を実現する上で考慮しなければならない制約は以下のものです。

  • バッチ処理はDockerクラスタまたはSparkクラスタ上で動作。
  • case classのパラメタ数制約22個が解消されるのはScala 2.11から。
  • プラットフォームのほとんどの主要DBテーブルのカラム数は22個超なのでScala 2.10ではレコードの表現にcase classは事実上難しい。
  • プラットフォームの主要機能がScala 2.10ベースなのでScala 2.11を全面的に採用するわけにはいかない。
  • Spark SQLがScala 2.11で安定的に動作するか不明(現在使用しているDockerイメージsequenceiq/sparkはScala 2.10みたい)。

上記の制約があるため、以下のような運用を想定しています。

  • プラットフォームとバッチ処理基盤の共通ライブラリはScala 2.10と2.11のクロスビルド。
  • Scala 2.10バッチとScala 2.11バッチを併用。
  • SparkジョブはScala 2.10バッチ。
  • 非SparkジョブはScala 2.11バッチ(case classを活用したいため)。
  • 非SparkジョブはDockerクラスタ上で実行する。
  • Sparkジョブでも小さなものはDockerクラスタ上で実行する。

クロスビルドの問題はかなり重たくて以下のような問題が出ています。

  • Play Frameworkの2.11サポートは2.3からのようだが、AnormのAPIが非互換みたい。
  • finagle-httpは2.10と2.11で別物になる気配。(詳細未調査。Netty 4対応?)

このため共通ライブラリからAnormとFinagleを外すことにしました。

アーキテクチャ選定

DockerでSpark SQL」で述べたようにSpark SQL 1.3をバッチ処理基盤の中軸に据え、1つのSparkジョブプログラムを用途ごとにSparkクラスタとDockerクラスタで実行しわける、というのが今回のバッチ処理基盤刷新の基本アイデアです。

現時点でもこの方向を軸に検討を進めています。

ただ以下のようなニーズもあるので普通のDB操作のためのライブラリも併用する形を考えています。

  • Sparkを使うまでもない小さなジョブの場合Sparkを使わず直接SQLで操作したい。
  • 開発をスケールさせるためSparkの知識なしでもバッチを記述できる方式は担保しておきたい。
  • Sparkで性能要件や機能要件が満たせないケースの回避策は担保しておきたい。
  • Spark処理の場合でもSQLによる入出力が併用できると便利かもしれない。
問題点

プラットフォームのエンジン部では現在はAnormとSquerylを併用しています。SQLを直接使いたいというニーズとORM的なCRUD処理を併用するため、このアーキテクチャを採用しました。

2012年当時の選択としてはよかったと思うのですが2015年新規開発のバッチ処理基盤として引き続き採用するのが望ましいのかという点が論点です。

AnormとSquerylとも非常に便利な機能なのですが経験上以下の問題が判明しています。

まずAnormです。

  • Play 2.2系と2.3系で非互換があるようでクロスビルドは難しそう。
  • ORM的な使い方ができないのでORMを併用する必要がある。

次にSquerylです。

  • 開発が止まっている感じ。
  • テーブル名を完全修飾名(スキーマ名.テーブル名)で指定すると動かない。
  • やりたいことをSquerylのDSLで実現する方法のノウハウが必要。
  • やりたいことがSquerylのDSLでサポートしていないことが判明した場合は対応が大変。
  • SQL組み立ての部品化にノウハウが必要。

AnormとSquerylを併用する際の問題点です。

  • コネクション管理の共通化のノウハウが必要。
  • AnormとSquerylの相互運用は難しい。

上記の問題があるのと有力な代替策があるのでバッチ処理基盤での採用は保留にしました。

Slick

Slickは実際に使ったことはないので資料からの推測ですが、レコードをcase classで表現した上で、関数型的なコレクション/モナド系の抽象操作でDB入出力を可能にしたライブラリ、と理解しています。

大変便利そうですし、関数型的にも筋がよさそうなのですが以下の懸念点があるので今回は採用を見送りました。

  • Scala 2.10系ではcase classの22個制限で事実上使用できない。
  • SQL直接操作ができないと細かい処理の記述が難しい。
  • Anormと併用する場合の親和性が不明。(コネクション管理の共通化など)
  • ORM的な関連の取り扱い方法など、新しいアプローチだけに仕様を調査するのが大変。
Scalikejdbc&Skinny-ORM

以上、色々と検討してきましたがSparkと併用するDB入出力ライブラリとしてはScalikejdbcとSkinny-ORMを採用する方向で考えています。

引き続きAnorm&Squerylをベースに考えても悪くはなさそうですが、Scalikejdbc&Skinny-ORMの方がより適切と判断しました。

ScalikejdbcはSQLベースのDB入出力ライブラリです。Skinny-ORMはScalikejdbc上に構築されたORMです。実用上はScalikejdbcとSkinny-ORMの併用を前提に考えるのがよいのではないかと思います。

Scalikejdbc&Skinny-ORMの採用を考えている理由は以下のものです。

  • Scala 2.10と2.11のクロスビルドで問題がなさそう。
  • SQLでの操作がAnormより若干使いやすそう。
  • Squerylは完全修飾名の問題があるため採用しづらい。
  • ScalikejdbcによるSQL入出力とSkinny-ORMによるORMがシームレスに連携できそう。
  • テーブルのメタデータからのプログラムの自動生成が便利。
  • Typesafe Configを用いたJDBC設定の管理メカニズムを持っている。
  • Joda-timeをサポートしている。

現在判明している問題点としては以下のものがあります。

  • case classの22個制限があるのでSkinny-ORMは2.10系では用途が限定される。
  • scalaz-streamと接続するためには工夫が要りそう。

後者のscalaz-streamの問題については引き続き解決策を探していく予定です。

まとめ

バッチ処理基盤におけるSQLライブラリですが、検討の結果Spark SQLを基本にScalikejdbc&Skinny-ORMを併用する形を基本線で考えていくことにしました。

DBアクセスライブラリのような基本機能は一度製品に採用するとなかなか切替は難しいですが、今回はバッチ処理基盤を刷新するタイミングがあったのでゼロベースで調べなおしてみました。

プロジェクトを開始した2012年当時とはかなり状況も変わってきていてキャッチアップが大変ですが、調度よいタイミングで棚卸しができたと思います。