2016年12月9日金曜日

ReaderWriterStateモナドと畳込み

ReaderWriterStateモナドは「Patterns in Types - A look at Reader, Writer and State in Scala」を見てからずっと気になっていたのですが、実務のプログラミングでも汎用的な基盤として使えるのではないかとふたたび自分の中でブームになってきたので、少し試してみました。

例題

レコードを正常なものと異常なものに選別する処理を考えます。異常なレコードは異常と判断した理由つきで記録します。

データ連携処理ではよく出てくる処理だと思います。

この処理を以下の関数として実装することにします。

def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord])

この関数を以下のバリエーションで実装していきます。

Foldable
通常の畳込み
Monoid
モノイドによる畳込み
State
Stateモナドによる畳込み
Traverse&State
TraverseとStateモナドによる畳込み
Reducer
Reducerを使った畳込み
ReaderWriterState
ReaderWriterStateモナドを使った畳込み

直接の目的はボクが常用しているFoldableによる畳込みとReaderWriterStateモナドによる畳込みを比較することです。

同時に色々なアプローチの比較も行い、それぞれのアプローチの使い所を探っていきます。

準備

まずプログラムが扱うドメインのオブジェクトを定義します。

object Domain {
  type Record = Map[String, Any]
  type Errors = Vector[ErrorRecord]
  type Reason = String
  case class ErrorRecord(record: Record, reason: Reason)
  case class RecordsState(totalCount: Int = 0, errorCount: Int = 0) {
    def success = copy(totalCount = totalCount + 1)
    def error = RecordsState(totalCount + 1, errorCount + 1)
  }
  trait ExecutionContext {
    def verify(r: Record): Option[Reason]
  }
  class DefaultExecutionContext() extends ExecutionContext {
    def verify(r: Record): Option[Reason] =
      if (r.get("id").isEmpty)
        Some("No id")
      else
        None
  }
}

以下の型、case class、クラスを定義しました。

Record
レコード
ErrorRecord
エラーとなったレコードと理由
Reason
エラー理由
Errors
ErrorRecordの集まり
RecordState
処理状況を記録
ExecutionContext
実行コンテキストのインタフェース
ExecutionContextImpl
実行コンテキストの実装

単にエラーコードを選り分けるだけでなく、以下の機能を実現できるようにしています。

  • 実行状況をRecordStateに記録して取得可能にしている。
  • エラーの判定のロジックをExecutionContextとして指定可能にしている。

Foldable

FP(Functional Programming)で一般的な畳込み処理です。

object FoldLeft {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) =
    xs.foldLeft(Z(context))(_+_).result
}

VectorのfoldLeft関数を使って畳込み処理を行います。

case classを使う手法はボクが個人的に使っているもので一般的ではないと思いますが、特に難しくはないと思います。(「foldの小技」)

FoldLeftのfold関数はExecutionContextを暗黙パラメタとして受取りcase class Zのパラメタとして渡しています。case class ZはこのExecutionContextを使用してロジックを実行します。ロジックの可変部分をExecutionContextに分離することでfold関数の処理をチューニング可能な構造になっています。

Monoid

次はMonoidを使った畳込みを考えてみます。

object FoldableMonoid {
  import Domain._
  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    val context: ExecutionContext = new ExecutionContextImpl()
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def +(rhs: Z): Z = copy(
      records = records ++ rhs.records,
      errors = errors ++ rhs.errors
    )
  }

  object Z {
    def empty = Z()
    def point(rec: Record) = empty + rec
  }

  implicit object ZMonoid extends Monoid[Z] {
    def zero = Z.empty
    def append(lhs: Z, rhs: => Z) = lhs + rhs
  }

  def fold(xs: Vector[Record]): (Vector[Record], Vector[ErrorRecord]) = {
    xs.foldMap(Z.point).result
  }
}

モノイドの場合、実行コンテキストの意図のExecutionContextを外部から与えることは筋悪そうなので固定のものを使うことにしています。

評価

コーディング的には、Monoid計算に適合するように各種関数を用意したり、型クラスMoonoidの型クラスインスタンを定義したりという手間がかかります。

何回も使用するロジックの場合はよいですが、その場限りのロジックの場合はコーディングのオーバーヘッドの方が大きくなるのでFoldable方式の方がよさそうです。

またExecutionContextを外付けで与えることができないのはかなり大きな問題です。

Monoidについては、すでにMonoidがある場合はそれを利用するのがよいですが、畳込みのために、わざわざMonoidのメカニズムを積極的に使うというほどではないようです。

State

次はStateモナドを使った畳み込みです。

object StateWithTraverse {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    val ts = xs.traverseS(x => State[Z, Unit] {
      case s => (s + x, ())
    })
    ts.exec(Z(context)).result
  }
}

scalazの型クラスTraverseにはStateモナドを使って走査する機能があります。Stateモナドで畳み込み動作をするようにしておけば、Traverseでの走査の過程で畳み込みを行うことができます。

ここではFoldableで使用したcase class Zと同じものをStateモナドでの状態として使用する実装を行っています。

実行コンテキストであるExecutionContextは状態の一部として受渡しています。

評価

Stateモナドの使い方に慣れていれば、Foldableとほぼ同じような手間でプログラミングすることができます。

ただ、Stateモナド実行のオーバヘッドなどを考えるとFoldableで間に合っているものをわざわざStateモナド化する必然性はなさそうです。

再利用可能なStateモナド部品を作った時に、このロジックで畳み込みに利用することも可能という選択肢として考えておくとよいと思います。

Stateモナドは畳込みの汎用ロジック向けではなく、以下の記事にまとめたように状態遷移/状態機械を作る時のキーパーツとして考えていくのがよさそうに思いました。

Traverse&State

Stateモナドは、1つの処理ごとに型パラメータAで示す処理結果を出力する機能があり、for式などで組み合わせる時にパラメタとして受け渡しすることで、全体として複雑な処理を記述できる機能を持っているのですが、traverseSによる畳込みの場合はここの部分で、走査結果を蓄積する形になります。

このためTraverseとStateを組み合わせる場合、Traverseの機能を活用してStateの実行結果をTraverse側に蓄積させることができるので、その点を活かした実装に改良してみました。

object TraverseState {
  import Domain._
  case class Z(
    context: ExecutionContext,
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def apply(rhs: Record): (Z, Option[Record]) = context.verify(rhs) match {
      case Some(reason) => (copy(errors = errors :+ ErrorRecord(rhs, reason)), None)
      case None => (copy(records = records :+ rhs), Some(rhs))
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    val ts = xs.traverseS(x => State[Z, Option[Record]] {
      case s => s(x)
    })
    val (s, records) = ts.run(Z(context))
    (records.flatten, s.errors)
  }
}
評価

前節「State」は正常レコードもcase class Z経由で取得することを前提にTraverseの主ルートには「()」を渡していて、事実上封印していました。

ここでは、正常レコードをTraverseの主ルートで受け渡すことができるようにcase class Zにapplyメソッドを追加しました。

case class Zが再利用可能な汎用ロジックを実装できるのであれば、ひと手間かけてapplyメソッドを追加しておくことで、利用範囲が広がります。

ただ、foldLeftよりはやや手間がかかるのは「State」と同じなので、一度限りのロジックに対して普段使いで適用する感じではなさそうです。

Reducer

ちょっと脱線してReducerを使った畳込みを考えてみました。

Reducerは畳み込み処理の中のデータを足し込む処理を汎用化したものです。畳込み対象がMonoidでなく、畳込み結果がMonoidである場合に使用できます。畳み込み処理の走査処理を汎用化(左畳み込み、右畳み込みの最適選択)したGeneratorと組み合わせて使用するのが基本的な使い方のようです。

object Reducer {
  import Domain._
  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    val context: ExecutionContext = new ExecutionContextImpl()
    def result = (records, errors)

    def +(rhs: Record): Z = context.verify(rhs) match {
      case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
      case None => copy(records = records :+ rhs)
    }

    def +(rhs: Z): Z = copy(
      records = records ++ rhs.records,
      errors = errors ++ rhs.errors
    )
  }

  object Z {
    def empty = Z()
    def point(rec: Record) = empty + rec
  }

  implicit object ZMonoid extends Monoid[Z] {
    def zero = Z()
    def append(lhs: Z, rhs: => Z) = lhs + rhs
  }

  def fold(xs: Vector[Record]): (Vector[Record], Vector[ErrorRecord]) = {
    val reducer = UnitReducer((x: Record) => Z.point(x))
    val G = Generator.FoldlGenerator[Vector]
    G.reduce(reducer, xs).result
  }
}

ReducerはMonoid以外の畳込み対象を一度Monoidに変換してから畳み込むというロジックなので、畳込みがMonoidの機能範囲に限定されます。

今回のケースではExecutionContextを外付けにするのが難しいので、Monoidであるcase class Zが内部で固定で持っています。

評価

ReducerはMonoid以外の要素の列をMonoidに畳み込む時のアダプタ的な機能と考えると分かりやすいと思います。ただ、このための機能としてはFoldableのfoldMapコンビネータという非常に汎用的な機能があるので、Reducerをわざわざ使うシーンはあまりなさそうです。

また、色々と糊コードを書かないといけないのとMonoidの制約が入ってくるので、汎用の畳込み機能として使うのはお得ではなさそうということも確認できました。

ReducerはscalazのreduceUnordered関数で並列実行したTaskの結果の順不同畳込みに使用されています。こういった、特別な用途向けの機能と考えておくとよさそうです。

ReaderWriterState

それでは本命のReaderWriterStateモナドを使ってみます。

object ReaderWriterStateFold {
  import scala.language.higherKinds
  import Domain._

  def run[C[_]: Foldable, X, R, W: Monoid, S, A: Monoid](xs: C[X], rws: X => ReaderWriterState[R, W, S, A], r: R, s: S): (W, A, S) = {
    case class RWSZ(
      writer: W = Monoid[W].zero,
      outcome: A = Monoid[A].zero,
      state: S = s
    ) {
      def result = (writer, outcome, state)
      def apply(r: R, x: X) = {
        val (rw, ra, rs) = rws(x).run(r, state)
        RWSZ(rw, ra, rs)
      }
    }
    xs.foldLeft(RWSZ())(_.apply(r, _)).result
  }

  case class Z(
    records: Vector[Record] = Vector.empty,
    errors: Vector[ErrorRecord] = Vector.empty
  ) {
    def result = (records, errors)

    def apply(context: ExecutionContext, rhs: Record) = {
      val z = context.verify(rhs) match {
        case Some(reason) => copy(errors = errors :+ ErrorRecord(rhs, reason))
        case None => copy(records = records :+ rhs)
      }
      (z.errors, z.records, z)
    }
  }

  def fold(
    xs: Vector[Record]
  )(implicit context: ExecutionContext): (Vector[Record], Vector[ErrorRecord]) = {
    def rws(a: Record) = ReaderWriterState[ExecutionContext, Vector[ErrorRecord], Z, Vector[Record]] {
      case (r, s) => s.apply(r, a)
    }
    val (errors, records, z) = run(xs, rws, context, Z())
    (records, errors)
  }
}

run関数は汎用関数なので、今回の用途向けに作成した部分はcase class Zとfold関数だけなのでそれほど大きくはありません。run関数を再利用することを前提にすると、ほとんどStateやTraverse&Stateと同じ手間で畳み込み処理を書くことができます。

ReaderWriterStateモナドは、Stateモナドの持つStateモナド自身と処理結果の出力に加えて実行コンテキストなどの参照専用データの受け渡し(Reader)とログ的な蓄積データの出力(Writer)の機能を持っています。

run関数では、引数に処理対象のVectorとReaderWriterStateモナド、実行コンテキストのExecutionContextと状態を持つcase class Zの初期値を渡しています。実行コンテキストと状態の初期値を外部から与えることができるので、ReaderWriterStateモナドの振る舞いを実行時にカスタマイズできる構造になっています。

run関数の返却値はStateモナドの実行結果の正常レコードとWriterに蓄積されたエラーコード、そしてState(case class Z)の最終結果です。

評価

run関数を事前に用意しておけば、Traverse&Stateモナドとほとんど変わらない使い勝手で使うことができることが確認できました。

Stateモナドの場合は、実行コンテキスト(ExecutionContext)の指定と蓄積データ(エラーレコード)の取得を状態(case class Z)の中に自分で実装する必要がありました。

一方、ReaderWriterStateモナドでは、実行コンテキストの指定は蓄積データの取得はReaderWriterStateモナドの機能としてもっているので、状態と計算結果に実装上の注意を集中することができます。また、実行コンテキストと蓄積データのインタフェースが決まっているので、部品として組み合わせることも可能になります。

本例でもそうであったように、多くの用途ではReaderWriterStateモナドが提供する機能で要件が満たせる事が多いのではないかと思います。そうであるならば、ReaderWriterStateモナドが提供する汎用機能を使って再利用可能な部品を作ることで、部品の再利用を促進できる可能性が高いと考えられます。

考察

畳み込み処理に関しては、一度限りのロジックであるならばFoldableのfoldLeftを使って普通に書くのが一番開発効率がよさそうです。

一方、StateモナドやReaderWriterStateモナドを使って畳込みを実装するのも、それほどの手間でないことも確認できました。StateモナドやReaderWriterStateモナドにピッタリ合うケースでは、一度限りののロジックでもこれらのモナドを使うのもありそうです。

StateモナドやReaderWriterStateモナドは共通部品向けの汎用インタフェースという意味合いが大きいですが、使い方が難しいと積極的には使いづらいところです。どちらのモナドもわりと簡単に使えることが分かったのが収穫でした。StateモナドやReaderWriterStateモナドをつかって再利用可能な部品を整備していく方向性が実現可能という感触を得ることができました。

また、Stateモナド、ReaderWriterStateモナド、Monoid、Reducerの機能差も改めて確認することができました。

