2015年4月13日月曜日

[OFAD]Everforthのモデル体系

前回「クラウド・サービスのモデリング」で、クラウド・サービスを開発する際のモデリングの枠組みについて考えました。

クラウド・サービスの開発はSPaaS(Service Platform as-a-Service), OFP(Object-Functional Programming), OFAD(Object-Functional Analysis and Design)の3つの要素から構成されるという枠組みを提示しました。

今回は、この枠組の中のOFADを実現するためにEverforthが採用しているモデル体系についてご紹介します。

モデリングの参照モデル

クラウド・サービスのモデリングを議論するための参照モデルとして以下のものを使用します。



この参照モデルでは、モデルは大きく以下の2系統に分かれます。

  • ドメイン・モデル
  • アプリケーション・モデル

ドメイン・モデルは、利用者やサービス提供者、開発者といったステークホルダー間で共有する事業ドメインのモデルです。最上流では用語集にまとめられたドメイン・オブジェクトが、最終的にはデータベースで管理されたり、センサーなどの外部デバイスとして実現されます。

アプリケーション・モデルは、各ステークホルダーの要件を定義し、ここからサービスとして実現する方法を定義するモデルです。ビジネス・ユースケースからユースケースとして定義した物語を、サービスに落としこみます。

クラウド・サービスの設計と実装

分析と設計のアクティビティによって以下の2つのモデルが作成されます。

  • ドメイン・モデル
  • サービス・モデル

この2つのモデルから、サービス・プラットフォーム上にクラウド・サービスを構築する際の、設計と実装を行う際の流れを以下にまとめました。


スクラッチのシステム開発の場合には、この2つのモデルからシステム全体を開発するわけですが、サービス・プラットフォーム上でクラウドサービスを開発する場合、サービス・プラットフォームが提供するDSLに載せる形で開発することになります。

ドメイン・モデルについては、業界全体の現時点での技術レベル的に80%程度は自動生成することを前提にするのが合理的です。サービス・プラットフォームを使う場合は、サービス・プラットフォームが提供するDSLに合わせた実装を自動生成することになります。

サービス・モデルはOFPを用いて実装することになります。この場合も、サービス・プラットフォームが提供するDSLに合わせた実装になります。

Everforthでのモデリング

「モデリングの全体像」と「クラウド・サービスの設計と実装」の参照モデルについて説明しました。この参照モデルにそった形でEverforthで採用しているモデリングの枠組みは以下になります。




モデルは大きくサマリ・モデルと詳細モデルの2つに分かれます。

サマリ・モデル

サービス毎にサマリ・モデルとして以下のモデルを作成します。

  • マインドマップ・モデル
  • WireFrame(WF)
  • API利用一覧
  • サービス記述
マインドマップ・モデル

マインドマップ・モデルは拙著「マインドマップではじめるモデリング講座」のものをベースにしています。ざっくりいうとマインドマップでビジネス・ユースケースとドメイン・モデルを記述するための手法です。

マインドマップ・モデルで作成したモデルは基本的にOOADのモデルなので、必要に応じて本格的なOOADモデリングに展開可能です。

WireFrame

WireFrame(WF)はWebやiOS/Androidアプリ開発で一般的に使われているものを採用しました。画面設計を中心にしたモデルです。ただし、画面とクラウド・サービスが提供するAPIの関係を定義する拡張を行っています。

API利用一覧

API利用一覧は、WFで定義したものも含めて、WebやiOS/Androidアプリケーションからクラウド・サービスのAPI利用方法一覧です。

サービス記述

サービス記述は、開発するサービスの概要情報を定義したものです。

このサービス・プラットフォームのカスタマイズ情報を定義することがサービス記述の重要な目的の一つになっています。

レギュラー・モデル

またサービスの要件が複雑な場合は、必要に応じてレギュラー・モデルとして以下のモデルを作成します。

  • ユースケース・モデル
  • ドメイン・モデル

ユースケース・モデルは拙著「上流工程UMLモデリング」で定義したユースケース一覧とユースケース詳細の2つの帳票を用いています。いずれもExcelやGoogleスプレッドシートなどの表計算ソフトで作成します。ユースケース一覧の作成が主で、必要に応じてユースケース詳細を作成するバランスで運用しています。

ドメイン・モデルはモデル・コンパイラEverforthModelerのDSLとして記述します。DSLはemacs-org形式のプレインドキュメントです。EverforthModelerはDSLのモデルから、サービス・プラットフォームが定義するエンティティ管理のDSLを自動生成します。

ユースケース・モデルを作成することで、自然にドメイン・モデルが整備されます。このドメイン・モデルの実装はモデル・コンパイラで自動生成してサービスに組込みます。そして、このドメイン・モデルを操作するサービスをユースケース・モデルをベースにAPI仕様としてまとめOFPによる実装につなげていきます。

まとめ

EverforthでApparel Cloudを開発する際に使用しているモデル体系についてご紹介しました。

実装技術としてはScalaによるOFPを使用していますが、要件定義や分析といった上流工程におけるモデリングの重要性は通常のエンタープライズ開発と変わるところはありません。ベースとなる方法論としてはOOADが引き続き有力です。

ただ、(1)クラウド・サービス・プラットフォームを前提とする、(2)実装技術で関数型成分が重要になる、という2つの要因によってモデリングの具体的な手法には少なからず影響が出てきます。

このいった点も含めて、クラウド・サービスのモデリングの方法論について、実践の経験をベースに今後もブログで検討を進めていきたいと思います。

2015年4月6日月曜日

[OFAD] クラウド・サービスのモデリング

ここ数年Apparel Cloudの開発に携わっています。

Apparel Cloudはアパレル向けのクラウド・サービスを実現するためのサービス・プラットフォームで、サーバーサイドの実装はScala+ScalazによるMonadic Programmingを採用しています。

また、サービス企画からクラウド・サービス向けの仕様策定にはオブジェクト指向開発の伝統的なモデル体系をクラウド・サービス向けにチューニングしたものを用いています。

実システムの構築にこれらの新しい技術を適用して一通り材料も揃ってきたので、クラウド・サービス開発の枠組みとその要素技術について整理していこうと思います。

今回はモデリング体系の枠組みの整理です。この枠組みをベースに、個々の要素技術の詳細化を行っていく予定です。

枠組み

