2015年5月25日月曜日

JSONライブラリ性能比較

Spark SQL 1.3の登場を機にバッチ処理基盤の刷新を考えています。この流れの中でJobSchedulerやSpark SQLをDockerで動かす試み(Docker ComposeでMySQLを使う,DockerでSpark SQL)などを行ってきました。

バッチをSpark SQLで記述し、データや計算量の規模に応じてDocker Cluster(e.g. ECS)またはSpark Cluster(e.g. EMR)を選択してバッチ処理を実行するという枠組みが見えてきました。

次に考えておきたいのはバッチ処理で使用する要素技術の選択です。今回はJSONライブラリについて性能の観点から味見してみました。

なお、あくまでも味見レベルの測定なので、条件を変えると違った結果になる可能性も高いです。また、ありがちですが性能測定プログラムにバグがあって結果が逆にでるようなことがあるかもしれません。このようなリスクがあることを前提に参考にして頂ければと思います。

オンライン処理とバッチ処理

クラウド・アプリケーションはおおまかにいうとWebページやREST APIとして実現するフロント系(オンライン系)と、バックエンドで動作するバッチ系で構成されます。(今後はこれに加えてストリーム系が入ってきます。またバッチ系とはニュアンスの異なる分析系も別立てで考えておくとよさそうです。)

極簡単なアプリケーションではフロント系だけで成立する場合もありますが、ある程度本格的な機能を提供する場合はバッチ系は必須になってきます。また、クラウド・アプリケーション・アーキテクチャの定番であるCQRS+Event Sourcingはバッチ系が主役ともいえるアーキテクチャであり、今後バッチ系の重要度はますます高まってくることは間違いないでしょう。

オンライン系のロジックとバッチ系のロジックでは以下のような特性の違いがあります。

オンライン処理バッチ処理
性能特性レスポンス重視スループット重視
データ規模
計算量
実行時間
エラー処理即時通知ジョブ管理

こうやって整理すると様々な面で求められる特性が大きく異なることが分かります。

これらの特性を念頭にプログラミングしていくことは当然ですが、使用する要素技術やライブラリもこれらの特性に適合したものを選択することが望ましいです。

オンライン処理はデータ規模や計算量が小さいので使いやすさ重視でよいですが、バッチ処理はデータ規模や計算量が大きいので処理性能やメモリ消費量がクリティカルな要因になります。多少使い難くても、高速、メモリ消費量が少ない、ものを選ぶ方がよいでしょう。

バッチ処理の中のJSON

バッチ処理の中でのJSONの利用シーンですが、主に意識しなければいけないのは大規模JSONファイルの入力ということになるかと思います。ログファイルや移入データがJSON形式で用意されているようなケースですね。

通常この用途のデータはCSVやLTSVで用意されるケースが多いと思いますが、データが木構造になっている場合(いわゆる文書型)にはJSONやXMLデータが適しており、JSONで用意されるケースも多いでしょう。

バッチをSpark SQLで組む場合、データがCSVで用意されていたり、JSONであってもフラットな構造の場合はSpark SQLで簡単に取り込めるのでJSONライブラリの出番はありません。

ということで、ある程度複雑な構造を持ったJSONファイルがメインの操作対象となります。

ただ、バッチをSpark SQLではなく通常のScalaプログラムとして組みたいケースも出てくるはずなので、フラットな構造のJSONファイルに対してもペナルティなしで扱える必要はあります。

ユースケース1: 複雑な構造を持ったJSON→CSV

代表的なユースケースとしては、複雑な構造を持った大規模JSONファイルから必要な項目を抽出してCSVに書き出す、といった用途が考えられます。

SparkやHadoopの前処理として頻出しそうなユースケースです。

このケースでは(1)JSONの基本的なパース性能と、(2)JSONからデータを抽出したりデータのフィルタリングをしたりする処理の記述性が論点となります。

ユースケース2: 複雑な構造を持ったJSON→アプリケーションロジック適用

少し複雑なバッチ処理を行う場合は、アプリケーションロジックを適用するためcase classなどで定義したドメイン・オブジェクトにJSONを変換する必要がでてきます。

case classに変換後は、アプリケーションが管理しているドメインロジックを型安全に適用することができるので、プログラムの品質と開発効率を高めることができます。

ただ、JSONをcase classにマッピングする処理はそれなりに重たいので、JSON段階である程度フィルタリングした上で、必要なデータのみcase classにマッピングする形を取りたいところです。

テストプログラム

検証対象のJSONライブラリは以下の4つにしました。

  • Json4s native
  • Json4s jackson
  • Play-json
  • Argonaut

各ライブラリの性能測定対象のプログラムは以下になります。

どのライブラリもほとんど使い方は同じです。ただし、ArgonautのみJSONとオブジェクトのマッピング設定がかなり煩雑になっています。

Json4s native
package sample

import org.json4s._
import org.json4s.native.JsonMethods._

object Json4sNativeSample {
  implicit val formats = DefaultFormats

  def jsonSimple() = {
    val s = Company.example
    parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = parse(s)
    j.extract[Company]
  }

  def complex() = {
    val s = Person.example
    val j = parse(s)
    j.extract[Person]
  }
}
Json4s jackson
package sample

import org.json4s._
import org.json4s.jackson.JsonMethods._

object Json4sSample {
  implicit val formats = DefaultFormats

  def jsonSimple() = {
    val s = Company.example
    parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = parse(s)
    j.extract[Company]
  }

  def complex() = {
    val s = Person.example
    val j = parse(s)
    j.extract[Person]
  }
}
Play-json
package sample

import play.api.libs.json._

object PlaySample {
  implicit val companyReads = Json.reads[Company]
  implicit val addressReads = Json.reads[Address]
  implicit val personReads = Json.reads[Person]

  def jsonSimple() = {
    val s = Company.example
    Json.parse(s)
  }

  def jsonComplex() = {
    val s = Person.example
    Json.parse(s)
  }

  def simple() = {
    val s = Company.example
    val j = Json.parse(s)
    Json.fromJson[Company](j)
  }

  def complex() = {
    val s = Person.example
    val j = Json.parse(s)
    Json.fromJson[Person](j)
  }
}
Argonaut
package sample

import scalaz._, Scalaz._
import argonaut._, Argonaut._

object ArgonautSample {
  implicit def decodeCompanyJson: DecodeJson[Company] =
    casecodec1(Company.apply, Company.unapply)("name")

  implicit def encodeCompanyJson: EncodeJson[Company] =
    jencode1L((a: Company) => (a.name))("name")

  implicit def decodeAddressJson: DecodeJson[Address] =
    casecodec2(Address.apply, Address.unapply)("zip", "city")

  implicit def encodeAddressJson: EncodeJson[Address] =
    jencode2L((a: Address) => (a.zip, a.city))("zip", "city")

  implicit def decodePersonJson: DecodeJson[Person] =
    casecodec4(Person.apply, Person.unapply)("name", "age", "address", "company")

  def jsonSimple() = {
    val s = Company.example
    Parse.parse(s)
   }

  def jsonComplex() = {
    val s = Person.example
    Parse.parse(s)
   }

  def simple() = {
    val s = Company.example
    Parse.decodeOption[Company](s)
  }

  def complex() = {
    val s = Person.example
    Parse.decodeOption[Person](s)
  }
}

テストデータ

Company
package sample

case class Company(name: String)

object Company {
  val example = """
{"name": "Yamada Corp"}
"""
}
Person
package sample

case class Person(name: String, age: Int, address: Address, company: Option[Company])
case class Address(zip: String, city: String)

object Person {
  val example = """
{"name": "Yamada Taro", "age": 30, "address": {"zip": "045", "city": "Yokohama", "company": "Yamada Corp"}}
"""
}

測定結果

性能測定は以下のプログラムで行いました。JSONのパース処理を100万回繰り返した時の累積ミリ秒を表示しています。

package sample

object All {
  def go(msg: String)(body: => Unit): Unit = {
    System.gc
    val ts = System.currentTimeMillis
    for (i <- 0 until 1000000) yield {
      body
    }
    println(s"$msg: ${System.currentTimeMillis - ts}")
  }

  def main(args: Array[String]) {
    go(s"Json4s native json simple")(Json4sNativeSample.jsonSimple)
    go(s"Json4s jackson json simple")(Json4sSample.jsonSimple)
    go(s"Play json simple")(PlaySample.jsonSimple)
    go(s"Argonaut json simple")(ArgonautSample.jsonSimple)
    go(s"Json4s native json complex")(Json4sNativeSample.jsonComplex)
    go(s"Json4s jackson json complex")(Json4sSample.jsonComplex)
    go(s"Play json complex")(PlaySample.jsonComplex)
    go(s"Argonaut json complex")(ArgonautSample.jsonComplex)
    go(s"Json4s native simple")(Json4sNativeSample.simple)
    go(s"Json4s jackson simple")(Json4sSample.simple)
    go(s"Play simple")(PlaySample.simple)
    go(s"Argonaut simple")(ArgonautSample.simple)
    go(s"Json4s native complex")(Json4sNativeSample.complex)
    go(s"Json4s jackson complex")(Json4sSample.complex)
    go(s"Play complex")(PlaySample.complex)
    go(s"Argonaut complex")(ArgonautSample.complex)
  }
}

性能測定結果は以下になります。

ネストなし/JSON
ライブラリ性能(ms)
Json4s native747
Json4s jackson788
Play-json640
Argonaut872

ほとんど同じですがPlay-jsonが速く、Argonautが遅いという結果になっています。

ネストあり/JSON
ライブラリ性能(ms)
Json4s native1495
Json4s jackson1310
Play-json1535
Argonaut1724

ネストありのJSONの場合は、Json4s jacksonが速く、Argonautが遅いという結果になりました。

