2015年4月20日月曜日

[scalaz-stream] ストリーミングで状態機械

Functional Reactive Programming(FRP)の眼目の一つはMonadic Programming(MP)によるストリーミング処理です。

MPでFRPを記述できることで安全なプログラムを簡単に書くことができるようになります。

そこで今回は「Scala的状態機械/FP編」で作成したProcessモナド版CSVパーサーをストリーミング処理に適用してみます。

準備

Scala的状態機械/OOP編で作成した状態遷移を記述した代数的データ型ParseStateをストリーミング用に一部手直しします。

package sample

sealed trait ParseState {
  def event(c: Char): ParseState
  def endEvent(): EndState
}

case object InitState extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = EndState(Nil)
}

case class InputState(
  fields: Seq[String],
  candidate: String
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(fields :+ candidate, "")
    case '\n' => EndState(fields :+ candidate)
    case _ => InputState(fields, candidate :+ c)
  }
  def endEvent() = EndState(fields :+ candidate)
}

case class EndState(
  row: Seq[String]
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = this
}

case class FailureState(
  row: Seq[String],
  message: String
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = sys.error("failure")
}

具体的にはEndStateが完全終了ではなく、次のイベントが発生したら入力受付け状態に復帰するようにしました。

Scala的状態機械/FP編で作成したParserStateMonadは変更ありません。今回はこの中で定義しているモナディック関数であるactionを使用します。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}

ストリーミング版

それではストリーミング版の作成に入ります。

EventProcessor

まずストリーミング処理の動作環境として、scalaz-streamが提供する非同期キュー(scalaz.stream.async.mutable.Queue)を作成します。

package sample

import scalaz.concurrent.Task
import scalaz.stream._

object EventProcessor {
  val q = async.unboundedQueue[Char]

  val eventStream: Process[Task, Char] = q.dequeue
}

EventProcessorは、scalaz.stream.async.unboundedQueue関数で作成したQueueによって、ストリームに対するイベントと、イベントをハンドリングするProcessモナドを接続します。

Queueに対して送信されたイベントは、Queueのdequeueメソッドで取得できるProcessモナドに転送されます。

StreamingParser

ストリーミング処理用のCSVパーサーは以下になります。

package sample

import scala.language.higherKinds
import scalaz.concurrent.Task
import scalaz.stream._

object StreamingParser {
  def createParser[F[_]](source: Process[F, Char]): Process[F, ParseState] = {
    source.pipe(fsm(InitState))
  }

  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }
}

まずパーサーはProcessモナドに組み込んで使用する必要があるので、組込み可能なモナディック関数fsmを用意します。これは、Scala的状態機械/FP編で作成したParseProcessMonadStateMonadのfsm関数と同じものです。

その上で、このfsm関数をpipeコンビネータでProcessモナドに組み込む関数createParserを用意しました。この関数は便利関数という位置付けのものです。

つまりScala的状態機械/FP編で作成した部品はそのままストリーミング処理にも適用できるということになります。

使い方

動作確認のためのプログラムは以下になります。

package sample

import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object StreamingParserSample {
  def main(args: Array[String]) {
    val stream = EventProcessor.eventStream
    build(stream)
    execute()
  }

  def build(stream: Process[Task, Char]) {
    Future {
      val parser = StreamingParser.createParser(stream).map {
        case EndState(row) => report(row)
        case x => ignore(x)
      }.run.run
    }
  }

  def report(row: Seq[String]) {
    println(s"report: $row")
  }

  def ignore(s: ParseState) {
    println(s"ignore: $s")
  }

  def execute() {
    val queue = EventProcessor.q
    val a = "abc,def,ghi\n"
    val b = "jkl,mno,pqr\n"
    a.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
    b.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
  }
}

まずイベントの送信ですが、EventProcessorのeventStreamメソッドで取得したProcessモナドに対してbuild関数でパーサーの組込みを行っています。パーサーはStreamingParserのcreateParser関数で作成しています。さらにmapコンビネータでパース結果のコンソール出力処理を追加しています。

execute関数は、EventProcessorのメソッドで取得した非同期キューに対してenqueueOneメソッドでイベントを送出しています。

非同期キューに対して送出したイベントが、非同期キューと連動したProcessモナドに対して送られます。

この例では、プログラム内に埋め込まれたデータを使用していますが、WebサーバーのHTTPリクエストやAkkaのメッセージの受信処理でこの非同期キューに転送することで、簡単にProcessモナドによるストリーミング処理を行うことができます。

実行

