2015年3月30日月曜日

Scala的状態機械/FP編

Scalaにおける状態機械の実装戦略について検討しています。

Scala的状態機械/OOP編」で状態+状態遷移を表現するトレイトであるParseStateを作成しました。ParseStateの具象クラスはcase classまたはcase objectとして実現しています。これらのトレイト、case class、case objectがワンセットでFPで使用できる代数的データ構造となっています。

「OOP編」ではこのParseStateを使用して、オブジェクト版の状態機械とアクター版の状態機械を作成しました。代数的データ構造でもあるParseStateがOOP的に問題なく使用できることが確認できました。

「FP編」では、ParseStateの代数的データ構造の性質を活かしてMonadic Programming(以下MP)版の状態機械を考えてみます。

Stateモナド版状態機械

MPで状態を扱いたい場合には、状態を扱うモナドであるStateモナドが有力な選択肢です。

代数的データ型であるParseStateはそのまま利用し、これをStateモナドでくるむことで状態遷移を実現したものが以下のParserStateMonadです。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}
action関数

モナドを使った共通機能を作る場合には、共通機能としていわゆるモナディック関数を提供するのが一つの形になっています。

モナディック関数とは「A→M[B]」(Mはモナド)の形をしている関数です。モナドMのflatMapコンビネータの引数になります。

Stateモナドを使用する場合には、A→State[B]の形の関数を用意することになります。

def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

今回作成したStateモナド用のモナディック関数であるaction関数は「Char→State[ParseState→(ParseState, Char)]」の形をしています。

A→M[B]の形とは以下の対応になります。

  • A : Char
  • M : State
  • B : ParseState→(ParseState, Char)

Stateモナドに設定している関数は「ParseState→(ParseState, Char)」の形をしていますが、action関数全体ではaction関数の引数もパラメタとして利用しているので、結果として「Char→ParseState→(Parse, Char)」の動きをする関数になっています。

action関数が返すStateモナドはParseStateによって表現された状態を、受信したイベントとの組合せ状態遷移する関数が設定されています。

状態+状態遷移を表現するオブジェクト兼代数的データ型であるParseStateがきちんと定義されていれば、Stateモナド用のモナディック関数を定義するのは容易なことが分かります。

parse関数

CharのシーケンスからCSVをパースするparse関数は以下になります。

def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }

parse関数は、Stateモナド用モナディック関数actionと型クラスTraverseのtraverseSコンビネータの組合せで実現しています。

traverseSコンビネータはStateモナド用のtraverseコンビネータです。Scalaの型推論が若干弱いためStateモナド専用のコンビネータを用意していますが、動きは通常のtraverseコンビネータと同じです。

状態遷移のロジックそのものはParseStateオブジェクトにカプセル化したものをaction関数から返されるStateモナド経由で使用します。

traverseSコンビネータとStateモナドを組み合わせると、Traverseで走査対象となるコレクションをイベント列と見立てて、各イベントの発生に対応した状態機械をStateモナドで実現することができます。この最終状態を取得することで、イベント列を消化した最終結果を得ることができます。

OOPオブジェクトであり代数的データ構造でもあるParseStateは、このようにしてStateモナドに包むことで、FP的な状態機械としてもそのまま使用することができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserStateMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserStateMonad.parse(events)
      println(s"ParserStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserStateMonadSpec
ParserStateMonadSpec: List(abc, def, xyz)
[info] ParserStateMonadSpec:
[info] ParserStateMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 400 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 1 s, completed 2015/03/21 17:49:39

scalaz stream版状態機械

Stateモナド版状態機械ではStateモナドと型クラスTraverseのtraverseSコンビネータを使用して、状態機械をMPで実現しました。

この実現方法は、メモリ上に展開したデータに対して一括処理をするのには適していますが、大規模データ処理やストリーム処理への適用は不向きです。

そこで、大規模データ処理やストリーム処理をFunctional Reactive Programming(以下FRP)の枠組みで行うために、scalaz streamを使用して状態機械の実装してみました。

package sample

import scalaz._, Scalaz._
import scalaz.stream._
import scalaz.stream.Process.Process0Syntax

object ParserProcessMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

Processモナドは一種の状態機械なので、この性質を利用してPraseStateによる状態遷移をProcessモナド化します。この処理を行うのがfsm関数です。

fsm関数は状態であるParseStateを引数に、Charを受け取るとParseStateとCharの組合せで計算された新しいParseStateによる状態に遷移しつつ処理結果としてParseStateを返すProcessモナドを返します。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }
parse関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の典型的な使い方です。

def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

parse関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

まず引数のCharシーケンスからソースとなるProcessモナドを生成します。

val source: Process0[Char] = Process.emitAll(events)

ProcessのemitAll関数は引数に指定されたシーケンスによるストリームを表現するProcessモナドを生成します。

ただし、内部的に非同期実行する本格的なシーケンスではなく、通常のシーケンスに対してストリームのインタフェースでアクセスできるという意味合いのProcessモナドになります。(内部的には実行制御に後述のTaskモナドではなく、Idモナドを使用しています。)

パイプラインの処理を記述

pipeコンビネータでfsm関数にInitStateを適用して得られるProcessモナドをパイプライン本体のProcessモナドに設定しています。

val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))

これが処理の本体です。ここではpipeコンビネータを始めとする各種コンビネータを使ってパイプラインを構築します。

パイプラインで加工後のデータを取得

最後にtoListメソッドで、パイプラインの処理結果をListとして取り出し、ここからパース結果を取り出す処理を行っています。

val result = pipeline.toList.last
    result.endEvent.row
parseTask関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の使用例ですが、より本格的な応用である大規模データ処理やストリーム処理では少し使い方が変わってきます。

そこで、参考のために実行制御にTaskモナドを使ったバージョンのparseTask関数を作りました。

def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process[Task, Char] = Process.emitAll(events).toSource
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }

parse関数と同様にparseTask関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

ProcessモナドのtoSourceメソッドで、Taskモナドを実行制御に使用するProcessモナドに変換されます。

val source: Process[Task, Char] = Process.emitAll(events).toSource
パイプラインの処理を記述

fsm関数から得られたProcessモナドをpipeコンビネータでパイプラインに設定します。

val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))

この処理はTask版でないparse関数と全く同じです。

パイプラインで加工後のデータを取得

Taskモナドを実行制御に使用する場合には、for式などを使ってモナディックに実行結果を取得する形になります。

