2016年11月30日水曜日

よこはまクラウド勉強会: OFP & OFAD Deep Dive with Reactive Streams

Qcon Tokyo 2016で「オブジェクト‐関数型プログラミングからオブジェクト‐関数型分析設計へ~クラウド時代のモデリングを考える」と題してOFAD(Object-Functional Analysis and Design)についてお話させていただきました。

テーマはOFADですが、OFADの前提としてモダンなFP(Functional Programming)を前提としたOFP(Object-Functional Programming)の知識、さらに基本的なOOADの知識が必要なので、非常に広範囲の内容を圧縮して詰め込んだ形になってしまいました。

そこで、もう少し時間を取って説明することができればよいと思っていたのですが、横浜クラウド勉強会 で機会を頂くことができました。

QCon Tokyoでは50分に収めましたが、こちらの方は少し脱線しながら2時間程度のセッションになりました。その後、Reactive Streamsのハンズオンという構成です。

Reactive Streamsの立ち位置

QCon向けの資料をまとめていて感じたのはReactive StreamsがOFP(Object-Functional Programming)のキーテクノロジーではないか、ということです。

Reactive Streamsは以下の2つの方向性があると考えています。

  • 即応性、スケーラビリティのためのメカニズム
  • FP(Functional Programming)の適用範囲を広げる

ここでのFPは純粋関数型プログラミングを意図しています。

前者は今後のクラウドアプリケーションの方向としては当然の方向性です。この方向性を追求する場合には、FPの純粋性を犠牲にしても実行時性能やバックプレッシャーなどのプロトコル拡張を追求することになります。

一方、後者はFPの純粋性は守りながら、FPを外部入出力や大規模データ処理、ストリーミング処理に適用することを目指すものです。

Reactive Streamsは即応性やスケーラビリティという点で注目されていますが、ボクの関心事はそのことよりもFPの適用範囲を広げることに大きく寄与するのではないかという点です。ボクは現時点では後者を重視しているので、(Akka Streamsではなく)scalazと組み合わせて使えるscalaz-streamを愛用しています。

セッションでもお話したOFPのコツはつまるところOOP(Object-Oriented Programming)とFPを適材適所で、ということですが整理すると以下の方針になります。

  • アルゴリズム系の副作用を伴わない処理はFP
  • コンポーネントのファサード部(API/SPI)はOOP
  • OOADのモデリングの実現部はOOP
  • その他はできる限りFP

「できる限りFP」にしたい理由は以下のものです。

  • バグの発生率が圧倒的に少ない。
  • アルゴリズムを効率よく記述できる。
  • 並列/並行/分散処理時代への助走。
  • 将来の証明プログラミングへの備え。

現時点での最大の魅力は「バグの発生率が圧倒的に少ない」がリファクタリングに大きく寄与する点です。長期間持続的に開発を続けるシステムはこの理由だけでOFPを採用する価値があると思います。そして、長期的には「並列/並行/分散処理時代への助走」、「将来の証明プログラミングへの備え」という観点から必須のプログラミング・スタイルになるとすると、先取りして取り入れておきたいところです。

ここで問題となるのは「その他はできる限りFP」です。

FPには外部入出力処理や状態を持った処理の記述が大変という問題があります。スライドに書いたように、モナドの登場で外部入出力処理や状態を持った処理の記述が「困難」から「可能」になったのは大きな前進ですが、OFPの観点からはかならずしも「便利」とは言えないと思います。またScalaの特殊事情として文法的な制約でHaskell程簡明には書けないということもあります。

「できる限りFP」とはいえ便利でないものは使わなくなるのが道理で、対策をとらないと結局OOPの部分が大きく残ってしまう事になってしまいます。

Reactive Streamsがキーテクノロジーではないか、という期待はまさにこの問題の解消に大きく寄与するのではないかという点です。

scalaz-streamが提供するProcessモナドも一種のモナドですが、概念的に分かりやすいのとリソース管理やフロー制御という伝統的なOOPによる外部入出力でも難しかった問題が解決されています。実際に製品開発で使ってみてこの便利さを体感しました。

このような経験からReactive Streamを開発の基盤とすることで、OFPの多くの部分をFP側に倒すことが可能になるのではないかと期待しているわけです。

Reactive Streamsハンズオン

そんな思いもあり、セッションの後半はReactive Streamsのハンズオンにしました。以下にソースコードがあります。

このハンズオンのソースはセッションの朝に急ごしらえで作ったものなので、動作確認などはあまりできておらず、当日会場で調整しながら使用したものです。その点はご留意下さい。