当面は以下のような方針で適用していきたいと考えています。

  • 一度限りのロジックはfoldLeft(普通の畳込み)
  • 再利用可能な処理で実行コンテキストがなくMonoid化できるものはMonoid
  • 共通部品はReaderWriterStateモナド化を考える(実行コンテキスト&蓄積データ&計算結果&状態)
  • 必要に応じてStateモナドやReducerを使う

諸元

  • Scala 2.11.7
  • Scalaz 7.2.0

2016年11月30日水曜日

よこはまクラウド勉強会: OFP & OFAD Deep Dive with Reactive Streams

Qcon Tokyo 2016で「オブジェクト‐関数型プログラミングからオブジェクト‐関数型分析設計へ~クラウド時代のモデリングを考える」と題してOFAD(Object-Functional Analysis and Design)についてお話させていただきました。

テーマはOFADですが、OFADの前提としてモダンなFP(Functional Programming)を前提としたOFP(Object-Functional Programming)の知識、さらに基本的なOOADの知識が必要なので、非常に広範囲の内容を圧縮して詰め込んだ形になってしまいました。

そこで、もう少し時間を取って説明することができればよいと思っていたのですが、横浜クラウド勉強会 で機会を頂くことができました。

QCon Tokyoでは50分に収めましたが、こちらの方は少し脱線しながら2時間程度のセッションになりました。その後、Reactive Streamsのハンズオンという構成です。

Reactive Streamsの立ち位置

QCon向けの資料をまとめていて感じたのはReactive StreamsがOFP(Object-Functional Programming)のキーテクノロジーではないか、ということです。

Reactive Streamsは以下の2つの方向性があると考えています。

  • 即応性、スケーラビリティのためのメカニズム
  • FP(Functional Programming)の適用範囲を広げる

ここでのFPは純粋関数型プログラミングを意図しています。

前者は今後のクラウドアプリケーションの方向としては当然の方向性です。この方向性を追求する場合には、FPの純粋性を犠牲にしても実行時性能やバックプレッシャーなどのプロトコル拡張を追求することになります。

一方、後者はFPの純粋性は守りながら、FPを外部入出力や大規模データ処理、ストリーミング処理に適用することを目指すものです。

Reactive Streamsは即応性やスケーラビリティという点で注目されていますが、ボクの関心事はそのことよりもFPの適用範囲を広げることに大きく寄与するのではないかという点です。ボクは現時点では後者を重視しているので、(Akka Streamsではなく)scalazと組み合わせて使えるscalaz-streamを愛用しています。

セッションでもお話したOFPのコツはつまるところOOP(Object-Oriented Programming)とFPを適材適所で、ということですが整理すると以下の方針になります。

  • アルゴリズム系の副作用を伴わない処理はFP
  • コンポーネントのファサード部(API/SPI)はOOP
  • OOADのモデリングの実現部はOOP
  • その他はできる限りFP

「できる限りFP」にしたい理由は以下のものです。

  • バグの発生率が圧倒的に少ない。
  • アルゴリズムを効率よく記述できる。
  • 並列/並行/分散処理時代への助走。
  • 将来の証明プログラミングへの備え。

現時点での最大の魅力は「バグの発生率が圧倒的に少ない」がリファクタリングに大きく寄与する点です。長期間持続的に開発を続けるシステムはこの理由だけでOFPを採用する価値があると思います。そして、長期的には「並列/並行/分散処理時代への助走」、「将来の証明プログラミングへの備え」という観点から必須のプログラミング・スタイルになるとすると、先取りして取り入れておきたいところです。

ここで問題となるのは「その他はできる限りFP」です。

FPには外部入出力処理や状態を持った処理の記述が大変という問題があります。スライドに書いたように、モナドの登場で外部入出力処理や状態を持った処理の記述が「困難」から「可能」になったのは大きな前進ですが、OFPの観点からはかならずしも「便利」とは言えないと思います。またScalaの特殊事情として文法的な制約でHaskell程簡明には書けないということもあります。

「できる限りFP」とはいえ便利でないものは使わなくなるのが道理で、対策をとらないと結局OOPの部分が大きく残ってしまう事になってしまいます。

Reactive Streamsがキーテクノロジーではないか、という期待はまさにこの問題の解消に大きく寄与するのではないかという点です。

scalaz-streamが提供するProcessモナドも一種のモナドですが、概念的に分かりやすいのとリソース管理やフロー制御という伝統的なOOPによる外部入出力でも難しかった問題が解決されています。実際に製品開発で使ってみてこの便利さを体感しました。

このような経験からReactive Streamを開発の基盤とすることで、OFPの多くの部分をFP側に倒すことが可能になるのではないかと期待しているわけです。

Reactive Streamsハンズオン

そんな思いもあり、セッションの後半はReactive Streamsのハンズオンにしました。以下にソースコードがあります。

このハンズオンのソースはセッションの朝に急ごしらえで作ったものなので、動作確認などはあまりできておらず、当日会場で調整しながら使用したものです。その点はご留意下さい。

ここでの趣旨は、(即応性、スケーラビリティではなく)FP成分を重視したReactive Streamsについて、プログラミングレベルでのイメージをつかんでいただくことです。

以下簡単に説明します。

ステップ1: Hello World

ステップ1はscalaz-streamのHello World的なプログラムです。scalaz-streamの最小限の使い方を体験することが目的です。

mainメソッドの第1引数に入力ファイル名を、第2引数に出力ファイル名を指定すると入力ファイルを出力ファイルに複写するプログラムです。

ハンズオンの問題は以下になります。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step1 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] = ???

  def converterProcess(in: String, out: String): Process[Task, Unit] = ???
}

converter関数とconverterProcess関数を実装します。

実装例

ステップ1の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step1 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).pipe(text.utf8Encode).to(io.fileChunkW(out))
}

converter関数はconverterProcess関数から返されたProcessモナドをrunしてTaskモナドにします。

converterProcess関数はscalaz-streamのパイプラインをProcessモナドという形で構築して返します。

ステップ2: Monadic API

ステップ1はscalaz-streamの最小限の使い方でした。

ステップ2ではscalaz-streamのパイプラインが、通常のMonadic APIとして使えることを体験します。ここでいうMonadic APIはJavaでいう所のStream APIで、FunctorやMonadによるパイプラインをベースにしたAPIです。(Streamは色々な意味付けがされていてミスリーディングな面もあるのでここでは本ブログではMonadic APIと呼んでいます。)

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step2 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val player = args(2)
    val t = converter(in, out, player)
    t.run
  }

  def converter(in: String, out: String, player: String): Task[Unit] =
    converterProcess(in, out, player).run

  def converterProcess(in: String, out: String, player: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).filter(isPlayer(player)).map(toYear).map(_ + "\n").
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def isPlayer(player: String)(record: Vector[String]): Boolean = ???

  def toYear(record: Vector[String]): String = ???
}

ステップ1との違いは以下の3つの関数をmapコンビネータ、filterコンビネータでパイプラインに組み込んでいることです。

  • toRecord関数
  • isPlayer関数
  • toYear関数

これらの関数が通常の関数であること、そして簡単にProcessモナドのパイプラインに組み込むことができる点を体験するのが目的です。

実装例

ステップ2の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step2 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val player = args(2)
    val t = converter(in, out, player)
    t.run
  }

  def converter(in: String, out: String, player: String): Task[Unit] =
    converterProcess(in, out, player).run

  def converterProcess(in: String, out: String, player: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).filter(isPlayer(player)).map(toYear).map(_ + "\n").
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def isPlayer(player: String)(record: Vector[String]): Boolean =
    record.lift(0) == Some(player)

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"
}

toRecord関数、isPlayer関数、toYear関数はごく普通の関数です。これらを簡単にProcessモナドのパイプラインに組み込んで動作することが確認できました。

ステップ3: フロー制御

ステップ3は一種のフロー制御であるチャンク化の体験です。

通常のMonadic APIは構造上フロー制御を行うことができませんが、Processモナドではpipeコンビネータなどの仕組みによってフロー制御を可能にしています。

ここが通常のMonadic APIに対するReactive Steramsの優位点で、外部入出力を効率的に処理することを可能にする拡張となっています。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step3 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).
      chunk(1000).map(groupByYear).pipe(process1.unchunk).
      map(toYear).
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String = ???

  def groupByYear(records: Vector[Vector[String]]): Vector[Vector[String]] = ???
}

注目するポイントはパイプライン中の「chunk(1000).map(groupByYear).pipe(process1.unchunk)」の部分です。パイプライン中のchunk関数とpipeコンビネータで適用したprocess.unchunk関数によって、パイプラインを流れるデータを1000個単位でチャンク化しています。

mapコンビネータで指定されているgroupByYear関数はチャンク化したデータに対する処理を行うようになっています。

チャンク化は入出力処理で性能向上するための必須処理なので、これを自動的に行なってくれる部品が提供されていることは生産性に大きく寄与します。

実装例

ステップ3の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step3 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).
      chunk(1000).map(groupByYear).pipe(process1.unchunk).
      map(toYear).
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"

  def groupByYear(records: Vector[Vector[String]]): Vector[Vector[String]] = {
    case class Z(years: Set[String] = Set.empty, result: Vector[Vector[String]] = Vector.empty) {
      def +(rhs: Vector[String]) =
        rhs.lift(1).fold(this) { year =>
          if (years.contains(year))
            this
          else
            Z(years + year, result :+ rhs)
        }
    }
    records.foldLeft(Z())(_ + _).result
  }
}

toYear関数、groupByYear関数を普通に実装するだけです。データをチャンク化する処理はscalaz-streamが提供している部品が自動的に行なってくれます。

フロー制御の例としては以下の記事が参考になると思います。

ステップ4: ストリーム

ステップ4はReactive Streamsでストリーム処理を行う課題です。

まず準備としてストリームを生成する部品EventProcessorを作ります。

package handson.reactive
  
import scalaz.concurrent.Task
import scalaz.stream._  

object EventProcessor {
  val q = async.unboundedQueue[String]

  val eventStream: Process[Task, String] = q.dequeue
}

buildメソッドはscalaz-streamによるパイプラインを作って、これをバックグラウンドで起動します。