for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonad.parse(events)
      println(s"ParserProcessMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadSpec
ParserProcessMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 301 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:12

scalaz stream + Stateモナド版状態機械

「scalaz stream」ではParseStateオブジェクトを直接使用して状態機械を作成しました。通常はこれで十分ですが、Stateモナド用モナディック関数が用意されている場合は、こちらを使用する方法もあります。

この方法では、Stateモナドの使い方だけ理解していればよいので、プログラミングはより簡単かつ汎用的になります。

package sample

import scalaz._, Scalaz._
import scalaz.stream._

object ParserProcessMonadStateMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

ParseStateによる状態機械の動作を行うProcessモナドを生成するfsm関数のStateモナド版は以下になります。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

ParseStateのeventメソッドを直接使用する代わりに、ParseStateを包んだStateモナドを返すモナディック関数actionを使用します。

この版のfsm関数ではaction関数から取得したStateモナドのexecメソッドを使用して、現状態とイベント(Char)から新状態を計算し、この状態を保持した、新たなProcessを生成しています。

この方法のメリットはParseStateの使い方(この場合はeventメソッド)は知る必要がなく、汎用的なStateモナドの使い方だけ知っていればよい点です。

つまりaction関数だけ作っておけば、Traverseを使った状態機械、Processモナドを使った状態機械のどちらも簡単に作ることができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonadStateMonad.parse(events)
      println(s"ParserProcessMonadStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadStateMonadSpec
ParserProcessMonadStateMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadStateMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 286 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:26

状態機械実装戦略

オブジェクト・モデルで状態機械が出てきた場合の実装戦略としては、まずベースとして:

  • sealed trait + case classで状態+状態遷移のオブジェクト&代数的データ型(以下、状態遷移case class)

を作成します。

前回に見たようにこの状態遷移case classを使って、OOP版の状態機械を簡単に作成することができます。

次に、FP用に以下の部品を整備しておくのがよいでしょう。

  • Stateモナド用モナディック関数
  • Processモナド用状態機械関数

どちらの関数も状態遷移case classが用意されていれば、ほとんど定型的な記述のみで作成することができます。この部品を使うことでFP版の状態機械を簡単に作成することができます。

以上、状態機械実装戦略について整理しました。

この戦略で重要なのは、状態+状態遷移を表現するために作成した状態遷移case classは1つだけ作ればよく、これをOOP流、FP流にマルチに展開していける点です。

この点が確定すれば、オブジェクト・モデルに状態機械が出てきたら安心して1つの状態遷移case classの実装に集中することができます。

まとめ

OFPの状態機械の実装についてOOP的実装、FP的実装について整理してみました。

いずれの場合もオブジェクト&代数的データ型である「状態遷移case class」が基本で、ここからOOP的状態機械、FP的状態機械へマルチに展開できることが確認できました。

OFPではアプリケーション・ロジックは可能な限りFP側に寄せておくのがよいので、FPでも状態機械を簡単に実現できることが確認できたのは収穫でした。

また、FP的なアプローチが使えないケースが出てきても、簡単にOOP的アプローチに切り替え可能なのも安心材料です。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月23日月曜日

Scala的状態機械/OOP編

オブジェクト・モデリングにおける動的モデルは状態機械で記述するのが基本です。つまり状態機械はオブジェクト・モデリングの重要な構成要素の一つということです。

ScalaでObject-Functional Programming(OFP)を行う場合でも、要求・分析・設計の各アクティビティを経て作成されたオブジェクト・モデル内の状態機械をどのように実装していくのかという実装方式が論点になります。

普通のOOP流の実装はすでに議論しつくされていると思いますが、OFPにおけるFPでの実装方式については、これから整備されていくことになると思います。

注意が必要なのはクラウド・アプリケーション開発をターゲットにする場合、伝統的なFPというよりMonadic Programming(以下MP)を経てFunctional Reactive Programming(以下FRP)がゴールになるということです。

このためFRPとして利用可能な状態機械実装を探っておきたいところです。

課題

状態機械の使い所としては、エンティティの状態遷移を記述することで業務ワークフローの構築に用いたり、プロトコルドライバのフロー制御といったものが考えられます。前者はDBに状態を格納することになりエンタープライズ・アプリケーション的なライフサイクルの長い応用ですし、後者は制御系の応用でメモリ内の制御で完結する形のものです。

色々な応用があるわけですが、どの方面でも使える実現方法を念頭におきつつ、ここではCSVのパーサー処理を状態機械で実装してみることにします。FPとの接続部分に興味を集中させるためできるだけ簡単なものを選んでみました。

具体的には以下の課題になります。

  • CSVの1行を「,」区切りのレコードとしてパースし、文字列の列を取得する処理に使用する状態機械

この状態機械は文字列の列の各文字をイベントとして扱い、一連のイベント終了後にパース処理の結果となる「文字列の列」を状態の情報として保持する形になります。

case classで状態機械

状態機械は「状態×イベント→状態」のメカニズムが基本の構成要素です。このメカニズムをcase classで実装します。

いうまでもなくcase classはScalaプログラミングの超重要な構成要素で、OOP的側面とFP的側面を兼ね備えたObject-Functional Programmingの肝となる機構です。

OOP的にはDDDでいうところのvalue objectの実装に適した文法になっています。

FP的には代数的データ型(algebraic data type)として利用することが想定されています。

つまりvalue object兼代数的データ型を実現するための機構がcase classです。

ただしcase classの文法上はvalue objectや代数的データ型に適さない普通のオブジェクトを実装することも可能です。このため、value object兼代数的データ型を実現するための紳士協定を組み込んでおかなければなりません。

この紳士協定は以下の2つです。

  • 不変オブジェクト(immutable object)にする
  • 基底のトレイトや抽象クラスはsealedにする

Value objectと代数的データ型は共に不変オブジェクトである必要があります。

また代数的データ型の要件としてコンパイル時に全インヘリタンス関係が確定している必要があるので、sealedにすることでこれを担保します。

case classはオブジェクト指向の普通のオブジェクトでもあるので、不変オブジェクトの性質さえ守ればオブジェクトのフルスペックを使用することができます。

package sample

sealed trait ParseState {
  def event(c: Char): ParseState
  def endEvent(): EndState
}

case object InitState extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = EndState(Nil)
}

case class InputState(
  fields: Seq[String],
  candidate: String
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(fields :+ candidate, "")
    case '\n' => EndState(fields :+ candidate)
    case _ => InputState(fields, candidate :+ c)
  }
  def endEvent() = EndState(fields :+ candidate)
}

case class EndState(
  row: Seq[String]
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = this
}

case class FailureState(
  row: Seq[String],
  message: String
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = sys.error("failure")
}

状態機械は、「状態×イベント→状態」の情報を記述したマトリックスとして表現できます。このマトリクスを実現するデータ構造と評価戦略は色々考えられますが、ここではOOP的なアプローチで実現しました。

状態

まず状態機械全体を表すトレイトとしてParseStateを定義します。sealedにすることで代数的データ構造の要件の1つを満たします。

ParseStateのサブクラスとして具体的な状態を定義します。情報を持つInitState, InputState, EndState, FailureStateはcase classとして、情報を持たないInitStateはcase objectとして定義しました。