大枠では以下のような枠組みを考えています。

  • Service Platform as-a-Service
  • Object-Functional Programming
  • Object-Functional Analysis and Design

まず、クラウド・サービスの開発はスクラッチ開発ではなく、Service Platform上でのカスタマイズや追加機能をプラグインとして開発する形を想定しています。

また、プログラミング・モデルはオブジェクト指向と関数型を併用したObject-Functional Programmingです。並列、並行、分散、ストリーミングといったクラウド時代の要件を満たすためには関数型の導入が必須となるためです。

スクラッチ開発ではなくサービス・プラットフォーム上でのカスタマイズ+プラグイン、オブジェクト指向と関数型を併用したプログラミング・モデルという2つの大きな枠組みの変更は、当然ながら要件定義から分析・設計の一連の技術にも影響を与えます。

Service Platform as-a-Service

クラウド・サービスの開発では、スクラッチでシステムを組むというより、既存のサービスを組み合わせた上で、必要な部分だけ開発するという形が基本です。

このアプローチの核になるのがService Platformです。さらにService Platformをクラウドサービスとして提供したものをService Platform as-a-Service(SPaaS)と呼ぶことにします。

SPaaSが提供するプラットフォームを利用することで、クラウド・サービスの開発と運用をより簡単に実現することができます。

Apparel Cloudは、アパレル業界でのO2O用途向けのSPaaSということになります。

本稿を起点とする一連のブログ記事では、SPaaSの具体的な紹介や利用方法というよりも、SPaaSの存在を前提としたクラウド・サービス開発のモデリング(Object-Functional Analysis and Design)やプログラミング・モデル(Object-Functional Programming)を整理し、可能な範囲で体系化していきたいと考えています。

Object-Functional Programming

Object-Functional Programming(OFP)はObject-Oriented Programming(OOP)とFunctional Programming(FP)を融合させたプログラミング・モデルです。

FPはアルゴリズム記述の容易性や信頼性、保守性の向上がメリットですがOOPと比べると以下の問題点もあり、エンタープライズ分野では限定的な利用にとどまっていました。

  • 実行性能の低下
  • メモリ消費の増大
  • 難易度の高いプログラミング・モデル
  • (エンタープライズ的な意味で実績のある)安定した実行環境
  • 開発エコシステム(開発環境、クラスライブラリ、コミュニティなど)

しかし、クラウド時代に入って以下のような目的により積極的に採用する必要性が出てきています。

  • 並列・並行・分散プログラミング
  • 大規模データ処理
  • ストリーミング処理
  • 問合せ処理
  • DSL(Domain Specific Language)

特に、モナド(monad)という新しい概念をベースとしたMonadic Programming(MP)、MPをベースにしたFunctional Reactive Programming(FRP)が、本質的なプログラミング・モデルの変革をもたらします。

さらに、既存のOOPとの併用・融合も新しいテーマとなってきます。

本ブログでは今までも、これらのテーマについてScala+Scalazによるソリューションについて検討してきました。今後も引き続きこのテーマについて検討を進めていきますが、可能な範囲で今回提示した枠組みであるSPaaS、OFADとの関係についても考えていきたいと思います。

Object-Functional Analysis and Design

サービス企画の要求をまとめて、サービス開発に落とし込むためのメソッドはObject-Oriented Analysis and Design(OOAD)が基本になりますが、Service Platform as-a-Service(SPaaS)とObject-Functional Programming(OFP)という2つの中核的な技術変革により要件定義から分析・設計に至る一連のアクティビティにも大きなインパクトが出てくることが予想されます。

OOADについては、以前大学で教えていた時の内容を以下の2冊にまとめています。

現時点でもこの内容がボクとしての結論で、大きな枠組としては変わっていません。

ざっくりいうとボキャブラリとなるドメイン・モデルと物語であるユースケースを起点としたアプリケーション・モデルの二系統のモデルを軸に業務モデルからObject-Oriented Programming(OOP)による実装までの一連の開発アクティビティを一気通貫でカバーしています。

このOOADに対して、SPaaSとOFPの成分をどのように織り込んでいくのかという点が論点となります。本ブログでは、これらの要素を織り込んでOOADを拡張した方法論をObject-Functional Analysis and Design(OFAD)と呼ぶことにします。

まとめ

材料が揃ってきたので、クラウド・サービス開発の方法論の整備を始めることにしました。今回は議論のベースとなる枠組みを整理しました。

Monadic ProgrammingやFunctional Reactive Programmingを中心とするObject-Functional Programmingは、今まさに技術革新が進行中のホットな技術分野であり、本ブログの中心的なテーマとして取り上げてきました。今後も基本的には変わらないスタンスですが、今回提示した枠組みであるSPaaS、OFADとの関係についても併せて考えていきたいと思います。

Object-Functional Analysis and Designは、既存のOOADがベースなので既存のものと大きな違いはない想定ですが、SPaaS成分、OFP成分が入ることで相応の拡張が必要になりそうです。Apparel Cloudでの実践で得られた経験をもとに体系化を進めていきたいと思います。

2015年3月30日月曜日

Scala的状態機械/FP編

Scalaにおける状態機械の実装戦略について検討しています。

Scala的状態機械/OOP編」で状態+状態遷移を表現するトレイトであるParseStateを作成しました。ParseStateの具象クラスはcase classまたはcase objectとして実現しています。これらのトレイト、case class、case objectがワンセットでFPで使用できる代数的データ構造となっています。

「OOP編」ではこのParseStateを使用して、オブジェクト版の状態機械とアクター版の状態機械を作成しました。代数的データ構造でもあるParseStateがOOP的に問題なく使用できることが確認できました。

「FP編」では、ParseStateの代数的データ構造の性質を活かしてMonadic Programming(以下MP)版の状態機械を考えてみます。

Stateモナド版状態機械

MPで状態を扱いたい場合には、状態を扱うモナドであるStateモナドが有力な選択肢です。

代数的データ型であるParseStateはそのまま利用し、これをStateモナドでくるむことで状態遷移を実現したものが以下のParserStateMonadです。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}
action関数

モナドを使った共通機能を作る場合には、共通機能としていわゆるモナディック関数を提供するのが一つの形になっています。

モナディック関数とは「A→M[B]」(Mはモナド)の形をしている関数です。モナドMのflatMapコンビネータの引数になります。