実行結果は以下になります。

ignore: InputState(List(),a)
ignore: InputState(List(),ab)
ignore: InputState(List(),abc)
ignore: InputState(List(abc),)
ignore: InputState(List(abc),d)
ignore: InputState(List(abc),de)
ignore: InputState(List(abc),def)
ignore: InputState(List(abc, def),)
ignore: InputState(List(abc, def),g)
ignore: InputState(List(abc, def),gh)
ignore: InputState(List(abc, def),ghi)
report: List(abc, def, ghi)
ignore: InputState(List(),j)
ignore: InputState(List(),jk)
ignore: InputState(List(),jkl)
ignore: InputState(List(jkl),)
ignore: InputState(List(jkl),m)
ignore: InputState(List(jkl),mn)
ignore: InputState(List(jkl),mno)
ignore: InputState(List(jkl, mno),)
ignore: InputState(List(jkl, mno),p)
ignore: InputState(List(jkl, mno),pq)
ignore: InputState(List(jkl, mno),pqr)
report: List(jkl, mno, pqr)

「ignore: InputState(List(),a)」といった形のパース処理の途中結果が流れた後に、「report: List(abc, def, ghi)」といった形のパース結果が流れてくることが確認できました。アプリケーション側ではパース結果のみをmatch式で拾いだして処理をすることになります。

フロー制御

StreamingParserのfsm関数はパース処理の途中結果もすべてストリームを流れてきます。

これはちょっとクールではないので、簡単なフロー制御を入れて対処することにしましょう。

この目的でStreamingParserを改修してStreamingParserRevisedを作成しました。

package sample

import scala.language.higherKinds
import scalaz.concurrent.Task
import scalaz.stream._

object StreamingParserRevised {
  def createParser[F[_]](source: Process[F, Char]): Process[F, Seq[String]] = {
    source.pipe(fsm(InitState))
  }

  def fsm(state: ParseState): Process1[Char, Seq[String]] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      s match {
        case EndState(row) => Process.emit(row) fby fsm(InitState)
        case FailureState(row, message) => log(message); fsm(InitState)
        case x => fsm(s)
      }
    }
  }

  def log(msg: String) {
    println(s"error: $msg")
  }
}

fsm関数ではaction関数から返されるStateモナドの実行結果によって以下のように処理を切り替えています。

EndStateの場合
パース結果の文字列をストリームに送出し、状態機械は初期状態に戻す
FailureStateの場合
エラーをログに出力し、状態機械は初期状態に戻す
その他
パース途中結果の状態に状態機械を遷移させる

上記の処理を行うことでEndStateの場合のみ、ストリーム上にデータを送出するようになっています。

これは一種のフロー制御ですが、このようなフロー制御が簡単に記述できることが確認できました。

使い方

StreamingParserRevisedの使用方法は以下になります。

package sample

import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object StreamingParserRevisedSample {
  def main(args: Array[String]) {
    val stream = EventProcessor.eventStream
    build(stream)
    execute()
  }

  def build(stream: Process[Task, Char]) {
    Future {
      val parser = StreamingParserRevised.createParser(stream).
        map(row => report(row)).run.run
    }
  }

  def report(row: Seq[String]) {
    println(s"report: $row")
  }

  def execute() {
    val queue = EventProcessor.q
    val a = "abc,def,ghi\n"
    val b = "jkl,mno,pqr\n"
    a.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
    b.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
  }
}

StreamingParserの場合はストリーミングに処理途中、処理結果を問わずParseStateが流れてくるので、このParseStateのハンドリングを行っていました。

StreamingParserRevisedでは、処理結果の文字列のみが流れてくるので、処理結果に対する処理のみを記述する形になっています。

実行

実行結果は以下になります。

report: List(abc, def, ghi)
report: List(jkl, mno, pqr)

まとめ

ストリーミング処理をProcessモナドで記述するメリットは、Processモナド用に作成した部品やアプリケーション・ロジックをそのままストリーミング処理に適用することができる点です。

今回の例でも簡単な改修で適用することができました。

ただしProcessモナド用に作成した部品もストリーミング処理で使用すると効率的でない部分も出てくるので、必要に応じて最適化を行っていくことになります。

ストリーミング処理での最適化ではフロー制御が重要な要因となります。

フロー制御を実現するためには、ストリーミング処理内で状態機械を記述できる必要があります。Processモナドでは、この状態機械の記述が可能なので、フロー制御も簡単に実現できることが確認できました。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

0 件のコメント:

コメントを投稿