2015年3月30日月曜日

Scala的状態機械/FP編

Scalaにおける状態機械の実装戦略について検討しています。

Scala的状態機械/OOP編」で状態+状態遷移を表現するトレイトであるParseStateを作成しました。ParseStateの具象クラスはcase classまたはcase objectとして実現しています。これらのトレイト、case class、case objectがワンセットでFPで使用できる代数的データ構造となっています。

「OOP編」ではこのParseStateを使用して、オブジェクト版の状態機械とアクター版の状態機械を作成しました。代数的データ構造でもあるParseStateがOOP的に問題なく使用できることが確認できました。

「FP編」では、ParseStateの代数的データ構造の性質を活かしてMonadic Programming(以下MP)版の状態機械を考えてみます。

Stateモナド版状態機械

MPで状態を扱いたい場合には、状態を扱うモナドであるStateモナドが有力な選択肢です。

代数的データ型であるParseStateはそのまま利用し、これをStateモナドでくるむことで状態遷移を実現したものが以下のParserStateMonadです。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}
action関数

モナドを使った共通機能を作る場合には、共通機能としていわゆるモナディック関数を提供するのが一つの形になっています。

モナディック関数とは「A→M[B]」(Mはモナド)の形をしている関数です。モナドMのflatMapコンビネータの引数になります。

Stateモナドを使用する場合には、A→State[B]の形の関数を用意することになります。

def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

今回作成したStateモナド用のモナディック関数であるaction関数は「Char→State[ParseState→(ParseState, Char)]」の形をしています。

A→M[B]の形とは以下の対応になります。

  • A : Char
  • M : State
  • B : ParseState→(ParseState, Char)

Stateモナドに設定している関数は「ParseState→(ParseState, Char)」の形をしていますが、action関数全体ではaction関数の引数もパラメタとして利用しているので、結果として「Char→ParseState→(Parse, Char)」の動きをする関数になっています。

action関数が返すStateモナドはParseStateによって表現された状態を、受信したイベントとの組合せ状態遷移する関数が設定されています。

状態+状態遷移を表現するオブジェクト兼代数的データ型であるParseStateがきちんと定義されていれば、Stateモナド用のモナディック関数を定義するのは容易なことが分かります。

parse関数

CharのシーケンスからCSVをパースするparse関数は以下になります。

def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }

parse関数は、Stateモナド用モナディック関数actionと型クラスTraverseのtraverseSコンビネータの組合せで実現しています。

traverseSコンビネータはStateモナド用のtraverseコンビネータです。Scalaの型推論が若干弱いためStateモナド専用のコンビネータを用意していますが、動きは通常のtraverseコンビネータと同じです。

状態遷移のロジックそのものはParseStateオブジェクトにカプセル化したものをaction関数から返されるStateモナド経由で使用します。

traverseSコンビネータとStateモナドを組み合わせると、Traverseで走査対象となるコレクションをイベント列と見立てて、各イベントの発生に対応した状態機械をStateモナドで実現することができます。この最終状態を取得することで、イベント列を消化した最終結果を得ることができます。

OOPオブジェクトであり代数的データ構造でもあるParseStateは、このようにしてStateモナドに包むことで、FP的な状態機械としてもそのまま使用することができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserStateMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserStateMonad.parse(events)
      println(s"ParserStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserStateMonadSpec
ParserStateMonadSpec: List(abc, def, xyz)
[info] ParserStateMonadSpec:
[info] ParserStateMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 400 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 1 s, completed 2015/03/21 17:49:39

scalaz stream版状態機械

Stateモナド版状態機械ではStateモナドと型クラスTraverseのtraverseSコンビネータを使用して、状態機械をMPで実現しました。

この実現方法は、メモリ上に展開したデータに対して一括処理をするのには適していますが、大規模データ処理やストリーム処理への適用は不向きです。

そこで、大規模データ処理やストリーム処理をFunctional Reactive Programming(以下FRP)の枠組みで行うために、scalaz streamを使用して状態機械の実装してみました。

package sample

import scalaz._, Scalaz._
import scalaz.stream._
import scalaz.stream.Process.Process0Syntax

object ParserProcessMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

Processモナドは一種の状態機械なので、この性質を利用してPraseStateによる状態遷移をProcessモナド化します。この処理を行うのがfsm関数です。

fsm関数は状態であるParseStateを引数に、Charを受け取るとParseStateとCharの組合せで計算された新しいParseStateによる状態に遷移しつつ処理結果としてParseStateを返すProcessモナドを返します。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }
parse関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の典型的な使い方です。