Stateモナドを使用する場合には、A→State[B]の形の関数を用意することになります。

def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

今回作成したStateモナド用のモナディック関数であるaction関数は「Char→State[ParseState→(ParseState, Char)]」の形をしています。

A→M[B]の形とは以下の対応になります。

  • A : Char
  • M : State
  • B : ParseState→(ParseState, Char)

Stateモナドに設定している関数は「ParseState→(ParseState, Char)」の形をしていますが、action関数全体ではaction関数の引数もパラメタとして利用しているので、結果として「Char→ParseState→(Parse, Char)」の動きをする関数になっています。

action関数が返すStateモナドはParseStateによって表現された状態を、受信したイベントとの組合せ状態遷移する関数が設定されています。

状態+状態遷移を表現するオブジェクト兼代数的データ型であるParseStateがきちんと定義されていれば、Stateモナド用のモナディック関数を定義するのは容易なことが分かります。

parse関数

CharのシーケンスからCSVをパースするparse関数は以下になります。

def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }

parse関数は、Stateモナド用モナディック関数actionと型クラスTraverseのtraverseSコンビネータの組合せで実現しています。

traverseSコンビネータはStateモナド用のtraverseコンビネータです。Scalaの型推論が若干弱いためStateモナド専用のコンビネータを用意していますが、動きは通常のtraverseコンビネータと同じです。

状態遷移のロジックそのものはParseStateオブジェクトにカプセル化したものをaction関数から返されるStateモナド経由で使用します。

traverseSコンビネータとStateモナドを組み合わせると、Traverseで走査対象となるコレクションをイベント列と見立てて、各イベントの発生に対応した状態機械をStateモナドで実現することができます。この最終状態を取得することで、イベント列を消化した最終結果を得ることができます。

OOPオブジェクトであり代数的データ構造でもあるParseStateは、このようにしてStateモナドに包むことで、FP的な状態機械としてもそのまま使用することができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserStateMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserStateMonad.parse(events)
      println(s"ParserStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserStateMonadSpec
ParserStateMonadSpec: List(abc, def, xyz)
[info] ParserStateMonadSpec:
[info] ParserStateMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 400 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 1 s, completed 2015/03/21 17:49:39

scalaz stream版状態機械

Stateモナド版状態機械ではStateモナドと型クラスTraverseのtraverseSコンビネータを使用して、状態機械をMPで実現しました。

この実現方法は、メモリ上に展開したデータに対して一括処理をするのには適していますが、大規模データ処理やストリーム処理への適用は不向きです。

そこで、大規模データ処理やストリーム処理をFunctional Reactive Programming(以下FRP)の枠組みで行うために、scalaz streamを使用して状態機械の実装してみました。

package sample

import scalaz._, Scalaz._
import scalaz.stream._
import scalaz.stream.Process.Process0Syntax

object ParserProcessMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

Processモナドは一種の状態機械なので、この性質を利用してPraseStateによる状態遷移をProcessモナド化します。この処理を行うのがfsm関数です。

fsm関数は状態であるParseStateを引数に、Charを受け取るとParseStateとCharの組合せで計算された新しいParseStateによる状態に遷移しつつ処理結果としてParseStateを返すProcessモナドを返します。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = state.event(c)
      Process.emit(s) fby fsm(s)
    }
  }
parse関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の典型的な使い方です。

def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

parse関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

まず引数のCharシーケンスからソースとなるProcessモナドを生成します。

val source: Process0[Char] = Process.emitAll(events)

ProcessのemitAll関数は引数に指定されたシーケンスによるストリームを表現するProcessモナドを生成します。

ただし、内部的に非同期実行する本格的なシーケンスではなく、通常のシーケンスに対してストリームのインタフェースでアクセスできるという意味合いのProcessモナドになります。(内部的には実行制御に後述のTaskモナドではなく、Idモナドを使用しています。)

パイプラインの処理を記述

pipeコンビネータでfsm関数にInitStateを適用して得られるProcessモナドをパイプライン本体のProcessモナドに設定しています。

val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))

これが処理の本体です。ここではpipeコンビネータを始めとする各種コンビネータを使ってパイプラインを構築します。

パイプラインで加工後のデータを取得

最後にtoListメソッドで、パイプラインの処理結果をListとして取り出し、ここからパース結果を取り出す処理を行っています。

val result = pipeline.toList.last
    result.endEvent.row
parseTask関数

parse関数はscalaz streamをメモリ上の小規模データに適用する際の使用例ですが、より本格的な応用である大規模データ処理やストリーム処理では少し使い方が変わってきます。

そこで、参考のために実行制御にTaskモナドを使ったバージョンのparseTask関数を作りました。

def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process[Task, Char] = Process.emitAll(events).toSource
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }

parse関数と同様にparseTask関数のおおまかな流れは以下になります。

  1. パイプライン(Processモナド)を生成
  2. パイプラインの処理を記述
  3. パイプラインで加工後のデータを取得
パイプラインの生成

ProcessモナドのtoSourceメソッドで、Taskモナドを実行制御に使用するProcessモナドに変換されます。

val source: Process[Task, Char] = Process.emitAll(events).toSource
パイプラインの処理を記述

fsm関数から得られたProcessモナドをpipeコンビネータでパイプラインに設定します。

val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState))

この処理はTask版でないparse関数と全く同じです。

パイプラインで加工後のデータを取得

Taskモナドを実行制御に使用する場合には、for式などを使ってモナディックに実行結果を取得する形になります。

for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonad.parse(events)
      println(s"ParserProcessMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadSpec
ParserProcessMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 301 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:12

scalaz stream + Stateモナド版状態機械

「scalaz stream」ではParseStateオブジェクトを直接使用して状態機械を作成しました。通常はこれで十分ですが、Stateモナド用モナディック関数が用意されている場合は、こちらを使用する方法もあります。

この方法では、Stateモナドの使い方だけ理解していればよいので、プログラミングはより簡単かつ汎用的になります。

package sample

import scalaz._, Scalaz._
import scalaz.stream._

object ParserProcessMonadStateMonad {
  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

  def parse(events: Seq[Char]): Seq[String] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process0[ParseState] = source.pipe(fsm(InitState))
    val result = pipeline.toList.last
    result.endEvent.row
  }

  import scalaz.concurrent.Task

  def parseTask(events: Seq[Char]): Task[Seq[String]] = {
    val source: Process0[Char] = Process.emitAll(events)
    val pipeline: Process[Task, ParseState] = source.pipe(fsm(InitState)).toSource
    for {
      lastoption <- pipeline.runLast
      last = lastoption.get
    } yield last.endEvent.row
  }
}
fsm関数