イベント

発生するイベントはメソッドで表現しました。

event
1文字入力イベント発生
endEvent
パース終了イベント発生
状態×イベント→状態

状態とイベントの組から新しい状態を決定するアルゴリズムは「発生するイベント」で定義したメソッドの実装になります。

具体的には各case class, case objectのeventメソッド、endEventメソッドの実装を参照して下さい。

オブジェクト版状態機械

case classによる状態機械の表現はimmutableなので、そのままでは状態機械としては動きません。あくまでも「状態×イベント→状態」のメカニズムを提供するまでになります。

状態機械の状態遷移は状態を遷移させるので本質的にはmutableな処理です。

OOP的には、オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

OOP版の状態機械をParserObjectとして実装しました。

package sample

class ParserObject {
  var state: ParseState = InitState

  def charEvent(c: Char) {
    state = state.event(c)
  }

  def parseEnd(): Seq[String] = {
    val end = state.endEvent()
    state = end
    end.row
  }
}

PaserObjectクラスでは、変更可能なインスタンス変数としてstateを定義しています。

イベントの発生をメソッドで受取り、状態とイベントの組合せで新たな状態を計算し、これをstateに設定することで状態遷移を実現しています。

新しい状態の計算ロジックは、ParseStateオブジェクトにカプセル化しています。

代数的データ構造で表現した状態機械のimmutableなオブジェクトを、インスタンス変数で管理することで、OOP的な状態機械が実現できました。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserObjectSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserObject" should {
    "parse" in {
      val parser = new ParserObject()
      val text = "abc,def,xyz"
      for (c <- text) {
        parser.charEvent(c) // 一文字づつパーサーに送信
      }
      val r = parser.parseEnd() // 解析終了のイベントを送信
      println(s"ParserObject: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserObjectSpec
ParserObject: List(abc, def, xyz)
[info] ParserObjectSpec:
[info] ParserObject
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 180 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:47:46

アクター版状態機械

FRP的な応用を考える場合、並行・並列処理で枠組みの中で状態機械を実現していく必要があります。

この実現方式として有力なのがアクターです。

ScalaではAkkaというアクター基盤を使用することができます。

このアクターは(理論的には色々あるでしょうが)状態を持ったactive objectを実現する方式になっています。

active objectであるアクター間はメッセージボックスでキューイングされたメッセージ通信で協調動作するので、アプリケーションレベルで特別な排他制御を行わなくても並行・並列処理を簡潔に記述することができます。

アクター版のCSVパーサーであるParserActorは以下になります。

package sample

import akka.actor._

case class ParseCharEvent(c: Char)
case object ParseEnd
case class ParseResult(result: Seq[String])

class ParserActor extends Actor {
  var state: ParseState = InitState

  def receive = {
    case ParseCharEvent(c) =>
      state = state.event(c)
    case ParseEnd =>
      val end = state.endEvent()
      state = end
      sender ! ParseResult(end.row)
  }
}

基本的に行っているのは(passive objectである)ParserObjectと同じです。オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

ParserObjectとの違いは、メソッド呼び出しではなくアクター間でのメッセージ通信によって処理が実行されるため、以下のようなメッセージ通信のためのコードを用意しないといけない点です。

  • アクター間で送受信されるメッセージの定義
  • メッセージを受け取り処理を行うイベントハンドラ

ParserObjectではメソッドとして簡単に定義できる処理ですが、これをメッセージ処理ように仕立て直さないといけないわけです。

アクターはとても簡単に利用できるのですが、少しボイラープレイトのコードを書く必要があります。

使い方
package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class ParserActorSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParseActor" should {
    "parse" in {
      implicit val timeout = Timeout(5.seconds)
      val system = ActorSystem("state")
      implicit val context = system.dispatcher
      val actor = system.actorOf(Props[ParserActor])

      val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }
      system.shutdown()
    }
  }
}

アプリケーションロジックは以下の部分です。

val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }

アクターを使うために以下のような準備が必要になります。

// タイムアウト値の暗黙知を定義
      implicit val timeout = Timeout(5.seconds)
      // アクターの実行基盤を作成
      val system = ActorSystem("state")
      // スレッドの実行文脈を定義
      implicit val context = system.dispatcher
      // アクターの生成
      val actor = system.actorOf(Props[ParserActor])
      ....
      // アクターの実行基盤をシャットダウン
      system.shutdown()

アクターはJava流のスレッドによる並行・並列プログラミングと比較するとはるかに楽でバグも出にくいアプローチなのですが、それでも少し込み入ったボイラープレイトが必要になります。

実行

実行結果は以下になります。

$ sbt test-only sample.ParserActorSpec
[info] Compiling 1 Scala source to /Users/asami/src/workspace2015/0317.blog.statemachine/target/scala-2.11/test-classes...
ParserActorSpec: List(abc, def, xyz)
[info] ParserActorSpec:
[info] ParseActor
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 408 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed 2015/03/21 17:48:09

まとめ

今回は状態機械のベースとなるcase class群(ParseState)を作成した後、通常のOOPのアプローチとして普通のオブジェクト版(passive object)のパーサーとアクター版(active object)のパーサーを作成しました。

ParseStateはOOPのvalue objectであると同時にFPの代数的データ型でもあります。このParseStateを普通のオブジェクト、アクターの両方で普通に利用して状態機械を作成できることが確認できました。

次回は今回のParseStateをベースにMonadic Programming, Functional Reactive Programmingにおける状態機械の実現方法について考えます。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月9日月曜日

[scalaz] Tryモナド問題

Scalazでmonadic programmingする上で困っているのがTryモナドの問題です。

scala.util.TryはScalaで例外処理をmonadicに処理するためにScala 2.10で導入されたScalaの基本機能です。Scalaのモナドとしての要件(flatMapメソッドが定義されている等)は満たしているのでfor式で使用することができます。

しかし、ScalazのMonadではないのでScalazの提供する各種機能の恩恵を得ることができません。このことがScalazを軸としたmonadic programmingの阻害要因になっています。

例外をハンドリングするためにはTryを使うのが自然ですが、Tryを使うとScalazのmonadic programmingがやりづらくなる、という構図です。

問題

TryがScalaz Monadとなっていない理由は、Tryがモナド則の1つであるleft identityを満たしていないとされているからのようです。

この理由は:

の記事の解説によると:

def foo[A, B](a: A): Try[B] = throw new Exception("oops")

foo(1) // exception is thrown

Try(1).flatMap(foo) // scala.util.Failure

ということなので:

  • foo関数が例外を返す場合、TryのflatMapコンビネータにfoo関数を適用した時にflatMapコンビネータが例外を返さないとMonad則のleft identity law違反

ということだと思います。

さて、それではTryのflatMapコンビネータで例外を返すようにすればよいかというと、これはOOP的には困ってしまう仕様です。

