2015年6月29日月曜日

[FP] Scalaへの道

先日、社内勉強会でScala的なプログラムにするためには、という話題が出たのでそのまとめです。

例題

勉強会で出たプログラム例をまとめて例題を作りました。

  • テキストファイル内の欧文単語を単語の長さで以下の三種類に分類してオブジェクトに設定する
  • 短 : 3文字以下
  • 中 : 7文字以下
  • 長 : 8文字以上

以下ではこの例題にそって説明していきます。

準備

分類した単語を設定するオブジェクトとしてcase classのWordsを中心としたクラスとコンパニオンオブジェクトを用意しました。

例題はテキストファイルを解析して、case class Wordsに解析結果を設定する処理になります。

package sample

case class Words(
  smalls: Vector[String],
  middles: Vector[String],
  larges: Vector[String]
) {
  def +(word: String): Words = {
    if (word.length <= 3)
      copy(smalls = smalls :+ word)
    else if (word.length > 7)
      copy(larges = larges :+ word)
    else
      copy(middles = middles :+ word)
  }

  def +(words: Words): Words = {
    copy(
      smalls ++ words.smalls,
      middles ++ words.middles,
      larges ++ words.larges
    )
  }

  def ++(words: Seq[String]): Words = {
    words.foldLeft(this)(_ + _)
  }

  override def toString() = s"small:${smalls.length}, middle:${middles.length}, large:${larges.length}"
}

object Words {
  val empty = Words(Vector.empty, Vector.empty, Vector.empty)

  def apply(word: String): Words = empty + word

  def apply(words: Seq[String]): Words = empty ++ words

  def getTokens(s: String): Vector[String] =
    s.split(" ").filter(_.nonEmpty).toVector
}

Java風プログラム

JavaもJava 8でストリームAPIが追加されましたし、Functional Javaのようなアプローチも色々あるので、モダンJavaは事情が異なりますが、古くからJavaを使っているベテランプログラマ程Java 5〜7時代のプログラミング・スタイルがスタンダードだと思います。

この古めのJavaスタンダード的なプログラムをここではJava風プログラムと呼ぶことにします。

それではJava風プログラムです。

def calcWords(filename: String): Words = {
    val buf = new ArrayBuffer[String] // ArrayBuffer中心ロジック
    val in = new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8"))
    try {
      var s = in.readLine
      while (s != null) {
        buf ++= getTokens(s)
        s = in.readLine
      }
    } finally {
      in.close() // リソースの解放が手動
    }
    val smalls = new ArrayBuffer[String]
    val middles = new ArrayBuffer[String]
    val larges = new ArrayBuffer[String]
    for (x <- buf) { // アルゴリズムの配置がトランザクションスクリプト的
      if (x.length <= 3)
        smalls += x
      else if (x.length > 7)
        larges += x
      else
        middles += x
    }
    Words(smalls.toVector, middles.toVector, larges.toVector)
  }

ボクもJavaからScalaに移行した当初はこのようなプログラムを書いていました。

ポイントは以下の3つです。

  • ArrayBuffer中心ロジック
  • リソースの解放が手動
  • アルゴリズムの配置がトランザクションスクリプト的
問題 : ArrayBuffer中心ロジック

Javaプログラムでは、データの集まりを作成する処理はjava.util.ArrayListとfor文などのループを組み合わせるのが定番です。

JavaからScalaに移行したての時期は、Scalaプログラムでもこの作戦を踏襲して上記のようなプログラムを書くことが多いと思います。java.util.ArrayListの代わりにscala.collection.mutable.ArrayBufferを使います。

Better Javaという意味では問題はないのですが、よりScala的、関数型プログラミング的にステップアップしたい場合には、この作戦を捨てる必要があります。

関数型プログラミングでは不変オブジェクトを使って副作用なしで処理を行うことが基本です。つまり、可変オブジェクトであるscala.collection.mutable.ArrayBufferを使った瞬間にこの基本が崩れてしまうため、関数型プログラミング的ではなくなるわけです。

問題 : リソースの解放が手動

Javaではローンパターン的なDSLを用意することが難しいため、リソースの解放がtry文を使った手動になってしまう事が多いと思います。

Scalaではリソース解放の処理を自動的に行なってくれるDSLが多数用意されているのでできるだけそれらの機能を使うようにするのが得策です。

問題 : アルゴリズムの配置がトランザクションスクリプト的

ArrayBufferを使うことでできる限り性能劣化を避けるという方針の悪い側面として、アルゴリズムの部品化を阻害するという問題があります。

たとえばArrayBufferを使ったアルゴリズムの一部を性能劣化を起こさず部品化する場合、ArrayBufferを持ちまわすようなインタフェースとなり、インタフェースが複雑化します。

また、ArrayBufferを意識した部品の場合、ArrayBuffer以外のデータ構築用のオブジェクトには適用できないので、データ構築用のオブジェクト毎に部品を用意する必要が出てくるという問題もあります。

インタフェースが複雑化した部品は結局使われないようになりがちなので、結果としてアプリケーション・ロジック内によく似たアルゴリズム断片がベタ書きされるようなケースが多くなります。

めぐりめぐって部品化が進まずプログラム開発の効率化が阻害されるという悪循環に入ることになります。

この問題への対応策は前述したように「多少の性能問題には目をつぶる」覚悟をした上で、不変オブジェクトを使った関数型プログラミングに移行することです。

性能問題対策

ArrayBufferを多用するアルゴリズムを採用する意図としては性能問題があります。

上級者ほど性能問題を重要視しているので、関数型的な不変オブジェクト中心のアルゴリズムには本能的な拒否反応があると思います。

この問題への対応ですが「多少の性能問題には目をつぶる」につきます。

Java的な感覚でいうと、そもそもScalaの生成するコードはかなりオーバーヘッドが高いのでArrayBufferを採用して部分的に性能を上げても他の所でもオーバーヘッドが出るため、結局Javaと全く同じものにはなりません。

それであれば中途半端に性能改善するよりScalaのパワーを活かした開発効率を重視する戦略を採るのが得策です。

余談 : ListBuffer

java.util.ArrayListに対応するScala側のコレクションとして、「List」つながりでscala.collection.mutable.ListBufferを選んでしまうケースがあるかもしれません。

scala.collection.mutable.ListBufferはscala.collection.mutable.ArrayBufferと比べると性能的にかなり不利なので、余程のことがなければ採用することはありません。

そもそもArrayBufferは使わないようにしようという議論ですが、仮にjava.util.ArrayList相当のコレクションが必要になった場合はListBufferではなくArrayBufferを使うようにしましょう。

Scala的プログラム

前述のJava風プログラムの改良版としてScala的プログラムを4つ用意しました。

基本形

まず基本形です。