ParseStateによる状態機械の動作を行うProcessモナドを生成するfsm関数のStateモナド版は以下になります。

def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }

ParseStateのeventメソッドを直接使用する代わりに、ParseStateを包んだStateモナドを返すモナディック関数actionを使用します。

この版のfsm関数ではaction関数から取得したStateモナドのexecメソッドを使用して、現状態とイベント(Char)から新状態を計算し、この状態を保持した、新たなProcessを生成しています。

この方法のメリットはParseStateの使い方(この場合はeventメソッド)は知る必要がなく、汎用的なStateモナドの使い方だけ知っていればよい点です。

つまりaction関数だけ作っておけば、Traverseを使った状態機械、Processモナドを使った状態機械のどちらも簡単に作ることができるわけです。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserProcessMonadStateMonadSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserProcessMonad" should {
    "parse" in {
      val events = "abc,def,xyz".toVector
      val r = ParserProcessMonadStateMonad.parse(events)
      println(s"ParserProcessMonadStateMonadSpec: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserProcessMonadStateMonadSpec
ParserProcessMonadStateMonadSpec: List(abc, def, xyz)
[info] ParserProcessMonadStateMonadSpec:
[info] ParserProcessMonad
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 286 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:49:26

状態機械実装戦略

オブジェクト・モデルで状態機械が出てきた場合の実装戦略としては、まずベースとして:

  • sealed trait + case classで状態+状態遷移のオブジェクト&代数的データ型(以下、状態遷移case class)

を作成します。

前回に見たようにこの状態遷移case classを使って、OOP版の状態機械を簡単に作成することができます。

次に、FP用に以下の部品を整備しておくのがよいでしょう。

  • Stateモナド用モナディック関数
  • Processモナド用状態機械関数

どちらの関数も状態遷移case classが用意されていれば、ほとんど定型的な記述のみで作成することができます。この部品を使うことでFP版の状態機械を簡単に作成することができます。

以上、状態機械実装戦略について整理しました。

この戦略で重要なのは、状態+状態遷移を表現するために作成した状態遷移case classは1つだけ作ればよく、これをOOP流、FP流にマルチに展開していける点です。

この点が確定すれば、オブジェクト・モデルに状態機械が出てきたら安心して1つの状態遷移case classの実装に集中することができます。

まとめ

OFPの状態機械の実装についてOOP的実装、FP的実装について整理してみました。

いずれの場合もオブジェクト&代数的データ型である「状態遷移case class」が基本で、ここからOOP的状態機械、FP的状態機械へマルチに展開できることが確認できました。

OFPではアプリケーション・ロジックは可能な限りFP側に寄せておくのがよいので、FPでも状態機械を簡単に実現できることが確認できたのは収穫でした。

また、FP的なアプローチが使えないケースが出てきても、簡単にOOP的アプローチに切り替え可能なのも安心材料です。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月23日月曜日

Scala的状態機械/OOP編

オブジェクト・モデリングにおける動的モデルは状態機械で記述するのが基本です。つまり状態機械はオブジェクト・モデリングの重要な構成要素の一つということです。

ScalaでObject-Functional Programming(OFP)を行う場合でも、要求・分析・設計の各アクティビティを経て作成されたオブジェクト・モデル内の状態機械をどのように実装していくのかという実装方式が論点になります。

普通のOOP流の実装はすでに議論しつくされていると思いますが、OFPにおけるFPでの実装方式については、これから整備されていくことになると思います。

注意が必要なのはクラウド・アプリケーション開発をターゲットにする場合、伝統的なFPというよりMonadic Programming(以下MP)を経てFunctional Reactive Programming(以下FRP)がゴールになるということです。

このためFRPとして利用可能な状態機械実装を探っておきたいところです。

課題

状態機械の使い所としては、エンティティの状態遷移を記述することで業務ワークフローの構築に用いたり、プロトコルドライバのフロー制御といったものが考えられます。前者はDBに状態を格納することになりエンタープライズ・アプリケーション的なライフサイクルの長い応用ですし、後者は制御系の応用でメモリ内の制御で完結する形のものです。

色々な応用があるわけですが、どの方面でも使える実現方法を念頭におきつつ、ここではCSVのパーサー処理を状態機械で実装してみることにします。FPとの接続部分に興味を集中させるためできるだけ簡単なものを選んでみました。

具体的には以下の課題になります。

  • CSVの1行を「,」区切りのレコードとしてパースし、文字列の列を取得する処理に使用する状態機械

この状態機械は文字列の列の各文字をイベントとして扱い、一連のイベント終了後にパース処理の結果となる「文字列の列」を状態の情報として保持する形になります。

case classで状態機械

状態機械は「状態×イベント→状態」のメカニズムが基本の構成要素です。このメカニズムをcase classで実装します。

いうまでもなくcase classはScalaプログラミングの超重要な構成要素で、OOP的側面とFP的側面を兼ね備えたObject-Functional Programmingの肝となる機構です。

OOP的にはDDDでいうところのvalue objectの実装に適した文法になっています。

FP的には代数的データ型(algebraic data type)として利用することが想定されています。

つまりvalue object兼代数的データ型を実現するための機構がcase classです。

ただしcase classの文法上はvalue objectや代数的データ型に適さない普通のオブジェクトを実装することも可能です。このため、value object兼代数的データ型を実現するための紳士協定を組み込んでおかなければなりません。

この紳士協定は以下の2つです。

  • 不変オブジェクト(immutable object)にする
  • 基底のトレイトや抽象クラスはsealedにする

Value objectと代数的データ型は共に不変オブジェクトである必要があります。

また代数的データ型の要件としてコンパイル時に全インヘリタンス関係が確定している必要があるので、sealedにすることでこれを担保します。

case classはオブジェクト指向の普通のオブジェクトでもあるので、不変オブジェクトの性質さえ守ればオブジェクトのフルスペックを使用することができます。

package sample

sealed trait ParseState {
  def event(c: Char): ParseState
  def endEvent(): EndState
}

case object InitState extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = EndState(Nil)
}

case class InputState(
  fields: Seq[String],
  candidate: String
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(fields :+ candidate, "")
    case '\n' => EndState(fields :+ candidate)
    case _ => InputState(fields, candidate :+ c)
  }
  def endEvent() = EndState(fields :+ candidate)
}