ネストなし/case class
ライブラリ性能(ms)
Json4s native2080
Json4s jackson1601
Play-json1149
Argonaut1151

JSONをcase classにマッピングする場合は、単にJSONをパースするだけの性能とは全く異なる性能特性になりました。

Json4s系はいずれもかなり遅くなっています。

Play-jsonとArgonautはほぼ同じで高速です。

ネストあり/case class
ライブラリ性能(ms)
Json4s native5162
Json4s jackson4786
Play-json4471
Argonaut3840

ネストありのJSONをcase classにマッピングする場合は、Argonautが高速でした。

Json4sはこのケースでもかなり遅くなっています。

評価

性能測定結果を評価の観点で表にまとめました。

ライブラリネストありJSON性能case class性能case class設定
Json4s native
Json4s jackson
Play-json
Argonaut

ネストありJSONのパース性能はJson4sが最速ですが、逆にcase classへのマッピングはかなり遅くなっています。このため、Json4s一択という感じではなさそうです。

ArgonautはJSONパースは遅く、case classへのマッピング性能はよいもののマッピング設定が煩雑なので、使い場所が難しい感触です。Scalazと組合せて関数型的に面白いことが色々できそうなので、その点をどう評価するかということだと思います。

一番バランスがよいのがPlay-jsonで、一択するならPlay-jsonがよさそうです。

ただPlay-jsonはPlayのバージョンと連動しているためdependency hellの可能性があるのが難点です。たとえば、最新版のPlay-jsonを使って共通ライブラリを作っても、サービスで運用しているPlayのバージョンが古いと適用できない、といった問題です。

まとめ

全体的にPlay-jsonが最もバランスがよいという結果が得られました。

個別の性能的には「ユースケース1: 複雑な構造を持ったJSON→CSV」はJson4s jackson、「ユースケース2: 複雑な構造を持ったJSON→アプリケーションロジック適用」はArgonautが適しているという結果が得られました。

ただArgonautはcase classの設定が煩雑なので、広範囲に使える共通処理を作る場合はよいのですが、用途ごとの小さな要件には適用しづらい感じです。そういう意味では、Play-jsonがこの用途にも適していそうです。

以上の点から、次のような方針で考えていこうと思います。

  • Play-jsonを基本にする
  • 必要に応じて「大規模JSON→CSV」の用途向けにJson4s jacksonを併用
  • 必要に応じて「大規模JSON→アプリケーションロジック適用」の用途向けにArgonautを併用

Play-jsonを基本ライブラリとして採用するメリットが大きいので、Play-jsonのdependency hell問題は、運用で回避する作戦を取りたいと思います。

諸元

  • Mac OS 10.7.5 (2.6 GHz Intel Core i7)
  • Java 1.7.0_75
  • Scala 2.11.6
  • Json4s native 2.3.11
  • Json4s jackson 2.3.11
  • Play-json 2.3.9
  • Argonaut 6.1

2015年5月18日月曜日

case class 2015

case classはScalaプログラミングにおける最重要機能の一つです。

case classはいろいろな便利機能が言語機能としてビルトインされているのに加えて、OOP的にはValue Object、関数型プログラミングとしては代数的データ構造として利用することができます。

case classはそのままシンプルに使っても便利ですが、case classを作る時に基本対応しておくとよさそうな各種機能があるので、その辺りの最新事情を取り込んだ2015年版case classについて考えてみます。

対応機能

2015年版case classでは以下の機能に対応することにします。

  • Monoid
  • Argonaut
  • ScalaCheck
Monoid

Scala Tips / Monoid - 新規作成」は2012年6月の記事なので、かれこれ3年になりますがMonoidの重要性は変わる所がありません。Monoidを使うためだけにScalazを導入してもお釣りがくるぐらいです。

case classを作る時は常にMonoidを意識しておいて、可能であればMonoid化しておくのがよいでしょう。

MonoidはScalazで実装します。

2012年版の「Scala Tips / Monoid - 新規作成」ではScalaz 6でしたが、今回はScalaz 7でMonoidの定義の仕方も変更されています。

Argonaut

Finagleなどを使ってRESTベースのmicroservicesアーキテクチャを取る場合、case classをJSONで送受信するニーズが大きくなります。

case classをできるだけ簡単にJSON化する方法としてArgonautが有力なので使ってみました。

ScalaCheck

case classがMonoidである場合は、必ず二項演算があるので、この二項演算のテストコードが必要になります。

Scalaプログラミングでは、こういった演算はScalaCheckでプロパティベーステストを行うのがお約束になっています。

プログラム

build.sbt

build.sbtは特に難しい所はありません。必要なライブラリを登録しているだけです。

scalaVersion := "2.11.6"

val scalazVersion = "7.1.0"

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % scalazVersion,
  "io.argonaut" %% "argonaut" % "6.1-M4",
  "org.scalatest" %% "scalatest" % "2.2.4" % "test",
  "org.scalacheck" %% "scalacheck" % "1.12.2" % "test",
  "junit" % "junit" % "4.12" % "test"
)
Average.scala

Monoid化とArgonaut化したcase class Averageは以下になります。

package sample

import scalaz._, Scalaz._
import argonaut._, Argonaut._

case class Average(count: Int, total: Int) {
  import Average.Implicits._

  def value: Float = total / count.toFloat

  def +(rhs: Average): Average = {
    Average(count + rhs.count, total + rhs.total)
  }

  def marshall: String = this.asJson.nospaces
}

object Average {
  import Implicits._

  val empty = Average(0, 0)

  def unmarshall(s: String): Validation[String, Average] = s.decodeValidation[Average]

  object Implicits {
    implicit object AverageMonoid extends Monoid[Average] {
      def append(lhs: Average, rhs: => Average) = lhs + rhs
      def zero = Average.empty
    }

    implicit def decodeAverageJson: DecodeJson[Average] =
      casecodec2(Average.apply, Average.unapply)("count", "total")

    implicit def encodeAverageJson: EncodeJson[Average] =
      jencode2L((d: Average) => (d.count, d.total))("count", "total")
  }
}

case classの定義に難しいところはないと思います。

以下ではMonoidとArgonautの定義について説明します。

Monoid

Monoidは、Scalazの型クラスMonoidの型クラスインスタンスを作成して暗黙オブジェクトとして定義します。

implicit object AverageMonoid extends Monoid[Average] {
      def append(lhs: Average, rhs: => Average) = lhs + rhs
      def zero = Average.empty
    }

型クラスMonoidの型クラスインスタンスのappendメソッドとzeroメソッドを実装します。

appendメソッドはMonoid対象の二項演算を定義します。通常、Monoid対象となるcase classで「+」メソッドを提供しているはずなのでこれを呼び出す形になります。

zeroメソッドは空の値を返すようにします。通常、case classの空値はコンパニオン/オブジェクトの変数emptyに定義するのが普通なので、これをzeroメソッドの値として返す形になります。

Argonaut

まずArgonautの基本定義として、ScalaオブジェクトとJSON間の相互変換をする暗黙関数を2つ用意します。

ここも自動化できればよいのですが、マクロを使う形になるはずなので、マクロの仕様がフィックスしていない現状だとやや時期尚早かもしれません。

implicit def decodeAverageJson: DecodeJson[Average] =
      casecodec2(Average.apply, Average.unapply)("count", "total")

    implicit def encodeAverageJson: EncodeJson[Average] =
      jencode2L((d: Average) => (d.count, d.total))("count", "total")

次に、マーシャル関数(Scala→JSON)とアンマーシャル関数(JSON→Scala)を定義します。

まず、マーシャル関数はcase class Averageのメソッドとして定義しました。

def marshall: String = this.asJson.nospaces

アンマーシャル関数はAverageのコンパニオンオブジェクトに定義しました。

def unmarshall(s: String): Validation[String, Average] = s.decodeValidation[Average]
AverageSpec.scala

case class Averageのテストコードとして、ScalaCheckによるプロパティベーステストを行うAverageSpecを作りました。

package sample

import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest._
import org.scalatest.prop.GeneratorDrivenPropertyChecks

@RunWith(classOf[JUnitRunner])
class AverageSpec extends WordSpec with Matchers with GivenWhenThen with GeneratorDrivenPropertyChecks {
  "Average" should {
    "value" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val x = Average(ns1.length, ns1.sum)
          x.value should be (toAverageValue(ns1))
        }
      }
    }
    "value gen" in {
      forAll ((Gen.nonEmptyListOf(Gen.posNum[Int]), "ns")) { ns =>
        val x = Average(ns.length, ns.sum)
        x.value should be (toAverageValue(ns))
      }
    }
    "+" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val xs = toAverages(ns1)
          val r = xs.foldLeft(Average.empty)((z, x) => z + x)
          r.value should be (toAverageValue(ns1))
        }
      }
    }
    "monoid concatenate" in {
      import scalaz._, Scalaz._
      import Average.Implicits.AverageMonoid
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val xs = toAverages(ns1)
          val r = xs.concatenate
          r.value should be (toAverageValue(ns1))
        }
      }
    }

    def toAverages(ns: List[Int]): List[Average] = {
      ns.map(Average(1, _))
    }

    def toAverageValue(ns: List[Int]): Float = {
      ns.sum.toFloat / ns.length
    }
  }
}

ScalaCheckのプロパティベーステストを行う方法はいくつかありますが、ここではScalaTestのGeneratorDrivenPropertyChecksを使ってみました。

GeneratorDrivenPropertyChecksを使うと「forAll」を使って、指定された型の値をワンセット自動生成してくれるので、この値を用いてテストを行うことができます。

forAllの内部定義として個々のテストを書いていきますが、これは通常のテストコードと同様です。

"value" in {
      forAll ("ns") { (ns: List[Int]) =>
        val ns1 = ns.filter(_ >= 0)
        if (ns1.nonEmpty) {
          val x = Average(ns1.length, ns1.sum)
          x.value should be (toAverageValue(ns1))
        }
      }
    }