ここでの趣旨は、(即応性、スケーラビリティではなく)FP成分を重視したReactive Streamsについて、プログラミングレベルでのイメージをつかんでいただくことです。

以下簡単に説明します。

ステップ1: Hello World

ステップ1はscalaz-streamのHello World的なプログラムです。scalaz-streamの最小限の使い方を体験することが目的です。

mainメソッドの第1引数に入力ファイル名を、第2引数に出力ファイル名を指定すると入力ファイルを出力ファイルに複写するプログラムです。

ハンズオンの問題は以下になります。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step1 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] = ???

  def converterProcess(in: String, out: String): Process[Task, Unit] = ???
}

converter関数とconverterProcess関数を実装します。

実装例

ステップ1の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step1 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).pipe(text.utf8Encode).to(io.fileChunkW(out))
}

converter関数はconverterProcess関数から返されたProcessモナドをrunしてTaskモナドにします。

converterProcess関数はscalaz-streamのパイプラインをProcessモナドという形で構築して返します。

ステップ2: Monadic API

ステップ1はscalaz-streamの最小限の使い方でした。

ステップ2ではscalaz-streamのパイプラインが、通常のMonadic APIとして使えることを体験します。ここでいうMonadic APIはJavaでいう所のStream APIで、FunctorやMonadによるパイプラインをベースにしたAPIです。(Streamは色々な意味付けがされていてミスリーディングな面もあるのでここでは本ブログではMonadic APIと呼んでいます。)

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step2 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val player = args(2)
    val t = converter(in, out, player)
    t.run
  }

  def converter(in: String, out: String, player: String): Task[Unit] =
    converterProcess(in, out, player).run

  def converterProcess(in: String, out: String, player: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).filter(isPlayer(player)).map(toYear).map(_ + "\n").
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def isPlayer(player: String)(record: Vector[String]): Boolean = ???

  def toYear(record: Vector[String]): String = ???
}

ステップ1との違いは以下の3つの関数をmapコンビネータ、filterコンビネータでパイプラインに組み込んでいることです。

  • toRecord関数
  • isPlayer関数
  • toYear関数

これらの関数が通常の関数であること、そして簡単にProcessモナドのパイプラインに組み込むことができる点を体験するのが目的です。

実装例

ステップ2の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step2 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val player = args(2)
    val t = converter(in, out, player)
    t.run
  }

  def converter(in: String, out: String, player: String): Task[Unit] =
    converterProcess(in, out, player).run

  def converterProcess(in: String, out: String, player: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).filter(isPlayer(player)).map(toYear).map(_ + "\n").
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def isPlayer(player: String)(record: Vector[String]): Boolean =
    record.lift(0) == Some(player)

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"
}

toRecord関数、isPlayer関数、toYear関数はごく普通の関数です。これらを簡単にProcessモナドのパイプラインに組み込んで動作することが確認できました。

ステップ3: フロー制御

ステップ3は一種のフロー制御であるチャンク化の体験です。

通常のMonadic APIは構造上フロー制御を行うことができませんが、Processモナドではpipeコンビネータなどの仕組みによってフロー制御を可能にしています。

ここが通常のMonadic APIに対するReactive Steramsの優位点で、外部入出力を効率的に処理することを可能にする拡張となっています。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step3 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).
      chunk(1000).map(groupByYear).pipe(process1.unchunk).
      map(toYear).
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String = ???

  def groupByYear(records: Vector[Vector[String]]): Vector[Vector[String]] = ???
}

注目するポイントはパイプライン中の「chunk(1000).map(groupByYear).pipe(process1.unchunk)」の部分です。パイプライン中のchunk関数とpipeコンビネータで適用したprocess.unchunk関数によって、パイプラインを流れるデータを1000個単位でチャンク化しています。

mapコンビネータで指定されているgroupByYear関数はチャンク化したデータに対する処理を行うようになっています。

チャンク化は入出力処理で性能向上するための必須処理なので、これを自動的に行なってくれる部品が提供されていることは生産性に大きく寄与します。

実装例

ステップ3の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._

object Step3 {
  def main(args: Array[String]) {
    val in = args(0)
    val out = args(1)
    val t = converter(in, out)
    t.run
  }

  def converter(in: String, out: String): Task[Unit] =
    converterProcess(in, out).run