def calcWords(filename: String): Words = {
    var strings = Vector.empty[String]
    for (in <- resource.managed(new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8")))) {
      var s = in.readLine
      while (s != null) {
        strings = strings ++ getTokens(s)
        s = in.readLine
      }
    }
    strings.foldLeft(Words.empty)(_ + _)
  }
対策 : ArrayBuffer中心ロジック

ArrayBufferを使わず不変オブジェクトであるVectorを使うようにしています。

Vectorは不変オブジェクトですがvarと複写を併用することでアルゴリズム内では内容を更新しながら処理を進める効果を得ています。

var strings = Vector.empty[String]
...
    strings = strings ++ getTokens(s)

varとVectorのこのような使い方は定番のイディオムです。

対策 : リソースの解放が手動

ファイルアクセスを伝統的なjava.ioを使って行う場合は、ストリームの解放処理が問題となります。

このような処理を記述する時によく利用するのがScala ARMです。

Scala ARMの使い方の一つとして上記のプログラムのようにfor式とresource.managedを組み合わせる方式があります。

resource.managedの引数にリソースのライフサイクルを管理したいオブジェクトを設定すると、for式の終了時に自動的にリソースを解放してくれます。

対策 : アルゴリズムの配置がトランザクションスクリプト的

単語の振り分けアルゴリズムの呼出しは以下の場所で行っています。

strings.foldLeft(Words.empty)(_ + _)

振り分けアルゴリズムはcase class Words内で定義した部品(メソッド)を、非常に簡単にcalcWords関数から呼び出して使用できており、トランザクションスクリプト的な問題は解消しています。

アルゴリズム部品の利用に貢献しているのが畳込み処理を抽象化した関数であるfoldLeftコンビネータです。ArrayBufferとfor式を使って記述しているアルゴリズムの多くはfoldLeftまたはfoldRightコンビネータで記述することができます。

VectorとはfoldLeftが相性がよいので、ArrayBufferとfor式を使いたくなったらVectorとfoldLeftの組合せの解を考えてみるのがよいでしょう。

大規模データ問題

上述のScala基本形ですが、Java風版から引き継いだ本質的な問題点として大規模データに対応できないという問題があります。

というのは、ArrayBufferに一度全データを格納するため、データ量がメモリに載らない程大規模になった場合にプログラムがクラッシュしてしまうからです。

ArrayBufferを使わずデータ読み込みのwhile式の中にすべての処理を押し込めばこの問題には対応できますが、部品化とは真逆の一枚岩の見通しの悪いアルゴリズムになってしまいます。

大規模データ問題は後述のパイプライン・プログラミングやMonadicプログラミングで解決することができます。

余談 : Iteratorパターンの本質

forロープで記述したアルゴリズムをより関数的なやり方で記述する方法について参考になるのが以下のページです。

foldLeft/foldRightで記述しきれない複雑なforループもApplicativeとTraverseという機能を使って部品化を進めながら記述できるということのようです。より本格的な関数型プログラミングを追求する場合は、こちらの方面に進むことになります。

Try

関数型プログラミングでは、関数の実行結果は関数のシグネチャに完全に記述されていることが基本です。このため、暗黙的に関数の制御フローを乱す「例外」は関数型プログラミング的には好ましくない言語機能です。

とはいえ実用的には非常に便利なのでScalaプログラムでも日常的に使用するのは問題ないと思いますが、関数ワールド的なプログラムを書きたい場合はscala.util.Tryモナドを使うのが基本です。

Tryモナドを使った版は以下になります。

def calcWords(filename: String): Try[Words] = Try {
    var strings = Vector.empty[String]
    for (in <- resource.managed(new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8")))) {
      var s = in.readLine
      while (s != null) {
        strings = strings ++ getTokens(s)
        s = in.readLine
      }
    }
    strings.foldLeft(Words.empty)(_ + _)
  }

関数の処理全体をTryで囲むだけなので簡単ですね。

scalax.io

基本形ではリソース解放の汎用的な解としてScala ARMを紹介する目的でjava.ioによるファイルアクセスを行いましたが、Scala的にはファイルアクセスにはscalax.ioを使うのがお勧めです。

scalax.ioを使った版は以下になります。

import scalax.io.{Resource, Codec}

  def calcWords(filename: String): Words = {
    implicit val codec = Codec.UTF8
    Resource.fromFile(filename).lines().
      map(getTokens).
      foldLeft(Words.empty)(_ ++ _)
  }
対策 : リソースの解放が手動

ファイルからの入力処理はリソースの解放も含めてResource.fromFile関数が全て行ってくれます。

対策 : ArrayBuffer中心ロジック&アルゴリズムの配置がトランザクションスクリプト的

関数型プログラミングの常道であるパイプライン・プログラミングの一環としてmapコンビネータとfoldLeftコンビネータを使って処理を記述します。

この方式を取ることで自然に「ArrayBuffer中心ロジック」が解消されます。また、アルゴリズムの部品かも進むので「アルゴリズムの配置がトランザクションスクリプト的」も解消されます。

対策 : 大規模データ

scalax.ioの方式では、Scala基本形に残っていた「大規模データ問題」も解消されています。「大規模データ問題」を解消しつつ部品化も行えているのがパイプライン・プログラミングの効用です。

scalaz-stream

Scalaらしさを越えて、Monadicプログラミングを追求したい場合はscalaz-streamを使うとよいでしょう。

まず準備としてWordsをMonoidとして定義しておきます。

implicit object WordsMonoid extends Monoid[Words] {
    def append(lhs: Words, rhs: => Words) = lhs + rhs
    def zero = Words.empty
  }

その上でscalaz-streamを使った版は以下になります。

import scalaz.concurrent.Task

  def calcWords(filename: String): Task[Words] = {
    implicit val codec = Codec.UTF8
    io.linesR(filename).
      map(getTokens).
      runFoldMap(Words(_))
  }

scalax.io版とほぼ同じような処理になります。

Processモナドを使っているので本来は高度なフロー制御ができるのですが、この例のような単純なデータ読込みだとフロー制御を使う場所はないのでscalax.ioとの違いはでてきません。

scalax.io版との違いは、結果がTaskモナドで返される点です。

TaskモナドはScalaのTryモナドとFutureモナドを合わせたような機能を持つモナドです。前出の「Try」版ではTryモナドで結果を返しましたが、同じような用途と理解すればよいと思います。

まとめ

Java風プログラムのScala的でない点を指摘した上で、Scala的プログラムの解を4つ上げました。

それぞれ長短があるので適材適所で使い分けることになります。

なお、今回はScala ARMを紹介する都合上java.ioによるファイルアクセスも使用していますが、実務的には例題のようなテキストファイル処理はscalax.ioを使うのが効率的です。scalax.ioを基本線に考え、機能不足が判明した場合に、java.io(またはjava.nio)やscalaz-streamを使う方式を検討するのがよいでしょう。

2015年6月22日月曜日

[FP] パイプライン・プログラミング

関数型プログラミングとは何か?

この問は深遠すぎてとてもボクの手には負えませんが、実務的なプラクティスとしてはパイプライン・プログラミングとして考えると分かりやすいのではないかと思います。

そこでScalaでのパイプライン・プログラミングの選択肢を整理してみました。

関数呼び出し

関数型プログラミングにおける普通の関数呼び出しもパイプラインと考えることができます。

純粋関数型では副作用は発生しないので、表に見えている関数の引数と復帰値のみで関数の挙動のすべてが表現されているためです。

たとえば以下のプログラムは:

h(g(f(1)))

以下のようなパイプラインと考えることができます。

Functor

文脈を持ったパイプラインはFunctorを使って構成できます。関数呼び出しとの違いは「文脈」を持った計算になるという点です。

ここでいう「文脈」とはパイプラインの裏側で暗黙的に共有、引継ぎされる状態やデータというぐらいの意味です。関数の直接呼び出しにはそのような裏表はないので、Functorで加わった重要な機能ということになります。

以下の図はQCon Tokyo 2015のセッション「ScalaによるMonadic Programmingのススメ - Functional Reactive Programmingへのアプローチ」で使用したスライドから引用です。



Functor, Applicative Functor, Monadが構成するパイプラインを概観しています。

この中でFunctorはmapコンビネータを使ってパイプラインを構築します。

Option(1).map(f).map(g).map(h)

Applicative Functor

上記の図で示した通りApplicative Functorは復数のパイプラインを一つに統合する機能を提供します。

ScalazではApplicative Functorのために以下のような文法を提供しています。

(Option(1) |@| Option(2) |@| Option(3))(i(_, _, _))

以下の図は同スライドからFuture Applicative Functorの例です。



Monad

「Functor, Applicative Functor, Monadが構成するパイプラインを概観」する図の中のMonadは以下のプログラムになっています。このようにMonadではflatMapコンビネータを使ってパイプラインを構築します。

Functorとの違いは「文脈」をプログラム側で制御する機能が加わる点です。

Option(1).flatMap(f).flatMap(g).flatMap(h)

以下の図は同スライドからOption Monadの例をあげています。



こちらではflatMapコンビネータを使う代わりにfor式によるfor内包表記(for comprehension)を使用しています。

def bigCalcO(n: Int): Option[String] = {
    for {
      a <- Option(calcString(n))
      b <- Option(calcInt(n))
      c <- Option(calcFloat(n))
    } yield finalCalc(a, b, c)
  }
}

for内包表記はMonadを使ったパイプラインのための文法糖衣として機能します。

上記の例ではflatMapコンビネータ直接使用とfor内包表記の違いはそれほど分かりませんが以下に示すState Monadのような複雑な構造のMonadを使う場合にfor内包表記の効果が出てきます。



Reactive Stream

Applicative FunctorやMonadを使うとかなり複雑なパイプラインを構築することができますが、あくまでも制御上は関数呼び出しなのでフロー制御を行うことができません。

ここでいうフロー制御とは、一度に処理するデータ量を制限したり、データの発生またはデータの消費の契機で処理を進める制御を指します。

直感的にパイプラインというとこういう制御が入っていて欲しいところですが、Monadそのものには入っていないわけです。

そこで登場するのがReactive Streamです。単にStreamというとscala.collection.immutable.Streamと紛らわしいので、ここではReactive Streamと呼ぶことにします。

Scalaz StreamではProcess Monadを使ってReactive Streamをを実現しています。

大規模データ処理とストリーム処理それぞれでのProcessモナドの使い方は以下になります。

ストリーム処理


大規模データ処理とストリーム処理のいずれもパイプラインに抽象化されたほとんど同じプログラムになっています。

また、普通のMonadと比べても若干おまじないは増えるもののプログラミングとしては同じような形になります。

状態機械

Process MonadはMonadによるパイプラインで状態機械による制御を可能にしたものと考えることもできます。

詳しくは以下の記事を御覧ください。

まとめ

Scalaのパイプライン・プログラミングを以下のパターンに整理してみました。

  • 関数呼び出し
  • Functor
  • Applicative Functor
  • Monad
  • Reactive Stream

これらのパイプラインを適材適所で使い分ける、という切り口で関数型プログラミングを考えてみると面白いかもしれません。

2015年6月15日月曜日

野毛倶楽部夜会:超高速開発

野毛倶楽部(横浜クラウド勉強会)は土曜の昼に行うチュートリアル的な技術セッション(+夜学)を中心に活動してきましたが、これに加えて先端技術についてオフレコで語り合う場として新たに夜会を始めることにしました。

先週の月曜(6月8日)はその第1回でジャスミンソフトの贄さんを招いて超高速開発をテーマに野毛倶楽部の第1回夜会を行いました。

ジャスミンソフトは超高速開発ツールWagbyの開発/提供元で超高速開発コミュニティの幹事会社です。

プログラムの自動生成はボクも長年追いかけてきた技術分野なので、この分野で成功を収められているジャスミンソフトの具体的なお話をビジネス/技術両面からお聞きできてとても参考になりました。

ディスカッションの中では以下のキーワードが印象に残りました。

  • モデリング
  • リポジトリ

以下、夜会の後に考えたことなどをつらつら。

モデリング

プログラムの自動生成も現時点ではモデルからアプリケーション全体を生成できるわけではないですがアプリケーションの相当部分を自動生成(+フレームワーク)でカバーできるところまできています。

また「超高速開発」という枠組みでは仕様から直接システムやサブシステムを動作させるXEAD DriverASTERIA WARPといったアプローチもあります。

いずれにしても伝統的なスクラッチ開発と比べるとテストも含めたプログラム開発工数が劇的に少なくなることは明らかです。

プログラム開発工数が少なくなるのは「仕様」が直接、間接にそのまま動作するためですが、「動作可能な仕様」でなければならないので、動作可能な精密で網羅的な仕様を作成するスキルが必要になってきます。加えてシステムの既存部分やスクラッチ開発する部分との連携を仕様化するスキルも要求されます。

「動作可能な仕様」を記述する言語としてはOOADのUMLやデータモデリングのER図といったものが代表的ですがExcelやプレインテキストを用いるアプローチもあります。記述言語も大事ですが、裏側のメタモデルがより重要です。

顧客要件から動作可能な精密なレベルのUMLやER図などで記述した動作可能な精密な仕様を作成するモデリングのスキルが「超高速開発」を活用するための重要スキルということになります。

伝統的なスクラッチ開発でも基本設計書、構造設計書、詳細設計書という形で仕様書は書かれていたわけですが仕様に曖昧な点があってもプログラマが補完することで補うことが可能でした。というよりプログラマが高度な補完を行うことが前提だったといっても過言ではないでしょう。しかし「超高速開発」の場合は、仕様をそのまま動作させるため仕様段階で実行可能レベルの正確さが要求されるわけです。

こういった問題意識から超高速開発コミュニティでもモデリング分科会の活動を行っているのだと思います。

リポジトリ

「動作可能な仕様」によるシステム開発が軌道に乗ると、次はこの仕様をデータベースで一元管理したくなるのが道理です。「リポジトリ」はこのような機能を提供するものです。

「リポジトリ」はWikipediaでは以下のように説明されています。

  • リポジトリ (英: repository) とは、情報工学において、仕様・デザイン・ソースコード・テスト情報・インシデント情報など、システムの開発プロジェクトに関連するデータの一元的な貯蔵庫を意味する。

リポジトリは少なくてもボクがオブジェクト指向が調べ始めた90年代初頭にはすでにあった概念ですが、格納するメタデータの記述能力が追いつかず現時点でも部分的な応用にとどまっていると認識しています。メタデータと配備するプログラムが連動していないと、絵に描いた餅になってしまうわけですね。

OMGのMOF(Meta Object Facility)やEAI(Enterprise Application Integration)で話題となったMDM(Master Data Management)も同じようなジャンルの技術ですがまだ(一部の大企業のシステムは別として)一般的に幅広く利用されるような段階にはいたっていないと思います。

リポジトリで管理する情報が超高速開発流の「動作可能な仕様」となると、リポジトリで構想されていた本来の運用が見えてきます。

また、もうひとつの流れとしてソフトウェア構成管理技術の進歩があります。現在はCI技術によってGitHubにプログラムのソースコードをプッシュすると自動的にHerokuのような実行コンテナに配備したり、DockerHubのようにDockerイメージを生成したり、ということが可能になっています。

こういった構成管理技術と「動作可能な仕様」の「リポジトリ」を組み合わせることで、より高度な運用ができるはずです。

現時点で実用化されている要素技術を組み合わせれば、もともと「リポジトリ」で構想されていた理想的な運用に近しいところまでもっていけるのではないか。ブレークスルーの寸前まで各要素技術のポテンシャルが高くなってきており、後は誰がどのように最後のピースをつなげるか、が焦点の分野ではないかと思われるわけです。わくわくしますね。

"リポジトリ"のキーワードからそういったことを考えながら、野毛の夜は更けていきました。

SmartDoc

本題とは離れますが、Wagbyではマニュアル作成にSmartDocを現役で使って頂いているということです。うれしいですね。

2015年6月1日月曜日

DBアクセスライブラリ

Spark SQL 1.3の登場を機にプラットフォームのバッチ処理基盤の刷新を考えています。前回「JSONライブラリ性能比較」では、JSONライブラリについて考えました。

今回はDBアクセスライブラリがテーマです。

バッチ処理基盤として観点からDBアクセスライブラリに対して大きく以下の4つの要件を考えています。

  • CRUDの範囲の操作はcase classで定義したレコードで簡単に処理したい。
  • 細かい問合せ処理はSQLを直接使いたい。
  • SQLの組み立てを部品化したい。
  • Dockerコンテナ上でカスタマイズ可能にしたい。

逆に通常ORMに求められる以下の機能はあまり重要視していません。

  • 関連の自動制御
  • データのマイグレーション

「関連の自動制御」ですが関連の取り扱いは必要機能、性能要件、メモリ要件、キャッシュ要件がORMが提供する機能と合致するのかを熟知するのが大変なので、SQLを生で使う方式を採用しているためです。もちろん、うまく合致する場合は使ったほうが楽なので、あるに越したことはありません。

「データのマイグレーション」はアジャイルにカジュアルに行う運用ではなく、システム結合のタイミングで手動で行っているので今のところ本番運用では使う可能性がないためです。

制約

バッチ処理基盤を実現する上で考慮しなければならない制約は以下のものです。

  • バッチ処理はDockerクラスタまたはSparkクラスタ上で動作。
  • case classのパラメタ数制約22個が解消されるのはScala 2.11から。
  • プラットフォームのほとんどの主要DBテーブルのカラム数は22個超なのでScala 2.10ではレコードの表現にcase classは事実上難しい。
  • プラットフォームの主要機能がScala 2.10ベースなのでScala 2.11を全面的に採用するわけにはいかない。
  • Spark SQLがScala 2.11で安定的に動作するか不明(現在使用しているDockerイメージsequenceiq/sparkはScala 2.10みたい)。

上記の制約があるため、以下のような運用を想定しています。

  • プラットフォームとバッチ処理基盤の共通ライブラリはScala 2.10と2.11のクロスビルド。
  • Scala 2.10バッチとScala 2.11バッチを併用。
  • SparkジョブはScala 2.10バッチ。
  • 非SparkジョブはScala 2.11バッチ(case classを活用したいため)。
  • 非SparkジョブはDockerクラスタ上で実行する。
  • Sparkジョブでも小さなものはDockerクラスタ上で実行する。

クロスビルドの問題はかなり重たくて以下のような問題が出ています。

  • Play Frameworkの2.11サポートは2.3からのようだが、AnormのAPIが非互換みたい。
  • finagle-httpは2.10と2.11で別物になる気配。(詳細未調査。Netty 4対応?)

このため共通ライブラリからAnormとFinagleを外すことにしました。

アーキテクチャ選定

DockerでSpark SQL」で述べたようにSpark SQL 1.3をバッチ処理基盤の中軸に据え、1つのSparkジョブプログラムを用途ごとにSparkクラスタとDockerクラスタで実行しわける、というのが今回のバッチ処理基盤刷新の基本アイデアです。

現時点でもこの方向を軸に検討を進めています。

ただ以下のようなニーズもあるので普通のDB操作のためのライブラリも併用する形を考えています。

  • Sparkを使うまでもない小さなジョブの場合Sparkを使わず直接SQLで操作したい。
  • 開発をスケールさせるためSparkの知識なしでもバッチを記述できる方式は担保しておきたい。
  • Sparkで性能要件や機能要件が満たせないケースの回避策は担保しておきたい。
  • Spark処理の場合でもSQLによる入出力が併用できると便利かもしれない。
問題点

プラットフォームのエンジン部では現在はAnormとSquerylを併用しています。SQLを直接使いたいというニーズとORM的なCRUD処理を併用するため、このアーキテクチャを採用しました。

2012年当時の選択としてはよかったと思うのですが2015年新規開発のバッチ処理基盤として引き続き採用するのが望ましいのかという点が論点です。

AnormとSquerylとも非常に便利な機能なのですが経験上以下の問題が判明しています。

まずAnormです。

  • Play 2.2系と2.3系で非互換があるようでクロスビルドは難しそう。
  • ORM的な使い方ができないのでORMを併用する必要がある。

次にSquerylです。

  • 開発が止まっている感じ。
  • テーブル名を完全修飾名(スキーマ名.テーブル名)で指定すると動かない。
  • やりたいことをSquerylのDSLで実現する方法のノウハウが必要。
  • やりたいことがSquerylのDSLでサポートしていないことが判明した場合は対応が大変。
  • SQL組み立ての部品化にノウハウが必要。

AnormとSquerylを併用する際の問題点です。

  • コネクション管理の共通化のノウハウが必要。
  • AnormとSquerylの相互運用は難しい。

上記の問題があるのと有力な代替策があるのでバッチ処理基盤での採用は保留にしました。

Slick

Slickは実際に使ったことはないので資料からの推測ですが、レコードをcase classで表現した上で、関数型的なコレクション/モナド系の抽象操作でDB入出力を可能にしたライブラリ、と理解しています。

大変便利そうですし、関数型的にも筋がよさそうなのですが以下の懸念点があるので今回は採用を見送りました。

  • Scala 2.10系ではcase classの22個制限で事実上使用できない。
  • SQL直接操作ができないと細かい処理の記述が難しい。
  • Anormと併用する場合の親和性が不明。(コネクション管理の共通化など)
  • ORM的な関連の取り扱い方法など、新しいアプローチだけに仕様を調査するのが大変。
Scalikejdbc&Skinny-ORM

以上、色々と検討してきましたがSparkと併用するDB入出力ライブラリとしてはScalikejdbcとSkinny-ORMを採用する方向で考えています。

引き続きAnorm&Squerylをベースに考えても悪くはなさそうですが、Scalikejdbc&Skinny-ORMの方がより適切と判断しました。

ScalikejdbcはSQLベースのDB入出力ライブラリです。Skinny-ORMはScalikejdbc上に構築されたORMです。実用上はScalikejdbcとSkinny-ORMの併用を前提に考えるのがよいのではないかと思います。

Scalikejdbc&Skinny-ORMの採用を考えている理由は以下のものです。

  • Scala 2.10と2.11のクロスビルドで問題がなさそう。
  • SQLでの操作がAnormより若干使いやすそう。
  • Squerylは完全修飾名の問題があるため採用しづらい。
  • ScalikejdbcによるSQL入出力とSkinny-ORMによるORMがシームレスに連携できそう。
  • テーブルのメタデータからのプログラムの自動生成が便利。
  • Typesafe Configを用いたJDBC設定の管理メカニズムを持っている。
  • Joda-timeをサポートしている。

現在判明している問題点としては以下のものがあります。

  • case classの22個制限があるのでSkinny-ORMは2.10系では用途が限定される。
  • scalaz-streamと接続するためには工夫が要りそう。

後者のscalaz-streamの問題については引き続き解決策を探していく予定です。

まとめ

バッチ処理基盤におけるSQLライブラリですが、検討の結果Spark SQLを基本にScalikejdbc&Skinny-ORMを併用する形を基本線で考えていくことにしました。

DBアクセスライブラリのような基本機能は一度製品に採用するとなかなか切替は難しいですが、今回はバッチ処理基盤を刷新するタイミングがあったのでゼロベースで調べなおしてみました。

プロジェクトを開始した2012年当時とはかなり状況も変わってきていてキャッチアップが大変ですが、調度よいタイミングで棚卸しができたと思います。

2015年5月25日月曜日

JSONライブラリ性能比較

Spark SQL 1.3の登場を機にバッチ処理基盤の刷新を考えています。この流れの中でJobSchedulerやSpark SQLをDockerで動かす試み(Docker ComposeでMySQLを使う,DockerでSpark SQL)などを行ってきました。

バッチをSpark SQLで記述し、データや計算量の規模に応じてDocker Cluster(e.g. ECS)またはSpark Cluster(e.g. EMR)を選択してバッチ処理を実行するという枠組みが見えてきました。

次に考えておきたいのはバッチ処理で使用する要素技術の選択です。今回はJSONライブラリについて性能の観点から味見してみました。

なお、あくまでも味見レベルの測定なので、条件を変えると違った結果になる可能性も高いです。また、ありがちですが性能測定プログラムにバグがあって結果が逆にでるようなことがあるかもしれません。このようなリスクがあることを前提に参考にして頂ければと思います。

オンライン処理とバッチ処理

クラウド・アプリケーションはおおまかにいうとWebページやREST APIとして実現するフロント系(オンライン系)と、バックエンドで動作するバッチ系で構成されます。(今後はこれに加えてストリーム系が入ってきます。またバッチ系とはニュアンスの異なる分析系も別立てで考えておくとよさそうです。)

極簡単なアプリケーションではフロント系だけで成立する場合もありますが、ある程度本格的な機能を提供する場合はバッチ系は必須になってきます。また、クラウド・アプリケーション・アーキテクチャの定番であるCQRS+Event Sourcingはバッチ系が主役ともいえるアーキテクチャであり、今後バッチ系の重要度はますます高まってくることは間違いないでしょう。

オンライン系のロジックとバッチ系のロジックでは以下のような特性の違いがあります。

オンライン処理バッチ処理
性能特性レスポンス重視スループット重視
データ規模
計算量
実行時間
エラー処理即時通知ジョブ管理

こうやって整理すると様々な面で求められる特性が大きく異なることが分かります。

これらの特性を念頭にプログラミングしていくことは当然ですが、使用する要素技術やライブラリもこれらの特性に適合したものを選択することが望ましいです。

オンライン処理はデータ規模や計算量が小さいので使いやすさ重視でよいですが、バッチ処理はデータ規模や計算量が大きいので処理性能やメモリ消費量がクリティカルな要因になります。多少使い難くても、高速、メモリ消費量が少ない、ものを選ぶ方がよいでしょう。

バッチ処理の中のJSON

バッチ処理の中でのJSONの利用シーンですが、主に意識しなければいけないのは大規模JSONファイルの入力ということになるかと思います。ログファイルや移入データがJSON形式で用意されているようなケースですね。

通常この用途のデータはCSVやLTSVで用意されるケースが多いと思いますが、データが木構造になっている場合(いわゆる文書型)にはJSONやXMLデータが適しており、JSONで用意されるケースも多いでしょう。

バッチをSpark SQLで組む場合、データがCSVで用意されていたり、JSONであってもフラットな構造の場合はSpark SQLで簡単に取り込めるのでJSONライブラリの出番はありません。

ということで、ある程度複雑な構造を持ったJSONファイルがメインの操作対象となります。

ただ、バッチをSpark SQLではなく通常のScalaプログラムとして組みたいケースも出てくるはずなので、フラットな構造のJSONファイルに対してもペナルティなしで扱える必要はあります。

ユースケース1: 複雑な構造を持ったJSON→CSV

代表的なユースケースとしては、複雑な構造を持った大規模JSONファイルから必要な項目を抽出してCSVに書き出す、といった用途が考えられます。

SparkやHadoopの前処理として頻出しそうなユースケースです。

このケースでは(1)JSONの基本的なパース性能と、(2)JSONからデータを抽出したりデータのフィルタリングをしたりする処理の記述性が論点となります。

ユースケース2: 複雑な構造を持ったJSON→アプリケーションロジック適用

少し複雑なバッチ処理を行う場合は、アプリケーションロジックを適用するためcase classなどで定義したドメイン・オブジェクトにJSONを変換する必要がでてきます。

case classに変換後は、アプリケーションが管理しているドメインロジックを型安全に適用することができるので、プログラムの品質と開発効率を高めることができます。

ただ、JSONをcase classにマッピングする処理はそれなりに重たいので、JSON段階である程度フィルタリングした上で、必要なデータのみcase classにマッピングする形を取りたいところです。

テストプログラム

検証対象のJSONライブラリは以下の4つにしました。

  • Json4s native
  • Json4s jackson
  • Play-json
  • Argonaut

各ライブラリの性能測定対象のプログラムは以下になります。

どのライブラリもほとんど使い方は同じです。ただし、ArgonautのみJSONとオブジェクトのマッピング設定がかなり煩雑になっています。

Json4s native
package sample

import org.json4s._
import org.json4s.native.JsonMethods._

object Json4sNativeSample {
  implicit val formats = DefaultFormats

  def jsonSimple() = {
    val s = Company.example
    parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = parse(s)
    j.extract[Company]
  }

  def complex() = {
    val s = Person.example
    val j = parse(s)
    j.extract[Person]
  }
}
Json4s jackson
package sample

import org.json4s._
import org.json4s.jackson.JsonMethods._

object Json4sSample {
  implicit val formats = DefaultFormats

  def jsonSimple() = {
    val s = Company.example
    parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = parse(s)
    j.extract[Company]
  }

  def complex() = {
    val s = Person.example
    val j = parse(s)
    j.extract[Person]
  }
}
Play-json
package sample

import play.api.libs.json._

object PlaySample {
  implicit val companyReads = Json.reads[Company]
  implicit val addressReads = Json.reads[Address]
  implicit val personReads = Json.reads[Person]

  def jsonSimple() = {
    val s = Company.example
    Json.parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    Json.parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = Json.parse(s)
    Json.fromJson[Company](j)
  }

  def complex() = {
    val s = Person.example
    val j = Json.parse(s)
    Json.fromJson[Person](j)
  }
}
Argonaut
package sample

import scalaz._, Scalaz._
import argonaut._, Argonaut._

object ArgonautSample {
  implicit def decodeCompanyJson: DecodeJson[Company] =
    casecodec1(Company.apply, Company.unapply)("name")

  implicit def encodeCompanyJson: EncodeJson[Company] =
    jencode1L((a: Company) => (a.name))("name")

  implicit def decodeAddressJson: DecodeJson[Address] =
    casecodec2(Address.apply, Address.unapply)("zip", "city")

  implicit def encodeAddressJson: EncodeJson[Address] =
    jencode2L((a: Address) => (a.zip, a.city))("zip", "city")

  implicit def decodePersonJson: DecodeJson[Person] =
    casecodec4(Person.apply, Person.unapply)("name", "age", "address", "company")

  def jsonSimple() = {
    val s = Company.example
    Parse.parse(s)
   }

  def jsonComplex() = {
    val s = Person.example
    Parse.parse(s)
   }

  def simple() = {
    val s = Company.example
    Parse.decodeOption[Company](s)
  }

  def complex() = {
    val s = Person.example
    Parse.decodeOption[Person](s)
  }
}

テストデータ

Company
package sample

case class Company(name: String)

object Company {
  val example = """
{"name": "Yamada Corp"}
"""
}
Person
package sample

case class Person(name: String, age: Int, address: Address, company: Option[Company])
case class Address(zip: String, city: String)

object Person {
  val example = """
{"name": "Yamada Taro", "age": 30, "address": {"zip": "045", "city": "Yokohama", "company": "Yamada Corp"}}
"""
}

測定結果

性能測定は以下のプログラムで行いました。JSONのパース処理を100万回繰り返した時の累積ミリ秒を表示しています。

package sample

object All {
  def go(msg: String)(body: => Unit): Unit = {
    System.gc
    val ts = System.currentTimeMillis
    for (i <- 0 until 1000000) yield {
      body
    }
    println(s"$msg: ${System.currentTimeMillis - ts}")
  }

  def main(args: Array[String]) {
    go(s"Json4s native json simple")(Json4sNativeSample.jsonSimple)
    go(s"Json4s jackson json simple")(Json4sSample.jsonSimple)
    go(s"Play json simple")(PlaySample.jsonSimple)
    go(s"Argonaut json simple")(ArgonautSample.jsonSimple)
    go(s"Json4s native json complex")(Json4sNativeSample.jsonComplex)
    go(s"Json4s jackson json complex")(Json4sSample.jsonComplex)
    go(s"Play json complex")(PlaySample.jsonComplex)
    go(s"Argonaut json complex")(ArgonautSample.jsonComplex)
    go(s"Json4s native simple")(Json4sNativeSample.simple)
    go(s"Json4s jackson simple")(Json4sSample.simple)
    go(s"Play simple")(PlaySample.simple)
    go(s"Argonaut simple")(ArgonautSample.simple)
    go(s"Json4s native complex")(Json4sNativeSample.complex)
    go(s"Json4s jackson complex")(Json4sSample.complex)
    go(s"Play complex")(PlaySample.complex)
    go(s"Argonaut complex")(ArgonautSample.complex)
  }
}

性能測定結果は以下になります。

ネストなし/JSON
ライブラリ性能(ms)
Json4s native747
Json4s jackson788
Play-json640
Argonaut872

ほとんど同じですがPlay-jsonが速く、Argonautが遅いという結果になっています。

ネストあり/JSON
ライブラリ性能(ms)
Json4s native1495
Json4s jackson1310
Play-json1535
Argonaut1724

ネストありのJSONの場合は、Json4s jacksonが速く、Argonautが遅いという結果になりました。

ネストなし/case class
ライブラリ性能(ms)
Json4s native2080
Json4s jackson1601
Play-json1149
Argonaut1151

JSONをcase classにマッピングする場合は、単にJSONをパースするだけの性能とは全く異なる性能特性になりました。

Json4s系はいずれもかなり遅くなっています。

Play-jsonとArgonautはほぼ同じで高速です。

ネストあり/case class
ライブラリ性能(ms)
Json4s native5162
Json4s jackson4786
Play-json4471
Argonaut3840

ネストありのJSONをcase classにマッピングする場合は、Argonautが高速でした。

Json4sはこのケースでもかなり遅くなっています。

評価

性能測定結果を評価の観点で表にまとめました。

ライブラリネストありJSON性能case class性能case class設定
Json4s native
Json4s jackson
Play-json
Argonaut

ネストありJSONのパース性能はJson4sが最速ですが、逆にcase classへのマッピングはかなり遅くなっています。このため、Json4s一択という感じではなさそうです。

ArgonautはJSONパースは遅く、case classへのマッピング性能はよいもののマッピング設定が煩雑なので、使い場所が難しい感触です。Scalazと組合せて関数型的に面白いことが色々できそうなので、その点をどう評価するかということだと思います。

一番バランスがよいのがPlay-jsonで、一択するならPlay-jsonがよさそうです。

ただPlay-jsonはPlayのバージョンと連動しているためdependency hellの可能性があるのが難点です。たとえば、最新版のPlay-jsonを使って共通ライブラリを作っても、サービスで運用しているPlayのバージョンが古いと適用できない、といった問題です。

まとめ

全体的にPlay-jsonが最もバランスがよいという結果が得られました。

個別の性能的には「ユースケース1: 複雑な構造を持ったJSON→CSV」はJson4s jackson、「ユースケース2: 複雑な構造を持ったJSON→アプリケーションロジック適用」はArgonautが適しているという結果が得られました。

ただArgonautはcase classの設定が煩雑なので、広範囲に使える共通処理を作る場合はよいのですが、用途ごとの小さな要件には適用しづらい感じです。そういう意味では、Play-jsonがこの用途にも適していそうです。

以上の点から、次のような方針で考えていこうと思います。

  • Play-jsonを基本にする
  • 必要に応じて「大規模JSON→CSV」の用途向けにJson4s jacksonを併用
  • 必要に応じて「大規模JSON→アプリケーションロジック適用」の用途向けにArgonautを併用

Play-jsonを基本ライブラリとして採用するメリットが大きいので、Play-jsonのdependency hell問題は、運用で回避する作戦を取りたいと思います。

諸元

  • Mac OS 10.7.5 (2.6 GHz Intel Core i7)
  • Java 1.7.0_75
  • Scala 2.11.6
  • Json4s native 2.3.11
  • Json4s jackson 2.3.11
  • Play-json 2.3.9
  • Argonaut 6.1

2015年5月18日月曜日

case class 2015

case classはScalaプログラミングにおける最重要機能の一つです。

case classはいろいろな便利機能が言語機能としてビルトインされているのに加えて、OOP的にはValue Object、関数型プログラミングとしては代数的データ構造として利用することができます。

case classはそのままシンプルに使っても便利ですが、case classを作る時に基本対応しておくとよさそうな各種機能があるので、その辺りの最新事情を取り込んだ2015年版case classについて考えてみます。

対応機能

2015年版case classでは以下の機能に対応することにします。

  • Monoid
  • Argonaut
  • ScalaCheck
Monoid

Scala Tips / Monoid - 新規作成」は2012年6月の記事なので、かれこれ3年になりますがMonoidの重要性は変わる所がありません。Monoidを使うためだけにScalazを導入してもお釣りがくるぐらいです。

case classを作る時は常にMonoidを意識しておいて、可能であればMonoid化しておくのがよいでしょう。

MonoidはScalazで実装します。

2012年版の「Scala Tips / Monoid - 新規作成」ではScalaz 6でしたが、今回はScalaz 7でMonoidの定義の仕方も変更されています。

Argonaut

Finagleなどを使ってRESTベースのmicroservicesアーキテクチャを取る場合、case classをJSONで送受信するニーズが大きくなります。

case classをできるだけ簡単にJSON化する方法としてArgonautが有力なので使ってみました。

ScalaCheck

case classがMonoidである場合は、必ず二項演算があるので、この二項演算のテストコードが必要になります。

Scalaプログラミングでは、こういった演算はScalaCheckでプロパティベーステストを行うのがお約束になっています。

プログラム

build.sbt

build.sbtは特に難しい所はありません。必要なライブラリを登録しているだけです。

scalaVersion := "2.11.6"

val scalazVersion = "7.1.0"

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % scalazVersion,
  "io.argonaut" %% "argonaut" % "6.1-M4",
  "org.scalatest" %% "scalatest" % "2.2.4" % "test",
  "org.scalacheck" %% "scalacheck" % "1.12.2" % "test",
  "junit" % "junit" % "4.12" % "test"
)
Average.scala