一つポイントとなるのは、テスト用データの自動生成は指定された型(ここでは List[Int])の任意の値を取る可能性があるので、これをテストコード側で排除する必要がある点です。

この問題への対処方法として、テスト用データ生成器(org.scalacheck.Gen)で値域を指定する方法があります。

org.scalacheck.Genを使って値域を指定するテスト"value gen"は以下になります。org.scalacheck.Genを使うとクロージャの引数の型(List[Int])も省略できます。

"value gen" in {
      forAll ((Gen.nonEmptyListOf(Gen.posNum[Int]), "ns")) { ns =>
        val x = Average(ns.length, ns.sum)
        x.value should be (toAverageValue(ns))
      }
    }

いずれの方法を取るにしても、テストプログラムを書く時に、テストデータを準備する必要はないのは大変便利です。

また、テストプログラムが、テストをする手続きというより、より仕様定義に近いものになるのもよい感触です。

実行

Sbtのtestでテストプログラムを実行することができます。

$ sbt test
...略...
[info] AverageSpec:
[info] Average
[info] - should value
[info] - should value gen
[info] - should +
[info] - should monoid concatenate
[info] ScalaTest
[info] 36mRun completed in 1 second, 870 milliseconds.0m
[info] 36mTotal number of tests run: 40m
[info] 36mSuites: completed 1, aborted 00m
[info] 36mTests: succeeded 4, failed 0, canceled 0, ignored 0, pending 00m
[info] 32mAll tests passed.0m
[info] Passed: Total 4, Failed 0, Errors 0, Passed 4
[success] Total time: 5 s, completed 2015/05/15 16:31:57

まとめ

case classを作る時に意識しておきたい基本形を考えてみました。

プログラミング時には、この基本形を念頭に置いて、不要な機能は削除、必要な機能を追加という形でcase classを組み立てていくことをイメージしています。

なお、Monoidに対するテストに関してはScalazのSpecLiteを使うともっと強力なテストができますが、この話題は別途取り上げたいと思います。

諸元

  • Scala 2.11.6
  • Scalaz 7.1.0
  • Argonaut 6.1-M4
  • ScalaTest 2.2.4
  • ScalaCheck 1.12.2

2015年5月11日月曜日

[docker] DockerでSpark SQL

Spark SQL 1.3から以下の2つの機能が導入されています。

  • DataSourceとしてJDBCが使えるようになった
  • DataFrame

この2つの機能追加によってSpark SQLを汎用のバッチ処理基盤にできるのではないかというインスピレーションが湧きました。

この実現目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみた、というのが今回のお話です。

Spark SQL

Spark SQLは、Sparkの分散計算処理をSQLで記述できるようにしたものです。SQLとSpark本来のmonadicなAPI(e.g. filter, map, flatMap)を併用して計算処理を記述することができます。

このプログラミングモデルは非常に強力で、大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。

Spark SQLの基本機能に加えて1.3から以下の機能も使えるようになりました。

DataSourceのJDBC対応

DataSourceとしてJDBCが使えるようになったことで、RedShift上にためた分析データなどから直接データを取得できるようになりました。MySQLやPostreSQLなどのデータを一旦S3に変換するといった準備タスクが不要になったので、ジョブ作成の手間が大きく低減すると思います。

小さな機能追加ですが、実運用上のインパクトは大きいのではないかと思います。

DataFrame

大きな機能追加としてはDataFrameが導入されました。

DataFrameは表形式の大規模データを抽象化したAPIで、元々はR/Pythonで実績のある機能のようです。

DataFrameは分析専用のAPIではなく、表形式データ操作の汎用APIとして使用できるのではないかと期待しています。計算結果を外部出力する際の汎用機能としても期待できます。

もちろんR/Pythonなどのデータ分析処理系との連携も期待できそうです。

Spark SQLの用途

Spark SQLの基本機能と上記の2つの機能追加によって、Sparkバッチを大規模(データ量/計算量)向けデータ処理基盤としてだけではなく、汎用のバッチ実行基盤として使えるようになるのではないかとインスピレーションが湧いたわけです。

データ集計用のバッチをSparkバッチとして作成して、データ量、計算量に応じてスタンドアロンジョブとSparkクラスタ上でのジョブのいずれかでジョブ実行するというユースケースです。

そのベースとして、Sparkクラスタを用いないスタンドアロンジョブとして実行するためのDockerイメージを作ってみました。

spark-sql-scala-docker

spark-sql-scala-dockerはSparkアプリケーションをスタンドアロンで実行するためのDockerイメージです。

GitHubにソースコードがありますので、詳細はこちらを参照して下さい。

以下では、実装上のポイントと使い方について説明します。

Dockerfile

spark-sql-scala-dockerのDockerfileは以下になります。

FROM sequenceiq/spark:1.3.0

RUN mkdir -p /opt/spark/lib
RUN cd /opt/spark/lib && curl -L 'http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.30.tar.gz' -o - | tar -xz --strip-components=1 mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar
RUN curl -L 'http://jdbc.postgresql.org/download/postgresql-9.2-1002.jdbc4.jar' -o /opt/spark/lib/postgresql-9.2-1002.jdbc4.jar

ENV SPARK_CLASSPATH /opt/spark/lib/mysql-connector-java-5.1.30-bin.jar:/opt/spark/lib/postgresql-9.2-1002.jdbc4.jar

RUN rpm -ivh http://ftp-srv2.kddilabs.jp/Linux/distributions/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm

RUN yum -y install redis --enablerepo=epel

COPY spark-defaults.conf /opt/spark-defaults.conf

COPY entrypoint.sh /opt/entrypoint.sh

ENV COMMAND_JAR_DIR /opt/command.d

ENV COMMAND_JAR_NAME command.jar

VOLUME [$COMMAND_JAR_DIR"]

ENTRYPOINT ["/opt/entrypoint.sh"]

Dockerイメージsequenceiq/spark:1.3.0をベースにしていて以下の調整だけ行っています。

  • MySQLとPostgreSQLのJDBCドライバのインストール
  • Sparkアプリケーションの登録処理
entrypoint.sh

spark-sql-scala-dockerのentrypoint.shは以下になります。

#! /bin/bash

# WAIT_CONTAINER_TIMER
# WAIT_CONTAINER_FILE
# WAIT_CONTAINER_KEY

# set -x

set -e

echo MySQL host: ${MYSQL_SERVER_HOST:=$MYSQL_PORT_3306_TCP_ADDR}
echo MySQL port: ${MYSQL_SERVER_PORT:=$MYSQL_PORT_3306_TCP_PORT}
echo PostgreSQL host: ${POSTGRESQL_SERVER_HOST:=$POSTGRESQL_PORT_5432_TCP_ADDR}
echo PostgreSQL port: ${POSTGRESQL_SERVER_PORT:=$POSTGRESQL_PORT_5432_TCP_PORT}
echo Redis host: ${REDIS_SERVER_HOST:=$REDIS_PORT_6379_TCP_ADDR}
echo Redis port: ${REDIS_SERVER_PORT:=$REDIS_PORT_6379_TCP_PORT}
export MYSQL_SERVER_HOST
export MYSQL_SERVER_PORT
export POSTGRESQL_SERVER_HOST
export POSTGRESQL_SERVER_PORT
export REDIS_SERVER_HOST
export REDIS_SERVER_PORT

function wait_container {
    if [ -n "$REDIS_SERVER_HOST" ]; then
 wait_container_redis
    elif [ -n "$WAIT_CONTAINER_FILE" ]; then
 wait_container_file
    fi
}

function wait_container_redis {
    result=1
    for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
    do
 sleep 1s
 result=0
 if [ $(redis-cli -h $REDIS_SERVER_HOST -p $REDIS_SERVER_PORT GET $WAIT_CONTAINER_KEY)'' = "up" ]; then
     break
 fi
 echo spark-sql-scala-docker wait: $REDIS_SERVER_HOST
 result=1
    done
    if [ $result = 1 ]; then
 exit 1
    fi
}

function wait_container_file {
    result=1
    for i in $(seq 1 ${WAIT_CONTAINER_TIMER:-100})
    do
 sleep 1s
 result=0
 if [ -e $WAIT_CONTAINER_FILE ]; then
     break
 fi
 echo spark-sql-scala-docker wait: $WAIT_CONTAINER_FILE
 result=1
    done
    if [ $result = 1 ]; then
 exit 1
    fi
}

COMMAND_JAR=$COMMAND_JAR_DIR/$COMMAND_JAR_NAME

wait_container

sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml

spark-submit --properties-file /opt/spark-defaults.conf $COMMAND_JAR

基本的にはspark-submitでSparkアプリケーションのジョブをサブミットしているだけですが、以下の2つの調整を行っています。

  • Redisを使って他のコンテナの待ち合わせ
  • 中間データのローディング先をHDFSではなくローカルファイルに変更する
コンテナの待ち合わせ

Sparkアプリケーションを動作させる前の準備を他のコンテナで進める場合は、コンテナの待ち合わせが必要になります。この待ち合わせをmysql-java-embulk-dockerと同様にRedisを用いて実現しています。

典型的な使用例は、Sparkアプリケーションのテスト実行時でのテストDBの準備です。この実例は後ほどサンプルで説明します。

中間データのローディング先

core-site.xmlの変更処理です。

sed -i "s!hdfs://.*:9000!file:\/\/\/tmp!g" /usr/local/hadoop/etc/hadoop/core-site.xml

ここの設定を変更しないとDocker環境内でスタンドアロンでは動かなかったので設定変更しています。

設定変更の方法としてはHDFSを動くようにするという方式もあるのですが、スタンドアプリケーションなのでここではローカルのファイルを使う方式で対応しています。

Docker Hub

mysql-java-embulk-dockerと同様にspark-sql-scala-dockerもDocker Hubの自動ビルドの設定を行っているので、以下の場所にDockerイメージが自動ビルドされます。

このイメージは「asami/spark-sql-scala-docker」という名前で利用することができます。

サンプル

Dockerイメージ「asami/spark-sql-scala-docker」を利用してテストデータの投入を行うサンプルを作ってみます。

手元の環境上でテスト目的で動作させるためmysql-java-embulk-dockerを併用してテストデータの投入を行っています。

サンプルのコードはGitHubのspark-sql-scala-dockerのsampleディレクトリにあるので、詳細はこちらを参照して下さい。

SimpleApp.scala

サンプルのSparkバッチであるSimpleAppのプログラムは以下になります。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, DataFrame}