最後に呼んでいるstimulusメソッドはEventProcessorを使って入力ファイルの内容を1行づつ読み込み1秒のインターバルでストリームに送出します。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step4 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).map(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String = ???

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

scalaz-streamのパイプラインはconvertProcess関数で作成します。このパイプラインにはmapコンビネータでtoYear関数を接続しています。

本課題ではこのtoYear関数を実装します。

toYear関数は通常の関数です。この通常の関数をscalaz-streamのパイプラインに組み込むだけでFPによるストリーム処理が記述できるのを体験するのが本課題の趣旨です。

実装例

ステップ4の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step4 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).map(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

ストリーム処理の例としては以下の記事が参考になると思います。

ステップ5: ストリーム&フロー制御

ステップ5はステップ4で作成したストリームに、自前のフロー制御を組み込むという課題です。

この課題はちょっと難しいので、時間が余った人向けを想定しています。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step5 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).pipe(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear: Process1[Vector[String], String] = ???

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

ステップ4のようにscalaz-streamのパイプラインにmapコンビネータで関数を合成する場合は、通常の関数を使うことができますが、フロー制御を組み込むことはできません。

フロー制御を組み込むためにはProcessモナドを引数にして、Processモナドを返す関数を作成し、pipeコンビネータでパイプラインに組込みます。

Processモナドを直接操作するので少し難しいプログラミングになりますが、フロー制御を自分で操作できるので色々な用途に適用することができるようになります。

ストリーム処理&フロー制御の例としては前項と同様に以下の記事が参考になると思います。

この課題は時間切れでボクも実装例を作ることができませんでした。

まとめ

OFPにおけるReactive Streamsの位置付けについて、FPを重視する方向性から整理してみました。

その上でプログラミングレベルでReactive Streamsを把握するための仕掛けとしてハンズオンの資料を紹介しました。

もちろんOFPのReactive StreamsはOFADにも、少なからず影響するはずです。この点はまだ考えが整理できていませんが、ブログで継続して検討していきたいと思います。

2016年10月28日金曜日

QCon Tokyo 2016

QCon Tokyo 2016で「オブジェクト‐関数型プログラミングからオブジェクト‐関数型分析設計へ~クラウド時代のモデリングを考える」と題してお話させて頂きました。


上記の個人用のSlideShareは文字化けが取りきれないので、きちんと読みたい方は会社のSlideShareの方を見ていただくとよいと思います。上記スライドもPDFをダウンロードしたものは文字化けしていません。

Reactive Streams

今回のテーマであるOFADについては別の記事で考えたいと思いますが、今回スライドを作っていて改めて以下のことを感じました。

  • Reactive Streamsは次のブレークスルーの起点になるかも

FP(Functional Programming)でI/Oを扱う技術としてIOモナドがありますが、その発展形として以下の2つの技術があります。

  • Operationalモナド(scalazではFreeモナド+α)
  • Processモナド(scalaz-streamの場合)

製品開発の中で、どちらの技術も使ってみましたがProcessモナドの方が圧倒的に楽なんですね。

Processモナドでは入出力などの作用に対する処理部は始端(source)と終端(sink)をパターンにしたがって実装すれば簡単に実現できます。source, sinkの抽象度が適切なので一度作った部品は色々な用途で再利用できます。また、(フロー制御用の)状態を管理する処理をProcessモナド内に組み込むためのメカニズムも持っていますが、これの実装もそれほど難しくありません。

一方、Operationalモナドは作用に対する処理はインタープリタとして実装して、自然変換のメカニズムでOperationalモナドの実行時に割り当てる必要があります。このメカニズムによりDI(Dependency Injection)の機能も実現できるので、その点では素晴らしいのですが、インタープリタという大きな仕掛けを作らないといけないので、単に入出力をしたいという目的には重たすぎると感じました。Scalaの場合はOOP側で自由に入出力できるので、このメカニズムを使ってまでFP化をすすめるニーズはなかなかないかも、という感触です。

このような感触が得られている中で、スライドページ「OOPとFPの協業」をまとめながら考えたのは「簡単に使えるReactive Streamsを使えば、多くの処理をFP化できる」ということです。

セッション内で説明しましたが、FPの方がOOPに対して、高品質(バグが出にくい)というメリットがあり、さらに持続的開発の中核作業であるリファクタリングで圧倒的な優位性を発揮する、というのがボクの主張点です。

このような観点からFPの範囲を大きく広げるReactive StreamsはOFP(Object-Functional Programming)にとって本質的に重要な技術なのではないかと感じました。

Reactive Streamsは大規模分散処理やストリーミング処理向けの専用機能という観点で取り上げられることが多いと思いますが、それだけではなく日常的なOFPにとって中軸となるプログラミング・モデルとなりうる点がより重要であると感じました。ここにさらに大規模、高頻度、ストリーミングがおまけでついてくるという切り口でのアプローチがよいと思います。

フィードバック

セッション後に2つほどフィードバックを頂きました。

OOPとFPの関係

スライドページ「OOPとFPの関係」ではFPでは以下のことが実現できないと説明しました。

  • 状態の更新
  • 動的束縛によるポリモーフィズム
  • 大規模開発(?)

この点について専門家の方から以下のような趣旨のフィードバックを頂きました。(文意はボクの理解によるものなので、正確な意図とはずれている可能性があります。)

  • 「動的束縛によるポリモーフィズム」と「大規模開発」は理論的に解決されており実用言語での実績もある。

「動的束縛によるポリモーフィズム」については、ボクの理解ではここがOOPとFPの違いだと思っていたので、理論的に解決されていて、さらに実用言語での実績もあるというという点は意外でした。

コンパイル時に継承関係が全て確定していれば、動的束縛部分を直和(+pattern matching)に落とし込むことはできるとは思いますが、共通ライブラリで定義したクラスの継承や、分割コンパイルといったニーズがOOP的には重要なので、この問題をどのように解決しているのか興味のあるところです。

大規模開発については、セッションではあまり深く触れていませんが、ボクのイメージでは以下のようなことがあってFP的には大変なのではと推測しています。

  • コンポーネント内に状態を持てないので、大規模システムの部品として利用するには大きな制約があるのではないか。
  • 「動的束縛によるポリモーフィズム」の問題でOOP的な継承が使えないとすると、API/SPIといったインタフェースを使ったコンポーネント部品化に制約がおきるのではないか。
  • OSGiなどを使った動的ローディングによるplugin機構は実現可能か。

機会があれば、このような観点から技術評価をしてみたいと思います。

どちらの問題も、Haslkellではできていないようですし、Scalaのロードマップにもないと思うので、Scalaで利用できるようになるのは当面なさそうとはいえそうです。

Reactive Streamsでできること

セッション後の質問時に以下のような指摘を頂きました。(文意はボクの理解によるものなので、正確な意図とはずれている可能性があります。)

  • 「HaskellでReactive Streamsを使って線形代数を行おうとしたがメモリが足りなくて動かなかった。セッションではReactive Streamsを使って大規模演算ができるとしているがいいかげんな主張ではないか?」

まず「HaskellでReactive Streams」についてはボクの経験外の話でもあり、Haskellライブラリの実装上の問題の可能性もあるのでここでは取り上げません。

線形代数に関しては、全データをメモリに展開して大規模な行列演算を行おうとすると、どのような技術を使ってもメモリ不足を起こすはずなので、この点でご質問の意図がよくわかりませんでした。

線形代数による大規模行列演算の応用にはReactive StreamsではなくSparkのMLLibのようなアプローチがよいのではないかと思います。

Reactive Streamsについては、ごくざっくりいうと「作用を始端と終端に切り離し、フロー制御ができるようになったイテレータ」なので、イテレータでできる範囲で大規模データ処理ができるということです。

具体的には、一定のウィンドウサイズの範囲でデータの一部をメモリに読込み、その範囲で処理を行ってメモリを開放する、という処理を繰り返す動きになります。

このような動きなので、原理的にはどのような大規模なサイズを扱っても大丈夫はなず、ということでセッション中は「1TBでも」とお話したのですが、この点に違和感を感じられたようです。

原理的には1TBでも大丈夫と思いますが、実証実験したわけではないので、この点は明らかにしておきます。

製品開発の中でReactive Streams(scalaz-stream)を大規模メール配信、大規模PUSH配信処理の実装で非常に便利に使っており、ほとんど問題も出ていないことから実用上は全く問題ないのでは、というのがボクの実感としてあり、その点を「1TB」として表現したしだいです。このサイズ表現問題が難しいのは10GB程度だと、現在のハードウェアではメモリに載せてしまうことも可能なので、わかりやすいインパクトのある例として使うのにはちょっと難しいことがあります。次の機会があれば、このあたりの表現を工夫したいと思います。

2016年10月11日火曜日

Object-Functional Analysis and Designふりかえり

クラウド時代のアプリケーション開発について、「クラウド・アプリケーション・モデリング」、「クラウド・アプリケーション開発のモデル体系」と考察してきました。

クラウド・アプリケーション開発では、実装時のプログラミングで「関数」が重要な構成要素となってきています。そうなると、この「関数」を上流のモデリングでどのように扱っていくのかということが重要な論点になります。

このような観点から、上流のモデリングから実装時のOFP(Object-Functional Programming)まで、オブジェクトと関数を融合させ一気通貫にまとめた開発方法論をModegramming StyleではObject-Functional Analysis and Design(OFAD)と呼んでいます。

Modegramming Styleでは2012年ぐらいからOFADについて考察を進めてきました。クラウド・アプリケーションのモデリングの検討を進めていく上で、OFADが一つの軸となると思います。このOFADについて、2012年に要求開発アライアンスでのセッション『Object-Functional Analysis and Design: 次世代モデリングパラダイムへの道標』向けにまとめたものがあります。

今回はこの2012年版OFADのふりかえりを行い、検討を進めていくうえでの論点整理を行いたいと思います。

Object-Functional Analysis and Design 2012

クラウド・アプリケーションの開発方法論を整備していくためには「関数」を理解した上で、関数とオブジェクトの関係を整理しないといけないという動機もあり、OFPの有力言語であるScalaを2008年から使い始めました。

ある程度「関数」とOFPについて勘所がつかめてきたところで、2012年に『Object-Functional Analysis and Design: 次世代モデリングパラダイムへの道標』というセッションのタイミングで一度まとめるタイミングがありました。

このセッションの内容は以下にまとまっています。

また関連して以下のような考察を行っています。

ふりかえり

今の目でOFAD 2012をチェックしてみましたが、それほど違和感はなく、以下の基本的な考え方については変更はありませんでした。

  • オブジェクトと関数の使い分け
  • オブジェクトと関数の連携
  • デザインパターン(代数的構造、圏論)
  • Domain-Driven Design (DDDD)

ただ、オブジェクトと関数の連携方法は2012年当時よりも手持ちの選択肢が増えたと思うので、その辺りは反映していきたいところです。

このタイミングで再検討したいのが以下の項目です。Reactive Streamsを始め、2012年以降、要素技術が大きく進化しているのでこれらの技術を取り込んだ上で新しい枠組みで考えてみたいと思います。

  • DSL
  • データフロー

以下のアーキテクチャ的な話題については2012年以降、特に大きな動きはなかったと思います。これらについてはOFADの再検討の中で対応を考えていきたいと思います。

  • EDA
  • DCI
  • CQRS

OFAD 2012以降の技術動向

OFAD 2012以降に起きた技術的な大きなムーブメントとしては以下の2つがあります。後者は我々の提案ですが、最近注目されているServerless Architectureに通じるところがあると思います。

  • Reactive Streams
  • Application Cloud Platform
Reactive Streams

より広い枠組みとしてはFunctional Reactive Programming(FRP)という切り口もありますが、Reactive Streamsの方が現状にあっていると思うのでReactive Streamsの用語を使います。

Modegramming Styleではscalaz-streamを中心にReactive Streamsについても考察を行ってきました。

また幾つかのセッションでお話させていただいたのでスライドとしてもまとめました。

純粋な数学的な計算はよいとして、システムの振る舞いをFunctional Programming(FP)でどのように記述するのかという点がFP実用化の重要な論点だと思いますが、モナドベースのReactive Streamsが一つの解としてブレークスルーの起点となりそうです。

そのような意味でOFADでの関数を考える上でReactive Streamsは重要な論点になります。

分析モデルの段階で、Reactive Streamsに対応するモデル要素を見つけることができればモデルから実装まで一気通貫でつなげるルートを確保することができます。

Application Cloud Platform

OFAD 2012の後、クラウド時代のアプリケーション開発ではクラウド・プラットフォームが重要な構成要素になると考え、その製品化を行う活動をしてきました。その成果として「Prefer Cloud Platform」をリリースすることができました。

Prefer Cloud PlatformのようなクラウドプラットフォームをModegramming StyleではApplication Cloud Platform(ACP)と呼んでいます。Prefer Cloud Platform自体もScalaによるOFPによって実装されていますが、この開発の中でOFPに関するノウハウ、OFADに対するヒントを蓄積することができました。

またACPによってアプリケーション開発の大きな部分を省略することができることが期待できます。こうなると、モデリングの目的はビジネスとアプリケーションの連携方法の分析とシステムの拡張方法の分析設計に絞られます。このような文脈の中での開発方法論ということもクラウド時代の開発方法論であるOFADに求められる点といえます。

まとめ

クラウド・アプリケーション・モデリング」を起点に進めている考察は『Object-Functional Analysis and Design: 次世代モデリングパラダイムへの道標』によって始動したOFAD 2012を最新技術動向やApplication Cloud Platform(ACP)の活用を前提に、2016年版OFADとして再構築を行うという目的のものです。

この検討を進めるためのベースとしてOFAD 2012を簡単にふりかえり、論点整理を行いました。

このフィードバックを活かして、次回はOFADのモデル体系について考えてみたいと思います。

お知らせ

10月24日に開催される「QCon Tokyo 2016」で以下のテーマでお話させていただくことになりました。

  • オブジェクト‐関数型プログラミングからオブジェクト‐関数型分析設計へ~クラウド時代のモデリングを考える

現在検討しているOFADについてのチュートリアル的な内容になる予定です。

2016年9月30日金曜日

クラウド・アプリケーション開発のモデル体系

前回「クラウド・アプリケーション・モデリング」ではACP(Application Cloud Platform)時代のアプリケーション開発におけるモデリングについて、開発プロセスの点から考えてみました。

今回はモデル体系(メタモデル/プロファイル)について考えてみることにします。

クラウド・アプリケーションの開発ではアプリケーション開発のみを考えるのではなくBusiness→Development→Operation→Evaluationによる一種のPDCAループをビジネススコープで回していくことが重要になります。

Business→Development→Operation→Evaluationループを回すためには持続化可能な形で各アクション間で情報を持ちまわる必要があります。この情報はDevelopmentで使用するオブジェクト・モデルと連続性のあるものが要求されるので、Businessでもオブジェクト・モデルを使用するのが自然です。

このため、ITシステム開発のアクションであるDevelopment(以下単にDevelopment)はもちろんですが、Businessアクション(以下単にBusiness)でのオブジェクト・モデルが非常に重要になってきます。

今回はこのような枠組みで開発方法論を考える上でベースとなるオブジェクト・モデルのリファレンス・モデルについて考えます。

全体像 

Business ProcessによるBusinessのモデルとDevelopmentでのITシステムのモデルの全体像を整理しました。おおまかな関係を示すのが目的なので、詳細は省いて強調したい部分のみを載せる形にしています。

大きく3つの領域があります。

  • Business Process Model :: ビジネス・プロセスを記述。
  • IT System Model :: ITシステム向けのオブジェクト・モデルを記述。
  • Domain Model :: ビジネスとITで共有するドメイン・モデルを記述。

Business Process

Businessのモデリングの方法としては色々なアプローチが考えられるところですが、ITシステム開発とのシームレスな連携を念頭に置いた場合、Business Processを用いるのがよいのではないかと考えています。

ここでは『Business Modeling with UML: Business Patterns at Work』をベースにカスタマイズしたものを用います。

Business Process Modelはざっくり以下のモデル要素で構成されています。

  • Vision / Value / Goal :: ビジョン・価値・ゴールなど「何」を目標にしたいのかを記述。
  • Business UseCase/UX :: ビジネス要求を物語で記述。
  • Business Flow :: ビジネスの流れ/仕組みを記述。

「Vision / Value / Goal」はビジネスで「何」をするのかを記述するのに必要そうなモデル要素を代表させた項目です。ここに何を持ってくるのかはビジネス・モデリングの方法論で変わってきます。「Vision / Value / Goal」はその1例と考えてください。

IT System Modelとの連携で必要なのはBusiness UseCase/UXとBusiness Flowです。

また、Domain ModelをIT System Modelと共用する点が本質的に重要です。

要求仕様の記述にはBusiness UseCaseを使用します。UX(User Experience)とUseCaseの棲み分けは難しい課題ですが、いずれ考えてみたいと思います。ここではUseCaseを中心に、必要に応じてUXも併用、というような形でとらえて下さい。

IT System Model

IT System Modelはざっくり以下のモデル要素で構成されています。

  • UseCase/UX :: 要求仕様を物語形式で記述
  • Collaboration :: 物語を実現するためのシステム内外の協調動作
  • Responsibility :: システムが果たすべき責務
  • System/SubSystem :: システム/サブシステム。
  • Module/Component :: システム/サビスシステムを構成するソフトウェア部品。
  • Class/Object :: クラスとオブジェクト。オブジェクト・モデルの基本分類子。

Bisuness Processと同様にUseCaseとUXの関係は微妙ですが、ここではざっくりUseCase/UXとひとかたまりにして扱います。

IT System ModelではUseCase/UXからDomain Modelを睨みつつCollaborationを経てResponsibilityを抽出します。このResponsibityをIT System ModelおよびDomain ModelのSystem/Subsystem, Module/Component, Class/Objectに配置していきます。

基本的にオーソドックスな手順ですが、オブジェクト・モデリングは最近はロストテクノロジーになってしまっているきらいもあるので、ACPの要素も加味しつつ一度まとめる予定です。

Domain Model

Domain Modelは、Business Process ModelとIT System Modelから共用します。

  • Context Map :: Bounded Contextの対応関係のマップ。
  • Bounded Context :: ドメインの領域。Entityの集まり。
  • Entity :: エンティティ(ドメイン・オブジェクト)。
  • Relashionship :: 各種分類子間の関係。
  • StateMachine :: 分類子の振る舞いを記述する状態機械。
  • Business Rule :: ビジネスルール。

オブジェクト指向技術は、上流のビジネス・モデリングからオブジェクト指向プログラミングまでシームレスに連携できる点が最大の美点であるといえるでしょう。この美点の中軸となるのがDomain Modelです。

Domain ModelはBusiness ProcessとIT System Modelから共通のモデルとして共有されます。

基本的には通常のオブジェクト・モデルですが、以下の拡張を行っています。

  • Context MapとBounded ContextはDDD(Domain-Driven Design)の用語を採用。
  • Business Ruleを採用。

モデルの対応関係

図に示すモデル間の対応関係について説明します。

まず大事な点は、Domain ModelをBusiness Process ModelとIT System Modelから共用することです。Domain ModelはBusiness Process Modelの一部でもあり、IT System Modelの一部でもあるという関係になります。

Business Process ModelのBusiness UseCase/UXとBusiness FlowからIT System ModelのUseCase/UXを作成します。IT System ModelのUseCase/UXとDomain Modelが作成できれば、ここからは通常のオブジェクト指向開発の手順で開発を進めることができます。

まとめ

ACP向けのモデリングを考えるベースとなるオブジェクト・モデルについて整理しました。

オブジェクト・モデルをBusiness Process, IT System Model, Domain Modelの3つの領域に分類する枠組みとそれぞれの対応関係を考えてみました。

大きな枠組としては、2000年代前半に完成されたオブジェクト指向技術がベースで、セマンティクス的に大きな拡張は行っていません。業界全体でもここ10年程大きな動きはなかったと思います。

このベースに対して、ACPの登場、また関数型言語、リアクティブといった要素技術の実用化がどのようなインパクトを与えていくのかという点が1つの論点になります。

いずれにしても、新しい時代向けに開発方法論が進化するタイミングになっていると思います。本ブログでもこの観点から引き続き考察を続けていきたいと思います。

2016年8月31日水曜日

クラウド・アプリケーション・モデリング

製品開発を通じて色々と経験を積むことができたので、一度腰を落としてクラウドアプリケーション開発向けの開発手法について考えてみることにしました。何回かに分けて考察していきたいと思います。

考察する開発手法は以下の2つの機能で構成されています。

  • 開発プロセス
  • モデル体系(メタモデル)

今回は開発プロセスの全体像についてまとめます。

クラウドアプリケーションは、スクラッチで開発するケースもあると思いますが、主流はクラウドアプリケーション向けのプラットフォーム上で必要な機能のみを開発することになるのではないかと推測しています。

この場合、クラウドアプリケーションの開発手法はクラウドアプリケーションのプラットフォーム上での開発を前提としたものが重要になってきます。

本稿ではこの前提の上で、クラウドアプリケーションのプラットフォーム上でのクラウドアプリケーション開発向けの開発手法をテーマにします。

アプリケーション開発の基盤となるクラウドプラットフォームをApplication Cloud Platform(以下ACP)と呼ぶことにします。

背景

クラウドアプリケーションは以下のような既存のアプリケーションの複合体と考えることができます。

  • a) 消費者向けUX指向のBtoCアプリケーション
  • b) 基幹システムや外部システムとの連携を行うInB, BtoB的な業務アプリケーション
  • c) 利用者同士を結びつけてコミュニティを構築するPtoPアプリケーション
  • d) 統合運用・管理システム
  • e) 蓄積されたデータに対する分析・評価基盤

