どうも、ポンセです。
前回の続きです(タイトルを微妙に変えていますが)。
SparkというよりSQLのWindow関数周りの話になっている気がしますが、気にせず書きます。
今回はSQLの形式で書きたいと思います。
ROWS
前回と同様にグループ単位での平均値を行毎で算出するのですが、
windowの行数を指定して集計したいとします。
そんな時は ROWS BETWEEN ~ AND ~ 。
データ
sample +-------------------+-------+-----+ | timestamp|metrics|value| +-------------------+-------+-----+ |2015-08-01 00:00:00| cpu| 40| |2015-08-01 00:00:00| memory| 50| |2015-08-01 00:00:00|disk_io| 70| |2015-08-01 01:00:00| cpu| 20| |2015-08-01 01:00:00| memory| 90| |2015-08-01 01:00:00|disk_io| 70| |2015-08-01 02:00:00| cpu| 100| |2015-08-01 02:00:00| memory| 55| |2015-08-01 02:00:00|disk_io| 72| |2015-08-01 03:00:00| cpu| 30| |2015-08-01 03:00:00| memory| 60| |2015-08-01 03:00:00|disk_io| 72| |2015-08-01 04:00:00| cpu| 50| |2015-08-01 04:00:00| memory| 65| |2015-08-01 04:00:00|disk_io| 76| |2015-08-01 05:00:00| cpu| 45| |2015-08-01 05:00:00| memory| 40| |2015-08-01 05:00:00|disk_io| 90| +-------------------+-------+-----+
SQL
SELECT timestamp, metrics, value, AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average FROM sample
結果
+-------------------+-------+-----+------------------+ | timestamp|metrics|value| average| +-------------------+-------+-----+------------------+ |2015-08-01 00:00:00| cpu| 40| 30.0| |2015-08-01 01:00:00| cpu| 20|53.333333333333336| |2015-08-01 02:00:00| cpu| 100| 50.0|<- *** |2015-08-01 03:00:00| cpu| 30| 60.0| |2015-08-01 04:00:00| cpu| 50|41.666666666666664| |2015-08-01 05:00:00| cpu| 45| 47.5| |2015-08-01 00:00:00|disk_io| 70| 70.0| |2015-08-01 01:00:00|disk_io| 70| 70.66666666666667| |2015-08-01 02:00:00|disk_io| 72| 71.33333333333333| |2015-08-01 03:00:00|disk_io| 72| 73.33333333333333| |2015-08-01 04:00:00|disk_io| 76| 79.33333333333333| |2015-08-01 05:00:00|disk_io| 90| 83.0| |2015-08-01 00:00:00| memory| 50| 70.0| |2015-08-01 01:00:00| memory| 90| 65.0| |2015-08-01 02:00:00| memory| 55| 68.33333333333333| |2015-08-01 03:00:00| memory| 60| 60.0| |2015-08-01 04:00:00| memory| 65| 55.0| |2015-08-01 05:00:00| memory| 40| 52.5| +-------------------+-------+-----+------------------+
例えば3行目を見てみます。
3行目はmetrics cpuに対して、20(1つ前の行)、 100(現在行)、30(1つ後ろの行)の平均値(すなわち50)となっています。
このようにROWS BETWEEN ~ AND ~ の構文で行数単位での集計が可能となります。
RANGE
今度はある値より大きいまたは小さい行でグルーピングして集計したいとします。
そんな時は RANGE BETWEEN ~ AND ~ 。
データ
データは上記例とは違い、timestampの間隔をバラバラにしています。
ex_sample +-------------------+-------+-----+ | timestamp|metrics|value| +-------------------+-------+-----+ |2015-08-01 00:00:00| cpu| 40| |2015-08-01 00:02:00| memory| 50| |2015-08-01 00:01:00|disk_io| 70| |2015-08-01 00:05:00| cpu| 20| |2015-08-01 00:04:00| memory| 90| |2015-08-01 00:04:00|disk_io| 70| |2015-08-01 00:07:00| cpu| 100| |2015-08-01 00:10:00| memory| 55| |2015-08-01 00:15:00|disk_io| 72| |2015-08-01 00:15:00| cpu| 30| |2015-08-01 00:30:00| memory| 60| |2015-08-01 00:20:00|disk_io| 72| |2015-08-01 00:27:00| cpu| 50| |2015-08-01 00:35:00| memory| 65| |2015-08-01 00:24:00|disk_io| 76| |2015-08-01 00:32:00| cpu| 45| |2015-08-01 00:54:00| memory| 40| |2015-08-01 00:40:00|disk_io| 90| +-------------------+-------+-----+
SQL
SELECT timestamp, unix_timestamp(timestamp) as unix_time, metrics, value, AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average FROM ex_sample
結果
+-------------------+----------+-------+-----+------------------+ | timestamp| unix_time|metrics|value| average| +-------------------+----------+-------+-----+------------------+ |2015-08-01 00:00:00|1438354800| cpu| 40| 40.0| |2015-08-01 00:05:00|1438355100| cpu| 20| 30.0| |2015-08-01 00:07:00|1438355220| cpu| 100|53.333333333333336| |2015-08-01 00:15:00|1438355700| cpu| 30| 50.0|<- *** |2015-08-01 00:27:00|1438356420| cpu| 50| 50.0| |2015-08-01 00:32:00|1438356720| cpu| 45| 47.5| |2015-08-01 00:01:00|1438354860|disk_io| 70| 70.0| |2015-08-01 00:04:00|1438355040|disk_io| 70| 70.0| |2015-08-01 00:15:00|1438355700|disk_io| 72| 72.0| |2015-08-01 00:20:00|1438356000|disk_io| 72| 72.0| |2015-08-01 00:24:00|1438356240|disk_io| 76| 73.33333333333333| |2015-08-01 00:40:00|1438357200|disk_io| 90| 90.0| |2015-08-01 00:02:00|1438354920| memory| 50| 50.0| |2015-08-01 00:04:00|1438355040| memory| 90| 70.0| |2015-08-01 00:10:00|1438355400| memory| 55| 65.0| |2015-08-01 00:30:00|1438356600| memory| 60| 60.0| |2015-08-01 00:35:00|1438356900| memory| 65| 62.5| |2015-08-01 00:54:00|1438358040| memory| 40| 40.0| +-------------------+----------+-------+-----+------------------+
わかりやすくするために、unixtimeの行を追加しています。
例えば4行目を見てみますと、
この行ではORDER BY句で指定しているunixtimeに対して、600秒前から現在行すなわち、30(現在行)、100(1つ前)、20(2つ前)の平均(すなわち50)となっています。
このようにRANGE BETWEEN ~ AND ~ の構文で特定値より大きいまたは小さい行による集計が可能となります。
まとめ
・ROWSとRANGEを使って行数指定、値の幅指定を行ってSparkで集計してみました(全然大規模なデータじゃないけど)。
さいごに
Window関数に加えて、Spark1.5では Date time や String 周りの関数がかなりサポートされているので、集計周りは相当充実してきた印象です。もうすぐ1.6もリリースされそうなのでそちらにも注目していきたいところですね。
今回の例で使ったpythonのコードを載せておきます。
df = sqlCtx.createDataFrame([ ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:00:00','memory',50), ('2015-08-01 00:00:00','disk_io',70), ('2015-08-01 01:00:00','cpu',20), ('2015-08-01 01:00:00','memory',90), ('2015-08-01 01:00:00','disk_io',70), ('2015-08-01 02:00:00','cpu',100), ('2015-08-01 02:00:00','memory',55), ('2015-08-01 02:00:00','disk_io',72), ('2015-08-01 03:00:00','cpu',30), ('2015-08-01 03:00:00','memory',60), ('2015-08-01 03:00:00','disk_io',72), ('2015-08-01 04:00:00','cpu',50), ('2015-08-01 04:00:00','memory',65), ('2015-08-01 04:00:00','disk_io',76), ('2015-08-01 05:00:00','cpu',45), ('2015-08-01 05:00:00','memory',40), ('2015-08-01 05:00:00','disk_io',90) ], ['timestamp','metrics','value']) df.registerTempTable('sample') q = """ SELECT timestamp, metrics, value, AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average FROM sample """ sqlContext.sql(q).show() ex_df = sqlCtx.createDataFrame([ ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:02:00','memory',50), ('2015-08-01 00:01:00','disk_io',70), ('2015-08-01 00:05:00','cpu',20), ('2015-08-01 00:04:00','memory',90), ('2015-08-01 00:04:00','disk_io',70), ('2015-08-01 00:07:00','cpu',100), ('2015-08-01 00:10:00','memory',55), ('2015-08-01 00:15:00','disk_io',72), ('2015-08-01 00:15:00','cpu',30), ('2015-08-01 00:30:00','memory',60), ('2015-08-01 00:20:00','disk_io',72), ('2015-08-01 00:27:00','cpu',50), ('2015-08-01 00:35:00','memory',65), ('2015-08-01 00:24:00','disk_io',76), ('2015-08-01 00:32:00','cpu',45), ('2015-08-01 00:54:00','memory',40), ('2015-08-01 00:40:00','disk_io',90) ], ['timestamp','metrics','value']) ex_df.registerTempTable('ex_sample') q = """ SELECT timestamp, unix_timestamp(timestamp) as unix_time, metrics, value, AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average FROM ex_sample """ sqlContext.sql(q).show()