object SimpleApp extends App {
  val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
  val count = batting.count()
  println(s"count = ${batting.count()}")
}

object SparkSqlUtils {
  def createSqlContext(name: String): SQLContext = {
    val conf = new SparkConf().setAppName(name)
    val sc = new SparkContext(conf)
    new SQLContext(sc)
  }

  def createMysqlDataFrame(name: String, table: String): DataFrame = {
    val sqlc = createSqlContext(name)
    createMysqlDataFrame(sqlc, table)
  }

  def createMysqlDataFrame(sqlc: SQLContext, table: String): DataFrame = {
    val host = System.getenv("MYSQL_SERVER_HOST")
    val port = System.getenv("MYSQL_SERVER_PORT")
    val user = System.getenv("MYSQL_SERVER_USER")
    val password = System.getenv("MYSQL_SERVER_PASSWORD")
    sqlc.load("jdbc", Map(
      "url" -> s"jdbc:mysql://$host:$port/baseball?user=$user&password=$password",
      "dbtable" -> table
    ))
  }
}

SparkSqlUtilsにDataFrame取得処理をまとめています。ここは汎用ライブラリ化できるところです。

この処理を除いた以下の処理がSparkバッチの本体です。

val batting = SparkSqlUtils.createMysqlDataFrame("Simple Application", "batting")
  val count = batting.count()
  println(s"count = ${batting.count()}")
アプリケーションロジック

指定したテーブル"batting"に対応したDataFrameを取得し、countメソッドでレコード総数を取得し、その結果をコンソールに出力しています。とても簡単ですね。

この部分を以下の機能を用いて記述することで高度なバッチ処理を簡単に記述できます。

  • DataFrameによる表データ操作
  • DataFrameから変換したRDDを用いてSpark計算処理

前述したように「大枠の絞り込みはSQLで行っておいて、アプリケーションに特化した検索ロジックをScalaで記述したUDF(User Defined Function)で補完するといった処理を、プログラミング言語的に簡潔に記述することができます。」

移入・移出

テーブル"batting"をDataFrameとしてローディングしているのは、前述のSpark 1.3の機能追加「DataSourceとしてJDBCが使えるようになった」によるものです。

また、ここでは外部出力をコンソール出力にしていますが、RDDのsaveAsTextFileメソッドやDataFrameを用いることで、S3やデータベースなどに集計結果を簡単に出力することができます。

データベースなどへの外部出力が簡単に行えるのもSpark 1.3の機能追加「DataFrame」の効果です。

ここからも分かるように、Spark SQL 1.3で導入された「DataSourceとしてJDBCが使えるようになった」と「DataFrame」により、Sparkバッチ処理の難題であったデータの移入/移出処理が極めて簡単に記述できるようになったわけです。

SBTの設定

SBTによるScalaプログラムのビルドの設定は以下になります。

Spark本体とSpark SQLを依存ライブラリとして設定している、ごくオーソドックスな設定です。

Sparkバッチ用にすべての依存ライブラリをまとめたJARファイルを作る必要があるので、sbt-assemblyの設定を行ってます。

ポイントとしては、Sparkバッチの実行環境にScalaの基本ライブラリとSpark本体/Spark SQLのライブラリが用意されているので、sbt-assemblyでまとめるJARファイルから排除する設定を行っています。

  • Spark本体とSpark SQLの依存ライブラリの設定を"provided"にしてリンク対象から外す
  • sbt-assemblyの設定で"includeScala = false"としてScala基本ライブラリをリンク対象から外す

これらの設定はなくても動作しますが、JARファイルが巨大になってしまいます。

name := "simple"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.1" % "provided"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

sbt-assemblyプラグインが必要なのでproject/assembly.sbtに以下の設定をしておきます。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
docker-compose.yml

サンプルプログラムのdocker-compose.ymlは以下になります。

spark:
  image: asami/spark-sql-scala-docker
  links:
    - mysql
    - redis
  volumes:
    - target/scala-2.10:/opt/command.d
  environment:
    COMMAND_JAR_NAME: simple-assembly-1.0.jar
    WAIT_CONTAINER_KEY: mysql-java-embulk-docker
    MYSQL_SERVER_USER: baseball
    MYSQL_SERVER_PASSWORD: baseball
mysql:
  image: asami/mysql-java-embulk-docker
  links:
    - redis
  ports:
    - ":3306"
  volumes:
    - setup.d:/opt/setup.d
  environment:
    MYSQL_USER: baseball
    MYSQL_PASSWORD: baseball
    MYSQL_ROOT_PASSWORD: baseball
    MYSQL_DATABASE: baseball
redis:
  image: redis
  ports:
    - ":6379"

自前ではDockerイメージを作らず、以下の3つの汎用Dockerイメージを再利用しています。

  • asami/spark-sql-scala-docker
  • asami/mysql-java-embulk-docker
  • redis
asami/spark-sql-scala-docker

ボリュームと環境変数の記述で、targetscala-2.10simple-assembly-1.0.jarがSparkバッチプログラムとして認識されるようにしています。

simple-assembly-1.0.jarはsbt-assemblyで作成した「全部入り(SparkとScala以外)」のJARファイルです。

それ以外は、mysql-java-embulk-dockerと同期をとるためのおまじないです。

asami/mysql-java-embulk-docker

前回の記事「Docker Composeでデータ投入」と同じ設定です。テスト用のMySQLデータベースにテストデータを投入しています。

Batting.csvは以下のサイトからデータを取得しました。

redis

asami/mysql-java-embulk-dockerによるテストデータ投入の待ち合わせにredisを用いています。

ビルド

Sparkバッチのビルドはsbtで行います。

$ sbt assembly

テスト環境はdocker-composeのbuildコマンドでビルドします。

$ docker-compose build
実行

docker-composeのupコマンドで実行します。

$ docker-compose up

動作過程がコンソールに出力されますが、最後の方で以下のような出力があります。

spark_1 | count = 99846

無事Sparkバッチでデータ集計ができました。

まとめ

Spark SQLを汎用のバッチ処理基盤として運用する目的でSparkバッチをスタンドアロンで実行するためのDockerイメージspark-sql-scala-dockerを作ってみましたが、無事動作しました。

このことによってspark-sql-scala-dockerとmysql-java-embulk-dockerを使って手元で簡単にSparkバッチをテストできるようになりました。

汎用Dockerイメージをdocker-composeで組み合わせるだけなので運用的にも大変、楽だと思います。

今回は試していませんが、spark-sql-scala-dockerを使ってSparkバッチをECS(EC2 Container Service)などのDocker環境上でスタンドアロンバッチとして実行するという運用も可能ではないかと考えています。

もちろん、SparkバッチのJARファイルをspark-submitコマンドによるジョブ投入により直接Sparkクラスタ上で実行することでSpark本来の大規模(データ量/計算量)処理を行うことができます。

いずれの場合も、基本的に開発するのは、Scalaによる通常のSparkバッチプログラムだけです。テストやDocker環境上でのスタンドアロンバッチのいずれも汎用Dockerイメージを活用することで、簡単な設定のみで運用することができそうです。

今回の作業で上記の3つのユースケースを同時に満たせることの目処が立ちました。この成果をベースにSpark SQLを汎用のバッチ処理基盤として利用するためのノウハウの積み上げをしてきたいと思います。

諸元

  • Mac OS 10.7.5
  • docker 1.6
  • docker-compose 1.2.0
  • Spark SQL 1.3

2015年5月7日木曜日

[docker] Docker Composeでデータ投入

Docker ComposeでMySQLを使う」ではDocker Composeを使ってJobSchedulerからMySQLをそれぞれ別のDockerコンテナ上で動作させ連携して使用しました。

テストなどでMySQL公式Dockerコンテナを使う際の問題点として、データ投入があります。事前に用意したデータをMySQLに投入後に、テスト対象のアプリケーションが起動されると理想的なのですが、MySQL公式Dockerイメージではデータ投入する機能は提供されていません。

また、MySQL公式DockerイメージではMySQLの起動完了を待ち合わせる機能を持っていないことも問題です。

これらの問題に対応するためMySQL公式Dockerイメージを元に、マイDockerイメージであるmysql-java-embulk-dockerを作ってみました。

mysql-java-embulk-docker

mysql-java-embulk-dockerはdocker-composeを使ってアプリケーションDockerイメージのテストを行う際に、事前にデータ投入したMySQLデータベースを提供するためのDockerイメージです。

GitHubにソースコードがありますので、詳細はこちらを参照して下さい。

以下では、実装上のポイントと使い方について説明します。

Dockerfile

mysql-java-embulk-dockerのDockerfileは以下になります。

FROM mysql:5.6
MAINTAINER asami

RUN apt-get update && apt-get -y install wget curl

# Install JDK 1.7
RUN cd /opt; wget --no-cookies --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u51-b13/jdk-7u51-linux-x64.tar.gz" -O /opt/jdk-7-linux-x64.tar.gz