def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

parse関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

まず引数のCharシーケンスからソースとなるProcessモナドを生成します。

val source: Process0[Char] = Process.emitAll(events)

ProcessのemitAll関数は引数に指定されたシーケンスによるストリームを表現するProcessモナドを生成します。

ただし、内部的に非同期実行する本格的なシーケンスではなく、通常のシーケンスに対してストリームのインタフェースでアクセスできるという意味合いのProcessモナドになります。(内部的には実行制御に後述のTaskモナドではなく、Idモナドを使用しています。)

パイプラインの処理を記述

pipeコンビネータでfsm関数にInitStateを適用して得られるProcessモナドをパイプライン本体のProcessモナドに設定しています。

val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))

これが処理の本体です。ここではpipeコンビネータを始めとする各種コンビネータを使ってパイプラインを構築します。

パイプラインで加工後のデータを取得

最後にtoListメソッドで、パイプラインの処理結果をListとして取り出し、ここからパース結果を取り出す処理を行っています。

val result = pipeline.toList.last
    result.endEvent.row
parseTask関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の使用例ですが、より本格的な応用である大規模データ処理やストリーム処理では少し使い方が変わってきます。

そこで、参考のために実行制御にTaskモナドを使ったバージョンのparseTask関数を作りました。

def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process[Task, Char] = Process.emitAll(events).toSource
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }

parse関数と同様にparseTask関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

ProcessモナドのtoSourceメソッドで、Taskモナドを実行制御に使用するProcessモナドに変換されます。

val source: Process[Task, Char] = Process.emitAll(events).toSource
パイプラインの処理を記述

fsm関数から得られたProcessモナドをpipeコンビネータでパイプラインに設定します。

val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))

この処理はTask版でないparse関数と全く同じです。

パイプラインで加工後のデータを取得

Taskモナドを実行制御に使用する場合には、for式などを使ってモナディックに実行結果を取得する形になります。