Monoid化とArgonaut化したcase class Averageは以下になります。

package sample

import scalaz._, Scalaz._
import argonaut._, Argonaut._

case class Average(count: Int, total: Int) {
  import Average.Implicits._

  def value: Float = total / count.toFloat

  def +(rhs: Average): Average = {
    Average(count + rhs.count, total + rhs.total)
  }

  def marshall: String = this.asJson.nospaces
}

object Average {
  import Implicits._

  val empty = Average(0, 0)

  def unmarshall(s: String): Validation[String, Average] = s.decodeValidation[Average]

  object Implicits {
    implicit object AverageMonoid extends Monoid[Average] {
      def append(lhs: Average, rhs: => Average) = lhs + rhs
      def zero = Average.empty
    }

    implicit def decodeAverageJson: DecodeJson[Average] =
      casecodec2(Average.apply, Average.unapply)("count", "total")

    implicit def encodeAverageJson: EncodeJson[Average] =
      jencode2L((d: Average) => (d.count, d.total))("count", "total")
  }
}

case classの定義に難しいところはないと思います。

以下ではMonoidとArgonautの定義について説明します。

Monoid

Monoidは、Scalazの型クラスMonoidの型クラスインスタンスを作成して暗黙オブジェクトとして定義します。

implicit object AverageMonoid extends Monoid[Average] {
      def append(lhs: Average, rhs: => Average) = lhs + rhs
      def zero = Average.empty
    }