OOP的にはTryは例外を包んで外に出さないことを期待したいところです。というのは、flatMapコンビネータが例外を返す可能性があるとすると、Tryを使っているにもかからわずTryも例外を出す可能性があるということなので、Tryの外側をTryで包まなければならなくなります。

これはプログラミング的にも煩雑ですし、Tryをネストさせなければならない条件をプログラマが意識しないといけないのでバグの出やすいインタフェースです。

こういう事情もありScalaの基本ライブラリは、あえて現在の仕様を選択しているようです。

解決策

本稿の趣旨ですが、Try Monad問題を解釈変更で解決するという試みです。

left identityの解釈

Scalaのようなハイブリッドな関数型言語で純粋関数型の計算を行う場合、いくつかの紳士協定を前提とします。

代表的なものとしては以下のようなものがあります。

  • var変数は用いない
  • mutableなコレクションは用いない
  • eqメソッドは用いない

つまりScalaにおける純粋関数型の演算は紳士協定が前提なので、Tryのleft identity問題も紳士協定で解決すればよいのではないかということです。

具体的には返却値にTryを返す:

f(a: A): Try[B]

というシグネチャの関数では例外をスローしない、という紳士協定を導入するというアプローチはどうでしょうか。もちろん絶対ということはないので、例外をスローしたらプログラムが致命的状態(e.g. バグ発生、メモリ不足)に入ったとして扱うという形になります。

プログラミング的には以下をコーディングの基本形にするということなので、特に煩雑な点はありません。

f(a: A): Try[B] = Try {
  ...
}

この紳士協定上ではTryをScalaz Monad化しても問題ないように思います。

実行タイミングの解釈

FPでは参照透過性が重要な要件になっていて、関数を評価した時の内部処理の実行タイミングや実行順序は結果に影響を与えないことになっています。

TryのようなFunctor系のコンテナの場合、コンテナの内部情報を取り出すメソッドの実行前の任意の時点で評価が行われていればよいわけです。

TryのflatMapコンビネータでも、同様のことが言えるはずです。つまりflatMapコンビネータ内で必ずしもMonadのbind演算をする必要はなく、もっと後のタイミングでしても大丈夫ではないかということです。と考えると、flatMapコンビネータが例外をスローすることは必須ではなく、このことがMonad則違反というのは必ずしも真とは限らないというわけです。

整理すると、以下になります。

  • Tryの内部実行がコンビネータの呼び出しと同時に行われていなければならないという計算モデルではMonad則違反
  • Tryの内部実行がコンテナの内部を取り出すメソッドの実行前の任意の時点で評価が行われていればよいという計算モデルではMonad則OK

Tryの計算モデルを後者であると解釈すると、Monad則が成り立っていると考えてよいのではないかと思います。

実際にScalazのFuture MonadやTask Monadも同様の問題があるはずですが、いずれもScalaz Monadとして定義されています。

Futureの場合は、関数型的な意味での遅延評価ではなくて、別スレッドで非同期実行されるためflatMapで例外を返すようにはできないのだと推測されます。

Taskの場合は、(通常は)runメソッドで実行を指示されるまで実行は遅延されます。

いずれの場合もmapコンビネータやflatMapコンビネータが例外を返すことはありません。そして例外は実行結果を取り出すときに、必要に応じてスローされるようになっています。

FP的なMonadの定義としてこれが許されるならTryについてもgetメソッドで例外のスルーは行われるわけですから、flatMapコンビネータで例外を返さなくてもよいと考えることは十分可能と思います。

まとめ

考察の結果left identityの解釈、実行タイミングの解釈のどちらか一つでもOKであればScala TryをScalaz Monadとして定義しても問題ないのではないかという結論に落ち着きました。

個人的にはいずれの解釈もOKと思えるので、プロダクションのコードでTry Monadを使用することにしました。

Try Monadの実装としてはscalaz-outlawsを使うのが一案ですが、現状ではdeprecatedの警告が出るので諦めて、自前のライブラリ内で定義して使うことにしました。

TryをScalaz Monad化できると、さらに広い範囲でmonadic programmingを適用できるようになります。Try Monadの存在を軸に例外処理戦略を練りなおしてみたいと思います。

おまけ

Scalazによるmonadic programmingでTryを使う場合、TryがApplicativeであるだけでも、色々と応用範囲が広がります。(e.g. Traverse)

TryはApplicativeの要件は満たしていると思うので、Try Monadが不安な場合は(Validationと同じように)TryをApplicativeとして定義して使用するのも有用だと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0

2015年3月2日月曜日

[scalaz-stream] シーケンス番号とエンドマーク

プロダクトの開発中にわりと難しい処理がscalaz-streamでうまくさばけたのでメモ。

近い将来Buzzwordになってきそうな「Functional Reactive Programming」ですが、monadicなstreaming系のアプローチが筋がよさそうということで、scalaz-streamを実プロダクト開発に導入して試行錯誤しているところです。大規模データ系のいろいろな処理に適用していますが、かなりいい感触を得ています。

そのような応用の一つである大規模メール配信処理で出てきた要件が以下のものです。

  • 大規模なデータ列を一つの固まりとしてストリーム上に流したい
  • 事前にデータ列の総量は分からない
  • データ規模が大きいので復数パケットに分割する必要がある
  • 性能向上のため、可能な範囲で1パケットに復数データ列を格納したい

ネットワーク系のプログラムではわりとよく出てくる処理だと思います。

やろうとしていることはそれほど難しくないのですが、プログラムを組んでみると思いの外、複雑なものになってしまうといった系統の処理です。

こういった処理をいかに簡単に書くことができるようにするのかというのがプログラミング・モデルの重要なテーマの一つです。OOPであれば専用フレームワーク的なものを用意する感じですが、FPの場合はComposableな関数のCompositionでさばいてみたいところです。

サンプルロジック

上記の要件をscalaz-streamで実現することができるかを検証するためにサンプルプログラムをつくってみました。

以下の要件を実装しています。

  • 事前にすべてデータが読み込まれていることは前提としない
  • 3データごとに1つのパケットに集約
  • シーケンス番号をつける
  • 最終パケットにエンドマークをつける

scalaz-streamのProcessモナドを使用しています。

package sample

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._

object Main {
  case class Packet(seqno: Int, end: Boolean, content: String)

  val source: Process[Task, Int] = {
    Process.range(0, 10)
  }

  def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }

  def main(args: Array[String]) {
    execute(source, sink)
  }

  def execute(source: Process[Task, Int], sink: Sink[Task, Packet]) {
    import process1._
    val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)
    val task = pipeline.run
    task.run
  }

  def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }
}
Source

ストリームの起点となるProcessモナドをSourceと呼びます。

例題なのでProcess#range関数で0から10の数値のストリームを生成しています。