for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonad.parse(events)
      println(s"ParserProcessMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadSpec
ParserProcessMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 301 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:12

scalaz stream + Stateモナド版状態機械

「scalaz stream」ではParseStateオブジェクトを直接使用して状態機械を作成しました。通常はこれで十分ですが、Stateモナド用モナディック関数が用意されている場合は、こちらを使用する方法もあります。

この方法では、Stateモナドの使い方だけ理解していればよいので、プログラミングはより簡単かつ汎用的になります。

package sample

import scalaz._, Scalaz._
import scalaz.stream._

object ParserProcessMonadStateMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

ParseStateによる状態機械の動作を行うProcessモナドを生成するfsm関数のStateモナド版は以下になります。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

ParseStateのeventメソッドを直接使用する代わりに、ParseStateを包んだStateモナドを返すモナディック関数actionを使用します。

この版のfsm関数ではaction関数から取得したStateモナドのexecメソッドを使用して、現状態とイベント(Char)から新状態を計算し、この状態を保持した、新たなProcessを生成しています。

この方法のメリットはParseStateの使い方(この場合はeventメソッド)は知る必要がなく、汎用的なStateモナドの使い方だけ知っていればよい点です。

つまりaction関数だけ作っておけば、Traverseを使った状態機械、Processモナドを使った状態機械のどちらも簡単に作ることができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonadStateMonad.parse(events)
      println(s"ParserProcessMonadStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadStateMonadSpec
ParserProcessMonadStateMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadStateMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 286 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:26

状態機械実装戦略

オブジェクト・モデルで状態機械が出てきた場合の実装戦略としては、まずベースとして:

  • sealed trait + case classで状態+状態遷移のオブジェクト&代数的データ型(以下、状態遷移case class)

を作成します。

前回に見たようにこの状態遷移case classを使って、OOP版の状態機械を簡単に作成することができます。

次に、FP用に以下の部品を整備しておくのがよいでしょう。

  • Stateモナド用モナディック関数
  • Processモナド用状態機械関数

どちらの関数も状態遷移case classが用意されていれば、ほとんど定型的な記述のみで作成することができます。この部品を使うことでFP版の状態機械を簡単に作成することができます。

以上、状態機械実装戦略について整理しました。

この戦略で重要なのは、状態+状態遷移を表現するために作成した状態遷移case classは1つだけ作ればよく、これをOOP流、FP流にマルチに展開していける点です。

この点が確定すれば、オブジェクト・モデルに状態機械が出てきたら安心して1つの状態遷移case classの実装に集中することができます。

まとめ

OFPの状態機械の実装についてOOP的実装、FP的実装について整理してみました。

いずれの場合もオブジェクト&代数的データ型である「状態遷移case class」が基本で、ここからOOP的状態機械、FP的状態機械へマルチに展開できることが確認できました。

OFPではアプリケーション・ロジックは可能な限りFP側に寄せておくのがよいので、FPでも状態機械を簡単に実現できることが確認できたのは収穫でした。

また、FP的なアプローチが使えないケースが出てきても、簡単にOOP的アプローチに切り替え可能なのも安心材料です。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月23日月曜日

Scala的状態機械/OOP編

オブジェクト・モデリングにおける動的モデルは状態機械で記述するのが基本です。つまり状態機械はオブジェクト・モデリングの重要な構成要素の一つということです。

ScalaでObject-Functional Programming(OFP)を行う場合でも、要求・分析・設計の各アクティビティを経て作成されたオブジェクト・モデル内の状態機械をどのように実装していくのかという実装方式が論点になります。

普通のOOP流の実装はすでに議論しつくされていると思いますが、OFPにおけるFPでの実装方式については、これから整備されていくことになると思います。

注意が必要なのはクラウド・アプリケーション開発をターゲットにする場合、伝統的なFPというよりMonadic Programming(以下MP)を経てFunctional Reactive Programming(以下FRP)がゴールになるということです。

このためFRPとして利用可能な状態機械実装を探っておきたいところです。

課題

状態機械の使い所としては、エンティティの状態遷移を記述することで業務ワークフローの構築に用いたり、プロトコルドライバのフロー制御といったものが考えられます。前者はDBに状態を格納することになりエンタープライズ・アプリケーション的なライフサイクルの長い応用ですし、後者は制御系の応用でメモリ内の制御で完結する形のものです。

色々な応用があるわけですが、どの方面でも使える実現方法を念頭におきつつ、ここではCSVのパーサー処理を状態機械で実装してみることにします。FPとの接続部分に興味を集中させるためできるだけ簡単なものを選んでみました。

具体的には以下の課題になります。

  • CSVの1行を「,」区切りのレコードとしてパースし、文字列の列を取得する処理に使用する状態機械

この状態機械は文字列の列の各文字をイベントとして扱い、一連のイベント終了後にパース処理の結果となる「文字列の列」を状態の情報として保持する形になります。

case classで状態機械

状態機械は「状態×イベント→状態」のメカニズムが基本の構成要素です。このメカニズムをcase classで実装します。

いうまでもなくcase classはScalaプログラミングの超重要な構成要素で、OOP的側面とFP的側面を兼ね備えたObject-Functional Programmingの肝となる機構です。

OOP的にはDDDでいうところのvalue objectの実装に適した文法になっています。

FP的には代数的データ型(algebraic data type)として利用することが想定されています。

つまりvalue object兼代数的データ型を実現するための機構がcase classです。

ただしcase classの文法上はvalue objectや代数的データ型に適さない普通のオブジェクトを実装することも可能です。このため、value object兼代数的データ型を実現するための紳士協定を組み込んでおかなければなりません。

この紳士協定は以下の2つです。

  • 不変オブジェクト(immutable object)にする
  • 基底のトレイトや抽象クラスはsealedにする

Value objectと代数的データ型は共に不変オブジェクトである必要があります。

また代数的データ型の要件としてコンパイル時に全インヘリタンス関係が確定している必要があるので、sealedにすることでこれを担保します。

case classはオブジェクト指向の普通のオブジェクトでもあるので、不変オブジェクトの性質さえ守ればオブジェクトのフルスペックを使用することができます。

package sample

sealed trait ParseState {
  def event(c: Char): ParseState
  def endEvent(): EndState
}

case object InitState extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = EndState(Nil)
}

case class InputState(
  fields: Seq[String],
  candidate: String
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(fields :+ candidate, "")
    case '\n' => EndState(fields :+ candidate)
    case _ => InputState(fields, candidate :+ c)
  }
  def endEvent() = EndState(fields :+ candidate)
}