  def converterProcess(in: String, out: String): Process[Task, Unit] =
    io.linesR(in).
      map(toRecord).
      chunk(1000).map(groupByYear).pipe(process1.unchunk).
      map(toYear).
      pipe(text.utf8Encode).to(io.fileChunkW(out))

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"

  def groupByYear(records: Vector[Vector[String]]): Vector[Vector[String]] = {
    case class Z(years: Set[String] = Set.empty, result: Vector[Vector[String]] = Vector.empty) {
      def +(rhs: Vector[String]) =
        rhs.lift(1).fold(this) { year =>
          if (years.contains(year))
            this
          else
            Z(years + year, result :+ rhs)
        }
    }
    records.foldLeft(Z())(_ + _).result
  }
}

toYear関数、groupByYear関数を普通に実装するだけです。データをチャンク化する処理はscalaz-streamが提供している部品が自動的に行なってくれます。

フロー制御の例としては以下の記事が参考になると思います。

ステップ4: ストリーム

ステップ4はReactive Streamsでストリーム処理を行う課題です。

まず準備としてストリームを生成する部品EventProcessorを作ります。

package handson.reactive
  
import scalaz.concurrent.Task
import scalaz.stream._  

object EventProcessor {
  val q = async.unboundedQueue[String]

  val eventStream: Process[Task, String] = q.dequeue
}

buildメソッドはscalaz-streamによるパイプラインを作って、これをバックグラウンドで起動します。

最後に呼んでいるstimulusメソッドはEventProcessorを使って入力ファイルの内容を1行づつ読み込み1秒のインターバルでストリームに送出します。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step4 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).map(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String = ???

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

scalaz-streamのパイプラインはconvertProcess関数で作成します。このパイプラインにはmapコンビネータでtoYear関数を接続しています。

本課題ではこのtoYear関数を実装します。

toYear関数は通常の関数です。この通常の関数をscalaz-streamのパイプラインに組み込むだけでFPによるストリーム処理が記述できるのを体験するのが本課題の趣旨です。

実装例

ステップ4の実装例です。

package answer.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step4 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).map(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear(record: Vector[String]): String =
    record.lift(1) getOrElse "Unknown"

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

ストリーム処理の例としては以下の記事が参考になると思います。

ステップ5: ストリーム&フロー制御

ステップ5はステップ4で作成したストリームに、自前のフロー制御を組み込むという課題です。

この課題はちょっと難しいので、時間が余った人向けを想定しています。

package handson.reactive

import scalaz._, Scalaz._  
import scalaz.concurrent.Task  
import scalaz.stream._
import scala.concurrent.Future  
import scala.concurrent.ExecutionContext.Implicits.global
import scalax.io._
import scalax.io.JavaConverters._
import java.io.File

object Step5 {
  def main(args: Array[String]) {
    val in = args(0)

    val stream = EventProcessor.eventStream
    build(stream)
    stimulus(in)
  }

  def build(stream: Process[Task, String]) {
    Future {
      val t = converterProcess(stream).to(io.printLines(System.out))
      t.run.run
    }  
  }  

  def converterProcess(source: Process[Task, String]): Process[Task, String] =
    source.map(toRecord).pipe(toYear)

  def toRecord(s: String): Vector[String] = s.split(",").toVector

  def toYear: Process1[Vector[String], String] = ???

  def stimulus(in: String) {
    val queue = EventProcessor.q
    val input = new File(in).asInput
    input.lines() foreach { line =>
      queue.enqueueOne(line).run
      Thread.sleep(1000)
    }
  }
}

ステップ4のようにscalaz-streamのパイプラインにmapコンビネータで関数を合成する場合は、通常の関数を使うことができますが、フロー制御を組み込むことはできません。

フロー制御を組み込むためにはProcessモナドを引数にして、Processモナドを返す関数を作成し、pipeコンビネータでパイプラインに組込みます。

Processモナドを直接操作するので少し難しいプログラミングになりますが、フロー制御を自分で操作できるので色々な用途に適用することができるようになります。

ストリーム処理&フロー制御の例としては前項と同様に以下の記事が参考になると思います。

この課題は時間切れでボクも実装例を作ることができませんでした。

まとめ

OFPにおけるReactive Streamsの位置付けについて、FPを重視する方向性から整理してみました。

その上でプログラミングレベルでReactive Streamsを把握するための仕掛けとしてハンズオンの資料を紹介しました。

もちろんOFPのReactive StreamsはOFADにも、少なからず影響するはずです。この点はまだ考えが整理できていませんが、ブログで継続して検討していきたいと思います。