このような複雑なシステムをスクラッチで開発することはコストもかかりますし、長期の開発期間を必要とするためビジネス的にタイムリーなタイミングでリリースすることが難しくなります。

ACPは上記のような機能をビルトインしたプラットフォームであり、UXを実現するWeb/モバイルアプリと必要最小限のサーバー機能を開発するだけで、アプリケーションを開発することができます。

このようにACP上で動作するアプリケーション開発を行う場合、スクラッチでアプリケーション全体を構築する場合とは、異なった開発手法になってきます。

また、クラウドアプリケーションではアプリケーション開発の力点が、「アプリケーションを開発」することから、「ビジネスを駆動」することに移っていることも見逃すことができません。 

単にアプリケーションを構築するだけでは不十分で、ビジネスのライフサイクルの中でアプリケーションの開発、運用、評価から次のビジネス設計に繋がるサイクルをまわしていく仕掛けを構築し、ビジネスに組込んでいくことが求められます。

以上のような点を念頭に置いたアプリケーションの開発手法が必要になってきます。これが本稿のテーマです。

開発プロセスの全体像

開発プロセスは4つのアクションで構成されます。

  • Business Modeling :: ビジネス設計を行います。
  • Development :: ACPアプリケーションの開発を行います。
  • Operation :: アプリケーションを配備して運用します。
  • Evaluation :: アプリケーションの動作結果を分析しビジネスや開発にフィードバックします。

これらの4つのアクションで一種のPDCAサイクルを構成します。



Business Modelingは、ビジネスレイヤーでの目標、価値などの分析を行いビジネスプロセスなどを用いてビジネス設計を行います。このビジネス設計の中で必要とされるアプリケーション・プログラムの責務が明らかになるので、これをDevelopmentアクションで開発を行います。

Developmentアクションでは、Business Modelingの成果物をベースにアプリケーションの開発を行います。Developmentアクションは必要に応じて繰り返し開発を行います。

クラウドアプリケーションにおいて重要な点として、アプリケーションの運用によって得られた各種データを分析してビジネスに対するフィードバックするというビジネス改善サイクルをどのようにして構築してビジネスに組込んでいくのかという点があります。

伝統的なアプリケーション開発ではこの活動はあまりフォーカスされていなかったので開発プロセスとしては中心的な話題にはなってこなかったと思います。(そもそもプログラムを作るのが大変だったので、そちらへの対応で力を使いきってしまっていたといえるかもしれません。)

一方、クラウドアプリケーションの場合ビジネス改善サイクルの構築とビジネスへの組み込みをいかに短く効率よく行うのかという点が主要な論点の一つとなっていると思います。1つにはACPによって複雑なバックエンドシステムを持つプログラムを開発することが簡単になったこと。また、ビッグデータ的なデータ活用技術の実用化によってビジネスにフィードバックするデータの分析が安価かつ容易にできるようになりました。

Operationで分析に必要なデータを蓄積すること、Evaluationで蓄積したデータを効率よく解析・分析してビジネスやアプリケーションの改善に必要なフィードバックを作成する作業をいかに行うのかという点が重要になります。

これらの2つのアクションではACPがプラットフォームとして提供するデータ分析基盤が非常に重要になります。

ACPのデータ分析基盤では、Operationアクションで各種データを標準形式で採取し、分析に適した形の分析データに再構成して管理します。Evaluationアクションではこれらの分析データを元に、各種分析を行いビジネスやアプリケーションへのフィードバックを作成します。

また、Business Modelingでは単にアプリケーションの要求仕様を抽出するだけでなく、全体ビジネスの中でのアプリケーションの位置付け、ビジネスの目標、価値といったものを踏まえてビジネス改善サイクルを組み込んだビジネス設計も同時に行うことになるでしょう。

開発の流れ

Developmentアクションでの開発の流れは以下になります。

  • Requirement
  • Design
  • Implementation
  • Test
  • Integration Test

開発の各アクティビティは標準的なものです。

ただし、ACPを使う場合ターゲットのプラットフォームは決まっているためPSM(Platform Indepent Model)を作成するAnalysisは効果が薄いので省略しています。必要に応じてDesignの中で行うとよいでしょう

これらのアクティビティは流れ作業的に順番にこなしていくのではなく、適切なプロジェクト運営のもと効率のよい順序で作業していくことになります。

目安としては、図で示した通りDevelopmentアクション全体のイテレーション、Design, Implementation, Testのイテレーションを想定しています。

サブシステム毎の開発手順カスタマイズ

図はDevelopmentアクションのRealization部を詳細化したものです。



ここでは複数のサブシステムを個別に開発し、最終的に結合する開発を想定しています。

図では特に重要なものとしてPresentation, Service Extension, Configurationを示しています。

Presentation

PresentationはWebやiOS/AndroidといったMobileアプリの開発です。このRealizationは以下の3つのactivityで構成されることを想定しています。

  • UX Design :: 画面設計を行います。
  • Implementation :: アプリケーションを実装します。
  • Test :: アプリケーションのテストを行います。

UX Designは画面設計や画面とAPCの提供するAPIなどの接続関係を設計します。

UX Designで行う画面設計はRequirementとの境界線が曖昧になりますが、開発中に顧客の意見を取り入れながら画面の調整を行う作業はUX Designの一環とします。(逆にRequirementでは、ペルソナやユースケースで利用者の体験する物語とドメインモデルの定義といった抽象度の高い成果物が中心となります。)

プロジェクト運営はアジャイル的なものを想定しています。

UX Design, Implementation, Testはこの順番に行うのではなく、作業の都合に合わせて任意の順番で行っていきます。

Service Extension

Service Extensionは、ACPが提供する拡張メカニズムに則ったプラグイン的な機能です。

ACP内に配備することで、ACPに対してアプリケーションが必要とする機能拡張を行います。

プロジェクト運営は計画駆動型を想定しています。

Service Extensionでは、基幹システムとの接続など、仕様がrequirement activityで明確に定義されるケースが多いと考えられるので計画駆動型が適していると考えています。もちろん、小規模開発など、場合によってはアジャイル型のプロジェクト運営が適しているケースも多いでしょう。

Configuration

アプリケーション向けのACPの振る舞いをカスタマイズパラメータとして設定します。

ここでは意図を明確にするために、独立した開発として示していますが、小規模開発の場合はPresentationなどの作業内で行われることになると思います。

Web/Mobileアプリ開発プロセス

「概要」の図に示す開発プロセスは汎用的な大きめの開発手順になっています。

さらに「サブシステム毎の開発手順カスタマイズ」の図ではPresentation, Service Extension, Configurationといった開発ターゲットごとの開発手順としてカスタマイズしています。



クラウドアプリケーションで多い形態は、フロントエンドのWeb/MobileアプリケーションのみUX重視で開発し、バックエンドサービスはACPのものをそのまま利用するものです。

このような開発では、多少ACPに対するconfigurationが発生しますが、基本的には従来技術であるWeb/Mobileアプリケーションの開発手法をそのまま用いてクラウドアプリケーションを開発できます。

Web/Mobileアプリ・プロファイルではこのような開発を想定したカスタマイズを行っています。

OperationやEvaluationはACPが自動的に提供してくれるため、開発エンジニアが特別に何かを開発する必要はありません。ただ、ACPが提供するAPIを使用する手順などで、何らかの意識は必要なのでWeb/Mobile開発者向けのユーザーガイドなどで、この点は明確化しておくことになるでしょう。

まとめ

今回はACPをベースにした、クラウドアプリケーションの開発プロセスの全体像についてざっくり考えてみました。

面白いテーマなので、引き続き考えていることをブログ化していきたいと思います。

2016年7月31日日曜日

[SDN] Generalized type constraints

Scalaの持つ型機能にGeneralized type constraintsがあります。訳語がよく分からなかったのでここでは型制約と呼びます。

Scalaプログラミングの要諦は、いかにプログラムのバグをコンパイルエラーで検出するか、だとすると型制約はこの目的を進めるために有効な言語機能です。使えるところではきっちりと使っていきたいところです。

そんなこともあり型制約の利用方法の最新状況が気になったので調べてみました。

型制約については先達の素晴らしい解説があります。

調べた範囲では、型制約の利用方法は上記ブログで取り上げている以下の2つにつきるようです。

  • 型によって有効になるメソッド
  • ビルダ

また上記の調査をしている最中に以下の用途もあるかもというのを思いつきました。今の所使用例は見かけていませんが、もしかして有効かもという使い方です。

  • 状態/状態遷移

おさらいも含めて上記3つの使い方について説明します。

型によって有効になるメソッド

「型によって有効になるメソッド」は型制約の基本的な使い方です。型制約が元々想定している利用方法だと思います。

以下では型パラメタTを持ったケースクラスResourceを定義しています。

openFileメソッドは「<:<」による型制約定義でResourceが保持しているリソースがFile(またはFileのサブクラス)だった時のみ有効となります。

case class Resource[T](r: T) {
    def openFile(implicit ev: T <:< File): InputStream = {
      new FileInputStream(r)
    }
  }

この目的で型制約を使うことによって以下の効能があります。

  • よく使われる特定のリソース向けのメソッドを簡単に定義できる。(この機能がないとトレイトの継承といった大掛かりな仕組みを使う必要がある。)
  • 有効でないリソースに対して誤ったメソッドが呼ばれることを防ぐ。
  • リソースの型がFileに定義された状態でメソッドの実装を行うことができる。
コンパイルエラーによる検出

コンパイルエラーとなる例です。

数値100を格納したResourceを作成し、openFileメソッドを呼び出します。

val r = Resource(100)
    val in = r.openFile

