Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

Spark1.6.0のDataset APIを触ってみた

あけましておめでとうございます。 Impulse開発チームの木村です。

今回は、Spark 1.6.0で導入されたDataset APIを、spark-shell上で触ってみました。

Dataset APIとは

Dataset APIは、RDDやDataFrameと同じく、データのまとまりを扱うためのAPIです。 RDDとDataFrame双方の長所を合わせ持つAPIとして、開発が進められています。 (なお、ver1.6.0でのDataset API導入は、あくまで実験的なものであり、ver2.0.0での本リリースが予定されているようです。)

対応するIssueによれば、Datasetの要件として次が掲げられています。

  • Fast
    • ほとんどのケースで、RDD以上のパフォーマンス
  • Typesafe
  • Support for a variety of object models
    • デフォルトで様々なオブジェクトを、Datasetの要素としてエンコード可能
  • Java Compatible
  • Interoperates with DataFrames
    • DatasetとDataFrame間での(お決まりのコードを書く必要のない)シームレスな変換

ver1.6.0時点では、ScalaJavaからDataset APIを使用できます。 Pythonのサポートは、先のリリースとなるそうです。

Datasetの作り方

  • Seqから作る

toDSを使って簡単に、SeqをDatasetに変換できます。

// spark-shell上では、sqlContext.implicits._のimportは不要

// 数が要素のDataset
scala> val numberDS = Seq(1, 2, 3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> numberDS.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

// case classのインスタンスが要素のDataset
scala> :paste
case class Player(
  name: String,
  weapon: String,
  rank: Int
)
defined class Player

scala> :paste
val playerDS = Seq(
  Player("Ponce", "Splattershot Jr.", 9), 
  Player("Paciorek", "E-Liter 3K", 50),
  Player("Petrick", "Splattershot Jr.", 12)
).toDS()
playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int]

scala> playerDS.show()
+--------+----------------+----+
|    name|          weapon|rank|
+--------+----------------+----+
|   Ponce|Splattershot Jr.|   9|
|Paciorek|      E-Liter 3K|  50|
| Petrick|Splattershot Jr.|  12|
+--------+----------------+----+
  • DataFrameから作る

DataFrame#as[T]を使って、DataFrameをDatasetに変換できます。

// 適当なDataFrameを作成
scala> val df = sqlContext.read.json(sc.parallelize("""{"x": 11, "y": 22}""" :: Nil))
df: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]

scala> case class Coordinate(x: Long, y: Long)
defined class Coordinate

// DataFrameのカラムとcase classのフィールドの対応は、名前で判断される
scala> val ds = df.as[Coordinate]
ds: org.apache.spark.sql.Dataset[Coordinate] = [x: bigint, y: bigint]

scala> ds.show()
+---+---+
|  x|  y|
+---+---+
| 11| 22|
+---+---+

Datasetへの操作

今までと似た感覚で操作を行えます。

  • フィルタ
scala> val numberDS = Seq(1, 2, 3).toDS()
numberDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> numberDS.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

scala> :paste
numberDS
  .filter(_ >= 2)
  .show()
+-----+
|value|
+-----+
|    2|
|    3|
+-----+
  • 集計
scala> :paste
case class Player(
  name: String,
  weapon: String,
  rank: Int
)
defined class Player

scala> :paste
val playerDS = Seq(
  Player("Ponce", "Splattershot Jr.", 9), 
  Player("Paciorek", "E-Liter 3K", 50),
  Player("Petrick", "Splattershot Jr.", 12)
).toDS()
playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int]

scala> playerDS.show()
+--------+----------------+----+
|    name|          weapon|rank|
+--------+----------------+----+
|   Ponce|Splattershot Jr.|   9|
|Paciorek|      E-Liter 3K|  50|
| Petrick|Splattershot Jr.|  12|
+--------+----------------+----+

scala> :paste
playerDS
  .groupBy(_.weapon)
  .agg(count("weapon"))
  .show()
+----------------+-------------+
|           value|count(weapon)|
+----------------+-------------+
|      E-Liter 3K|            1|
|Splattershot Jr.|            2|
+----------------+-------------+

ただし、RDDやDataFrameに存在する操作の一部(orderBydropなど)は、現時点では実装されていません。 このような操作を行う場合は、一度RDDやDataFrameに変換する必要があります。 (Dataset#rddRDDに、Dataset#toDFでDataFrameに変換することができます)

おわりに

Dataset APIについて紹介しました。

Sparkはバージョンアップの間隔が短く、新しい機能がどんどん出てきます。 追っていくのは比較的大変ですが、良い新機能はどんどん取り入れるべく、精進していこうと思います。

(deprecatedが頻発しないといいなあ…)

参考