# Install in /usr/java/jdk1.7.0_51 
RUN mkdir /usr/java && (cd /usr/java; tar xzf /opt/jdk-7-linux-x64.tar.gz)
RUN rm /opt/jdk-7-linux-x64.tar.gz
RUN update-alternatives --install /usr/bin/java java /usr/java/jdk1.7.0_51/jre/bin/java 20000; update-alternatives --install /usr/bin/jar jar /usr/java/jdk1.7.0_51/bin/jar 20000; update-alternatives --install /usr/bin/javac javac /usr/java/jdk1.7.0_51/bin/javac 20000; update-alternatives --install /usr/bin/javaws javaws /usr/java/jdk1.7.0_51/jre/bin/javaws 20000; update-alternatives --set java /usr/java/jdk1.7.0_51/jre/bin/java; update-alternatives --set javaws /usr/java/jdk1.7.0_51/jre/bin/javaws; update-alternatives --set javac /usr/java/jdk1.7.0_51/bin/javac; update-alternatives --set jar /usr/java/jdk1.7.0_51/bin/jar;

RUN curl --create-dirs -o /opt/embulk -L "http://dl.embulk.org/embulk-latest.jar" && chmod +x /opt/embulk

RUN /opt/embulk gem install embulk-output-mysql

RUN apt-get -y install redis-server

COPY charset.cnf /etc/mysql/conf.d/charset.cnf
COPY entrypoint.sh /opt/entrypoint.sh
RUN chmod +x /opt/entrypoint.sh

VOLUME ["/var/lib/mysql", "/etc/mysql/conf.d", "/opt/setup.d"]

ENTRYPOINT ["/opt/entrypoint.sh"]

CMD ["mysqld"]
Embulk

Embulkはビッグデータスケールのデータローダです。

データ投入にEmbulkを利用できると大変便利なので組み込んでみました。

EmbulkはJava VM上で動作するので、Embulk用にJDKをインストールしています。

また、MySQLにデータ投入するので、Embulkにembulk-output-mysqlプラグインを追加しています。

Redis

Docker Compose上でmysql-java-embulk-dockerコンテナの起動時にデータ投入をする際に問題点として、mysql-java-embulk-dockerコンテナの起動とアプリケーションコンテナの起動の同期が行われないというものがあります。

mysql-java-embulk-dockerコンテナの起動とアプリケーションコンテナの起動が同時に行われてしまうために、mysql-java-embulk-dockerコンテナの起動時に行われるデータ投入が完了する前に、アプリケーションコンテナが動き出してしまい、想定したデータがない状態なので誤動作する、という問題です。

現段階ではDocker Composeにはこの問題を解決するための機能は提供されていないようなので、Redisを使って対応することにしました。

この目的でRedisをインストールしています。

本来はRedisのクライアントのみがインストールできればよいのですが、簡単にできるよい方法がみつからなかったのでRedisをまるごとインストールしています。

charset.cnf

MySQLで日本語を使うための設定として、サーバーのコード系をUTF-8に設定します。

この目的で以下のchaset.cnfを用意して、Dockerコンテナの/etc/mysql/conf.d/charset.cnfにCOPYします。

[mysqld]
character-set-server = utf8
entrypoint.sh

mysql-java-embulk-dockerのentrypoint.shは以下になります。

#!/bin/bash

# WAIT_DB_TIMER
# WAIT_CONTAINER_KEY

# set -x

set -e

echo Wait contaner key: ${WAIT_CONTAINER_KEY:=mysql-java-embulk-docker}
echo Redis host: ${REDIS_SERVER_HOST:=$REDIS_PORT_6379_TCP_ADDR}
echo Redis port: ${REDIS_SERVER_PORT:=$REDIS_PORT_6379_TCP_PORT}

function check_db {
    if [ "$MYSQL_ROOT_PASSWORD" ]; then
 mysql -u root -p"$MYSQL_ROOT_PASSWORD" -e "status"
    elif [ "$MYSQL_ALLOW_EMPTY_PASSWORD" ]; then
 mysql -e "status"
    else
 exit 1
    fi
}

function wait_db {
    result=1
    for i in $(seq 1 ${WAIT_DB_TIMER:-10})
    do
 sleep 1s
 result=0
 check_db && break
 result=1
    done
    if [ $result = 1 ]; then
 exit 1
    fi
}

if [ "${1:0:1}" = '-' ]; then
    set -- mysqld "$@"
fi

is_install=false

if [ "$1" = 'mysqld' ]; then
    # read DATADIR from the MySQL config
    DATADIR="$("$@" --verbose --help 2>/dev/null | awk '$1 == "datadir" { print $2; exit }')"

    if [ ! -d "$DATADIR/mysql" ]; then
        if [ -z "$MYSQL_ROOT_PASSWORD" -a -z "$MYSQL_ALLOW_EMPTY_PASSWORD" ]; then
            echo >&2 'error: database is uninitialized and MYSQL_ROOT_PASSWORD not set'
            echo >&2 '  Did you forget to add -e MYSQL_ROOT_PASSWORD=... ?'
            exit 1
        fi

 is_install=true

        echo 'Running mysql_install_db ...'
        mysql_install_db --datadir="$DATADIR"
        echo 'Finished mysql_install_db'

        # These statements _must_ be on individual lines, and _must_ end with
        # semicolons (no line breaks or comments are permitted).
        # TODO proper SQL escaping on ALL the things D:

        tempSqlFile='/tmp/mysql-first-time.sql'
        cat > "$tempSqlFile" <<-EOSQL
            DELETE FROM mysql.user ;
            CREATE USER 'root'@'%' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ;
            GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION ;
            DROP DATABASE IF EXISTS test ;
EOSQL

        if [ "$MYSQL_DATABASE" ]; then
            echo "CREATE DATABASE IF NOT EXISTS \`$MYSQL_DATABASE\` ;" >> "$tempSqlFile"
        fi

        if [ "$MYSQL_USER" -a "$MYSQL_PASSWORD" ]; then
            echo "CREATE USER '$MYSQL_USER'@'%' IDENTIFIED BY '$MYSQL_PASSWORD' ;" >> "$tempSqlFile"

            if [ "$MYSQL_DATABASE" ]; then
                echo "GRANT ALL ON \`$MYSQL_DATABASE\`.* TO '$MYSQL_USER'@'%' ;" >> "$tempSqlFile"
            fi
        fi

        echo 'FLUSH PRIVILEGES ;' >> "$tempSqlFile"

 # http://qiita.com/toritori0318/items/242274d4f5794e2f68e5
        # setup
        echo "use $MYSQL_DATABASE;" >> "$tempSqlFile"
 if [ -e "/opt/setup.d/setup.sql"]; then
            cat /opt/setup.d/setup.sql >> "$tempSqlFile"
 fi
        # start mysql
        set -- "$@" --init-file="$tempSqlFile"
    fi

    chown -R mysql:mysql "$DATADIR"
fi

exec "$@" &

wait_db

if [ -e "/opt/setup.d/setup.yml" ]; then
    if [ $is_install=true ]; then
 echo "embulk run setup.yml"
 cd /opt/setup.d && /opt/embulk run setup.yml
    fi
fi

if [ -n "$REDIS_SERVER_HOST" ]; then
    redis-cli -h $REDIS_SERVER_HOST -p $REDIS_SERVER_PORT SET $WAIT_CONTAINER_KEY up
fi

sleep infinity

MySQL公式をベースに、データ投入用SQLおよびEmbulkでデータ投入するように拡張したものです。

setup.sql

/opt/setup.d/setup.sqlとしてデータ投入用SQLが存在する場合は、MySQLの初期起動スクリプトにこの内容を追加することで、起動時にデータ投入されるようになっています。

/opt/setup.dはDockerfileでVolumeなっていて、外部からディレクトリをマウントして使用することを想定しています。

データ投入用SQLは「Docker公式のmysqlイメージを使いつつ初期データも投入する」の記事を参考にしました。

setup.yml

/opt/setup.d/setup.ymlとしてデータ投入用Embulk記述ファイルが存在する場合は、Embulkを使ってデータ投入するようになっています。

ただし、MySQLの起動が完了した後でないとデータ投入ができないのでmysqlコマンドを使って待ち合わせ処理を行っています。

Redisによる同期

mysql-java-embulk-dockerコンテナの起動終了の待ち合わせのためRedisを使用します。

具体的には以下のように、外部コンテナでRedisが起動されている場合に、redis-cliコマンドを使って環境変数WAIT_CONTAINER_KEYで指定されたスロットに「up」という文字列を設定しています。

if [ -n "$REDIS_SERVER_HOST" ]; then
    redis-cli -h $REDIS_SERVER_HOST -p $REDIS_SERVER_PORT SET $WAIT_CONTAINER_KEY up
fi

アプリケーション側は、Redisのこのスロットがupになるまでポーリングで待ち合わせることで同期を取ることになります。

終了抑止

最後にDockerでサービスを記述する時のお約束としてsleepコマンドで終了抑止を行っています。

Docker Hub

Docker HubはGitHubやBitBucketと連動した自動ビルド機能を提供しています。

mysql-java-embulk-dockerもこの設定を行っているので、以下の場所にDockerイメージが自動ビルドされます。

このイメージは「asami/mysql-java-embulk-docker」という名前で利用することができます。

サンプル

Dockerイメージ「asami/mysql-java-embulk-docker」を利用してテストデータの投入を行うサンプルを作ってみます。

サンプルのコードはGitHubのmysql-java-embulk-dockerのsampleディレクトリにあるので、詳細はこちらを参照して下さい。

docker-compose.yml

サンプルプログラムのdocker-compose.ymlは以下になります。

app:
  build: .
  links:
    - mysql
    - redis
  environment:
    WAIT_CONTAINER_KEY: mysql-java-embulk-docker
    MYSQL_SERVER_USER: baseball
    MYSQL_SERVER_PASSWORD: baseball