型クラスMonoidの型クラスインスタンスのappendメソッドとzeroメソッドを実装します。

appendメソッドはMonoid対象の二項演算を定義します。通常、Monoid対象となるcase classで「+」メソッドを提供しているはずなのでこれを呼び出す形になります。

zeroメソッドは空の値を返すようにします。通常、case classの空値はコンパニオン/オブジェクトの変数emptyに定義するのが普通なので、これをzeroメソッドの値として返す形になります。

Argonaut

まずArgonautの基本定義として、ScalaオブジェクトとJSON間の相互変換をする暗黙関数を2つ用意します。

ここも自動化できればよいのですが、マクロを使う形になるはずなので、マクロの仕様がフィックスしていない現状だとやや時期尚早かもしれません。

implicit def decodeAverageJson: DecodeJson[Average] =
      casecodec2(Average.apply, Average.unapply)("count", "total")

    implicit def encodeAverageJson: EncodeJson[Average] =
      jencode2L((d: Average) => (d.count, d.total))("count", "total")

次に、マーシャル関数(Scala→JSON)とアンマーシャル関数(JSON→Scala)を定義します。

まず、マーシャル関数はcase class Averageのメソッドとして定義しました。

def marshall: String = this.asJson.nospaces

アンマーシャル関数はAverageのコンパニオンオブジェクトに定義しました。