val source: Process[Task, Int] = {
    Process.range(0, 10)
  }
Sink

ストリームの終端となるProcessモナドをSinkと呼びます。

例題なのでscalaz.stream.io#channel関数を使って、コンソール出力するSinkを定義しています。

def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }
ストリームの構築

SourceとSinkはサンプル用のものなので、ここからが本題です。

ストリームの構築は、以下のようにProcessモナドが提供するコンビネータをつないでいく形になります。

FunctorやMonadが定義している基本コンビネータであるmapやflatMap以外にProcessモナドが提供するpipeといったコンビネータを多用することになるのが特徴です。

val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)

ストリームを処理するパイプラインはSourceから始まってtoコンビネータで指定するSinkが終端になります。途中、chunk、pipe、mapの各メソッド/コンビネータがパイプラインを構成しています。

chunk

ストリーム内を流れるデータを1つのチャンクにまとめる処理はchunkメソッドを使います。チャンクにまとめる個数の3を引数に指定します。

ストリームにはsourceからデータであるIntが流れてきますが、これがSeq[Int]に変換されます。

pipe

pipeコンビネータはProcessモナドをストリーム処理のパイプライン部品として埋め込みます。

ここではscalaz.stream.process1にある以下の関数を使って生成したProcessモナドをパイプラインに埋め込んでいます。

  • zipWithNext
  • zipWithIndex

zipWithNext関数が返すProcessモナドは、処理中のデータに加えてその次のデータをOptionとして渡してきます。処理中のデータが最終データの場合は、「その次のデータ」はNoneになります。これを使用すると、次のデータの有無、次のデータの内容によって処理を変更することができるわけです。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Seq[Int], Option[Seq[Int]]]に変換されます。

zipWithIndex関数が返すProcessモナドは、処理中のデータにインデックスをつけます。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]変換されます。

map

mapコンビネータで以下のtoPacket関数をパイプラインに組み込んでいます。

def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }

ここまでの処理でストリームからTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]が流れてきますが、この段でPacketに変換されます。

toPacket関数の処理は簡単でストリームから流れてきた情報を元に:

インデックス
ストリーム上の情報は0起点なので1起点に変換
エンドマーク
Nextがない場合は最終パケットなのでtrue

の情報をPacketに設定しています。

サンプルコード内で自前のアルゴリズムらしい物を書いているはここだけです。

ストリームの実行

ストリームの実行はrunメソッドで行います。

今回はTaskモナドを使う指定をしているので、ProcessモナドのrunメソッドではTaskモナドが返ってきます。

Taskモナドのrunメソッドを実行するとストリームが動作します。

val task = pipeline.run
    task.run

実行

実行とすると以下の結果が出力されました。

10個のデータ列が4つのパケット列として出力され、最後のパケットにはエンドマークがついています。

Packet(1,false,0-1-2)
Packet(2,false,3-4-5)
Packet(3,false,6-7-8)
Packet(4,true,9)

受信側の処理

このパケット列を受け取った側の処理としては:

  • エンドマークがついているパケットまでパケットを読み込む
  • シーケンス番号を監視して、欠損があれば再送を依頼する

というような処理になるかと思います。

こういった処理をscalaz-streamのProcessモナドで実装可能か、というのも面白そうなテーマなので機会があれば試してみたいと思います。

まとめ

大規模データのパケット分割処理ですが、自前のロジックはtoPacket関数のものぐらいで、後はscalaz-streamの用意する部品を組み合わせるだけで構築できました。

パケット分割や復元は自分でロジックを組むとそれなりにバグが出る所なので、既存部品を組み合わせるだけで、ほとんどの処理が組み上がって、最後の仕上げだけ自前のロジックを差し込むことができるのはほんとうに楽です。

また、OOP的なフレームワークだと:

  • パイプラインの構成をXMLなどの外部DSLで定義
  • 自前ロジックをパイプラインに組み込むためのボイラープレイト作成

といった作業が必要になるので、それなりに大変です。

それに対して、scalaz-streamの方式はmonadic programmingの作法に則っていれば通常の関数型プログラミングでOKなのが非常に使いやすいところです。具体的にはOOP的なフレームワークアプローチに対して以下のメリットがあります。

  • パイプラインを外部DSLで構築すると、型安全でなくなる、デバッグがしづらいといった問題も出るが、いずれの問題もなくなる。
  • 自前ロジックを通常の関数で記述すればよく、特別なボイラープレイトのコードは必要ない。
いいことずくめのようですが:

  • monadic programmingの習得
  • processモナドの理解

といった難点があるので、これはこれで一定のハードルがあります。

こういったハードルをcoding idiomやdesin patternといった技法でクリアすることができれば、大規模データ処理にはなかなか有力なアプローチだと思います。

注意

本稿の「大規模」は処理対象としてメモリに一度に乗らない規模のデータ量を指していて、最大1GB程度のものを想定しています。

これを超えてくるデータの処理はHadoopやSpark的な並列分散処理が必要になってくるので、本稿のスコープ外です。このような並列分散処理もSparkのRDDといったものを使ったmonadic programmingが有力と思うので、いずれ取り上げてみたいと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a

2015年2月16日月曜日

Scalaz 7.1の非互換項目

プロダクトで使っているScalazをscalaz 7.0.6からScalaz 7.1に上げたわけですが(Operational Monad)、その際に判明したScalaz 7.1の非互換というか仕様変更についてのまとめです。

非互換対応している時に気付いたものが中心なので、仕様変更項目の網羅にはなっていません。(公式仕様変更項目)

ここに出ている項目以外は、スムーズに対応できました。(他の非互換項目にも遭遇していると思いますが、記憶に残らないレベル)

Validation

Scalaz 7.1からValidationがMonadではなくなってしまいました。deprecatedのコメント「flatMap does not accumulate errors, use `scalaz.\/` or `import scalaz.Validation.FlatMap._` instead」からすると、エラーをMonoidとして蓄積する処理がモナド則を破る、ということが判明したのではないかと思われます。

このことでエラー処理戦略のバランスが少し変わってしまいました。

このことを加味した上でのエラー処理戦略に関する最新の状況は以下がよくまとまっていると思います。(この記事は2014年2月のものなので(Scalaz 7.1のリリースは2014年8月)、リリース版に対応した記事ではないと思いますが、ValidationがMonadでないことが前提になっているので、7.1の開発バージョンで仕様を先取りした記事と思われます。)

ValidationをMonadとして使っていたコードに対する当面の回避策としては:

import scalaz.Validation.FlatMap._

として互換機能を有効にすることですが、本格対応としては

"10".parseInt.disjunction

といった感じで「\/」(disjunction)モナドに変換するのがひとつの形です。

Validationは引き続きApplicativeとしては有効なので:

("10".parseInt.toValidationNel |@| "20".parseInt.toValidationNel)(_ + _)