mysql:
  image: asami/mysql-java-embulk-docker
  links:
    - redis
  ports:
    - ":3306"
  volumes:
    - setup.d:/opt/setup.d
  environment:
    MYSQL_USER: baseball
    MYSQL_PASSWORD: baseball
    MYSQL_ROOT_PASSWORD: baseball
    MYSQL_DATABASE: baseball
redis:
  image: redis
  ports:
    - ":6379"

setup.dをコンテナの/opt/setup.dにマウントしています。

setup.dには後述のsetup.ymlとデータファイルBatting.csvが格納されています。

Batting.csvは以下のサイトからデータを取得しました。

setup.yml

setup.ymlはembulkで移入するデータの情報を記述したものです。

in:
  type: file
  path_prefix: Batting.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - {name: playerID, type: string}
    - {name: yearID, type: long}
    - {name: stint, type: long}
    - {name: teamID, type: string}
    - {name: lgID, type: string}
    - {name: G, type: long}
    - {name: AB, type: long}
    - {name: R, type: long}
    - {name: H, type: long}
    - {name: 2B, type: long}
    - {name: 3B, type: long}
    - {name: HR, type: long}
    - {name: RBI, type: long}
    - {name: SB, type: long}
    - {name: CS, type: long}
    - {name: BB, type: long}
    - {name: SO, type: long}
    - {name: IBB, type: long}
    - {name: HBP, type: long}
    - {name: SH, type: long}
    - {name: SF, type: long}
    - {name: GIDP, type: long}
exec: {}
out:
  type: mysql
  host: localhost
  database: baseball
  user: baseball
  password: baseball
  table: batting
  mode: insert

CSVファイルから入力したデータをMySQLに投入する際の標準的な指定と思います。

Dockerfile

サンプルアプリケーションのDockerfileは以下になります。

FROM mysql
MAINTAINER asami

ENV MYSQL_ALLOW_EMPTY_PASSWORD true

RUN apt-get update && apt-get -y install redis-server

COPY app.sh /opt/app.sh
RUN chmod +x /opt/app.sh
ADD https://raw.githubusercontent.com/asami/mysql-java-embulk-docker/master/lib/mysql-java-embulk-docker-lib.sh /opt/mysql-java-embulk-docker-lib.sh

ENTRYPOINT /opt/app.sh

アプリケーションでmysqlコマンドを使うので、MySQL公式Dockerイメージをベースにしました。

mysql-java-embulk-dockerコンテナとの同期にRedisを使うのでRedisをインストールしています。

また、アプリケーション起動シェルの共通ライブラリmysql-java-embulk-docker-lib.shをGitHubからコンテナ内にコピーしています。

アプリケーション起動の動きは大きく以下の3つの部分に分かれます。

  • パラメタの取り込み
  • データ投入の待ち合わせ
  • アプリケーションロジック

この中の「パラメタの取り込み」と「データ投入の待ち合わせ」をmysql-java-embulk-docker-lib.shが行います。

app.sh

サンプルアプリケーションの実行スクリプトapp.shは以下になります。

#! /bin/bash

# set -x

set -e

DIR="${BASH_SOURCE%/*}"
if [[ ! -d "$DIR" ]]; then DIR="$PWD"; fi
source $DIR/mysql-java-embulk-docker-lib.sh

mysql -u $MYSQL_SERVER_USER -p$MYSQL_SERVER_PASSWORD --host=$MYSQL_SERVER_HOST --port=$MYSQL_SERVER_PORT -e "select count(*) from baseball.batting"

まず、共通ライブラリmysql-java-embulk-docker-lib.shをsourceで取り込んでいます。

この中でRedisを使った同期が行われ、Embulkによるデータ投入が完了した状態でアプリケーションロジックに入ってきます。

今回のアプリケーションロジックは非常に単純で以下の処理を行います。

  • baseball.battingテーブルの総レコード数を取得する

この問合せ処理をmysqlコマンドを使って行っています。

実行

docker-composeのbuildコマンドでビルドします。

$ docker-compose build

docker-composeのupコマンドでビルドします。

$ docker-compose up

動作過程がコンソールに出力されますが、最後の方で以下のような出力があります。

app_1   | count(*)
app_1   | 99846

無事、Batting.csvをMySQLのbaseball.battingテーブルに投入した後、baseball.battingテーブルの総レコード数を取得することができました。 

まとめ

SQLとEmbulkを使ってデータ投入できるMySQL用のDockerコンテナを作ってみました。

アプリケーション開発では、テスト用データベースの準備とデータ投入が大きな手間であり、テスト自動化の障壁にもなっていたので、今回開発したDockerイメージをアプリケーション開発に適用していきたいと思います。

それにしても、Dockerのイメージ開発はシェルスクリプトプログラミングということを実感しました。相当錆び付いていましたが、なんとか動くものができました。

諸元

  • Mac OS 10.7.5
  • docker 1.6
  • docker-compose 1.2.0

2015年5月4日月曜日

[docker] Docker ComposeでMySQLを使う

製品の評価や、開発時のテストなどDBをカジュアルに作成して消したいシチュエーションは多々あります。

開発マシンにDBを入れてデータ投入する運用だと、開発マシン環境が汚れてきたり、復数の設定が共存して収集がつかなくなったり、DBやライブラリのバージョンに依存する場合の切り替えに困ったりということになりがちです。

そこで以前だとVMを使ったり、AWSのようなクラウド上にDBを立てたりしていたわけですが、設定はそのものが難しくはないものの、毎回環境を構築する作業が必要になったりと、まだまだ手間のかかる作業でした。

言うまでもなく、このような問題がDockerで一気に解決したのは大変画期的なことです。

今回はDockerを使ってDBをカジュアルに構築する方法について考えてみます。

SoS JobScheduler

今回はジョブ管理製品のSoS JobSchedulerを使ってみました。

JobSchedulerはバックエンドにDBを使っているので、このDBをDocker上でどのようにして構築して接続するのかという点がポイントです。

またJobSchedulerはapt-getやrpmといったインストーラに対応しておらずインストールが手作業になります。このような製品の配布方式はけっこうありますが、この部分をDockerfile側、起動スクリプト側双方のシェルスクリプトで対応することになります。

MySQL公式Dockerイメージ

MySQLをインストールしたマイDockerイメージを作る方法も有力ですが、MySQLの公式Dockerイメージが機能豊富で結構便利だったので使ってみました。

また、JobSchedulerとMySQLを同じDockerイメージにインストールする方法もありますが、JobScheduler用Dockerイメージとしての汎用性が損なわれるので、避けたほうがよいでしょう。

このため、今回使用するDockerイメージはMySQL公式イメージとJobSchedulerをインストールしたマイイメージの2つになります。このように復数のイメージを接続する場合、dockerコマンドののパラメタを設定する方法もありますが、かなり煩雑です。

この問題に対応するため今回はDocker Composeを使ってみました。

Docker Compose

Docker Compose(旧fig)は復数のDockerイメージを連携動作させるための機能です。

定義ファイルdocker-compose.ymlの設定に従って、復数のDockerコンテナを同時に立ち上げ、Dockerのlinking systemを用いて各Dockerコンテナをリンクで接続する処理を行います。

設定

それでは、Docker Composeを用いた設定を行います。設定結果の全体はGitHubにありますので必要に応じて参照して下さい。

docker-compose.yml

docker-compose.ymlの設定は以下になります。

jobscheduler:
  build: .
  links:
    - db
  ports:
    - "4444:4444"
db:
  image: mysql
  ports:
    - ":3306"
  volumes:
    - conf.d/etc.mysql.conf.d:/etc/mysql/conf.d
  environment:
    MYSQL_USER: jobscheduler
    MYSQL_PASSWORD: jobscheduler
    MYSQL_ROOT_PASSWORD: jobscheduler
    MYSQL_DATABASE: jobscheduler

jobschedulerとdbの2つのDockerコンテナの定義をしています。

jobschedulerコンテナは自前のDockerfileを使ったコンテナです。linksでdbコンテナをリンクする設定を行っています。

dbコンテナはMySQL公式イメージをそのまま使っています。ポイントとなるのはenvironmentで指定している4つの環境変数です。これらの環境変数を設定することで、自動的に必要な初期設定をしてくれるようになっています。

多くのケースで、MySQL公式イメージが提供している環境変数で目的が足りると思われるので、MySQL公式イメージはかなり使い出がありそうです。

Dockerfile

JobSchedulerを実行するDockerfileの設定は以下になります。

FROM dockerfile/java:oracle-java8
MAINTAINER asami

ENV JOBSCHEDULER_VERSION 1.9.0

RUN mkdir -p /opt/jobscheduler && cd /opt/jobscheduler; curl -L http://freefr.dl.sourceforge.net/project/jobscheduler/jobscheduler_linux-x64.$JOBSCHEDULER_VERSION.tar.gz -o - | tar -xz --strip-components=1

# SSH, API/HTTP, API/HTTPS, JOC
EXPOSE 22 44440 8443 4444

USER root

COPY scheduler_install.xml /opt/jobscheduler/scheduler_install.xml

# Set the default command to run when starting the container
COPY startup-jobscheduler.sh /opt/startup-jobscheduler.sh
CMD ["/opt/startup-jobscheduler.sh"]

JobSchedulerがJava 8依存なので、基盤イメージとしてjava8版のJava公式イメージを指定しています。

wgetはインストールが必要なので用いずcurlを使っています。curlとtarを連動させている以下の行は:

RUN mkdir -p /opt/jobscheduler && cd /opt/jobscheduler; curl -L http://freefr.dl.sourceforge.net/project/jobscheduler/jobscheduler_linux-x64.$JOBSCHEDULER_VERSION.tar.gz -o - | tar -xz --strip-components=1
  • 中間ファイルを残さない。
  • 配布アーカイブにあるディレクトリ"jobscheduler.1.9.0"を"jobscheduler"に付け替え。こうすることで、後続の処理でバージョン番号を意識する処理を減らすことができます。