case class EndState(
  row: Seq[String]
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = this
}

case class FailureState(
  row: Seq[String],
  message: String
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = sys.error("failure")
}

状態機械は、「状態×イベント→状態」の情報を記述したマトリックスとして表現できます。このマトリクスを実現するデータ構造と評価戦略は色々考えられますが、ここではOOP的なアプローチで実現しました。

状態

まず状態機械全体を表すトレイトとしてParseStateを定義します。sealedにすることで代数的データ構造の要件の1つを満たします。

ParseStateのサブクラスとして具体的な状態を定義します。情報を持つInitState, InputState, EndState, FailureStateはcase classとして、情報を持たないInitStateはcase objectとして定義しました。

イベント

発生するイベントはメソッドで表現しました。

event
1文字入力イベント発生
endEvent
パース終了イベント発生
状態×イベント→状態

状態とイベントの組から新しい状態を決定するアルゴリズムは「発生するイベント」で定義したメソッドの実装になります。

具体的には各case class, case objectのeventメソッド、endEventメソッドの実装を参照して下さい。

オブジェクト版状態機械

case classによる状態機械の表現はimmutableなので、そのままでは状態機械としては動きません。あくまでも「状態×イベント→状態」のメカニズムを提供するまでになります。

状態機械の状態遷移は状態を遷移させるので本質的にはmutableな処理です。

OOP的には、オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

OOP版の状態機械をParserObjectとして実装しました。

package sample

class ParserObject {
  var state: ParseState = InitState

  def charEvent(c: Char) {
    state = state.event(c)
  }

  def parseEnd(): Seq[String] = {
    val end = state.endEvent()
    state = end
    end.row
  }
}

PaserObjectクラスでは、変更可能なインスタンス変数としてstateを定義しています。

イベントの発生をメソッドで受取り、状態とイベントの組合せで新たな状態を計算し、これをstateに設定することで状態遷移を実現しています。

新しい状態の計算ロジックは、ParseStateオブジェクトにカプセル化しています。