case class EndState(
  row: Seq[String]
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = this
}

case class FailureState(
  row: Seq[String],
  message: String
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = sys.error("failure")
}

状態機械は、「状態×イベント→状態」の情報を記述したマトリックスとして表現できます。このマトリクスを実現するデータ構造と評価戦略は色々考えられますが、ここではOOP的なアプローチで実現しました。

状態

まず状態機械全体を表すトレイトとしてParseStateを定義します。sealedにすることで代数的データ構造の要件の1つを満たします。

ParseStateのサブクラスとして具体的な状態を定義します。情報を持つInitState, InputState, EndState, FailureStateはcase classとして、情報を持たないInitStateはcase objectとして定義しました。

イベント

発生するイベントはメソッドで表現しました。

event
1文字入力イベント発生
endEvent
パース終了イベント発生
状態×イベント→状態

状態とイベントの組から新しい状態を決定するアルゴリズムは「発生するイベント」で定義したメソッドの実装になります。

具体的には各case class, case objectのeventメソッド、endEventメソッドの実装を参照して下さい。

オブジェクト版状態機械

case classによる状態機械の表現はimmutableなので、そのままでは状態機械としては動きません。あくまでも「状態×イベント→状態」のメカニズムを提供するまでになります。

状態機械の状態遷移は状態を遷移させるので本質的にはmutableな処理です。

OOP的には、オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

OOP版の状態機械をParserObjectとして実装しました。

package sample

class ParserObject {
  var state: ParseState = InitState

  def charEvent(c: Char) {
    state = state.event(c)
  }

  def parseEnd(): Seq[String] = {
    val end = state.endEvent()
    state = end
    end.row
  }
}

PaserObjectクラスでは、変更可能なインスタンス変数としてstateを定義しています。

イベントの発生をメソッドで受取り、状態とイベントの組合せで新たな状態を計算し、これをstateに設定することで状態遷移を実現しています。

新しい状態の計算ロジックは、ParseStateオブジェクトにカプセル化しています。

代数的データ構造で表現した状態機械のimmutableなオブジェクトを、インスタンス変数で管理することで、OOP的な状態機械が実現できました。

使い方

プログラムを実行するためのSpecは以下になります。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class ParserObjectSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParserObject" should {
    "parse" in {
      val parser = new ParserObject()
      val text = "abc,def,xyz"
      for (c <- text) {
        parser.charEvent(c) // 一文字づつパーサーに送信
      }
      val r = parser.parseEnd() // 解析終了のイベントを送信
      println(s"ParserObject: $r")
      r should be (Vector("abc", "def", "xyz"))
    }
  }
}
実行

実行結果は以下になります。

$ sbt test-only sample.ParserObjectSpec
ParserObject: List(abc, def, xyz)
[info] ParserObjectSpec:
[info] ParserObject
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 180 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 0 s, completed 2015/03/21 17:47:46

アクター版状態機械

FRP的な応用を考える場合、並行・並列処理で枠組みの中で状態機械を実現していく必要があります。

この実現方式として有力なのがアクターです。

ScalaではAkkaというアクター基盤を使用することができます。

このアクターは(理論的には色々あるでしょうが)状態を持ったactive objectを実現する方式になっています。

active objectであるアクター間はメッセージボックスでキューイングされたメッセージ通信で協調動作するので、アプリケーションレベルで特別な排他制御を行わなくても並行・並列処理を簡潔に記述することができます。

アクター版のCSVパーサーであるParserActorは以下になります。

package sample

import akka.actor._

case class ParseCharEvent(c: Char)
case object ParseEnd
case class ParseResult(result: Seq[String])

class ParserActor extends Actor {
  var state: ParseState = InitState

  def receive = {
    case ParseCharEvent(c) =>
      state = state.event(c)
    case ParseEnd =>
      val end = state.endEvent()
      state = end
      sender ! ParseResult(end.row)
  }
}

基本的に行っているのは(passive objectである)ParserObjectと同じです。オブジェクトのインスタンス変数でmutableな状態を保持する実現方法になります。

ParserObjectとの違いは、メソッド呼び出しではなくアクター間でのメッセージ通信によって処理が実行されるため、以下のようなメッセージ通信のためのコードを用意しないといけない点です。

  • アクター間で送受信されるメッセージの定義
  • メッセージを受け取り処理を行うイベントハンドラ

ParserObjectではメソッドとして簡単に定義できる処理ですが、これをメッセージ処理ように仕立て直さないといけないわけです。

アクターはとても簡単に利用できるのですが、少しボイラープレイトのコードを書く必要があります。

使い方
package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class ParserActorSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "ParseActor" should {
    "parse" in {
      implicit val timeout = Timeout(5.seconds)
      val system = ActorSystem("state")
      implicit val context = system.dispatcher
      val actor = system.actorOf(Props[ParserActor])

      val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }
      system.shutdown()
    }
  }
}

アプリケーションロジックは以下の部分です。

val text = "abc,def,xyz"
      for (c <- text) {
        actor ! ParseCharEvent(c)
      }
      for (r <- actor ? ParseEnd) yield {
        r match {
          case ParseResult(r) =>
            println(s"ParserActorSpec: $r")
            r should be (Vector("abc", "def", "xyz"))
        }
      }

アクターを使うために以下のような準備が必要になります。

// タイムアウト値の暗黙知を定義
      implicit val timeout = Timeout(5.seconds)
      // アクターの実行基盤を作成
      val system = ActorSystem("state")
      // スレッドの実行文脈を定義
      implicit val context = system.dispatcher
      // アクターの生成
      val actor = system.actorOf(Props[ParserActor])
      ....
      // アクターの実行基盤をシャットダウン
      system.shutdown()

アクターはJava流のスレッドによる並行・並列プログラミングと比較するとはるかに楽でバグも出にくいアプローチなのですが、それでも少し込み入ったボイラープレイトが必要になります。

実行

実行結果は以下になります。