def unmarshall(s: String): Validation[String, Average] = s.decodeValidation[Average]
AverageSpec.scala

case class Averageのテストコードとして、ScalaCheckによるプロパティベーステストを行うAverageSpecを作りました。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class AverageSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "Average" should {
    "value" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val x = Average(ns1.length, ns1.sum)
          x.value should be (toAverageValue(ns1))
        }
      }
    }
    "value gen" in {
      forAll ((Gen.nonEmptyListOf(Gen.posNum[Int]), "ns")) { ns =>
        val x = Average(ns.length, ns.sum)
        x.value should be (toAverageValue(ns))
      }
    }
    "+" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val xs = toAverages(ns1)
          val r = xs.foldLeft(Average.empty)((z, x) => z + x)
          r.value should be (toAverageValue(ns1))
        }
      }
    }
    "monoid concatenate" in {
      import scalaz._, Scalaz._
      import Average.Implicits.AverageMonoid
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val xs = toAverages(ns1)
          val r = xs.concatenate
          r.value should be (toAverageValue(ns1))
        }
      }
    }

    def toAverages(ns: List[Int]): List[Average] = {
      ns.map(Average(1, _))
    }

    def toAverageValue(ns: List[Int]): Float = {
      ns.sum.toFloat / ns.length
    }
  }
}