このコードは以下のように型チェックでコンパイルエラーとなります。このように誤った使い方をした場合、コンパイルエラーとして検出することができるわけです。

[error] .../Main.scala:45: Cannot prove that Int <:< java.io.File.
[error]     val in = r.openFile
[error]                ^
[error] one error found

ビルダ

型制約の使い方として有名なのがビルダです。

ビルダで必須項目が設定されていない状態でビルドを行うコードはコンパイルエラーになります。

以下のUrlBuilderはjava.net.URL用ビルダの例です。

import java.net.URL
import UrlBuilder._

case class UrlBuilder[HasProtocol <: YesNo, HasHost <: YesNo] private (
  private val _protocol: Option[String] = None,
  private val _host: Option[String] = None,
  private val _port: Option[Int] = None,
  private val _file: Option[String] = None
) {
  def isCompleted = _protocol.isDefined && _host.isDefined

  def protocol(s: String) = new UrlBuilder[Yes, HasHost](
    Some(s), _host, _port, _file)
  def host(s: String) = new UrlBuilder[HasProtocol, Yes](
    _protocol, Some(s), _port, _file)
  def port(s: Int) = copy(_port = Some(s))
  def file(s: String) = copy(_file = Some(s))

  def build(implicit ev1: HasProtocol =:= Yes, ev2: HasHost =:= Yes): URL =
    (_protocol, _host, _port, _file) match {
      case (Some(s), Some(a), Some(p), Some(f)) => new URL(s, a, p, f)
      case (Some(s), Some(a), Some(p), None) => new URL(s, a, p, "")
      case (Some(s), Some(a), None, None) => new URL(s, a, "")
      case (Some(s), Some(a), None, Some(f)) => new URL(s, a, f)
      case _ => throw new IllegalStateException(s"Illegal parameters $this")
    }
}

object UrlBuilder {
  sealed trait YesNo
  sealed trait Yes extends YesNo
  sealed trait No extends YesNo

  def builder = new UrlBuilder[No, No]()
}

型パラメタに設定する目的のトレイトYesNoとそのサブトレイトYes、Noを定義しています。

UrlBuilderは2つの型パラメタHasProtocolとHasHostを定義していて、いずれもトレイトYesNoとそのサブトレイトを型として設定できるようにしています。

ポイントはbuildメソッドで型制約「=:=」を使って型パラメタHasProtocolとHasHostの型をYesに限定しているところです。こうすることで型パラメタHasProtocolまたはHasHostにYesが設定されていない場合はコンパイルエラーとなります。

型パラメタHasProtocolにYesが設定されるのは、protocolメソッドが呼ばれてprotocolが設定された時です。また型パラメタHasHostにYesが設定されるのは、hostメソッドが呼ばれてHostが設定された時です。

つまり必須項目であるプロトコルとホストが設定されていない状態でbuildメソッドを使ってURLを生成しようとするとコンパイルエラーとなるわけです。

使い方

以下のようにprotocolとhostを設定後にbuildメソッドを呼び出すとコンパイルエラーにはなりません。

UrlBuilder.builder.protocol("http").host("example.com").build
コンパイルエラーによる検出

UrlBuilderにパラメタを設定しないでbuildメソッドを呼び出します。

UrlBuilder.builder.build

すると以下のようにコンパイルエラーとなります。

[error] .../Main.scala:16: Cannot prove that sample.UrlBuilder.No =:= sample.UrlBuilder.Yes.
[error]     UrlBuilder.builder.build
[error]                        ^

状態

型制約の使い方として「型によって有効になるメソッド」と「ビルダ」以外に、状態や状態遷移の記述にも使えるのではと思いついたので、そのアイデアの紹介です。

状態や状態遷移の操作に対するバグをコンパイル時に検出できればメリットは非常に大きいと思います。ただ、イベント駆動プログラムのような動的な処理で使用できる範囲は狭いと思われるので、ぴったりフィットするユースケースがあるかは将来課題です。

GreetingServiceは、指定された範囲に挨拶メッセージを送るサービスです。

以下の2つの状態を持ちます。

  • Openされているか否か
  • Assignされているか否か

GreetingServiceは全世界にメッセージを送る場合は、大量配信になるため復数のリクエスをと同時実行できないので、事前にリソースをアサインする必要があるという設定です。

import java.net.URL
import GreetingService._

case class GreetingService[IsOpened <: YesNo, IsAssigned <: YesNo] private (private val _url: URL) {
  def open()(implicit ev1: IsOpened =:= No, ev2: IsAssigned =:= No) = new GreetingService[Yes, IsAssigned](_url)
  def close()(implicit ev: IsOpened =:= Yes) = new GreetingService[No, IsAssigned](_url)
  def assign()(implicit ev1: IsOpened =:= Yes, ev2: IsAssigned =:= No) = new GreetingService[IsOpened, Yes](_url)
  def release()(implicit ev1: IsOpened =:= Yes, ev2: IsAssigned =:= Yes) = new GreetingService[IsOpened, No](_url)

  def helloLocalArea(msg: String)(implicit ev: IsOpened =:= Yes) {
    // do send to local area
  }

  def helloWorldWide(msg: String)(implicit ev1: IsOpened =:= Yes, ev2: IsAssigned =:= Yes) {
    // do send to world wide
  }
}

object GreetingService {
  sealed trait YesNo
  sealed trait Yes extends YesNo
  sealed trait No extends YesNo

  def create(url: URL) = new GreetingService[No, No](url)
}

上記2つの状態を型パラメタIsOpenedとIsAssingedで記述します。

型パラメタに設定する型はビルダで使用したYesNo, Yes, Noと同じ方式のものです。

GreetingServiceのポイントはopen, close, assign, releaseの各メソッドが「=:=」による型制約定義でGreetingServiceの状態によって以下の制約を持っていることです。

open
オープンもアサインのされていない時のみ可
close
オープン済みの時のみ可
assign
オープン済みで未アサインの時のみ可
release
オープン済み、アサイン済みの時のみ可

closeメソッドはエラー処理などでアサイン状態にかかわらずクローズしたいケースを想定してアサインの制約は設定していません。

使い方

GreetingServiceは以下のようにして使います。

val url = UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    val opened = srv.open()
    opened.helloLocalArea("local")
    val assigned = opened.assign()
    assigned.helloLocalArea("local")
    assigned.helloWorldWide("world")

openメソッドでオープンした後のGreetingServiceではhelloLocalAreaメソッドでローカル向けのメッセージ送信ができます。

assignメソッドでアサインした後のGreetingServiceではhelloLocalAreaメソッドでのローカル向けのメッセージ送信に加えて、helloWorldWideメソッドで全世界向けのメッセージ送信が可能になっています。

コンパイルエラーによる検出

誤った使用をコンパイルエラーで検出できるか見ていきます。

まずオープン前のGreetingServiceでメッセージを送る場合です。

val url = UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    srv.helloLocalArea("local")

無事helloLocalAreaメソッドがコンパイルエラーになりました。

[error] .../Main.scala:50: Cannot prove that sample.GreetingService.No =:= sample.GreetingService.Yes.
[error]     srv.helloLocalArea("local")
[error]                       ^

次にオープン済みで未アサインの場合に全世界向けメッセージ送信する場合です。

val url = UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    val opened = srv.open()
    opened.helloLocalArea("local")
    opened.helloWorldWide("world")

こちらも無事helloWorldWideメソッドがコンパイルエラーになりました。

[error] .../Main.scala:58: Cannot prove that sample.GreetingService.No =:= sample.GreetingService.Yes.
[error]     opened.helloWorldWide("world")
[error]                          ^
ユーティリティメソッド

GreetingServiceを引数にするユーティリティメソッドでも型パラメタのチェックを行うことができます。型チェックをしたくない項目は「_」にしておけばよいようです。

def sendToLocalAreal(srv: GreetingService[Yes, _]) {
    srv.helloLocalArea("local")
  }

  def sendToWorldWide(srv: GreetingService[Yes, Yes]) {
    srv.helloWorldWide("world")
  }

使い方は以下になります。

val url = UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    val opened = srv.open()
    sendToLocalAreal(opened)
    val assigned = opened.assign()
    sendToWorldWide(assigned)

状態遷移

「状態」の発展形です。

表現できる範囲は限定的ですが、状態遷移を型パラメタを使って実現することも可能です。

以下のGreetingServiceは前述のGreetingServiceと機能は同じですが、型制約の実現方法を変更しています。

具体的には、GreetingServiceは未オープン⇔オープン済⇔アサイン済の階層構造で状態遷移を行うので、この状態遷移の構造を反映した型を使用します。

import java.net.URL
import GreetingService._

case class GreetingService[+State <: OpenState] private (private val _url: URL) {
  def open()(implicit ev: State <:< Closed) = new GreetingService[Opened](_url)
  def close()(implicit ev: State <:< Opened) = new GreetingService[Closed](_url)
  def assign()(implicit ev: State <:< Opened) = new GreetingService[Assigned](_url)
  def release()(implicit ev: State <:< Assigned) = new GreetingService[Opened](_url)

  def helloLocalArea(msg: String)(implicit ev: State <:< Opened) {
    // do send to local area
  }

  def helloWorldWide(msg: String)(implicit ev1: State <:< Assigned) {
    // do send to world wide
  }
}

object GreetingService {
  sealed trait OpenState
  sealed trait Closed extends OpenState
  sealed trait Opened extends OpenState
  sealed trait Assigned extends Opened

  def create(url: URL) = new GreetingService[Closed](url)
}

まず型制約の用の型としてOpenStateトレイトを定義し、このサブトレイトとしてClosedトレイトとOpenedトレイトを、OpenedトレイトのサブトレイトとしてAssignedトレイトを定義しました。

OpenedトレイトとAssignedトレイトのサブトレイト関係で状態遷移の階層構造を表現しています。

OpenState, Opened, Assignedのトレイトを使用してopen, close, assign, releaseの各メソッドが「<:<」による型制約定義によりGreetingServiceの状態によって以下の制約を課しています。この例ではトレイト間のサブクラス関係も利用するので「=:=」ではなく「<:<」を使用しています。

open
Closedの時のみ使用可(オープンもアサインのされていない時のみ可)
close
Openedの時のみ使用可(オープン済みの時のみ可)
assign
Openedの時のみ使用可(オープン済みの時のみ可)
release
Assginedの時のみ使用可(オープン済み、アサイン済みの時のみ可)

前出の「状態」との違いはassignメソッドの制約条件が若干違うことですが、基本的には同等のものとなっています。

使い方

GreetingServiceの使い方は前出の「状態」のものと同じです。

val url = sample.UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    val opened = srv.open()
    opened.helloLocalArea("local")
    val assigned = opened.assign()
    assigned.helloLocalArea("local")
    assigned.helloWorldWide("world")
コンパイルエラーによる検出

コンパイルエラーによるエラー検出も前出の「状態」のもの基本的には同じになります。

まずオープン前のGreetingServiceでメッセージを送る場合です。

val url = sample.UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    srv.helloLocalArea("local")

無事helloLocalAreaメソッドがコンパイルエラーになりました。

[error] .../Main.scala:36: Cannot prove that sample2.GreetingService.Closed <:< sample2.GreetingService.Opened.
[error]     srv.helloLocalArea("local")
[error]                       ^

次にオープン済みで未アサインの場合に全世界向けメッセージ送信する場合です。

val url = sample.UrlBuilder.builder.protocol("http").host("example.com").build
    val srv = GreetingService.create(url)
    val opened = srv.open()
    opened.helloLocalArea("local")
    opened.helloWorldWide("world")

こちらも無事helloWorldWideメソッドがコンパイルエラーになりました。

[error] .../Main.scala:44: Cannot prove that sample2.GreetingService.Opened <:< sample2.GreetingService.Assigned.
[error]     opened.helloWorldWide("world")
[error]                          ^

まとめ

Generalized type constraintsはかなり以前(2.8)からある機能ですが、新しい使い方などが登場していないかという確認の意味もあり利用方法について調べてみました。

特に新たしい使用方法は見つけることはできませんでしたが、調べている中で「状態/状態遷移」の実装方法について思いついたことがあったので形にしてみました。うまくするとフィットする利用方法が見つかるかもしれません。

Generalized type constraintsについては、当初は暗黙変換も対象にする型制約「<%<」があったのですが、最近の版では使えなくなっているようです。この型制約があると「型によって有効になるメソッド」の応用で色々と技が使えそうだったのですが、色々負担の大きそうな機能なのでいたしかたなさそうです。

いずれにしてもGeneralized type constraintsが強力な機能であることを再認識しました。使えそうなポイントを見つけて製品開発にも適用していきたいと思います。

諸元

  • Java 1.7.0_75
  • Scala 2.11.7

2016年6月30日木曜日

foldの小技

関数型プログラミングではfold系の畳込み処理が多用されます。

composableという観点からはMonoid (Scala Tips / multiplication) やState Monad (State的状態機械2016)を使う形に持ち込むのが筋がよいと思いますが、1回限りのロジックに適用するには手間が掛かり過ぎる感じです。

このためListやVectorのfoldLeft/foldRightメソッドを使った畳込みを使うケースが多くなります。

たとえばリストを逆にするリバース処理をfoldLeftによる畳込みで書くと以下のようになるのが普通です。

def reverse[T](xs: List[T]): List[T] = {
    xs.foldLeft(List.empty[T])((z, x) => x :: z)
  }

この畳込み処理で最近愛用しているのが以下のようにcase classを用いる方法です。

def reverse[T](xs: List[T]): List[T] = {
    case class Z(result: List[T] = Nil) {
      def apply(rhs: T) = Z(rhs :: result)
    }

    xs.foldLeft(Z())(_(_)).result
  }

リバース処理のような簡単なロジックだと「普通」の書き方の方がシンプルですが、少し複雑な処理になってくると逆にcase class方式の方がいい感じになります。(あくまで私見です)