$ sbt test-only sample.ParserActorSpec
[info] Compiling 1 Scala source to /Users/asami/src/workspace2015/0317.blog.statemachine/target/scala-2.11/test-classes...
ParserActorSpec: List(abc, def, xyz)
[info] ParserActorSpec:
[info] ParseActor
[info] - should parse
[info] ScalaTest
[info] 36mRun completed in 408 milliseconds.0m
[info] 36mTotal number of tests run: 10m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 1, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed 2015/03/21 17:48:09

まとめ

今回は状態機械のベースとなるcase class群(ParseState)を作成した後、通常のOOPのアプローチとして普通のオブジェクト版(passive object)のパーサーとアクター版(active object)のパーサーを作成しました。

ParseStateはOOPのvalue objectであると同時にFPの代数的データ型でもあります。このParseStateを普通のオブジェクト、アクターの両方で普通に利用して状態機械を作成できることが確認できました。

次回は今回のParseStateをベースにMonadic Programming, Functional Reactive Programmingにおける状態機械の実現方法について考えます。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4

2015年3月9日月曜日

[scalaz] Tryモナド問題

Scalazでmonadic programmingする上で困っているのがTryモナドの問題です。

scala.util.TryはScalaで例外処理をmonadicに処理するためにScala 2.10で導入されたScalaの基本機能です。Scalaのモナドとしての要件(flatMapメソッドが定義されている等)は満たしているのでfor式で使用することができます。

しかし、ScalazのMonadではないのでScalazの提供する各種機能の恩恵を得ることができません。このことがScalazを軸としたmonadic programmingの阻害要因になっています。

例外をハンドリングするためにはTryを使うのが自然ですが、Tryを使うとScalazのmonadic programmingがやりづらくなる、という構図です。

問題

TryがScalaz Monadとなっていない理由は、Tryがモナド則の1つであるleft identityを満たしていないとされているからのようです。

この理由は:

の記事の解説によると:

def foo[A, B](a: A): Try[B] = throw new Exception("oops")

foo(1) // exception is thrown

Try(1).flatMap(foo) // scala.util.Failure

ということなので:

  • foo関数が例外を返す場合、TryのflatMapコンビネータにfoo関数を適用した時にflatMapコンビネータが例外を返さないとMonad則のleft identity law違反

ということだと思います。

さて、それではTryのflatMapコンビネータで例外を返すようにすればよいかというと、これはOOP的には困ってしまう仕様です。

OOP的にはTryは例外を包んで外に出さないことを期待したいところです。というのは、flatMapコンビネータが例外を返す可能性があるとすると、Tryを使っているにもかからわずTryも例外を出す可能性があるということなので、Tryの外側をTryで包まなければならなくなります。

これはプログラミング的にも煩雑ですし、Tryをネストさせなければならない条件をプログラマが意識しないといけないのでバグの出やすいインタフェースです。

こういう事情もありScalaの基本ライブラリは、あえて現在の仕様を選択しているようです。

解決策

本稿の趣旨ですが、Try Monad問題を解釈変更で解決するという試みです。

left identityの解釈

Scalaのようなハイブリッドな関数型言語で純粋関数型の計算を行う場合、いくつかの紳士協定を前提とします。

代表的なものとしては以下のようなものがあります。

  • var変数は用いない
  • mutableなコレクションは用いない
  • eqメソッドは用いない

つまりScalaにおける純粋関数型の演算は紳士協定が前提なので、Tryのleft identity問題も紳士協定で解決すればよいのではないかということです。

具体的には返却値にTryを返す:

f(a: A): Try[B]

というシグネチャの関数では例外をスローしない、という紳士協定を導入するというアプローチはどうでしょうか。もちろん絶対ということはないので、例外をスローしたらプログラムが致命的状態(e.g. バグ発生、メモリ不足)に入ったとして扱うという形になります。

プログラミング的には以下をコーディングの基本形にするということなので、特に煩雑な点はありません。

f(a: A): Try[B] = Try {
  ...
}

この紳士協定上ではTryをScalaz Monad化しても問題ないように思います。

実行タイミングの解釈

FPでは参照透過性が重要な要件になっていて、関数を評価した時の内部処理の実行タイミングや実行順序は結果に影響を与えないことになっています。

TryのようなFunctor系のコンテナの場合、コンテナの内部情報を取り出すメソッドの実行前の任意の時点で評価が行われていればよいわけです。

TryのflatMapコンビネータでも、同様のことが言えるはずです。つまりflatMapコンビネータ内で必ずしもMonadのbind演算をする必要はなく、もっと後のタイミングでしても大丈夫ではないかということです。と考えると、flatMapコンビネータが例外をスローすることは必須ではなく、このことがMonad則違反というのは必ずしも真とは限らないというわけです。

整理すると、以下になります。

  • Tryの内部実行がコンビネータの呼び出しと同時に行われていなければならないという計算モデルではMonad則違反
  • Tryの内部実行がコンテナの内部を取り出すメソッドの実行前の任意の時点で評価が行われていればよいという計算モデルではMonad則OK

Tryの計算モデルを後者であると解釈すると、Monad則が成り立っていると考えてよいのではないかと思います。

実際にScalazのFuture MonadやTask Monadも同様の問題があるはずですが、いずれもScalaz Monadとして定義されています。

Futureの場合は、関数型的な意味での遅延評価ではなくて、別スレッドで非同期実行されるためflatMapで例外を返すようにはできないのだと推測されます。

Taskの場合は、(通常は)runメソッドで実行を指示されるまで実行は遅延されます。

いずれの場合もmapコンビネータやflatMapコンビネータが例外を返すことはありません。そして例外は実行結果を取り出すときに、必要に応じてスローされるようになっています。

FP的なMonadの定義としてこれが許されるならTryについてもgetメソッドで例外のスルーは行われるわけですから、flatMapコンビネータで例外を返さなくてもよいと考えることは十分可能と思います。

まとめ

考察の結果left identityの解釈、実行タイミングの解釈のどちらか一つでもOKであればScala TryをScalaz Monadとして定義しても問題ないのではないかという結論に落ち着きました。

個人的にはいずれの解釈もOKと思えるので、プロダクションのコードでTry Monadを使用することにしました。

Try Monadの実装としてはscalaz-outlawsを使うのが一案ですが、現状ではdeprecatedの警告が出るので諦めて、自前のライブラリ内で定義して使うことにしました。