代数的データ構造で表現した状態機械のimmutableなオブジェクトを、インスタンス変数で管理することで、OOP的な状態機械が実現できました。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserObjectSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserObject" should {
    "parse" in {
      val parser = new ParserObject()
      val text = "abc,def,xyz"
      for (c <- text) {
        parser.charEvent(c) // 一文字づつパーサーに送信
      }
      val r = parser.parseEnd() // 解析終了のイベントを送信
      println(s"ParserObject: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserObjectSpec
ParserObject: List(abc, def, xyz)
[info] ParserObjectSpec:
[info] ParserObject
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 180 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:47:46

アクター版状態機械

FRP的な応用を考える場合、並行・並列処理で枠組みの中で状態機械を実現していく必要があります。

この実現方式として有力なのがアクターです。

ScalaではAkkaというアクター基盤を使用することができます。

このアクターは(理論的には色々あるでしょうが)状態を持ったactive objectを実現する方式になっています。

active objectであるアクター間はメッセージボックスでキューイングされたメッセージ通信で協調動作するので、アプリケーションレベルで特別な排他制御を行わなくても並行・並列処理を簡潔に記述することができます。

アクター版のCSVパーサーであるParserActorは以下になります。

package sample

import akka.actor._

case class ParseCharEvent(c: Char)
case object ParseEnd
case class ParseResult(result: Seq[String])

class ParserActor extends Actor {
  var state: ParseState = InitState

  def receive = {
    case ParseCharEvent(c) =>
      state = state.event(c)
    case ParseEnd =>
      val end = state.endEvent()
      state = end
      sender ! ParseResult(end.row)
  }
}

基本的に行っているのは(passive objectである)ParserObjectと同じです。オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

ParserObjectとの違いは、メソッド呼び出しではなくアクター間でのメッセージ通信によって処理が実行されるため、以下のようなメッセージ通信のためのコードを用意しないといけない点です。

  • アクター間で送受信されるメッセージの定義
  • メッセージを受け取り処理を行うイベントハンドラ

ParserObjectではメソッドとして簡単に定義できる処理ですが、これをメッセージ処理ように仕立て直さないといけないわけです。

アクターはとても簡単に利用できるのですが、少しボイラープレイトのコードを書く必要があります。

使い方
package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class ParserActorSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParseActor" should {
    "parse" in {
      implicit val timeout = Timeout(5.seconds)
      val system = ActorSystem("state")
      implicit val context = system.dispatcher
      val actor = system.actorOf(Props[ParserActor])

      val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }
      system.shutdown()
    }
  }
}

アプリケーションロジックは以下の部分です。

val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }

アクターを使うために以下のような準備が必要になります。

// タイムアウト値の暗黙知を定義
      implicit val timeout = Timeout(5.seconds)
      // アクターの実行基盤を作成
      val system = ActorSystem("state")
      // スレッドの実行文脈を定義
      implicit val context = system.dispatcher
      // アクターの生成
      val actor = system.actorOf(Props[ParserActor])
      ....
      // アクターの実行基盤をシャットダウン
      system.shutdown()

アクターはJava流のスレッドによる並行・並列プログラミングと比較するとはるかに楽でバグも出にくいアプローチなのですが、それでも少し込み入ったボイラープレイトが必要になります。

実行

実行結果は以下になります。

$ sbt test-only sample.ParserActorSpec
[info] Compiling 1 Scala source to /Users/asami/src/workspace2015/0317.blog.statemachine/target/scala-2.11/test-classes...
ParserActorSpec: List(abc, def, xyz)
[info] ParserActorSpec:
[info] ParseActor
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 408 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed 2015/03/21 17:48:09

まとめ

今回は状態機械のベースとなるcase class群(ParseState)を作成した後、通常のOOPのアプローチとして普通のオブジェクト版(passive object)のパーサーとアクター版(active object)のパーサーを作成しました。

ParseStateはOOPのvalue objectであると同時にFPの代数的データ型でもあります。このParseStateを普通のオブジェクト、アクターの両方で普通に利用して状態機械を作成できることが確認できました。

次回は今回のParseStateをベースにMonadic Programming, Functional Reactive Programmingにおける状態機械の実現方法について考えます。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月9日月曜日

[scalaz] Tryモナド問題

Scalazでmonadic programmingする上で困っているのがTryモナドの問題です。

scala.util.TryはScalaで例外処理をmonadicに処理するためにScala 2.10で導入されたScalaの基本機能です。Scalaのモナドとしての要件(flatMapメソッドが定義されている等)は満たしているのでfor式で使用することができます。

しかし、ScalazのMonadではないのでScalazの提供する各種機能の恩恵を得ることができません。このことがScalazを軸としたmonadic programmingの阻害要因になっています。

例外をハンドリングするためにはTryを使うのが自然ですが、Tryを使うとScalazのmonadic programmingがやりづらくなる、という構図です。

問題

TryがScalaz Monadとなっていない理由は、Tryがモナド則の1つであるleft identityを満たしていないとされているからのようです。

この理由は:

の記事の解説によると:

def foo[A, B](a: A): Try[B] = throw new Exception("oops")

foo(1) // exception is thrown

Try(1).flatMap(foo) // scala.util.Failure