少し複雑な場合

たとえばFloatのListによるデータの平均値を求める処理を以下のような畳込み処理で記述するとします。

def average(xs: List[Float]): Float = {
    val a = xs.foldLeft((0.0F, 0))((z, x) => (z._1 + x, z._2 + 1))
    a._1 / a._2
  }

この方式で、複雑なデータ構造の畳込みをしようとするとタプルを使うケースが多くなりますが、「_1」といった記述がでてくるため可読性がよくありません。少し複雑な処理を書いているとかなりストレスになります。

また、畳込み処理後に「a._1 / a._2」で平均値の計算をしますが、ロジックが畳込み部と最終結果計算部の2つに別れてしまうのも個人的にはやや不満です。

case class方式

この平均値の計算処理はcase classを用いる方式では以下のようになります。

def average(xs: List[Float]): Float = {
    case class Z(sum: Float = 0.0F, length: Int = 0) {
      def apply(rhs: Float) = Z(sum + rhs, length + 1)
      def result = sum / length
    }
    xs.foldLeft(Z())(_(_)).result
  }

まず重要なのは、case class内のインスタンス変数として適切な名前をつけることができるので可読性がよくなる点です。

case class Zでは、畳込み処理の中の一要素の累積処理をapplyメソッドで行っています。また、最終結果の平均値の計算をresultメソッドで行っています。このように処理を適切なメソッドに分けてさらに1つのクラス内にまとめて記述できるので、プログラムも書きやすいですし、可読性も高まります。

畳込み処理の記述は「xs.foldLeft(Z())(()).result」として書いていますが、これは必ずこの形になります。「xs.foldLeft(Z())(()).result」は決まり文句として覚えてしまえばよいので、ロジックの記述はクラスZに集中することができます。

また、この例では使っていませんがcase classのcopyコンストラクタ機能が積算処理の記述にとても便利です。

まとめ

foldLeft/foldRightによる畳込み処理を記述する際にcase classを使ってみたところ、具合がよかったので自分用のコーディングパターンにしているものを紹介しました。

case classは関数型言語的にはタプルの進化形で直積の意味を持ちますが、オブジェクト指向言語的には通常のクラスであり、インスタンス変数やメソッドなどを定義して使用することができます。今回のイディオムはある意味両方のパラダイムを融合させたものということができるかもしれません。

実用的な観点から重要なのは畳込み処理の関数型的な記述を、case classの実装というオブジェクト指向的な記述に置き換える事ができることです。ボクの場合はオブジェクト指向の方がプログラムは書きやすいので、case class方式の方が書いていて楽ということだと思います。

諸元

  • Java 1.7.0_75
  • Scala 2.11.7

2016年5月8日日曜日

Breezeで行列式

大規模データに対する本格的な分析を行うためには線形代数は避けて通れません。

そこでScalaで広く使われている線形代数ライブラリBreezeを調べてみることにしました。

最終的には大規模データに対して分散演算を行いたいのでSparkのMLlibが有力候補ですが、MLlibもBreezeを使っているようなのでMLlibを使うときにもそのまま役に立ちそうです。

一応電気工学科を出ているので線形代数は習ったはずなのですが、すっかり忘れてしまったので「ゼロから学ぶ線形代数」(以下、ゼロ線)を参考にしています。以下に出てくる用語はこのゼロ線をベースにします。

目的

Breezeの基本的な使い方として以下の項目を調べます。

  • ベクトルの表現
  • 行列の表現
  • 行列式の計算
  • 連立一次方程式の解

準備

build.sbtは以下になります。

name := "breeze-sample"

version := "1.0"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

val scalazVersion = "7.2.0"

libraryDependencies ++= Seq(
  "org.scalanlp" %% "breeze" % "0.12",
  "org.scalanlp" %% "breeze-natives" % "0.12",
  "org.scalanlp" %% "breeze-viz" % "0.12"
)

initialCommands in console := "import breeze.linalg._"

sbtをconsoleモードで起動する場合は以下になります。

$ sbt console

ベクトルの表現

BreezeではベクトルはDenseVectorオブジェクトで記述します。

ベクトル\(\vec a = \begin{pmatrix}1 \cr2 \cr3\end{pmatrix}\)をDenseVectorで作成すると以下になります。

scala> val a = DenseVector(1, 2, 3)
a: breeze.linalg.DenseVector[Int] = DenseVector(1, 2, 3)

行列の表現

行列式を求めるために、2つのベクトル\(\vec{a} = \begin{pmatrix}3 \cr2\end{pmatrix}\), \(\vec{b} = \begin{pmatrix}7 \cr{-}5\end{pmatrix}\)を作成します。

scala> val a = DenseVector(3, 2)
a: breeze.linalg.DenseVector[Int] = DenseVector(3, 2)
scala> val b = DenseVector(7, -5)
a: breeze.linalg.DenseVector[Int] = DenseVector(7, -5)

次に\(\vec{a}, \vec{b}\)から行列を作成します。

scala> val m = DenseMatrix(a, b)
m: breeze.linalg.DenseMatrix[Int] =
3  2
7  -5

ここで気になることがあります。

ゼロ線の記法では\(\vec{a} = \begin{pmatrix}3 \cr2\end{pmatrix}\), \(\vec{b} = \begin{pmatrix}7 \cr{-}5\end{pmatrix}\)から\(\begin{pmatrix}3 & 7 \cr2 & {-}5\end{pmatrix}\)という行列を作っていますが、DenseMatrixの表示形式では\(\begin{pmatrix}3 & 2 \cr7 & {-}5\end{pmatrix}\)になっています。

ゼロ線の表記方法と比べると転置した形になっています。これはDenseVectorがカラムベクトルであるものの要素を横に並べた表記になっているので、これに合わせてカラムベクトルを横に並べる表記にしているのではないかと推測します。

ここではこの解釈で確認を先にすすめることにします。

行列式の計算

行列の1次従属/1次独立の判定式は以下になります。(ゼロ線 P.26)

\(\vec{a} = \begin{pmatrix}a \cr c\end{pmatrix}\), \(\vec{b} = \begin{pmatrix}b \cr d\end{pmatrix}\)に対して、

\[\begin{matrix}ad {-} bc = 0 & ⇔ & \vec{a}と\vec{b}は1次従属 \cr ad {-} bc \ne 0 & ⇔ & \vec{a}と\vec{b}は1次独立\end{matrix}\]

この\(ad {-} bc\)の式を行列式(determinan)と呼び以下のように定義します。(ゼロ線 P.28)

\[\rm{det}\begin{pmatrix}a & b \cr c & d\end{pmatrix} = ad {-} bc\]

Breezeでは行列式の計算を行う関数detを提供しています。2つのベクトルからdet関数で行列式を計算する関数は以下になります。

def 行列式(a: DenseVector[Double], b: DenseVector[Double]): Double =
    det(DenseMatrix(a, b))

この関数を使って行列式の各種法則が動くことを以下のプログラムで確認しました。法則名はゼロ線によります。(ゼロ線 P.30)

val a = DenseVector(3.0, 2.0)
    val b = DenseVector(7.0, -5.0)
    val c = DenseVector(-5.0, 4.0)
    val k = 10.0
    println(s"行列式: det(a,b) = ${行列式(a, b)}")
    println(s"一致退化法則: det(a, a) = ${行列式(a, a)}")
    println(s"交代法則: det(a,b) = ${行列式(a, b)}, det(b,a) = ${行列式(b, a)}")
    println(s"分配法則: det(a, b + c) = ${行列式(a, b + c)}")
    println(s"分配法則: det(a, b) + det(a, c) = ${行列式(a, b) + 行列式(a, c)}")
    println(s"分配法則: det(a + b, c) = ${行列式(a + b, c)}")
    println(s"分配法則: det(a, c) + det(b, c) = ${行列式(a, c) + 行列式(b, c)}")
    println(s"スカラー倍法則: det(ka, b) = ${行列式(k * a, b)}")
    println(s"スカラー倍法則: det(a, kb) = ${行列式(a, k * b)}")
    println(s"スカラー倍法則: kdet(a, b) = ${k * 行列式(a, b)}")

結果は以下になります。

5 07, 2016 10:44:10 午前 com.github.fommil.jni.JniLoader liberalLoad
情報: successfully loaded /var/folders/s_/qy3xzz_s22g99nh0x8dgrfjm0000gn/T/jniloader5473535094494407732netlib-native_system-osx-x86_64.jnilib
行列式: det(a,b) = -28.999999999999996
一致退化法則: det(a, a) = 0.0
交代法則: det(a,b) = -28.999999999999996, det(b,a) = 28.999999999999996
分配法則: det(a, b + c) = -6.999999999999999
分配法則: det(a, b) + det(a, c) = -6.9999999999999964
分配法則: det(a + b, c) = 25.0
分配法則: det(a, c) + det(b, c) = 25.0
スカラー倍法則: det(ka, b) = -290.00000000000006
スカラー倍法則: det(a, kb) = -289.99999999999994
スカラー倍法則: kdet(a, b) = -289.99999999999994

以下の部分はBreezeがネイティブの数値計算ライブラリを使用していることを情報として出力しているようです。

5 07, 2016 10:44:10 午前 com.github.fommil.jni.JniLoader liberalLoad
情報: successfully loaded /var/folders/s_/qy3xzz_s22g99nh0x8dgrfjm0000gn/T/jniloader5473535094494407732netlib-native_system-osx-x86_64.jnilib
行列式

行列式の計算結果は-28.999999999999996となりました。期待していた結果は-29ですから、少し丸め誤差が出るようです。

\(ad {-} bc\)を文字通り計算すると丸め誤差が出るはずはないので、行列演算の汎用ロジックとして内部で何か難しい計算を行っているのだと推測できます。

Breezeを使う場合(あるいは線形代数ライブラリの一般的な仕様として)、丸め誤差が出ることを前提で考えて、必要な有効桁数で四捨五入していく必要があるということのようです。

この前提を受け入れると、(適当なところで四捨五入すると)行列式の計算結果として-29を得ることができました。

一致退化法則

一致退化法則は0.0を得ることができました。

交代法則

交代法則はベクトルを入れ替えることで行列値の符号が変わることが確認できました。

分配法則

以下の2つの計算が丸め誤差を許容すると同じ値-7になることが確認できました。

\begin{matrix}\rm{det}(\vec{a}, \vec{b} + \vec{c}) & ⇒ & \rm{det}\begin{pmatrix}3 & 7 + ({-}5) \cr 2 & {-}5 + 4 \end{pmatrix} \cr\rm{det}(\vec{a}, \vec{b}) + \rm{det}(\vec{a}, \vec{c}) & ⇒ & \rm{det}\begin{pmatrix}3 & 7 \cr 2 & {-}5\end{pmatrix} + \rm{det}\begin{pmatrix}3 & {-}5 \cr 2 & 4\end{pmatrix} \cr\end{matrix}

また以下の2つの計算が同じ値25になることが確認できました。

\begin{matrix}\rm{det}(\vec{a} + \vec{b}, \vec{c}) & ⇒ & \rm{det}\begin{pmatrix}3 + 7 & {-}5 \cr 2 + ({-}5) & 4 \end{pmatrix} \cr\rm{det}(\vec{a}, \vec{c}) + \rm{det}(\vec{b}, \vec{c}) & ⇒ & \rm{det}\begin{pmatrix}3 & {-}5 \cr 2 & 4\end{pmatrix} + \rm{det}\begin{pmatrix}7 & {-}5 \cr {-}5 & 4\end{pmatrix} \cr\end{matrix}

スカラー倍法則

以下の3つ計算とも丸め誤差を許容すると同じ値-290になることが確認できました。

\begin{matrix}\rm{det}(k\vec{a}, \vec{b}) & ⇒ & \rm{det}\begin{pmatrix}10 × 3 & 7 \cr 10 × 2 & {-}5 \end{pmatrix} \cr\rm{det}(\vec{a}, k\vec{b}) & ⇒ & \rm{det}\begin{pmatrix}3 & 10 × 7 \cr 2 & 10 × ({-}5)\end{pmatrix} \cr {}k\rm{det}(\vec{a}, \vec{b}) & ⇒ & 10 × \rm{det}\begin{pmatrix}3 & 7 \cr 2 & {-}5\end{pmatrix}\end{matrix}

連立一次方程式の解

行列式の応用として連立一次方程式の解を求めてみます。

行列式を使って連立一次方程式を解くための公式としてクラメールの法則があります。(ゼロ線 P.35)