ScalaCheckのプロパティベーステストを行う方法はいくつかありますが、ここではScalaTestのGeneratorDrivenPropertyChecksを使ってみました。

GeneratorDrivenPropertyChecksを使うと「forAll」を使って、指定された型の値をワンセット自動生成してくれるので、この値を用いてテストを行うことができます。

forAllの内部定義として個々のテストを書いていきますが、これは通常のテストコードと同様です。

"value" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val x = Average(ns1.length, ns1.sum)
          x.value should be (toAverageValue(ns1))
        }
      }
    }

一つポイントとなるのは、テスト用データの自動生成は指定された型(ここでは List[Int])の任意の値を取る可能性があるので、これをテストコード側で排除する必要がある点です。

この問題への対処方法として、テスト用データ生成器(org.scalacheck.Gen)で値域を指定する方法があります。

org.scalacheck.Genを使って値域を指定するテスト"value gen"は以下になります。org.scalacheck.Genを使うとクロージャの引数の型(List[Int])も省略できます。

"value gen" in {
      forAll ((Gen.nonEmptyListOf(Gen.posNum[Int]), "ns")) { ns =>
        val x = Average(ns.length, ns.sum)
        x.value should be (toAverageValue(ns))
      }
    }

いずれの方法を取るにしても、テストプログラムを書く時に、テストデータを準備する必要はないのは大変便利です。