TryをScalaz Monad化できると、さらに広い範囲でmonadic programmingを適用できるようになります。Try Monadの存在を軸に例外処理戦略を練りなおしてみたいと思います。

おまけ

Scalazによるmonadic programmingでTryを使う場合、TryがApplicativeであるだけでも、色々と応用範囲が広がります。(e.g. Traverse)

TryはApplicativeの要件は満たしていると思うので、Try Monadが不安な場合は(Validationと同じように)TryをApplicativeとして定義して使用するのも有用だと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0

2015年3月2日月曜日

[scalaz-stream] シーケンス番号とエンドマーク

プロダクトの開発中にわりと難しい処理がscalaz-streamでうまくさばけたのでメモ。

近い将来Buzzwordになってきそうな「Functional Reactive Programming」ですが、monadicなstreaming系のアプローチが筋がよさそうということで、scalaz-streamを実プロダクト開発に導入して試行錯誤しているところです。大規模データ系のいろいろな処理に適用していますが、かなりいい感触を得ています。

そのような応用の一つである大規模メール配信処理で出てきた要件が以下のものです。

  • 大規模なデータ列を一つの固まりとしてストリーム上に流したい
  • 事前にデータ列の総量は分からない
  • データ規模が大きいので復数パケットに分割する必要がある
  • 性能向上のため、可能な範囲で1パケットに復数データ列を格納したい

ネットワーク系のプログラムではわりとよく出てくる処理だと思います。

やろうとしていることはそれほど難しくないのですが、プログラムを組んでみると思いの外、複雑なものになってしまうといった系統の処理です。

こういった処理をいかに簡単に書くことができるようにするのかというのがプログラミング・モデルの重要なテーマの一つです。OOPであれば専用フレームワーク的なものを用意する感じですが、FPの場合はComposableな関数のCompositionでさばいてみたいところです。

サンプルロジック

上記の要件をscalaz-streamで実現することができるかを検証するためにサンプルプログラムをつくってみました。

以下の要件を実装しています。

  • 事前にすべてデータが読み込まれていることは前提としない
  • 3データごとに1つのパケットに集約
  • シーケンス番号をつける
  • 最終パケットにエンドマークをつける

scalaz-streamのProcessモナドを使用しています。

package sample

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._

object Main {
  case class Packet(seqno: Int, end: Boolean, content: String)

  val source: Process[Task, Int] = {
    Process.range(0, 10)
  }

  def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }

  def main(args: Array[String]) {
    execute(source, sink)
  }

  def execute(source: Process[Task, Int], sink: Sink[Task, Packet]) {
    import process1._
    val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)
    val task = pipeline.run
    task.run
  }

  def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }
}
Source

ストリームの起点となるProcessモナドをSourceと呼びます。

例題なのでProcess#range関数で0から10の数値のストリームを生成しています。

val source: Process[Task, Int] = {
    Process.range(0, 10)
  }
Sink

ストリームの終端となるProcessモナドをSinkと呼びます。

例題なのでscalaz.stream.io#channel関数を使って、コンソール出力するSinkを定義しています。

def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }
ストリームの構築

SourceとSinkはサンプル用のものなので、ここからが本題です。

ストリームの構築は、以下のようにProcessモナドが提供するコンビネータをつないでいく形になります。

FunctorやMonadが定義している基本コンビネータであるmapやflatMap以外にProcessモナドが提供するpipeといったコンビネータを多用することになるのが特徴です。

val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)

ストリームを処理するパイプラインはSourceから始まってtoコンビネータで指定するSinkが終端になります。途中、chunk、pipe、mapの各メソッド/コンビネータがパイプラインを構成しています。

chunk

ストリーム内を流れるデータを1つのチャンクにまとめる処理はchunkメソッドを使います。チャンクにまとめる個数の3を引数に指定します。

ストリームにはsourceからデータであるIntが流れてきますが、これがSeq[Int]に変換されます。

pipe

pipeコンビネータはProcessモナドをストリーム処理のパイプライン部品として埋め込みます。

ここではscalaz.stream.process1にある以下の関数を使って生成したProcessモナドをパイプラインに埋め込んでいます。

  • zipWithNext
  • zipWithIndex

zipWithNext関数が返すProcessモナドは、処理中のデータに加えてその次のデータをOptionとして渡してきます。処理中のデータが最終データの場合は、「その次のデータ」はNoneになります。これを使用すると、次のデータの有無、次のデータの内容によって処理を変更することができるわけです。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Seq[Int], Option[Seq[Int]]]に変換されます。

zipWithIndex関数が返すProcessモナドは、処理中のデータにインデックスをつけます。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]変換されます。

map

mapコンビネータで以下のtoPacket関数をパイプラインに組み込んでいます。

def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }

ここまでの処理でストリームからTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]が流れてきますが、この段でPacketに変換されます。

toPacket関数の処理は簡単でストリームから流れてきた情報を元に:

インデックス
ストリーム上の情報は0起点なので1起点に変換
エンドマーク
Nextがない場合は最終パケットなのでtrue

の情報をPacketに設定しています。

サンプルコード内で自前のアルゴリズムらしい物を書いているはここだけです。

ストリームの実行

ストリームの実行はrunメソッドで行います。

今回はTaskモナドを使う指定をしているので、ProcessモナドのrunメソッドではTaskモナドが返ってきます。

Taskモナドのrunメソッドを実行するとストリームが動作します。

val task = pipeline.run
    task.run

実行

実行とすると以下の結果が出力されました。

10個のデータ列が4つのパケット列として出力され、最後のパケットにはエンドマークがついています。

Packet(1,false,0-1-2)
Packet(2,false,3-4-5)
Packet(3,false,6-7-8)
Packet(4,true,9)

受信側の処理

このパケット列を受け取った側の処理としては:

  • エンドマークがついているパケットまでパケットを読み込む
  • シーケンス番号を監視して、欠損があれば再送を依頼する

というような処理になるかと思います。

こういった処理をscalaz-streamのProcessモナドで実装可能か、というのも面白そうなテーマなので機会があれば試してみたいと思います。

まとめ

大規模データのパケット分割処理ですが、自前のロジックはtoPacket関数のものぐらいで、後はscalaz-streamの用意する部品を組み合わせるだけで構築できました。

パケット分割や復元は自分でロジックを組むとそれなりにバグが出る所なので、既存部品を組み合わせるだけで、ほとんどの処理が組み上がって、最後の仕上げだけ自前のロジックを差し込むことができるのはほんとうに楽です。