連立方程式\(\left\{\begin{array}{ll}ax + by = e\\cx + dy = f\\\end{array}\right.\)

を\(\vec{a}=\begin{pmatrix}a \cr c\end{pmatrix}\)と\(\vec{b}=\begin{pmatrix}b \cr d\end{pmatrix}\)を用いて、\(x\vec{a} + y\vec{b} = \vec{p}\)と表すとき、\(\rm{det}(\vec{a}, \vec{b}) \ne 0\)ならば、解$x$, $y$がただ1つ存在し、

\[x={\rm{det}(\vec{p}, \vec{b}) \over \rm{det}(\vec{a}, \vec{b})},  y={\rm{det}(\vec{a}, \vec{p}) \over \rm{det}(\vec{a}, \vec{b})}\]

となる。

クラメールの法則を使って連立一次方程式を解く関数を以下に定義します。

def 連立一次方程式(a: DenseVector[Double], b: DenseVector[Double], p: DenseVector[Double]): (Double, Double) = {
    val x = det(DenseMatrix(p, b)) / det(DenseMatrix(a, b))
    val y = det(DenseMatrix(a, p)) / det(DenseMatrix(a, b))
    (x, y)
  }

この関数を使って以下の2元連立1次方程式を解きます。

\[\left\{\begin{array}{ll}3x + 7y = 1\\2x {-} 5y = 20\\\end{array}\right.\]

この方程式を以下の3つのベクトルで記述し、これに先ほどの関数を適用します。

\begin{matrix}\vec{a} & = & \begin{pmatrix}3 \cr 2\end{pmatrix} \cr\vec{b} & = & \begin{pmatrix}7 \cr {-}5\end{pmatrix} \cr\vec{p} & = & \begin{pmatrix}1 \cr 20\end{pmatrix}\end{matrix}

val a = DenseVector(3.0, 2.0)
    val b = DenseVector(7.0, -5.0)
    val p = DenseVector(1.0, 20.0)
    val (x, y) = 連立一次方程式(a, b, p)
    println(s"x = $x, y = $y")

実行結果は以下になりました。

x = 5.000000000000001, y = -2.0000000000000004

丸め誤差をならすと、結果は x = 5, y = 2 となります。

正しい解が得られたようです。

まとめ

線形代数ライブラリBreezeをお試しで使ってみました。

まず入り口ということで行列式と連立一次方程式を動かしてみましたが、法則通り動くことが確認できました。

実際に使ってみた結果の注意点として以下の2つが出てきました。

  • 行列の表記方法(ベクトルから作成した時の縦横方向)
  • 丸め誤差

行列の表記方法については、ドキュメントなどを調べてみた範囲ではよく分かりませんでした。連立一次方程式の解は正しくでてきたので、今回の解釈でよさそうなのではないかと考えています。継続して、使いながら解釈の補強をしていく予定です。

丸め誤差に関しては、確率計算のような用途ではほぼ問題にならないと推測しますが、今回応用例として用いた連立1次方程式では無視できない問題です。このあたりは使用時に注意する必要がありそうです。

諸元

  • Java 1.7.0_75
  • Scala 2.11.7
  • Breeze 0.12

2016年4月30日土曜日

型安全イコール判定 - ScalazとScalactic

Scalaプログラミングのはまりポイントとして頻出頻度が高く影響も甚大なのは、==メソッドとcontainsメソッドの型チェックだと思います。

この2つのメソッドは(多分Javaとの互換性の問題で)、引数に定義されている型がAnyであるため事実上型チェックが効かない仕様になっています。

具体的には以下のような処理がコンパイルエラーにならず通ってしまうという問題です。

scala> "1" == 1
"1" == 1
res112: Boolean = false

scala> List(1, 2, 3).contains("1")
List(1, 2, 3).contains("1")
res113: Boolean = false

いずれの場合も、常に判定結果はfalseになり、意図した処理でないことは明らかですがScalaコンパイラはエラーとして弾いてくれません。

Scalaは型チェックが厳しいので、きちんとプログラミングしていればケアレスミス的なバグはほとんどでないのですが、逆に出る場合は==メソッドとcontainsメソッドのあたりに集中するというのがボクの実感です。

例題

==メソッドとcontainsメソッドの問題がよく発生する例として、クラスで保持しているプロパティの型がOptionのケースがあります。

以下の例ではcityの型がOption[String]となっています。

case class Person(name: String, city: Option[String])

特によくバグになるパターンとしては最初に:

case class Person(name: String, city: String)

として(Optionなしの)String型で定義していたものを、機能追加などでOption[String]に変更した場合です。このような場合には、コンパイルエラーで影響箇所が検出されることを期待しているわけですが、==メソッドとcontainsメソッドを使っている場所はエラーにならず、ロジック的に常にfalseとなるため、即バグになってしまいます。

準備

説明では以下のクラスとデータを使用します。

case class Person(name: String, city: Option[String])
  val cities = Vector("Yokohama", "Kawasaki", "Kamakura")
  val person = Person("Taro", Some("Yokohama"))
  val persons = Vector(
    Person("Taro", Some("Yokohama")),
    Person("Jiro", Some("Kawasaki")))
ScalazとScalactic

==メソッドとcontainsメソッドの問題に対応するための機能を提供するライブラリとしてScalazとScalacticを使ってみます。

ScalazはMonadicプログラミング向けのライブラリの1機能(型クラスEqual)として==メソッド問題の解を提供しています。

一方Scalacticは、Scalatestのスピンオフ機能で==メソッド問題を中心にオブジェクト操作の便利機能を提供するライブラリです。

ダメロジック

ダメロジックとして==メソッドとcontainsメソッドの例を順にみていきます。

==メソッド

Personのcityプロパティの方はOption[String]なのでStringと比較しても意味がないのですが、以下のように==メソッドでは比較できてしまいます。

person.city == "Yokohama"

コンパイルは通りますが、実行結果は必ずfalseになってしまいます。

containsメソッド

Personの集まりからcitiesに登録された市に所属する人を抽出する処理です。

persons.filter(x => cities.contains(x.city))

本来比較できないPersonのcityプロパティのOption[String]と、citiesに入っているStringを比較していますが、コンパイルエラーになりません。

しかし、containsメソッドの結果は必ずfalseになるため、全体の実行結果は必ず空の集まりが返ってきてしまいます。

Scalazによる解

まずScalazの提供する機能を使ってみます。

準備として以下のimportを行います。

import scalaz._, Scalaz._
==メソッド

Scalazでは、型チェックあり版の==メソッドとして===メソッドを提供しています。

エラー検出

オブジェクトの同定比較として==メソッドの代わりに===メソッドを用いると型チェックしてエラー検出するようになります。

person.city === "Yokohama"
[error] .../src/main/scala/sample/Main.scala:66: type mismatch;
[error]  found   : String("Yokohama")
[error]  required: Option[String]
[error]       person.city === "Yokohama"
[error]                       ^

簡単に使えて効果抜群です。

対応

コンパイルエラーに対する対応は、比較の型を合わせる修正です。

person.city === Some("Yokohama")
containsメソッド

Scalazでは、型チェックあり版のcontainsメソッドとしてelementメソッドを提供しています。

エラー検出

オブジェクトの集まりでの存在確認にcontainsメソッドの代わりにelementメソッドを用いると型チェックしてエラー検出するようになります。

persons.filter(x => cities.element(x.city))
[error] .../src/main/scala/sample/Main.scala:67: type mismatch;
[error]  found   : Option[String]
[error]  required: String
[error]       persons.filter(x => cities.element(x.city))
[error]                                            ^

こちらも簡単に使えて効果抜群です。

対応

コンパイルエラーに対する対応は、比較の型を合わせる修正です。以下の例では、Optionのfoldメソッドを使ってみました。

persons.filter(_.city.fold(false)(cities.element))

Scalacticによる解

Scalacticで同定比較によるコンパイラエラー抽出を行うためには以下のimportを行います。

import scalactic._
import TypeCheckedTripleEquals._

「import TypeCheckedTripleEquals._」を他のものに変えると、同定比較の方式を変更することができます。

==メソッド

ScalacticではScalazと同様に同定比較によるコンパイラエラー抽出用に型チェックあり版の==メソッドとして===メソッドを提供しています。

エラー検出

オブジェクトの同定比較として==メソッドの代わりに===メソッドを用いると型チェックしてエラー検出するようになります。

person.city === "Yokohama"
[error] .../src/main/scala/sample/Main.scala:28: types Option[String] and String do not adhere to the type constraint selected for the === and !== operators; the missing implicit parameter is of type org.scalactic.CanEqual[Option[String],String]
[error]     person.city === "Yokohama"
[error]                 ^

こちらも簡単に使えて効果抜群です。

対応

コンパイルエラーに対する対応は、比較の型を合わせる修正です。

person.city === Some("Yokohama")
containsメソッド

ScalacticではScalazのelementメソッドに相当する機能はないようなので===メソッドを使って対応してみます。

エラー検出

containsメソッドの代わりにexistsメソッドを使い、同定比較に===メソッドを使ってみました。

persons.filter(x => cities.exists(_ === x.city))
[error] .../src/main/scala/sample/Main.scala:29: types String and Option[String] do not adhere to the type constraint selected for the === and !== operators; the missing implicit parameter is of type org.scalactic.CanEqual[String,Option[String]]
[error]     persons.filter(x => cities.exists(_ === x.city))
[error]                                         ^

containsメソッドを使うより若干ロジックが入りますが、気になるほどではないと思います。

対応

コンパイルエラーに対する対応は、比較の型を合わせる修正です。

persons.filter(x => cities.exists(y => x.city === Some(y)))

ScalazとScalacticの使い分け

型安全な同定比較機能としてScalazとScalacticで同等の使い勝手の機能が提供されていることが確認できました。

Scalazは汎用のMonadicプログラミング向けライブラリなので、Scalazを使用している場合はありがたく===メソッド、elementメソッドを使用するのがよいと思います。

Scalacticは同定比較機能として以下の機能を提供しています。

  • Torerance (値の誤差範囲を考慮した比較)
  • 同定比較方法のカスタマイズ (大文字小文字の区別の有無など)

また以下のユーティリティ機能も提供しています。

  • Or/Every (Validation結果の格納)
  • 事前条件の文字列補完
  • スナップショット用の文字列補完

こういったユーティリティ機能を使う場合にはScalacticが選択肢になります。

===メソッドだけ欲しい場合は、どちらを選んでもよいですが暗黙定義の影響範囲が少なそうなScalaticの方が若干使い勝手がよさそうです。

まとめ

地味な話題ですが結構開発工数に影響する==メソッドとcontainsメソッドの問題と、その解決策についてご紹介しました。

防御的プログラミングを心がけるなら==メソッドとcontainsメソッドは使わないぐらいの心持ちでもよいと思います。

Scalazは===メソッドだけのために採用するのは心理的障壁が高そうですが、その場合はScalacticを選ぶとよいでしょう。

Scalacticは、事前条件の文字列補完、スナップショット用の文字列補完の機能が地味に便利なので、そういう意味でも使い出があると思います。

諸元

  • Java 1.7.0_75
  • Scala 2.11.7
  • Scalaz 7.2.0
  • Scalactic 3.0.0-M15

2016年3月31日木曜日

State的状態機械2016

昨年Scalaにおける状態機械の実装戦略について以下の記事で検討しました。

OOP編はcase classで作った汎用の状態機械オブジェクトをOOP的な枠組みで利用しました。

そして、FP編ではOOP編で作った汎用の状態機械オブジェクトをそのままFP的な枠組みで利用できることを確認しました。

もちろん、これはOOPでもFPでも使用できる汎用の状態機械オブジェクトの作り方を確認するのが目的で、幸いどちらの目的にも使用できるものを作成することができました。

この記事で検討した方針で一年間プログラミングをしてきましたが、改良のポイントが出てきたので、これを反映した2016年版の状態機械オブジェクトを考えてみます。

ParseState

「状態+状態遷移を表現するオブジェクト兼代数的データ型」であるParseStateですが、2016年版では以下の点を変更しています。

  • イベントの入力用メソッドであるeventメソッドとendEventメソッドを統合
  • イベントをParseEventオブジェクトで表現
  • イベントに対する反応を新状態のParseStateと処理結果ParseResultの組を返すようにした

2015年版と機能的には変わらないのですが、Stateモナドの形に合わせることでストリーム処理で使いやすい構造になっています。

sealed trait ParseState {
  def apply(event: ParseEvent): (ParseState, Option[ParseResult])
}

case object InitState extends ParseState {
  def apply(event: ParseEvent): (ParseState, Option[ParseResult]) = event match {
    case CharEvent(',') => (InputState(Vector(""), ""), None)
    case CharEvent('\n') => (InitState, Some(ParseResult.empty))
    case CharEvent(c) => (InputState(Vector.empty, c.toString), None)
    case EndEvent => (InitState, None)
  }
}

case class InputState(
  fields: Vector[String],
  candidate: String
) extends ParseState {
  def apply(event: ParseEvent): (ParseState, Option[ParseResult]) = event match {
    case CharEvent(',') => (InputState(fields :+ candidate, ""), None)
    case CharEvent('\n') => (InitState, Some(RecordParseResult(fields :+ candidate)))
    case CharEvent(c) => (InputState(fields, candidate :+ c), None)
    case EndEvent => (InitState, Some(RecordParseResult(fields :+ candidate)))
  }
}
ParseEvent

2016年版では入力パラメタとして入力イベントを表現するParseEventを使用するようにしました。

2015年版では入力の終了をendEventメソッドでハンドリングしていましたが、2016年版ではEndEventイベントで扱うようになっています。

製品開発で実際に使用した経験で、この構造の方がストリーミング処理に適していることが分かりました。

具体的には、EndEventなどのイベントで状態をクリアな状態に戻すことができるので、scalaz-streamといったReactive-Streams的なパイプラインの部品として使用することが容易になります。

trait ParseEvent
case class CharEvent(c: Char) extends ParseEvent
case object EndEvent extends ParseEvent
ParseResult

計算結果を表現するParseResultは以下になります。

2015年版では、ParseStateが結果を管理していましたが、2016年版では結果をParseResultで外部に出力するようにしました。

ParseEventと同様に製品開発で実際に使用した経験で、Stateモナドとの相性がよいことが分かりました。

sealed trait ParseResult {
  def getRecord: Option[Vector[String]]
}
case object NoneParseResult extends ParseResult {
  def getRecord = None
}
case class RecordParseResult(record: Vector[String]) extends ParseResult {
  def getRecord = Some(record)
}

object ParseResult {
  val empty = RecordParseResult(Vector.empty)
}

Stateモナド

それでは、新版のParseStateをStateモナドで使ってみましょう。

2015年版に比べると結果を取り出す処理が若干複雑になっていますが、それほど大きな違いはありません。

Stateモナドで問題なく使えることが確認できました。

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: ParseEvent) = State((s: ParseState) => s.apply(event))

  def parse(events: Seq[Char]): Seq[String] = {
    val xs = events.toStream.map(CharEvent) :+ EndEvent
    val t = xs.traverseS(action)
    val r = t.eval(InitState)
    r.flatten.flatMap(_.getRecord).flatten
  }

  def parseAnnotated(events: Seq[Char]): Seq[String] = {
    val xs: Stream[ParseEvent] = events.toStream.map(CharEvent) :+ EndEvent
    val t: State[ParseState, Stream[Option[ParseResult]]] = xs.traverseS(action)
    val r: Stream[Option[ParseResult]] = t.eval(InitState)
    r.flatten.flatMap(_.getRecord).flatten
  }
}