という意図です。このようなケースのイディオム的なスクリプトです。

startup-jobscheduler.sh

dockerの定義で比較的難しいのは起動スクリプトのところです。

基本的には、ターゲットのプログラムを起動するだけなのですが、環境との整合性を取るための処理を色々と書く必要があります。

今回の起動スクリプトはstartup-jobscheduler.shで、DockerfileのCMDで指定しています。

#! /bin/bash

sleep 10s

sed -i -e "s/{{DB_PORT_3306_TCP_ADDR}}/$DB_PORT_3306_TCP_ADDR/g" /opt/jobscheduler/scheduler_install.xml
sed -i -e "s/{{DB_PORT_3306_TCP_PORT}}/$DB_PORT_3306_TCP_PORT/g" /opt/jobscheduler/scheduler_install.xml

(cd /opt/jobscheduler;/usr/bin/java -jar jobscheduler_linux-x64.$JOBSCHEDULER_VERSION.jar scheduler_install.xml)

sleep infinity

DBの起動を待ち合わせるためsleepコマンドで10秒ウエイト入れています。

scheduler_install.xmlの設定を、Docker実行時の環境に適合するようにsedコマンドで書き換えています。環境変数DB_PORT_3306_TCP_ADDRとDB_PORT_3306_TCP_PORTはDockerのlinking systemが設定してくる環境情報です。これらの情報の取り込みを起動スクリプトで対応する必要があります。

JobSchedulerは起動後、自動的にバックグラウンドになってしまうため、そのままstartup-jobscheduler.shが終わるとそのままDockerも終わってしまいます。そこでsleepコマンドで永久にウエイトするようにしています。

実行

Docker Composeを使ってJobSchedulerを立ち上げてみましょう。

設定の取得

GitHubから設定一式を取得します。

$ git clone https://github.com/asami/SoS-JobScheduler-docker.git
ビルド

docker-composeコマンドのbuildを実行します。

$ docker-compose build
実行

docker-composeコマンドのupを実行します。

$ docker-compose up

起動は以上で終了です。

以下のアドレスにアクセスするとJobSchedulerの管理画面が表示されます。(Macの場合)

まとめ

Docker Composeを使って、自前のJobSchedulerコンテナとMySQL公式イメージを連動させ、JobSchedulerの実行環境を作ってみました。

MySQL公式イメージの機能が結構豊富なので、DB側は設定だけで使用することができました。

Dockerも便利ですが、Docker Composeもかなり便利で、アイデア次第で色々と応用がありそうです。

諸元

Mac OS 10.7.5 docker 1.6 docker-compose 1.2.0

2015年4月27日月曜日

Finagle+Karaf+Dockerでmicroservices

流行りなのでmicroservicesという用語を使ってみましたが、意図としては(microservicesも含む)マルチサーバー構成によるクラウド・システムを構成するサービス群がターゲットです。

この実行基盤としてFinagle、Karaf、Dockerの組合せが有力ではないかということで、試しにサンプルプロジェクトを作ってみました。このサンプルプロジェクトは、新しいクラウド・システム内サービスを作る時の雛形を想定しています。

このサンプルプロジェクトを実際に作ってみることで、Karafがどのぐらいの手間で使えるのか、ScalaやFinagleとの相性はどうなのか、といったことを試してみるのが目的です。この手のプロジェクト構築ではライブラリ間の依存関係の解決が難しいのですが、Finagleをリンクしても大丈夫な設定を発見できたので、この問題はクリアできていると思います。

以下ではクラウド・システム内サービスをサービスの粒度によらずmicroserviceと呼ぶことにします。

Apache Karaf

Apache Karafは軽量OSGiコンテナです。

Karafを導入する目的は大きく以下の3つです。

  • microserviceの環境設定共有
  • dependency hell対策
  • CamelによるEIP(Enterprise Integration Patterns)
環境設定共有

第一の目的はクラウド・システムを構成するmicroservice群の環境設定を共通化することです。

microserviceをmainメソッドを作るなどして自前でデーモン化すると各種設定もすべて自前で行わなければならなくなります。

たとえば、SLF4J+fluentedなどによるロギング、JMX(Java Management Extensions)による運用監視、JNDI(Java Naming and Directory Interface)によるディレクトリ管理といった各種設定をサービス毎に設定する必要があります。

これは(1)設定そのものが大変な手間、(2)クラウド・システム内の共通設定情報の共有の手間、(3)設定漏れの危険、(4)ハードコーディングによるカスタマイザビリティの欠如、といった問題があります。

microserviceをコンテナ内で動作させることで、各種設定はコンテナに対する設定として共通化する上記の問題を解消することができます。

この目的で導入するコンテナとして、軽量OSGiコンテナであるKarafがよいのではないか考えています。

dependency hell対策

Java VM上での開発における未解決問題の一つにdependency hellがあります。

この問題はJava 9でModule機能として解決される見込みですが、当面の対策としてはOSGiコンテナを使用するのが現実解となっています。

最近はちょっとしたライブラリをリンクしても、その裏でApache Commons、Spring、JBossといった巨大なライブラリがついてくることが多いので、思ったより切実な問題です。

CamelによるEIP

Karafを採用したボーナスのようなものですが、KarafからはCamelを簡単に使えるようになっています。(KarafはESB(Enterprise Service Bus)のServiceMixのOSGiコンテナ部を切り出して製品化したものです。)

Camelを使うと、EIPのパターンを使用して様々な通信プロトコルを使って外部サービスと連携する処理を簡単に作成することができます。またCamelにはScala DSLも用意されています。

Twitter Finagle

Twitter Finagleはmicroservices指向のRPCシステムです。

FinagleはFutureモナドによる非同期実行、耐故障性の抽象化を行っており、Monadic Programmingとの相性がよいのもFunctional Reactive Programming指向のmicroservicesの通信基盤として魅力です。

準備

Karafを使うためのベースとしてKarafの実行環境をDockerイメージ化しました。

Karaf Docker Image

Dockerイメージを作成するプロジェクトは以下になります。

Dockerfileだけの簡単なプロジェクトです。

実際にmicroserviceをインストールして使うことを想定しているので、余分な設定は行わずプレインな簡単なものにしています。

Docker Hub

このDockerイメージをDocker Hubに登録しました。

以下のようにして利用できます。

$ docker pull asami/karaf-docker

サンプル・プロジェクト

サンプル・プロジェクトは以下のGitHubプロジェクトとして作成しました。

サンプル・サーバー

Finagleによるサンプル・サーバーは以下のものです。

package sample

import com.twitter.finagle.{Http, Service, ListeningServer}
import com.twitter.util.{Await, Future}
import java.net.InetSocketAddress
import org.jboss.netty.handler.codec.http._

object Server {
  val service = new Service[HttpRequest, HttpResponse] {
    def apply(req: HttpRequest): Future[HttpResponse] =
      Future.value(new DefaultHttpResponse(
        req.getProtocolVersion, HttpResponseStatus.OK))
  }

  def start(): ListeningServer = {
    Http.serve(":8080", service)
  }

  def stop(server: ListeningServer) {
    server.close()
  }

  def main(args: Array[String]) {
    val server = start()
    Await.ready(server)
  }
}
ビルド

ビルドは以下の手順で行います。

  • Karaf KAR
  • Dockerfile
  • Dockerイメージ
Karaf KAR

Karafに配備するKARファイルの作成にはkarafタスクを使用します。

$ sbt karaf

karafタスクはbuild.sbtで定義しています。

Dockerfile

KARファイルを配備したKaraf実行環境のDockerイメージを作成するためのDockerfileの作成にはdockerタスクを使用します。

$ sbt docker

dockerタスクはbuild.sbtで定義しています。

Dockerイメージ

Dockerイメージの作成は以下になります。

$ docker build -t sample-finagle-karaf-docker .

dockerタスクを実行するとDockerfileが作成されるので、このDockerfileでDockerイメージをビルドします。この時、前述したDockerイメージasami/karaf-dockerを内部的に使用します。

実行

作成したDockerイメージsample-finagle-karaf-dockerを実行してみましょう。

$ docker run -t -i --rm -p 1099:1099 -p 8101:8101 -p 44444:44444 -p 8080:8080 sample-finagle-karaf-docker

ポートは1099, 8101, 44444, 8080の4ポートを使用します。

ポート1099, 8101, 44444はKarafが使用するポートです。

ポート用途
1099JMX RMI registry
8101SSHコンソール
44444JMX RMI server

ポート8080はサンプルのFinagleサービスが使用するポートです。

確認

Dockeイメージを実行するとKarafコンテナ内で自動的にサンプルのFinagleサービスが起動されます。

curlコマンドでサンプルのFinagleサービスにアクセスすると以下の結果が得られます。無事動作していることが確認できました。