また、OOP的なフレームワークだと:

  • パイプラインの構成をXMLなどの外部DSLで定義
  • 自前ロジックをパイプラインに組み込むためのボイラープレイト作成

といった作業が必要になるので、それなりに大変です。

それに対して、scalaz-streamの方式はmonadic programmingの作法に則っていれば通常の関数型プログラミングでOKなのが非常に使いやすいところです。具体的にはOOP的なフレームワークアプローチに対して以下のメリットがあります。

  • パイプラインを外部DSLで構築すると、型安全でなくなる、デバッグがしづらいといった問題も出るが、いずれの問題もなくなる。
  • 自前ロジックを通常の関数で記述すればよく、特別なボイラープレイトのコードは必要ない。
いいことずくめのようですが:

  • monadic programmingの習得
  • processモナドの理解

といった難点があるので、これはこれで一定のハードルがあります。

こういったハードルをcoding idiomやdesin patternといった技法でクリアすることができれば、大規模データ処理にはなかなか有力なアプローチだと思います。

注意

本稿の「大規模」は処理対象としてメモリに一度に乗らない規模のデータ量を指していて、最大1GB程度のものを想定しています。

これを超えてくるデータの処理はHadoopやSpark的な並列分散処理が必要になってくるので、本稿のスコープ外です。このような並列分散処理もSparkのRDDといったものを使ったmonadic programmingが有力と思うので、いずれ取り上げてみたいと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a

2015年2月16日月曜日

Scalaz 7.1の非互換項目

プロダクトで使っているScalazをscalaz 7.0.6からScalaz 7.1に上げたわけですが(Operational Monad)、その際に判明したScalaz 7.1の非互換というか仕様変更についてのまとめです。

非互換対応している時に気付いたものが中心なので、仕様変更項目の網羅にはなっていません。(公式仕様変更項目)

ここに出ている項目以外は、スムーズに対応できました。(他の非互換項目にも遭遇していると思いますが、記憶に残らないレベル)

Validation

Scalaz 7.1からValidationがMonadではなくなってしまいました。deprecatedのコメント「flatMap does not accumulate errors, use `scalaz.\/` or `import scalaz.Validation.FlatMap._` instead」からすると、エラーをMonoidとして蓄積する処理がモナド則を破る、ということが判明したのではないかと思われます。

このことでエラー処理戦略のバランスが少し変わってしまいました。

このことを加味した上でのエラー処理戦略に関する最新の状況は以下がよくまとまっていると思います。(この記事は2014年2月のものなので(Scalaz 7.1のリリースは2014年8月)、リリース版に対応した記事ではないと思いますが、ValidationがMonadでないことが前提になっているので、7.1の開発バージョンで仕様を先取りした記事と思われます。)

ValidationをMonadとして使っていたコードに対する当面の回避策としては:

import scalaz.Validation.FlatMap._

として互換機能を有効にすることですが、本格対応としては

"10".parseInt.disjunction

といった感じで「\/」(disjunction)モナドに変換するのがひとつの形です。

Validationは引き続きApplicativeとしては有効なので:

("10".parseInt.toValidationNel |@| "20".parseInt.toValidationNel)(_ + _)

といった形を基本線に、for式で使いたい場合はdisjunctionメソッドで\/モナドに変換して使う戦略となりそうです。

for {
  a <- "10".parseInt.disjunction
  b <- "20".parseInt.disjunction
} yield a + b

ただ、Validationの価値はかなり下がったと思うので、エラー処理戦略は\/モナドを軸に考えて、エラーの積算が必要な処理のみValidationで補完というバランスがよさそうに思います。

具体的には以下のようなイメージです。

for {
  a <- ("10".parseInt.toValidationNel |@| "20".parseInt.toValidationNel)(_ + _).disjunction
+ _).disjunction // エラーの積算が必要な処理をValidation + Applicativeで記述し、Disjunctionに変換
  b <- "30".parseInt.disjunction // for式内は\/モナドで揃える
} yield a + b

Tagged type

以下のページにある通り「Scalaz 7.1 以降は明示的にタグを unwrap しなければいけなくなった」ため、かなり使いにくくなってしまいました。

元々Scala言語仕様のぎりぎりの所を使っている機能だと思うので、将来にわたっての安全性(Scalaのバージョンが上がると動かなくなる等)と利便性のバランスを考えるとプロダクションコードでは使いにくくなりました。

OptionやBooleanの振舞いをカスタマイズするのに便利だったのでちょっと残念。

PromiseとFuture

Scalaz 7.1では、Promiseモナドが非推奨になっていて、代わり(?)にFutureモナドが追加されているようです。Futureモナドは、Scalaの基本機能であるscala.concurrent.FutureをScalazのMonad化したものです。

Operational Monad」では「scala.concurrent.FutureはScalaz 7.1.0的にはモナドとして定義されていない」と書きましたが、間違いだったので訂正します。

Scala Tips / Scala 2.10味見(4) - Future(2)」で書いた通り、ほとんど同じ機能であるPromiseとFutureが(Scalaz上の扱いが異なる状態(PromiseはMonad、Futureは非Monad)で)並立している状況が懸念点だったので、この点が払拭されたのは喜ばしいと思います。

今後は安心してFutureを軸に並列プログラミングの戦略を考えていくことができます。

また、並列プログラミングの部品となるモナドとしてはscalaz.concurrent.Taskは引き続き有効です。(注: scalaz.concurrent.Taskが内部的に使用している部品にscalaz.concurrent.Futureがありますが、scala.concurrent.Futureと紛らわしいこともありここでは触れないことにします。本稿ではscala.concurrent.FutureをFutureと呼びます。)

FutureとTaskはざっくりいうと以下のような性質の違いがあると理解しています。

Future
スレッドを積極的に生成して処理を行う
Task
スレッドはできるだけ生成しない戦略

I/Oバウンドの処理はFutureが有効です。

一方CPUバウンドの処理に関しては、100コア級のメニーコア時代になればFutureを使うのがよさそうですが、せいぜい4コア級、AWSのローエンドは1コアの状況では、Futureで細粒度の並列プログラミングをするのは時期尚早の感じです。そういう意味で、当面はTaskもFutureと同様の重み付けの選択肢と考えています。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0