といった形を基本線に、for式で使いたい場合はdisjunctionメソッドで\/モナドに変換して使う戦略となりそうです。

for {
  a <- "10".parseInt.disjunction
  b <- "20".parseInt.disjunction
} yield a + b

ただ、Validationの価値はかなり下がったと思うので、エラー処理戦略は\/モナドを軸に考えて、エラーの積算が必要な処理のみValidationで補完というバランスがよさそうに思います。

具体的には以下のようなイメージです。

for {
  a <- ("10".parseInt.toValidationNel |@| "20".parseInt.toValidationNel)(_ + _).disjunction
+ _).disjunction // エラーの積算が必要な処理をValidation + Applicativeで記述し、Disjunctionに変換
  b <- "30".parseInt.disjunction // for式内は\/モナドで揃える
} yield a + b

Tagged type

以下のページにある通り「Scalaz 7.1 以降は明示的にタグを unwrap しなければいけなくなった」ため、かなり使いにくくなってしまいました。

元々Scala言語仕様のぎりぎりの所を使っている機能だと思うので、将来にわたっての安全性(Scalaのバージョンが上がると動かなくなる等)と利便性のバランスを考えるとプロダクションコードでは使いにくくなりました。

OptionやBooleanの振舞いをカスタマイズするのに便利だったのでちょっと残念。

PromiseとFuture

Scalaz 7.1では、Promiseモナドが非推奨になっていて、代わり(?)にFutureモナドが追加されているようです。Futureモナドは、Scalaの基本機能であるscala.concurrent.FutureをScalazのMonad化したものです。

Operational Monad」では「scala.concurrent.FutureはScalaz 7.1.0的にはモナドとして定義されていない」と書きましたが、間違いだったので訂正します。

Scala Tips / Scala 2.10味見(4) - Future(2)」で書いた通り、ほとんど同じ機能であるPromiseとFutureが(Scalaz上の扱いが異なる状態(PromiseはMonad、Futureは非Monad)で)並立している状況が懸念点だったので、この点が払拭されたのは喜ばしいと思います。

今後は安心してFutureを軸に並列プログラミングの戦略を考えていくことができます。

また、並列プログラミングの部品となるモナドとしてはscalaz.concurrent.Taskは引き続き有効です。(注: scalaz.concurrent.Taskが内部的に使用している部品にscalaz.concurrent.Futureがありますが、scala.concurrent.Futureと紛らわしいこともありここでは触れないことにします。本稿ではscala.concurrent.FutureをFutureと呼びます。)

FutureとTaskはざっくりいうと以下のような性質の違いがあると理解しています。

Future
スレッドを積極的に生成して処理を行う
Task
スレッドはできるだけ生成しない戦略

I/Oバウンドの処理はFutureが有効です。

一方CPUバウンドの処理に関しては、100コア級のメニーコア時代になればFutureを使うのがよさそうですが、せいぜい4コア級、AWSのローエンドは1コアの状況では、Futureで細粒度の並列プログラミングをするのは時期尚早の感じです。そういう意味で、当面はTaskもFutureと同様の重み付けの選択肢と考えています。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0

2015年1月14日水曜日

Operationalモナドの合成

ScalazでOperationalモナドが簡単に使えることが分かったので、次の段階としてxuweiさんの書かれた「CoproductとInjectを使ったFree Monadの合成とExtensible Effects」を参考に、Operationalモナド(Freeモナド)の合成を試してみました。

CoproductとInjectについて、理論や動作メカニズムを把握できていないのでほぼ写経の状態です。

ConsoleService

まず最初は前回作成したConsoleServiceを合成可能にチューニングします。

プログラムの見通しをよくするため今回のテーマに関係しないinterpreterTaskとrunTaskは省いています。

package sample

import scala.language.higherKinds
import scalaz._, Scalaz._

object ConsoleService {
  sealed trait ConsoleOperation[_]
  case class PrintLine(msg: String) extends ConsoleOperation[Unit]
  case object ReadLine extends ConsoleOperation[String]

  def printLine(msg: String) = Free.liftFC(PrintLine(msg))
  def readLine = Free.liftFC(ReadLine)

  val interpreter = new (ConsoleOperation ~> Id) {
    def apply[T](c: ConsoleOperation[T]): Id[T] = {
      c match {
        case PrintLine(msg) => println(msg)
        case ReadLine => scala.io.StdIn.readLine()
      }
    }
  }

  def run[T](f: Free.FreeC[ConsoleOperation, T]): T = {
    Free.runFC(f)(interpreter)
  }

  class ConsolePart[F[_]](implicit I: Inject[ConsoleOperation, F]) {
    def printLine(msg: String): Free.FreeC[F, Unit] = Free.liftFC(I.inj(PrintLine(msg)))
    def readLine: Free.FreeC[F, String] = Free.liftFC(I.inj(ReadLine))
  }

  object ConsolePart {
    implicit def instance[F[_]](implicit I: Inject[ConsoleOperation, F]): ConsolePart[F] = new ConsolePart[F]
  }
}

追加したのは、scalaz.InjectによってOperationalモナドの合成を行うための受け皿となるクラスConsolePartとそのコンパニオンオブジェクトです。

class ConsolePart[F[_]](implicit I: Inject[ConsoleOperation, F]) {
    def printLine(msg: String): Free.FreeC[F, Unit] = Free.liftFC(I.inj(PrintLine(msg)))
    def readLine: Free.FreeC[F, String] = Free.liftFC(I.inj(ReadLine))
  }

  object ConsolePart {
    implicit def instance[F[_]](implicit I: Inject[ConsoleOperation, F]): ConsolePart[F] = new ConsolePart[F]
  }

ConsoleServiceオブジェクト本体に定義されているprintLine関数、readLine関数の合成可能版を定義しています。

暗黙パラメタで渡されてきたInjectのinjメソッドを使ってcase class(PrintLine, ReadLine)をインジェクト可能にしたもの(?)をFree.liftFCでOperationalモナド化しています。

コンパニオンオブジェクトの方にはおまじないの暗黙変換関数を定義しています。

AuthService

次にConsoleServiceに合成して使用するOperationalモナドとしてAuthOperationをAuthServiceに定義します。

Operationalモナド化のターゲットとなる1階カインド型のトレイトとしてAuthOperationを、具体的なコマンドとなるcase classとしてLoginを定義してます。

インタープリターの実行エンジンはinterpreterとして定義しています。

その後ろにあるAuthPartクラスとコンパニオンオブジェクトがOperationalモナド合成のために必要な定義です。

package sample

import scala.language.higherKinds
import scalaz._, Scalaz._

object AuthService {
  sealed trait AuthOperation[_]
  case class Login(user: String, password: String) extends AuthOperation[Unit]

