2015年3月2日月曜日

[scalaz-stream] シーケンス番号とエンドマーク

プロダクトの開発中にわりと難しい処理がscalaz-streamでうまくさばけたのでメモ。

近い将来Buzzwordになってきそうな「Functional Reactive Programming」ですが、monadicなstreaming系のアプローチが筋がよさそうということで、scalaz-streamを実プロダクト開発に導入して試行錯誤しているところです。大規模データ系のいろいろな処理に適用していますが、かなりいい感触を得ています。

そのような応用の一つである大規模メール配信処理で出てきた要件が以下のものです。

  • 大規模なデータ列を一つの固まりとしてストリーム上に流したい
  • 事前にデータ列の総量は分からない
  • データ規模が大きいので復数パケットに分割する必要がある
  • 性能向上のため、可能な範囲で1パケットに復数データ列を格納したい

ネットワーク系のプログラムではわりとよく出てくる処理だと思います。

やろうとしていることはそれほど難しくないのですが、プログラムを組んでみると思いの外、複雑なものになってしまうといった系統の処理です。

こういった処理をいかに簡単に書くことができるようにするのかというのがプログラミング・モデルの重要なテーマの一つです。OOPであれば専用フレームワーク的なものを用意する感じですが、FPの場合はComposableな関数のCompositionでさばいてみたいところです。

サンプルロジック

上記の要件をscalaz-streamで実現することができるかを検証するためにサンプルプログラムをつくってみました。

以下の要件を実装しています。

  • 事前にすべてデータが読み込まれていることは前提としない
  • 3データごとに1つのパケットに集約
  • シーケンス番号をつける
  • 最終パケットにエンドマークをつける

scalaz-streamのProcessモナドを使用しています。

package sample

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._

object Main {
  case class Packet(seqno: Int, end: Boolean, content: String)

  val source: Process[Task, Int] = {
    Process.range(0, 10)
  }

  def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }

  def main(args: Array[String]) {
    execute(source, sink)
  }

  def execute(source: Process[Task, Int], sink: Sink[Task, Packet]) {
    import process1._
    val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)
    val task = pipeline.run
    task.run
  }

  def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }
}
Source

ストリームの起点となるProcessモナドをSourceと呼びます。

例題なのでProcess#range関数で0から10の数値のストリームを生成しています。

val source: Process[Task, Int] = {
    Process.range(0, 10)
  }
Sink

ストリームの終端となるProcessモナドをSinkと呼びます。

例題なのでscalaz.stream.io#channel関数を使って、コンソール出力するSinkを定義しています。

def sink: Sink[Task, Packet] = {
    io.channel((a: Packet) => Task.delay { println(a) })
  }
ストリームの構築

SourceとSinkはサンプル用のものなので、ここからが本題です。

ストリームの構築は、以下のようにProcessモナドが提供するコンビネータをつないでいく形になります。

FunctorやMonadが定義している基本コンビネータであるmapやflatMap以外にProcessモナドが提供するpipeといったコンビネータを多用することになるのが特徴です。

val pipeline = source.
      chunk(3).
      pipe(zipWithNext).
      pipe(zipWithIndex).
      map(toPacket).
      to(sink)

ストリームを処理するパイプラインはSourceから始まってtoコンビネータで指定するSinkが終端になります。途中、chunk、pipe、mapの各メソッド/コンビネータがパイプラインを構成しています。

chunk

ストリーム内を流れるデータを1つのチャンクにまとめる処理はchunkメソッドを使います。チャンクにまとめる個数の3を引数に指定します。

ストリームにはsourceからデータであるIntが流れてきますが、これがSeq[Int]に変換されます。

pipe

pipeコンビネータはProcessモナドをストリーム処理のパイプライン部品として埋め込みます。

ここではscalaz.stream.process1にある以下の関数を使って生成したProcessモナドをパイプラインに埋め込んでいます。

  • zipWithNext
  • zipWithIndex