$ curl http://192.168.59.103:8080 -v
* Rebuilt URL to: http://192.168.59.103:8080/
* Hostname was NOT found in DNS cache
*   Trying 192.168.59.103...
* Connected to 192.168.59.103 (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.37.1
> Host: 192.168.59.103:8080
> Accept: */*
> 
< HTTP/1.1 200 OK
< Content-Length: 0
< 
* Connection #0 to host 192.168.59.103 left intact

感想

現時点だとSBTからKaraf向けのKARファイルの作成のための設定が大変なので、まだまだKarafは気軽には使えないような感じです。

ただAPPREL CLOUD級の規模になってくると設定の大変さよりも運用管理上のメリットの方が大きくなるのでそろそろ導入を考えてもよさそうです。

諸元

  • Scala 2.10.5
  • Finagle 6.25.0
  • Karaf 4.0.0.M2
  • Docker 1.3.2
  • sbt 0.13.7

2015年4月20日月曜日

[scalaz-stream] ストリーミングで状態機械

Functional Reactive Programming(FRP)の眼目の一つはMonadic Programming(MP)によるストリーミング処理です。

MPでFRPを記述できることで安全なプログラムを簡単に書くことができるようになります。

そこで今回は「Scala的状態機械/FP編」で作成したProcessモナド版CSVパーサーをストリーミング処理に適用してみます。

準備

Scala的状態機械/OOP編で作成した状態遷移を記述した代数的データ型ParseStateをストリーミング用に一部手直しします。

package sample

sealed trait ParseState {
  def event(c: Char): ParseState
  def endEvent(): EndState
}

case object InitState extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = EndState(Nil)
}

case class InputState(
  fields: Seq[String],
  candidate: String
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(fields :+ candidate, "")
    case '\n' => EndState(fields :+ candidate)
    case _ => InputState(fields, candidate :+ c)
  }
  def endEvent() = EndState(fields :+ candidate)
}

case class EndState(
  row: Seq[String]
) extends ParseState {
  def event(c: Char) = c match {
    case ',' => InputState(Vector(""), "")
    case '\n' => EndState(Nil)
    case _ => InputState(Nil, c.toString)
  }
  def endEvent() = this
}

case class FailureState(
  row: Seq[String],
  message: String
) extends ParseState {
  def event(c: Char) = this
  def endEvent() = sys.error("failure")
}

具体的にはEndStateが完全終了ではなく、次のイベントが発生したら入力受付け状態に復帰するようにしました。

Scala的状態機械/FP編で作成したParserStateMonadは変更ありません。今回はこの中で定義しているモナディック関数であるactionを使用します。

package sample

import scalaz._, Scalaz._

object ParserStateMonad {
  def action(event: Char) = State((s: ParseState) => {
    (s.event(event), event)
  })

  def parse(events: Seq[Char]): Seq[String] = {
    val s = events.toVector.traverseS(action)
    val r = s.run(InitState)
    r._1.endEvent.row
  }
}

ストリーミング版

それではストリーミング版の作成に入ります。

EventProcessor

まずストリーミング処理の動作環境として、scalaz-streamが提供する非同期キュー(scalaz.stream.async.mutable.Queue)を作成します。

package sample

import scalaz.concurrent.Task
import scalaz.stream._

object EventProcessor {
  val q = async.unboundedQueue[Char]

  val eventStream: Process[Task, Char] = q.dequeue
}

EventProcessorは、scalaz.stream.async.unboundedQueue関数で作成したQueueによって、ストリームに対するイベントと、イベントをハンドリングするProcessモナドを接続します。

Queueに対して送信されたイベントは、Queueのdequeueメソッドで取得できるProcessモナドに転送されます。

StreamingParser

ストリーミング処理用のCSVパーサーは以下になります。

package sample

import scala.language.higherKinds
import scalaz.concurrent.Task
import scalaz.stream._

object StreamingParser {
  def createParser[F[_]](source: Process[F, Char]): Process[F, ParseState] = {
    source.pipe(fsm(InitState))
  }

  def fsm(state: ParseState): Process1[Char, ParseState] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      Process.emit(s) fby fsm(s)
    }
  }
}

まずパーサーはProcessモナドに組み込んで使用する必要があるので、組込み可能なモナディック関数fsmを用意します。これは、Scala的状態機械/FP編で作成したParseProcessMonadStateMonadのfsm関数と同じものです。

その上で、このfsm関数をpipeコンビネータでProcessモナドに組み込む関数createParserを用意しました。この関数は便利関数という位置付けのものです。

つまりScala的状態機械/FP編で作成した部品はそのままストリーミング処理にも適用できるということになります。

使い方

動作確認のためのプログラムは以下になります。

package sample

import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object StreamingParserSample {
  def main(args: Array[String]) {
    val stream = EventProcessor.eventStream
    build(stream)
    execute()
  }

  def build(stream: Process[Task, Char]) {
    Future {
      val parser = StreamingParser.createParser(stream).map {
        case EndState(row) => report(row)
        case x => ignore(x)
      }.run.run
    }
  }

  def report(row: Seq[String]) {
    println(s"report: $row")
  }

  def ignore(s: ParseState) {
    println(s"ignore: $s")
  }

  def execute() {
    val queue = EventProcessor.q
    val a = "abc,def,ghi\n"
    val b = "jkl,mno,pqr\n"
    a.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
    b.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
  }
}

まずイベントの送信ですが、EventProcessorのeventStreamメソッドで取得したProcessモナドに対してbuild関数でパーサーの組込みを行っています。パーサーはStreamingParserのcreateParser関数で作成しています。さらにmapコンビネータでパース結果のコンソール出力処理を追加しています。

execute関数は、EventProcessorのメソッドで取得した非同期キューに対してenqueueOneメソッドでイベントを送出しています。

非同期キューに対して送出したイベントが、非同期キューと連動したProcessモナドに対して送られます。

この例では、プログラム内に埋め込まれたデータを使用していますが、WebサーバーのHTTPリクエストやAkkaのメッセージの受信処理でこの非同期キューに転送することで、簡単にProcessモナドによるストリーミング処理を行うことができます。

実行

実行結果は以下になります。

ignore: InputState(List(),a)
ignore: InputState(List(),ab)
ignore: InputState(List(),abc)
ignore: InputState(List(abc),)
ignore: InputState(List(abc),d)
ignore: InputState(List(abc),de)
ignore: InputState(List(abc),def)
ignore: InputState(List(abc, def),)
ignore: InputState(List(abc, def),g)
ignore: InputState(List(abc, def),gh)
ignore: InputState(List(abc, def),ghi)
report: List(abc, def, ghi)
ignore: InputState(List(),j)
ignore: InputState(List(),jk)
ignore: InputState(List(),jkl)
ignore: InputState(List(jkl),)
ignore: InputState(List(jkl),m)
ignore: InputState(List(jkl),mn)
ignore: InputState(List(jkl),mno)
ignore: InputState(List(jkl, mno),)
ignore: InputState(List(jkl, mno),p)
ignore: InputState(List(jkl, mno),pq)
ignore: InputState(List(jkl, mno),pqr)
report: List(jkl, mno, pqr)

「ignore: InputState(List(),a)」といった形のパース処理の途中結果が流れた後に、「report: List(abc, def, ghi)」といった形のパース結果が流れてくることが確認できました。アプリケーション側ではパース結果のみをmatch式で拾いだして処理をすることになります。

フロー制御

StreamingParserのfsm関数はパース処理の途中結果もすべてストリームを流れてきます。

これはちょっとクールではないので、簡単なフロー制御を入れて対処することにしましょう。

この目的でStreamingParserを改修してStreamingParserRevisedを作成しました。

package sample

import scala.language.higherKinds
import scalaz.concurrent.Task
import scalaz.stream._

object StreamingParserRevised {
  def createParser[F[_]](source: Process[F, Char]): Process[F, Seq[String]] = {
    source.pipe(fsm(InitState))
  }

  def fsm(state: ParseState): Process1[Char, Seq[String]] = {
    Process.receive1 { c: Char =>
      val s = ParserStateMonad.action(c).exec(state)
      s match {
        case EndState(row) => Process.emit(row) fby fsm(InitState)
        case FailureState(row, message) => log(message); fsm(InitState)
        case x => fsm(s)
      }
    }
  }

  def log(msg: String) {
    println(s"error: $msg")
  }
}

fsm関数ではaction関数から返されるStateモナドの実行結果によって以下のように処理を切り替えています。

EndStateの場合
パース結果の文字列をストリームに送出し、状態機械は初期状態に戻す
FailureStateの場合
エラーをログに出力し、状態機械は初期状態に戻す
その他
パース途中結果の状態に状態機械を遷移させる

上記の処理を行うことでEndStateの場合のみ、ストリーム上にデータを送出するようになっています。

これは一種のフロー制御ですが、このようなフロー制御が簡単に記述できることが確認できました。

使い方

StreamingParserRevisedの使用方法は以下になります。

package sample

import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object StreamingParserRevisedSample {
  def main(args: Array[String]) {
    val stream = EventProcessor.eventStream
    build(stream)
    execute()
  }

  def build(stream: Process[Task, Char]) {
    Future {
      val parser = StreamingParserRevised.createParser(stream).
        map(row => report(row)).run.run
    }
  }

  def report(row: Seq[String]) {
    println(s"report: $row")
  }

  def execute() {
    val queue = EventProcessor.q
    val a = "abc,def,ghi\n"
    val b = "jkl,mno,pqr\n"
    a.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
    b.foreach(queue.enqueueOne(_).run)
    Thread.sleep(2000)
  }
}

StreamingParserの場合はストリーミングに処理途中、処理結果を問わずParseStateが流れてくるので、このParseStateのハンドリングを行っていました。

StreamingParserRevisedでは、処理結果の文字列のみが流れてくるので、処理結果に対する処理のみを記述する形になっています。

実行

実行結果は以下になります。

report: List(abc, def, ghi)
report: List(jkl, mno, pqr)

まとめ

ストリーミング処理をProcessモナドで記述するメリットは、Processモナド用に作成した部品やアプリケーション・ロジックをそのままストリーミング処理に適用することができる点です。

今回の例でも簡単な改修で適用することができました。

ただしProcessモナド用に作成した部品もストリーミング処理で使用すると効率的でない部分も出てくるので、必要に応じて最適化を行っていくことになります。

ストリーミング処理での最適化ではフロー制御が重要な要因となります。

フロー制御を実現するためには、ストリーミング処理内で状態機械を記述できる必要があります。Processモナドでは、この状態機械の記述が可能なので、フロー制御も簡単に実現できることが確認できました。

諸元

  • Scala 2.11.4
  • Scalaz 7.1.0
  • Scalaz-stream 0.6a
  • Scalatest 2.2.4