また、テストプログラムが、テストをする手続きというより、より仕様定義に近いものになるのもよい感触です。

実行

Sbtのtestでテストプログラムを実行することができます。

$ sbt test
...略...
[info] AverageSpec:
[info] Average
[info] - should value
[info] - should value gen
[info] - should +
[info] - should monoid concatenate
[info] ScalaTest
[info] 36mRun completed in 1 second, 870 milliseconds.0m
[info] 36mTotal number of tests run: 40m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 4, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 4, Failed 0, Errors 0, Passed 4
[success] Total time: 5 s, completed 2015/05/15 16:31:57

まとめ

case classを作る時に意識しておきたい基本形を考えてみました。

プログラミング時には、この基本形を念頭に置いて、不要な機能は削除、必要な機能を追加という形でcase classを組み立てていくことをイメージしています。

なお、Monoidに対するテストに関してはScalazのSpecLiteを使うともっと強力なテストができますが、この話題は別途取り上げたいと思います。

諸元

  • Scala 2.11.6
  • Scalaz 7.1.0
  • Argonaut 6.1-M4
  • ScalaTest 2.2.4
  • ScalaCheck 1.12.2

2015年5月11日月曜日

[docker] DockerでSpark SQL

Spark SQL 1.3から以下の2つの機能が導入されています。

  • DataSourceとしてJDBCが使えるようになった
  • DataFrame

この2つの機能追加によってSpark SQLを汎用のバッチ処理基盤にできるのではないかというインスピレーションが湧きました。

この実現目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみた、というのが今回のお話です。

Spark SQL

Spark SQLは、Sparkの分散計算処理をSQLで記述できるようにしたものです。SQLとSpark本来のmonadicなAPI(e.g. filter, map, flatMap)を併用して計算処理を記述することができます。

このプログラミングモデルは非常に強力で、大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。

Spark SQLの基本機能に加えて1.3から以下の機能も使えるようになりました。

DataSourceのJDBC対応

DataSourceとしてJDBCが使えるようになったことで、RedShift上にためた分析データなどから直接データを取得できるようになりました。MySQLやPostreSQLなどのデータを一旦S3に変換するといった準備タスクが不要になったので、ジョブ作成の手間が大きく低減すると思います。

小さな機能追加ですが、実運用上のインパクトは大きいのではないかと思います。

DataFrame

大きな機能追加としてはDataFrameが導入されました。

DataFrameは表形式の大規模データを抽象化したAPIで、元々はR/Pythonで実績のある機能のようです。

DataFrameは分析専用のAPIではなく、表形式データ操作の汎用APIとして使用できるのではないかと期待しています。計算結果を外部出力する際の汎用機能としても期待できます。

もちろんR/Pythonなどのデータ分析処理系との連携も期待できそうです。

Spark SQLの用途

Spark SQLの基本機能と上記の2つの機能追加によって、Sparkバッチを大規模(データ量/計算量)向けデータ処理基盤としてだけではなく、汎用のバッチ実行基盤として使えるようになるのではないかとインスピレーションが湧いたわけです。

データ集計用のバッチをSparkバッチとして作成して、データ量、計算量に応じてスタンドアロンジョブとSparkクラスタ上でのジョブのいずれかでジョブ実行するというユースケースです。

そのベースとして、Sparkクラスタを用いないスタンドアロンジョブとして実行するためのDockerイメージを作ってみました。

spark-sql-scala-docker

spark-sql-scala-dockerはSparkアプリケーションをスタンドアロンで実行するためのDockerイメージです。

GitHubにソースコードがありますので、詳細はこちらを参照して下さい。

以下では、実装上のポイントと使い方について説明します。

Dockerfile

spark-sql-scala-dockerのDockerfileは以下になります。

FROM sequenceiq/spark:1.3.0

RUN mkdir -p /opt/spark/lib
RUN cd /opt/spark/lib && curl -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.30.tar.gz' -o - | tar -xz --strip-components=1 mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar
RUN curl -L 'http://jdbc.postgresql.org/download/postgresql-9.2-1002.jdbc4.jar' -o /opt/spark/lib/postgresql-9.2-1002.jdbc4.jar

ENV SPARK_CLASSPATH /opt/spark/lib/mysql-connector-java-5.1.30-bin.jar:/opt/spark/lib/postgresql-9.2-1002.jdbc4.jar

RUN rpm -ivh http://ftp-srv2.kddilabs.jp/Linux/distributions/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm

RUN yum -y install redis --enablerepo=epel

COPY spark-defaults.conf /opt/spark-defaults.conf

COPY entrypoint.sh /opt/entrypoint.sh

ENV COMMAND_JAR_DIR /opt/command.d

ENV COMMAND_JAR_NAME command.jar

VOLUME [$COMMAND_JAR_DIR"]

ENTRYPOINT ["/opt/entrypoint.sh"]

Dockerイメージsequenceiq/spark:1.3.0をベースにしていて以下の調整だけ行っています。

  • MySQLとPostgreSQLのJDBCドライバのインストール
  • Sparkアプリケーションの登録処理
entrypoint.sh

spark-sql-scala-dockerのentrypoint.shは以下になります。

#! /bin/bash

# WAIT_CONTAINER_TIMER
# WAIT_CONTAINER_FILE
# WAIT_CONTAINER_KEY

# set -x

set -e

echo MySQL host: ${MYSQL_SERVER_HOST:=$MYSQL_PORT_3306_TCP_ADDR}
echo MySQL port: ${MYSQL_SERVER_PORT:=$MYSQL_PORT_3306_TCP_PORT}
echo PostgreSQL host: ${POSTGRESQL_SERVER_HOST:=$POSTGRESQL_PORT_5432_TCP_ADDR}
echo PostgreSQL port: ${POSTGRESQL_SERVER_PORT:=$POSTGRESQL_PORT_5432_TCP_PORT}
echo Redis host: ${REDIS_SERVER_HOST:=$REDIS_PORT_6379_TCP_ADDR}
echo Redis port: ${REDIS_SERVER_PORT:=$REDIS_PORT_6379_TCP_PORT}
export MYSQL_SERVER_HOST
export MYSQL_SERVER_PORT
export POSTGRESQL_SERVER_HOST
export POSTGRESQL_SERVER_PORT
export REDIS_SERVER_HOST
export REDIS_SERVER_PORT

function wait_container {
    if [ -n "$REDIS_SERVER_HOST" ]; then
 wait_container_redis
    elif [ -n "$WAIT_CONTAINER_FILE" ]; then
 wait_container_file
    fi
}

function wait_container_redis {
    result=1
    for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
    do
 sleep 1s
 result=0
 if [ $(redis-cli -h $REDIS_SERVER_HOST -p $REDIS_SERVER_PORT GET $WAIT_CONTAINER_KEY)'' = "up" ]; then
     break
 fi
 echo spark-sql-scala-docker wait: $REDIS_SERVER_HOST
 result=1
    done
    if [ $result = 1 ]; then
 exit 1
    fi
}

function wait_container_file {
    result=1
    for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
    do
 sleep 1s
 result=0
 if [ -e $WAIT_CONTAINER_FILE ]; then
     break
 fi
 echo spark-sql-scala-docker wait: $WAIT_CONTAINER_FILE
 result=1
    done
    if [ $result = 1 ]; then
 exit 1
    fi
}

COMMAND_JAR=$COMMAND_JAR_DIR/$COMMAND_JAR_NAME

wait_container

sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml

spark-submit --properties-file /opt/spark-defaults.conf $COMMAND_JAR

基本的にはspark-submitでSparkアプリケーションのジョブをサブミットしているだけですが、以下の2つの調整を行っています。

  • Redisを使って他のコンテナの待ち合わせ
  • 中間データのローディング先をHDFSではなくローカルファイルに変更する
コンテナの待ち合わせ

Sparkアプリケーションを動作させる前の準備を他のコンテナで進める場合は、コンテナの待ち合わせが必要になります。この待ち合わせをmysql-java-embulk-dockerと同様にRedisを用いて実現しています。

典型的な使用例は、Sparkアプリケーションのテスト実行時でのテストDBの準備です。この実例は後ほどサンプルで説明します。

中間データのローディング先

core-site.xmlの変更処理です。

sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml

ここの設定を変更しないとDocker環境内でスタンドアロンでは動かなかったので設定変更しています。

設定変更の方法としてはHDFSを動くようにするという方式もあるのですが、スタンドアプリケーションなのでここではローカルのファイルを使う方式で対応しています。

Docker Hub

mysql-java-embulk-dockerと同様にspark-sql-scala-dockerもDocker Hubの自動ビルドの設定を行っているので、以下の場所にDockerイメージが自動ビルドされます。

このイメージは「asami/spark-sql-scala-docker」という名前で利用することができます。

サンプル

Dockerイメージ「asami/spark-sql-scala-docker」を利用してテストデータの投入を行うサンプルを作ってみます。

手元の環境上でテスト目的で動作させるためmysql-java-embulk-dockerを併用してテストデータの投入を行っています。

サンプルのコードはGitHubのspark-sql-scala-dockerのsampleディレクトリにあるので、詳細はこちらを参照して下さい。

SimpleApp.scala

サンプルのSparkバッチであるSimpleAppのプログラムは以下になります。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, DataFrame}