zipWithNext関数が返すProcessモナドは、処理中のデータに加えてその次のデータをOptionとして渡してきます。処理中のデータが最終データの場合は、「その次のデータ」はNoneになります。これを使用すると、次のデータの有無、次のデータの内容によって処理を変更することができるわけです。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Seq[Int], Option[Seq[Int]]]に変換されます。

zipWithIndex関数が返すProcessモナドは、処理中のデータにインデックスをつけます。

ここまでの処理でストリームからSeq[Int]が流れてきますが、この段でTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]変換されます。

map

mapコンビネータで以下のtoPacket関数をパイプラインに組み込んでいます。

def toPacket(x: ((Seq[Int], Option[Seq[Int]]), Int)): Packet = {
    val ((current, next), index) = x
    val content = current.mkString("-")
    Packet(index + 1, next.isEmpty, content)
  }

ここまでの処理でストリームからTuple2[Tuple2[Seq[Int], Option[Seq[Int]]], Int]が流れてきますが、この段でPacketに変換されます。

toPacket関数の処理は簡単でストリームから流れてきた情報を元に:

インデックス
ストリーム上の情報は0起点なので1起点に変換
エンドマーク
Nextがない場合は最終パケットなのでtrue

の情報をPacketに設定しています。

サンプルコード内で自前のアルゴリズムらしい物を書いているはここだけです。

ストリームの実行

ストリームの実行はrunメソッドで行います。

今回はTaskモナドを使う指定をしているので、ProcessモナドのrunメソッドではTaskモナドが返ってきます。

Taskモナドのrunメソッドを実行するとストリームが動作します。

val task = pipeline.run
    task.run

実行

実行とすると以下の結果が出力されました。

10個のデータ列が4つのパケット列として出力され、最後のパケットにはエンドマークがついています。

Packet(1,false,0-1-2)
Packet(2,false,3-4-5)
Packet(3,false,6-7-8)
Packet(4,true,9)

受信側の処理

このパケット列を受け取った側の処理としては:

  • エンドマークがついているパケットまでパケットを読み込む
  • シーケンス番号を監視して、欠損があれば再送を依頼する

というような処理になるかと思います。

こういった処理をscalaz-streamのProcessモナドで実装可能か、というのも面白そうなテーマなので機会があれば試してみたいと思います。

まとめ

大規模データのパケット分割処理ですが、自前のロジックはtoPacket関数のものぐらいで、後はscalaz-streamの用意する部品を組み合わせるだけで構築できました。

パケット分割や復元は自分でロジックを組むとそれなりにバグが出る所なので、既存部品を組み合わせるだけで、ほとんどの処理が組み上がって、最後の仕上げだけ自前のロジックを差し込むことができるのはほんとうに楽です。

また、OOP的なフレームワークだと:

  • パイプラインの構成をXMLなどの外部DSLで定義
  • 自前ロジックをパイプラインに組み込むためのボイラープレイト作成

といった作業が必要になるので、それなりに大変です。

それに対して、scalaz-streamの方式はmonadic programmingの作法に則っていれば通常の関数型プログラミングでOKなのが非常に使いやすいところです。具体的にはOOP的なフレームワークアプローチに対して以下のメリットがあります。

  • パイプラインを外部DSLで構築すると、型安全でなくなる、デバッグがしづらいといった問題も出るが、いずれの問題もなくなる。
  • 自前ロジックを通常の関数で記述すればよく、特別なボイラープレイトのコードは必要ない。
いいことずくめのようですが:

  • monadic programmingの習得
  • processモナドの理解

といった難点があるので、これはこれで一定のハードルがあります。

こういったハードルをcoding idiomやdesin patternといった技法でクリアすることができれば、大規模データ処理にはなかなか有力なアプローチだと思います。

注意

本稿の「大規模」は処理対象としてメモリに一度に乗らない規模のデータ量を指していて、最大1GB程度のものを想定しています。

これを超えてくるデータの処理はHadoopやSpark的な並列分散処理が必要になってくるので、本稿のスコープ外です。このような並列分散処理もSparkのRDDといったものを使ったmonadic programmingが有力と思うので、いずれ取り上げてみたいと思います。

諸元

  • Scala 2.10.3
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a

0 件のコメント:

コメントを投稿