  val interpreter = new (AuthOperation ~> Id) {
    def apply[T](c: AuthOperation[T]): Id[T] = {
      c match {
        case Login(login, password) => println(s"$login:$password")
      }
    }
  }

  class AuthPart[F[_]](implicit I: Inject[AuthOperation, F]) {
    def login(user: String, password: String): Free.FreeC[F, Unit] = Free.liftFC(I.inj(Login(user, password)))
  }

  object AuthPart {
    implicit def instance[F[_]](implicit I: Inject[AuthOperation, F]): AuthPart[F] = new AuthPart[F]
  }
}

この例では、プログラムの意図を見やすくするためAuthService本体ではユーティリティ関数を定義していませんが、実応用時にはAuthService本体とAuthPartの両方で定義して、合成非使用時、合成使用時のどちらのケースでも動作可能にしておくことになります。

Utility

Operationalモナド合成のために必要なユーティリティ関数orを定義します。これは「CoproductとInjectを使ったFree Monadの合成とExtensible Effects」にある関数をそのまま持ってきています。

package sample

import scala.language.higherKinds
import scalaz._, Scalaz._

object Utility {
  def or[F[_], H[_], G[_]](
    fg: F ~> G, hg: H ~> G
  ): ({ type f[x] = Coproduct[F, H, x]})#f ~> G = {
    new (({type f[x] = Coproduct[F,H,x]})#f ~> G) {
      def apply[A](c: Coproduct[F,H,A]): G[A] = c.run match {
        case -\/(fa) => fg(fa)
        case \/-(ha) => hg(ha)
      }
    }
  }
}

2つの自然変換(NaturalTransformation)を合成した自然変換を作成する関数のようです。

積(product)の双対であるCoproduct(余積)は直和と同等ということらしく、型レベルのEither(Disjoint union)と考えてよさそうです。Scalazの内部実装もScalazの「\/」を使っています。

合成後の自然変換はCoproductとして渡されてきた型が合成された2つの自然変換のどちらに該当するかを判定して、該当する自然変換によって変換を行う、というロジックだと思います。

ConsoleAuthService

さていよいよ合成です。

ConsoleServiceとAuthServiceを合成したConsoleAuthServiceを定義してみました。

package sample

import scalaz._, Scalaz._

object ConsoleAuthService {
  import ConsoleService._, AuthService._

  type ConsoleAuth[A] = Coproduct[ConsoleOperation, AuthOperation, A]
  val interpreter = Utility.or(ConsoleService.interpreter, AuthService.interpreter)
  def run[T](f: Free.FreeC[ConsoleAuth, T]) = Free.runFC(f)(interpreter)
}

定義はとても簡単で以下の3行だけです。

  • CoproductによるOperationalモナドの合成
  • 実行エンジンの合成
  • runメソッド
CoproductによるOperationalモナドの合成

Coproductを使って2つのOperationalモナド(ConsoleOperation, AuthOperation)を合成します。

type ConsoleAuth[A] = Coproduct[ConsoleOperation, AuthOperation, A]

正確には、2つのOperationalモナドのいずれかを保持するCoproductを定義する、ということになろうかと思います。

実行エンジンの合成

Utilityのor関数でConsoleServiceの実行エンジンとAuthServiceの実行エンジンを合成します。実行エンジンは自然変換なので、汎用の自然変換の合成機能で合成することができます。

val interpreter = Utility.or(ConsoleService.interpreter, AuthService.interpreter)

意味的には、「2つのOperationalモナドのいずれかを保持するCoproduct」をターゲットの関手に変換する自然変換、ということになろうかと思います。

runメソッド

runメソッドでは、「2つのOperationalモナドをCoproductを使って合成したもの」をFreeモナド化したものをパラメタで受取り、「2つのOperationalモナドをCoproductを使って合成したもの」の実行エンジンをFree.runFC関数で適用して処理を実行しています。

def run[T](f: Free.FreeC[ConsoleAuth, T]) = Free.runFC(f)(interpreter)

App

2つのOperationalモナドを合成したConsoleAuthServiceの使用方法は以下になります。

package sample

import scala.language.higherKinds
import scalaz._, Scalaz._

object App {
  import ConsoleService.ConsolePart, AuthService.AuthPart

  def program[F[_]](implicit C: ConsolePart[F], A: AuthPart[F]) = {
    import C._, A._
    for {
      password <- readLine
      _ <- login("user", password)
    } yield ()
  }

  def main(args: Array[String]) {
    ConsoleAuthService.run(program)
  }
}

program関数の実装はConsoleServiceやAuthServiceを単独で使う場合と同様にfor式によるものですが以下の点が異なっています。

  • 暗黙パラメタとしてscalaz.Injectを使った合成用のクラスConsolePartとAuthPartを受け取っている。
  • ConsoleServiceとAuthServiceのユーティリティ関数として暗黙パラメタで受け取ったクラスのメソッドを使用している。

このあたりをおまじないとして考えると、ConsoleServiceやAuthServiceを単独で使う場合とほとんど変わらない手間で、ConsoleServiceとAuthServiceを合成したConsoleAuthServiceを使用できています。

まとめ

Scalaz 7.1.0でOperationalモナド(Freeモナド)の合成が簡単にできることが確認できました。

モナドを合成できるのもよいですが、モナドの実行エンジンも自然変換を使って簡単に合成できるのも好感触です。

今まではモナドは合成できないためモナド変換子(e.g. WriteT)を都度作るアプローチだったと思いますが、Operationalモナド/Freeモナドを使うことで合成によるアプローチを取ることも可能になったと理解してよいのかもしれません。

どこまで実用性があるかは未知数ですが、この検証も含みにアプリケーションやフレームワークで定義するモナドはOperationalモナドを第1候補に考えてみたいと思っています。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0

2015年1月8日木曜日

Operationalモナド

おぼろげな記憶ではFreeモナドが話題になったのが一昨々年、Operationalモナドは一昨年だったと思うので、いまさらの感もありますが、ScalazでOperationalモナドが簡単に使えることが判明したのでメモしておきます。

Freeモナドは、FunctorをMonad化することができるモナドですが、以下の性質を持ったインタープリターのDSLを簡単に作れることで知られています。

  • トランポリン化によりjoin演算時の再帰呼び出しによるスタックオーバーフローを回避
  • インタープリターの実行エンジンが疎結合になっており取替え可能
  • 副作用を実行エンジン側に分離(2015-01-14追記)

Operationlモナドは、Coyonedaにより型パラメータが1つの1階カインド型のデータ(Scala的にはcase classで実装。以下case class。)をFunctor化する事が可能な性質を利用して、case classから直接Freeモナド化したものです。

ざっくりいうとcase classでインタープリターのオペレーションを定義すれば、ここから簡単にインタープリターの(monadicな)DSLを作ることができるわけです。

