あけましておめでとうございます。 Impulse開発チームの木村です。
今回は、Spark 1.6.0で導入されたDataset APIを、spark-shell上で触ってみました。
Dataset APIとは
Dataset APIは、RDDやDataFrameと同じく、データのまとまりを扱うためのAPIです。 RDDとDataFrame双方の長所を合わせ持つAPIとして、開発が進められています。 (なお、ver1.6.0でのDataset API導入は、あくまで実験的なものであり、ver2.0.0での本リリースが予定されているようです。)
対応するIssueによれば、Datasetの要件として次が掲げられています。
- Fast
- ほとんどのケースで、RDD以上のパフォーマンス
- Typesafe
- Support for a variety of object models
- デフォルトで様々なオブジェクトを、Datasetの要素としてエンコード可能
- Java Compatible
- Interoperates with DataFrames
- DatasetとDataFrame間での(お決まりのコードを書く必要のない)シームレスな変換
ver1.6.0時点では、ScalaとJavaからDataset APIを使用できます。 Pythonのサポートは、先のリリースとなるそうです。
Datasetの作り方
- Seqから作る
toDS
を使って簡単に、SeqをDatasetに変換できます。
// spark-shell上では、sqlContext.implicits._のimportは不要 // 数が要素のDataset scala> val numberDS = Seq(1, 2, 3).toDS() ds: org.apache.spark.sql.Dataset[Int] = [value: int] scala> numberDS.show() +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ // case classのインスタンスが要素のDataset scala> :paste case class Player( name: String, weapon: String, rank: Int ) defined class Player scala> :paste val playerDS = Seq( Player("Ponce", "Splattershot Jr.", 9), Player("Paciorek", "E-Liter 3K", 50), Player("Petrick", "Splattershot Jr.", 12) ).toDS() playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int] scala> playerDS.show() +--------+----------------+----+ | name| weapon|rank| +--------+----------------+----+ | Ponce|Splattershot Jr.| 9| |Paciorek| E-Liter 3K| 50| | Petrick|Splattershot Jr.| 12| +--------+----------------+----+
- DataFrameから作る
DataFrame#as[T]
を使って、DataFrameをDatasetに変換できます。
// 適当なDataFrameを作成 scala> val df = sqlContext.read.json(sc.parallelize("""{"x": 11, "y": 22}""" :: Nil)) df: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint] scala> case class Coordinate(x: Long, y: Long) defined class Coordinate // DataFrameのカラムとcase classのフィールドの対応は、名前で判断される scala> val ds = df.as[Coordinate] ds: org.apache.spark.sql.Dataset[Coordinate] = [x: bigint, y: bigint] scala> ds.show() +---+---+ | x| y| +---+---+ | 11| 22| +---+---+
Datasetへの操作
今までと似た感覚で操作を行えます。
- フィルタ
scala> val numberDS = Seq(1, 2, 3).toDS() numberDS: org.apache.spark.sql.Dataset[Int] = [value: int] scala> numberDS.show() +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ scala> :paste numberDS .filter(_ >= 2) .show() +-----+ |value| +-----+ | 2| | 3| +-----+
- 集計
scala> :paste case class Player( name: String, weapon: String, rank: Int ) defined class Player scala> :paste val playerDS = Seq( Player("Ponce", "Splattershot Jr.", 9), Player("Paciorek", "E-Liter 3K", 50), Player("Petrick", "Splattershot Jr.", 12) ).toDS() playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int] scala> playerDS.show() +--------+----------------+----+ | name| weapon|rank| +--------+----------------+----+ | Ponce|Splattershot Jr.| 9| |Paciorek| E-Liter 3K| 50| | Petrick|Splattershot Jr.| 12| +--------+----------------+----+ scala> :paste playerDS .groupBy(_.weapon) .agg(count("weapon")) .show() +----------------+-------------+ | value|count(weapon)| +----------------+-------------+ | E-Liter 3K| 1| |Splattershot Jr.| 2| +----------------+-------------+
ただし、RDDやDataFrameに存在する操作の一部(orderBy
やdrop
など)は、現時点では実装されていません。
このような操作を行う場合は、一度RDDやDataFrameに変換する必要があります。
(Dataset#rdd
でRDDに、Dataset#toDF
でDataFrameに変換することができます)
おわりに
Dataset APIについて紹介しました。
Sparkはバージョンアップの間隔が短く、新しい機能がどんどん出てきます。 追っていくのは比較的大変ですが、良い新機能はどんどん取り入れるべく、精進していこうと思います。
(deprecatedが頻発しないといいなあ…)