ということなので:

  • foo関数が例外を返す場合、TryのflatMapコンビネータにfoo関数を適用した時にflatMapコンビネータが例外を返さないとMonad則のleft identity law違反

ということだと思います。

さて、それではTryのflatMapコンビネータで例外を返すようにすればよいかというと、これはOOP的には困ってしまう仕様です。

OOP的にはTryは例外を包んで外に出さないことを期待したいところです。というのは、flatMapコンビネータが例外を返す可能性があるとすると、Tryを使っているにもかからわずTryも例外を出す可能性があるということなので、Tryの外側をTryで包まなければならなくなります。

これはプログラミング的にも煩雑ですし、Tryをネストさせなければならない条件をプログラマが意識しないといけないのでバグの出やすいインタフェースです。

こういう事情もありScalaの基本ライブラリは、あえて現在の仕様を選択しているようです。

解決策

本稿の趣旨ですが、Try Monad問題を解釈変更で解決するという試みです。

left identityの解釈

Scalaのようなハイブリッドな関数型言語で純粋関数型の計算を行う場合、いくつかの紳士協定を前提とします。

代表的なものとしては以下のようなものがあります。

  • var変数は用いない
  • mutableなコレクションは用いない
  • eqメソッドは用いない

つまりScalaにおける純粋関数型の演算は紳士協定が前提なので、Tryのleft identity問題も紳士協定で解決すればよいのではないかということです。

具体的には返却値にTryを返す:

f(a: A): Try[B]

というシグネチャの関数では例外をスローしない、という紳士協定を導入するというアプローチはどうでしょうか。もちろん絶対ということはないので、例外をスローしたらプログラムが致命的状態(e.g. バグ発生、メモリ不足)に入ったとして扱うという形になります。

プログラミング的には以下をコーディングの基本形にするということなので、特に煩雑な点はありません。

f(a: A): Try[B] = Try {
  ...
}

この紳士協定上ではTryをScalaz Monad化しても問題ないように思います。

実行タイミングの解釈

FPでは参照透過性が重要な要件になっていて、関数を評価した時の内部処理の実行タイミングや実行順序は結果に影響を与えないことになっています。

TryのようなFunctor系のコンテナの場合、コンテナの内部情報を取り出すメソッドの実行前の任意の時点で評価が行われていればよいわけです。

TryのflatMapコンビネータでも、同様のことが言えるはずです。つまりflatMapコンビネータ内で必ずしもMonadのbind演算をする必要はなく、もっと後のタイミングでしても大丈夫ではないかということです。と考えると、flatMapコンビネータが例外をスローすることは必須ではなく、このことがMonad則違反というのは必ずしも真とは限らないというわけです。

整理すると、以下になります。

  • Tryの内部実行がコンビネータの呼び出しと同時に行われていなければならないという計算モデルではMonad則違反
  • Tryの内部実行がコンテナの内部を取り出すメソッドの実行前の任意の時点で評価が行われていればよいという計算モデルではMonad則OK

Tryの計算モデルを後者であると解釈すると、Monad則が成り立っていると考えてよいのではないかと思います。

実際にScalazのFuture MonadやTask Monadも同様の問題があるはずですが、いずれもScalaz Monadとして定義されています。

Futureの場合は、関数型的な意味での遅延評価ではなくて、別スレッドで非同期実行されるためflatMapで例外を返すようにはできないのだと推測されます。

Taskの場合は、(通常は)runメソッドで実行を指示されるまで実行は遅延されます。

いずれの場合もmapコンビネータやflatMapコンビネータが例外を返すことはありません。そして例外は実行結果を取り出すときに、必要に応じてスローされるようになっています。

FP的なMonadの定義としてこれが許されるならTryについてもgetメソッドで例外のスルーは行われるわけですから、flatMapコンビネータで例外を返さなくてもよいと考えることは十分可能と思います。

まとめ

考察の結果left identityの解釈、実行タイミングの解釈のどちらか一つでもOKであればScala TryをScalaz Monadとして定義しても問題ないのではないかという結論に落ち着きました。

個人的にはいずれの解釈もOKと思えるので、プロダクションのコードでTry Monadを使用することにしました。