object SimpleApp extends App {
  val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
  val count = batting.count()
  println(s"count = ${batting.count()}")
}

object SparkSqlUtils {
  def createSqlContext(name: String): SQLContext = {
    val conf = new SparkConf().setAppName(name)
    val sc = new SparkContext(conf)
    new SQLContext(sc)
  }

  def createMysqlDataFrame(name: String, table: String): DataFrame = {
    val sqlc = createSqlContext(name)
    createMysqlDataFrame(sqlc, table)
  }

  def createMysqlDataFrame(sqlc: SQLContext, table: String): DataFrame = {
    val host = System.getenv("MYSQL_SERVER_HOST")
    val port = System.getenv("MYSQL_SERVER_PORT")
    val user = System.getenv("MYSQL_SERVER_USER")
    val password = System.getenv("MYSQL_SERVER_PASSWORD")
    sqlc.load("jdbc", Map(
      "url" -> s"jdbc:mysql://$host:$port/baseball?user=$user&password=$password",
      "dbtable" -> table
    ))
  }
}

SparkSqlUtilsにDataFrame取得処理をまとめています。ここは汎用ライブラリ化できるところです。

この処理を除いた以下の処理がSparkバッチの本体です。

val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
  val count = batting.count()
  println(s"count = ${batting.count()}")
アプリケーションロジック

指定したテーブル"batting"に対応したDataFrameを取得し、countメソッドでレコード総数を取得し、その結果をコンソールに出力しています。とても簡単ですね。

この部分を以下の機能を用いて記述することで高度なバッチ処理を簡単に記述できます。

  • DataFrameによる表データ操作
  • DataFrameから変換したRDDを用いてSpark計算処理

前述したように「大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。」

移入・移出

テーブル"batting"をDataFrameとしてローディングしているのは、前述のSpark 1.3の機能追加「DataSourceとしてJDBCが使えるようになった」によるものです。

また、ここでは外部出力をコンソール出力にしていますが、RDDのsaveAsTextFileメソッドやDataFrameを用いることで、S3やデータベースなどに集計結果を簡単に出力することができます。

データベースなどへの外部出力が簡単に行えるのもSpark 1.3の機能追加「DataFrame」の効果です。

ここからも分かるように、Spark SQL 1.3で導入された「DataSourceとしてJDBCが使えるようになった」と「DataFrame」により、Sparkバッチ処理の難題であったデータの移入/移出処理が極めて簡単に記述できるようになったわけです。

SBTの設定

SBTによるScalaプログラムのビルドの設定は以下になります。

Spark本体とSpark SQLを依存ライブラリとして設定している、ごくオーソドックスな設定です。

Sparkバッチ用にすべての依存ライブラリをまとめたJARファイルを作る必要があるので、sbt-assemblyの設定を行ってます。

ポイントとしては、Sparkバッチの実行環境にScalaの基本ライブラリとSpark本体/Spark SQLのライブラリが用意されているので、sbt-assemblyでまとめるJARファイルから排除する設定を行っています。

  • Spark本体とSpark SQLの依存ライブラリの設定を"provided"にしてリンク対象から外す
  • sbt-assemblyの設定で"includeScala = false"としてScala基本ライブラリをリンク対象から外す

これらの設定はなくても動作しますが、JARファイルが巨大になってしまいます。

name := "simple"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.1" % "provided"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

sbt-assemblyプラグインが必要なのでproject/assembly.sbtに以下の設定をしておきます。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
docker-compose.yml

サンプルプログラムのdocker-compose.ymlは以下になります。

spark:
  image: asami/spark-sql-scala-docker
  links:
    - mysql
    - redis
  volumes:
    - target/scala-2.10:/opt/command.d
  environment:
    COMMAND_JAR_NAME: simple-assembly-1.0.jar
    WAIT_CONTAINER_KEY: mysql-java-embulk-docker
    MYSQL_SERVER_USER: baseball
    MYSQL_SERVER_PASSWORD: baseball
mysql:
  image: asami/mysql-java-embulk-docker
  links:
    - redis
  ports:
    - ":3306"
  volumes:
    - setup.d:/opt/setup.d
  environment:
    MYSQL_USER: baseball
    MYSQL_PASSWORD: baseball
    MYSQL_ROOT_PASSWORD: baseball
    MYSQL_DATABASE: baseball
redis:
  image: redis
  ports:
    - ":6379"

自前ではDockerイメージを作らず、以下の3つの汎用Dockerイメージを再利用しています。

  • asami/spark-sql-scala-docker
  • asami/mysql-java-embulk-docker
  • redis
asami/spark-sql-scala-docker

ボリュームと環境変数の記述で、targetscala-2.10simple-assembly-1.0.jarがSparkバッチプログラムとして認識されるようにしています。

simple-assembly-1.0.jarはsbt-assemblyで作成した「全部入り(SparkとScala以外)」のJARファイルです。

それ以外は、mysql-java-embulk-dockerと同期をとるためのおまじないです。

asami/mysql-java-embulk-docker

前回の記事「Docker Composeでデータ投入」と同じ設定です。テスト用のMySQLデータベースにテストデータを投入しています。

Batting.csvは以下のサイトからデータを取得しました。

redis

asami/mysql-java-embulk-dockerによるテストデータ投入の待ち合わせにredisを用いています。

ビルド

Sparkバッチのビルドはsbtで行います。

$ sbt assembly

テスト環境はdocker-composeのbuildコマンドでビルドします。

$ docker-compose build
実行

docker-composeのupコマンドで実行します。

$ docker-compose up

動作過程がコンソールに出力されますが、最後の方で以下のような出力があります。

spark_1 | count = 99846

無事Sparkバッチでデータ集計ができました。

まとめ

Spark SQLを汎用のバッチ処理基盤として運用する目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみましたが、無事動作しました。

このことによってspark-sql-scala-dockerとmysql-java-embulk-dockerを使って手元で簡単にSparkバッチをテストできるようになりました。

汎用Dockerイメージをdocker-composeで組み合わせるだけなので運用的にも大変、楽だと思います。

今回は試していませんが、spark-sql-scala-dockerを使ってSparkバッチをECS(EC2 Container Service)などのDocker環境上でスタンドアロンバッチとして実行するという運用も可能ではないかと考えています。

もちろん、SparkバッチのJARファイルをspark-submitコマンドによるジョブ投入により直接Sparkクラスタ上で実行することでSpark本来の大規模(データ量/計算量)処理を行うことができます。

いずれの場合も、基本的に開発するのは、Scalaによる通常のSparkバッチプログラムだけです。テストやDocker環境上でのスタンドアロンバッチのいずれも汎用Dockerイメージを活用することで、簡単な設定のみで運用することができそうです。

今回の作業で上記の3つのユースケースを同時に満たせることの目処が立ちました。この成果をベースにSpark SQLを汎用のバッチ処理基盤として利用するためのノウハウの積み上げをしてきたいと思います。

諸元

  • Mac OS 10.7.5
  • docker 1.6
  • docker-compose 1.2.0
  • Spark SQL 1.3