scalaz-stream版状態機械

次はscalaz-streamのProcessモナドです。

Scala的状態機械/FP編の「scalaz-stream版状態機械」と基本的には同じで、ParseStateの入出力が変更された部分に対応しています。

大きな違いとしては2015年版はParseState自身がストリームを流れてくる構造になっており、ParseStateから計算結果を取り出す処理になっていました。

それに対して、今回はParseStateによる処理結果であるParseResultがストリームを流れてくるので、これを取り出す構造になっています。

いずれにしても、OO版としても使える汎用の状態機械オブジェクトをscalaz-streamのProcessモナドで使えることが確認できました。

import scalaz._, Scalaz._
import scalaz.stream._

object ParserProcessMonadStateMonad {
  def fsm(state: ParseState): Process1[ParseEvent, ParseResult] = {
    Process.receive1 { evt: ParseEvent =>
      ParserStateMonad.action(evt).run(state) match {
        case (s, Some(r)) => Process.emit(r) fby fsm(s)
        case (s, None) => fsm(s)
      }
    }
  }

  def parse(xs: Seq[Char]): Seq[String] = {
    val events = xs.map(CharEvent) :+ EndEvent
    val source: Process0[ParseEvent] = Process.emitAll(events)
    val pipeline: Process0[ParseResult] = source.pipe(fsm(InitState))
    pipeline.map(_.getRecord).pipe(process1.stripNone).toList.last
  }

  import scalaz.concurrent.Task

  def parseTask(xs: Seq[Char]): Task[Seq[String]] = {
    val events = xs.map(CharEvent) :+ EndEvent
    val source: Process0[ParseEvent] = Process.emitAll(events)
    val pipeline: Process[Task, ParseResult] = source.pipe(fsm(InitState)).toSource
    pipeline.map(_.getRecord).pipe(process1.stripNone).runLastOr(Nil)
  }
}

まとめ

状態機械の実装方法について、1年間の実践経験を踏まえて改良版をまとめました。

状態機械の実装方法は色々な選択肢がありますが、OOPかつFPを同時に満たすものとなると選択肢が限られてきます。

OOP化はそれほど難しくないので、焦点はFP化ですが、StateモナドやProcessモナドで使用することを前提にした上で、自然な形の実装方法に落とし込めたと思います。

諸元

  • Java 1.7.0_75
  • Scala 2.11.7

2016年2月29日月曜日

ScalaでXSLT

Scalaで半構造データ的なデータ処理に対してどのようなアプローチをとっていくのかは重要な論点の一つだと思いますが、引き続きXMLも有力な選択肢だと思います。

XMLはXML文書をプログラム内にデータ構造として取り込むだけだとあまり面白みはありませんが、XPath, XSLT, XQueryといったデータ操作用の機能を使用すると応用の範囲がぐっと広がります。

この中でもXPathとXSLTはJavaの基本APIに入っているので、Scalaからもシームレスに使うことができます。

そこでScalaでのXSLT使い方を整理してみました。

準備

処理対象とするXML文書です。

<userlist>
  <user zip="221" city="Yokohama" name="Taro"/>
  <user zip="248" city="Kamakura" name="Hanako"/>
</userlist>

XSLTのスタイルシートとして以下のものを使用します。

上記のXML文書をHTMLの表に整形します。

<?xml version="1.0"?>
<xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns="http://www.w3.org/1999/xhtml">
<xsl:strip-space elements="*"/>
<xsl:template match="/">
    <html>
      <table border="true">
 <thead>
   <th>Name</th>
   <th>Zip</th>
   <th>City</th>
 </thead>
 <tbody>
          <xsl:apply-templates select="/userlist/user"/>
 </tbody>
      </table>
    </html>
  </xsl:template> 

  <xsl:template match="user">
    <tr>
      <td><xsl:value-of select="@name"/></td>
      <td><xsl:value-of select="@zip"/></td>
      <td><xsl:value-of select="@city"/></td>
    </tr>
  </xsl:template>
</xsl:stylesheet>

xsltprocコマンドでXML文書の変換を行うと以下のようにXHTML文書が出力されます。

$ xsltproc app.xsl data.xml
<?xml version="1.0"?>
<html xmlns="http://www.w3.org/1999/xhtml"><table border="true"><thead><th>Name</th><th>Zip</th><th>City</th></thead><tbody><tr><td>Taro</td><td>221</td><td>Yokohama</td></tr><tr><td>Hanako</td><td>248</td><td>Kamakura</td></tr></tbody></table></html>

このXHTML文書を見やすいように整形すると以下になります。

<html xmlns="http://www.w3.org/1999/xhtml">
  <table border="true">
    <thead>
      <th>Name</th>
      <th>Zip</th>
      <th>City</th>
    </thead>
    <tbody>
      <tr>
        <td>Taro</td>
        <td>221</td>
        <td>Yokohama</td>
      </tr>
      <tr>
        <td>Hanako</td>
        <td>248</td>
        <td>Kamakura</td>
      </tr>
    </tbody>
  </table>
</html>

XSLTプロセッサを使う

スタイルシートとXMLデータの両方をStringで受取り、変換結果をStringで返す処理を関数「StringのスタイルシートでStringのXMLデータを変換」として作成しました。

def StringのスタイルシートでStringのXMLデータを変換(stylesheet: String, data: String): String = {
    val stylesource = new StreamSource(new StringReader(stylesheet))
    val transformer = TransformerFactory.newInstance().newTransformer(stylesource)
    val source = new StreamSource(new StringReader(data))
    val buf = new StringWriter()
    val result = new StreamResult(buf)
    transformer.transform(source, result)
    buf.toString
  }

JavaのXSLTプロセッサであるjavax.xml.transform.Transformerは、スタイルシート、XML文書、変換結果のそれぞれに対して以下の入力の選択肢があります。

  • 文字列
  • DOM
  • SAX

ここでは、スタイルシート、XML文書、変換結果のすべてに文字列を使う組合せを実装しています。

XMLリテラルとDOM

ScalaでXMLを扱う時に少しややこしいのはXMLリテラルの存在です。

XMLリテラルはScalaの文法に組み入れられており、さらにScala的、DSL的なAPIで木構造の操作を簡単に行えるようになっているメリットがある反面、XMLの共通機能であるXPath、XSLTを使用することができない、というデメリットがあります。

この問題への対応方法は、DOMやSAXといったJavaが提供しているパーサーとXMLリテラルの相互変換です。

以下ではこの相互変換で一番楽な方法である文字列を媒介とした変換を採用しました。スタイルシートをXMLリテラルのオブジェクトであるscala.xml.Nodeで受取り、それを文字列表現に変換して「StringのスタイルシートでStringのXMLデータを変換」関数に渡しています。

def XMLリテラルのスタイルシートでStringのXMLデータを変換(stylesheet: scala.xml.Node, data: String): String =
    StringのスタイルシートでStringのXMLデータを変換(stylesheet.toString, data)

文字列表現を媒介にする方法は一般的には十分ですが、大規模データ操作などで性能が求められる場合は適さないかもしれません。

そのようなケースでは、javax.xml.transform.sax.SAXSourceをextendsしたXMLリテラル用のSourceを作成して対処するとよいでしょう。XMLリテラル用のSAXパーサーorg.xml.sax.XMLReaderを作成し、このSAXSourceでラップするような形になります。

使ってみる

スタイルシートをXMLリテラルで、XML文書を文字列で受け取って、変換結果を文字列で受け取る関数「XMLリテラルのスタイルシートでStringのXMLデータを変換」で処理するプログラムは以下になります。

生文字列リテラルでも問題はないですが、やはりXML専用のXMLリテラルでXSLTによる変換処理を記述できると便利です。

def main(args: Array[String]) {
    val stylesheet = <xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns="http://www.w3.org/1999/xhtml">
<xsl:strip-space elements="*"/>
<xsl:template match="/">
    <html>
      <table border="true">
 <thead>
   <th>Name</th>
   <th>Zip</th>
   <th>City</th>
 </thead>
 <tbody>
          <xsl:apply-templates select="/userlist/user"/>
 </tbody>
      </table>
    </html>
  </xsl:template> 

  <xsl:template match="user">
    <tr>
      <td><xsl:value-of select="@name"/></td>
      <td><xsl:value-of select="@zip"/></td>
      <td><xsl:value-of select="@city"/></td>
    </tr>
  </xsl:template>
</xsl:stylesheet>
    val data = """<userlist>
  <user zip="1234567" city="Yokohama" name="Taro"/>
  <user zip="1234567" city="Kamakura" name="Hanako"/>
</userlist>
"""
    val result = XMLリテラルのスタイルシートでStringのXMLデータを変換(
      stylesheet, data)
    println(result)
  }

プログラム実行の結果、無事以下の出力が得られました。

<?xml version="1.0" encoding="UTF-8"?><html xmlns="http://www.w3.org/1999/xhtml"><table border="true"><thead><th>Name</th><th>Zip</th><th>City</th></thead><tbody><tr><td>Taro</td><td>1234567</td><td>Yokohama</td></tr><tr><td>Hanako</td><td>1234567</td><td>Kamakura</td></tr></tbody></table></html>

諸元

  • Java 1.7.0_75
  • Scala 2.11.7

2016年1月31日日曜日

[SDN] 例外とNothing

try/catchで例外処理を書く場合、以下のように例外に対する共通処理を羅列する形になることがあります。

すべての例外に対して同じ処理を行う場合には、大本のjava.lang.Exception(場合によってはjava.lang.Throwable)でキャッチすればよいですが、以下のように例外毎に少しずつ処理が異なる場合には羅列せざるを得ません。

def doSometing(): String = {
  try {
    doSomesing()
  } catch {
    case e: FileNotFoundException =>
      log("file not found")
      throw e
    case e: InterruptedIOException =>
      log("intterupted io")
      throw e
    case e: IOException =>
      log("io")
      throw e
    case NonFatal(e) =>
      log("non fatal")
      throw e
  }
}

上記のプログラムでは以下の処理が例外の共通処理になります。

log("file not found")
throw e

「例外毎に異なったメッセージをログに出力して、受け取った例外を再送出する処理」ですが、ログに出力するメッセージの文言が例外毎に変わってきます。

共通処理化

「例外毎に異なったメッセージをログに出力して、受け取った例外を再送出する処理」を関数化して再利用するために以下の関数を作ってみました。

この処理は例題なので単純なものになっていますが、製品開発ではもっと細かくて複雑な処理が必要になってくる可能性があります。また、システム共通のエラー処理を変更する場合には、エラー処理のロジックが一箇所に集まっていた方が取り回しが楽ということもあります。

def handleError(message: String, e: Throwable) {
  log(message)
  throw e
}

この関数は以下のように例外処理に組み込みます。

try {
    doSomesing()
  } catch {
    case e: FileNotFoundException =>
      handleError("file not found", e)
    case e: InterruptedIOException =>
      handleError("intterupted io", e)
    case e: IOException =>
      handleError("io", e)
    case NonFatal(e) =>
      handleError("non fatal", e)
  }

一見これでよいように思いますが、実は以下のようなコンパイルエラーが出てしまいます。

...
[error]  found   : Unit
[error]  required: String
[error]         handleError("file not found", e)
[error]                    ^
...
[error]  found   : Unit
[error]  required: String
[error]         handleError("intterupted io", e)
[error]                    ^
...
[error]  found   : Unit
[error]  required: String
[error]         handleError("io", e)
[error]                    ^
...
[error]  found   : Unit
[error]  required: String
[error]         handleError("non fatal", e)
[error]                    ^
[error] four errors found

その理由は、handleError関数の返却値がUnitであるため、例外処理を行っているdoSomething関数の返却値Stringと型が合わないためです。

この問題の対策が今回のテーマです。

Stringを返す

利用者側の関数の返却値とhandleError関数の返却値が合わない問題を解決する方法として、返却する型を明示したhandleError関数を用意する方法があります。

たとえば以下のようなhandleErrorString関数を用意する方法です。

def handleErrorString(message: String, e: Throwable): String = {
  log(message)
  throw e
}

しかし、この方法を採ると必要な型ごとに1つ関数を用意する必要があります。用途によっては考えられる解決策ですが、汎用的に適用できる方法ではありません。

型パラメータ

1つの解としては以下のように型パラメータを使う方法があります。

def handleError[T](message: String, e: Throwable): T = {
  log(message)
  throw e
}

handleError関数の使用元が必要とする型が自動的に型Tに設定されhandleError関数の返却値となります。このためコンパイルエラーになりません。

Nothing

前述の型パラメータを使う方法でもよいのですが、Scalaにはこのような目的に使用できるNothingという型があります。

Nothingはすべての型のサブクラス、という特殊な位置付けの型です。一種のワイルドカードのような型です。

以下のようにhandleError関数の返却値をNothingとすると、doSomething関数がどのような型を返すことになっても、そのまま利用することができます。

def handleError(message: String, e: Throwable): Nothing = {
  log(message)
  throw e
}

この例のように最後に例外を送出する処理を行う関数は、実際には値は返すことはないので、返却値の型をNothingしても問題はなく、利用範囲は大きく広がります。

今回の問題に対しては前述の型パラメータを使う方法でも対処可能ですが、Nothingを使った方がより意図が明確になると思います。

参考

未実装のメソッドを定義するのに便利な「???」メソッドも返却値にこのNothingを使っています。「???」メソッドの定義は以下のようになっています。

def ??? : Nothing = throw new NotImplementedError

諸元

  • Java 1.7.0_75
  • Scala 2.11.7