Try Monadの実装としてはscalaz-outlawsを使うのが一案ですが、現状ではdeprecatedの警告が出るので諦めて、自前のライブラリ内で定義して使うことにしました。

TryをScalaz Monad化できると、さらに広い範囲でmonadic programmingを適用できるようになります。Try Monadの存在を軸に例外処理戦略を練りなおしてみたいと思います。

おまけ

Scalazによるmonadic programmingでTryを使う場合、TryがApplicativeであるだけでも、色々と応用範囲が広がります。(e.g. Traverse)

TryはApplicativeの要件は満たしていると思うので、Try Monadが不安な場合は(Validationと同じように)TryをApplicativeとして定義して使用するのも有用だと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0

2015年3月2日月曜日

[scalaz-stream] シーケンス番号とエンドマーク

プロダクトの開発中にわりと難しい処理がscalaz-streamでうまくさばけたのでメモ。

近い将来Buzzwordになってきそうな「Functional Reactive Programming」ですが、monadicなstreaming系のアプローチが筋がよさそうということで、scalaz-streamを実プロダクト開発に導入して試行錯誤しているところです。大規模データ系のいろいろな処理に適用していますが、かなりいい感触を得ています。

そのような応用の一つである大規模メール配信処理で出てきた要件が以下のものです。

  • 大規模なデータ列を一つの固まりとしてストリーム上に流したい
  • 事前にデータ列の総量は分からない
  • データ規模が大きいので復数パケットに分割する必要がある
  • 性能向上のため、可能な範囲で1パケットに復数データ列を格納したい

ネットワーク系のプログラムではわりとよく出てくる処理だと思います。

やろうとしていることはそれほど難しくないのですが、プログラムを組んでみると思いの外、複雑なものになってしまうといった系統の処理です。

こういった処理をいかに簡単に書くことができるようにするのかというのがプログラミング・モデルの重要なテーマの一つです。OOPであれば専用フレームワーク的なものを用意する感じですが、FPの場合はComposableな関数のCompositionでさばいてみたいところです。

サンプルロジック

上記の要件をscalaz-streamで実現することができるかを検証するためにサンプルプログラムをつくってみました。

以下の要件を実装しています。

  • 事前にすべてデータが読み込まれていることは前提としない
  • 3データごとに1つのパケットに集約
  • シーケンス番号をつける
  • 最終パケットにエンドマークをつける

scalaz-streamのProcessモナドを使用しています。

package sample

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._

object Main {
  case class Packet(seqno: Int, end: Boolean, content: String)

  val source: Process[Task, Int] = {
    Process.range(0, 10)
  }

  def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }

  def main(args: Array[String]) {
    execute(source, sink)
  }

  def execute(source: Process[Task, Int], sink: Sink[Task, Packet]) {
    import process1._
    val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)
    val task = pipeline.run
    task.run
  }

  def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }
}
Source

ストリームの起点となるProcessモナドをSourceと呼びます。

例題なのでProcess#range関数で0から10の数値のストリームを生成しています。

val source: Process[Task, Int] = {
    Process.range(0, 10)
  }
Sink

ストリームの終端となるProcessモナドをSinkと呼びます。

例題なのでscalaz.stream.io#channel関数を使って、コンソール出力するSinkを定義しています。

def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }
ストリームの構築

SourceとSinkはサンプル用のものなので、ここからが本題です。

ストリームの構築は、以下のようにProcessモナドが提供するコンビネータをつないでいく形になります。

FunctorやMonadが定義している基本コンビネータであるmapやflatMap以外にProcessモナドが提供するpipeといったコンビネータを多用することになるのが特徴です。

val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)

ストリームを処理するパイプラインはSourceから始まってtoコンビネータで指定するSinkが終端になります。途中、chunk、pipe、mapの各メソッド/コンビネータがパイプラインを構成しています。

chunk

ストリーム内を流れるデータを1つのチャンクにまとめる処理はchunkメソッドを使います。チャンクにまとめる個数の3を引数に指定します。

ストリームにはsourceからデータであるIntが流れてきますが、これがSeq[Int]に変換されます。