プロダクトではScalaz 7.0.6を使っているのですが、このバージョンでOperationalモナドを使おうとするとかなり込み入った呪文を唱えないといけません。

Operationalモナドは非常に便利そうなんですが、呪文が必要になるとすると、あまり気軽に使えなくなってしまいます。

この問題がScalaz 7.1.0で解消されていることが判明したわけです。(Scalaz 7.1.0のリリースが昨年8月のようなのでいまさらですが。)

ConsoleService

Scalaz 7.1.0の機能を使用したOperationalモナドの実装は以下になります。

package sample

import scalaz._, Scalaz._
import scalaz.concurrent.Task

object ConsoleService {
  sealed trait ConsoleOperation[_]
  case class PrintLine(msg: String) extends ConsoleOperation[Unit]
  case object ReadLine extends ConsoleOperation[String]

  def printLine(msg: String) = Free.liftFC(PrintLine(msg))
  def readLine: Int = Free.liftFC(ReadLine)

  val interpreter = new (ConsoleOperation ~> Id) {
    def apply[T](c: ConsoleOperation[T]): Id[T] = {
      c match {
        case PrintLine(msg) => println(msg)
        case ReadLine => scala.io.StdIn.readLine()
      }
    }
  }

  def run[T](f: Free.FreeC[ConsoleOperation, T]): T = {
    Free.runFC(f)(interpreter)
  }

  val interpreterTask = new (ConsoleOperation ~> Task) {
    def apply[T](c: ConsoleOperation[T]): Task[T] = {
      Task.delay {
        c match {
          case PrintLine(msg) => println(msg)
          case ReadLine => scala.io.StdIn.readLine()
        }
      }
    }
  }

  def runTask[T](f: Free.FreeC[ConsoleOperation, T]): Task[T] = {
    Free.runFC(f)(interpreterTask)
  }
}
case classの定義

まずインタープリターのコマンドとなるcase classを定義します。

ここではコンソールに文字列を出力するPrintLineとコンソールから文字列を入力するReadLineを定義しています。この2つのcase classの親クラスとしてConsoleOperationトレイトを定義しています。このConsoleOperationトレイトが型パラメータを1つ取る構造になっているためCoyonedaによるOperationalモナド化が可能になっています。

定義時のイメージとしてはcase classの引数にコマンドの引数、case classの親クラスであるConsoleOperationの型パラメータにコマンドの返り値を定義します。

sealed trait ConsoleOperation[_]
  // PrintLine(msg: String): Unit
  case class PrintLine(msg: String) extends ConsoleOperation[Unit]
  // ReadLine(): String
  case object ReadLine extends ConsoleOperation[String]
ユーティリティ関数

次にFreeモナドとして使いやすくするためのユーティリティ関数を定義します。

定義は簡単で、scalaz.FreeのliftFC関数に(条件に合致した)case classを渡すだけです。

Scalaz 7.0.6ではこのliftFC関数がなかったために呪文が必要だったわけです。

def printLine(msg: String) = Free.liftFC(PrintLine(msg))
  def readLine: Int = Free.liftFC(ReadLine)
インタープリター

ConsoleServiceの利用時には、Freeモナドの内部構造としてPrintLineやReadLineの列が構築されますが、これを解釈して実行するインタープリターを定義します。

これはScalazのNaturalTransformation(自然変換)を用いて実装します。NaturalTransformation自体は1階カインド型間の変換(圏論的には関手間の射?)の機能ですが、ここではモナド間の変換に適用します。

まずConsoleOperationモナド(Operationalモナド化したもの)をIdモナドに変換する自然変換によるインタープリターの実行エンジンです。

val interpreter = new (ConsoleOperation ~> Id) {
    def apply[T](c: ConsoleOperation[T]): Id[T] = {
      c match {
        case PrintLine(msg) => println(msg)
        case ReadLine => scala.io.StdIn.readLine()
      }
    }
  }

Idモナドは実行結果をモナドにくるまず直接取得する時に使用するモナドです。

変換先としてIDモナドを指定することで、インタープリターの実行結果の値を取得できるようになります。

実行ユーティリティ

NaturalTransformationとして実装されたインタープリターはscalaz.FreeのrunFC関数で実行することができます。

Scalaz 7.0.6ではこのrunFC関数もなかったために、ここでも呪文が必要でした。

def run[T](f: Free.FreeC[ConsoleOperation, T]): T = {
    Free.runFC(f)(interpreter)
  }
Task版インタープリターと実行ユーティリティ

scalaz.concurrent.Taskは、scala.util.Tryとscala.concurrent.Futureの機能を併せ持ったようなモナドです。例外のキャッチ、遅延評価、並列実行といった目的で汎用的に使用することができます。

scala.util.Tryとscala.concurrent.FutureはScalaz 7.1.0的にはモナドとして定義されていないので、Monadicプログラミングをする上ではTaskの方が都合がよいです。

ここではFreeモナドConsoleOperationをTaskに変換するインタープリターを実装しました。正確にはインタープリターの実行を行う関数が束縛されたTaskが返ってきます。

実行はID版と同様にscalaz.FreeのrunFC関数を用います。

val interpreterTask = new (ConsoleOperation ~> Task) {
    def apply[T](c: ConsoleOperation[T]): Task[T] = {
      Task.delay {
        c match {
          case PrintLine(msg) => println(msg)
          case ReadLine => scala.io.StdIn.readLine()
        }
      }
    }
  }

  def runTask[T](f: Free.FreeC[ConsoleOperation, T]): Task[T] = {
    Free.runFC(f)(interpreterTask)
  }

Operationalモナドを使う

OperationalモナドによるConsoleServiceの使用方法は以下になります。

package sample

import ConsoleService._

object App {
  def main(args: Array[String]) {
    console_id
    console_task
  }

  def console_id {
    val program = for {
      msg <- readLine
      _ <- printLine(msg)
    } yield Unit
    run(program)
  }

  def console_task {
    val program = for {
      msg <- readLine
      _ <- printLine(msg)
    } yield msg
    val task = runTask(program)
    task.run
  }
}
IDモナド

console_idメソッドではConsoleServiceのrunメソッドを使用して、プログラムの実行を行います。IDモナドを使っているため、(処理を行う関数が束縛された)モナドが返されるのではなく処理が直接実行されます。

Taskモナド

console_taskメソッドではConsoleServiceのrunTaskメソッドを使用してプログラムの実行を行います。

runTaskメソッドは処理を行う関数が束縛されたTaskモナドを返すので、さらにrunメソッドでプログラムの実行を行います。

まとめ

Operationalモナドは敷居が高いので今まで使っていなかったのですが、Scalaz 7.1.0で簡単に利用できるようになっているのが分かりました。

プロダクトで使っているライブラリのバージョンを上げるのは勇気が要るところですが、Operationalモナド目的で、そろそろ上げておくのがよさそうです。