pipe

pipeコンビネータはProcessモナドをストリーム処理のパイプライン部品として埋め込みます。

ここではscalaz.stream.process1にある以下の関数を使って生成したProcessモナドをパイプラインに埋め込んでいます。

  • zipWithNext
  • zipWithIndex

zipWithNext関数が返すProcessモナドは、処理中のデータに加えてその次のデータをOptionとして渡してきます。処理中のデータが最終データの場合は、「その次のデータ」はNoneになります。これを使用すると、次のデータの有無、次のデータの内容によって処理を変更することができるわけです。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Seq[Int], Option[Seq[Int]]]に変換されます。

zipWithIndex関数が返すProcessモナドは、処理中のデータにインデックスをつけます。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]変換されます。

map

mapコンビネータで以下のtoPacket関数をパイプラインに組み込んでいます。

def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }

ここまでの処理でストリームからTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]が流れてきますが、この段でPacketに変換されます。

toPacket関数の処理は簡単でストリームから流れてきた情報を元に:

インデックス
ストリーム上の情報は0起点なので1起点に変換
エンドマーク
Nextがない場合は最終パケットなのでtrue

の情報をPacketに設定しています。

サンプルコード内で自前のアルゴリズムらしい物を書いているはここだけです。

ストリームの実行

ストリームの実行はrunメソッドで行います。

今回はTaskモナドを使う指定をしているので、ProcessモナドのrunメソッドではTaskモナドが返ってきます。

Taskモナドのrunメソッドを実行するとストリームが動作します。

val task = pipeline.run
    task.run

実行

実行とすると以下の結果が出力されました。

10個のデータ列が4つのパケット列として出力され、最後のパケットにはエンドマークがついています。

Packet(1,false,0-1-2)
Packet(2,false,3-4-5)
Packet(3,false,6-7-8)
Packet(4,true,9)

受信側の処理

このパケット列を受け取った側の処理としては:

  • エンドマークがついているパケットまでパケットを読み込む
  • シーケンス番号を監視して、欠損があれば再送を依頼する

というような処理になるかと思います。

こういった処理をscalaz-streamのProcessモナドで実装可能か、というのも面白そうなテーマなので機会があれば試してみたいと思います。

まとめ

大規模データのパケット分割処理ですが、自前のロジックはtoPacket関数のものぐらいで、後はscalaz-streamの用意する部品を組み合わせるだけで構築できました。

パケット分割や復元は自分でロジックを組むとそれなりにバグが出る所なので、既存部品を組み合わせるだけで、ほとんどの処理が組み上がって、最後の仕上げだけ自前のロジックを差し込むことができるのはほんとうに楽です。

また、OOP的なフレームワークだと:

  • パイプラインの構成をXMLなどの外部DSLで定義
  • 自前ロジックをパイプラインに組み込むためのボイラープレイト作成

といった作業が必要になるので、それなりに大変です。

それに対して、scalaz-streamの方式はmonadic programmingの作法に則っていれば通常の関数型プログラミングでOKなのが非常に使いやすいところです。具体的にはOOP的なフレームワークアプローチに対して以下のメリットがあります。

  • パイプラインを外部DSLで構築すると、型安全でなくなる、デバッグがしづらいといった問題も出るが、いずれの問題もなくなる。
  • 自前ロジックを通常の関数で記述すればよく、特別なボイラープレイトのコードは必要ない。
いいことずくめのようですが:

  • monadic programmingの習得
  • processモナドの理解

といった難点があるので、これはこれで一定のハードルがあります。

こういったハードルをcoding idiomやdesin patternといった技法でクリアすることができれば、大規模データ処理にはなかなか有力なアプローチだと思います。

注意

本稿の「大規模」は処理対象としてメモリに一度に乗らない規模のデータ量を指していて、最大1GB程度のものを想定しています。

これを超えてくるデータの処理はHadoopやSpark的な並列分散処理が必要になってくるので、本稿のスコープ外です。このような並列分散処理もSparkのRDDといったものを使ったmonadic programmingが有力と思うので、いずれ取り上げてみたいと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a