こんにちは。春休みにブレインズテクノロジーのインターンシップに参加した、現在学部4年生の松井です。
インターン中にやったこと、ハマったことなどをまとめてみました。
- やったこと
- S3に置かれているログデータをローカルに保存
- ローカルに保存したデータをDataframeとして読み出し
- DataFrameに対して、意味のある情報の抜き出し・データ整形といった処理を行う
- SQL文を記述してグラフ出力
やったこと
Apache Zeppelinというブラウザ上で動くインタラクティブシェルを用いて、S3に置かれているサーバ4台のメトリクス1週間分のログデータを可視化しました。
可視化までのステップを大雑把に区切ると以下のような感じです。
- S3に置かれているログデータを必要な分ローカルに保存
- ローカルに保存したデータをDataframeとして読み出し
- DataFrameに対し、意味のある情報の抜き出し・データ整形といった処理を行う
- SQL文を記述してグラフ出力
Spark SQLについて
Apache Zeppelinには分散処理のフレームワークであるApache Sparkのインタプリタが組み込まれており、データの整形・可視化にはApache Sparkの1コンポーネントであるSpark SQLを使いました。
Spark SQLは、関係データベースと似ているDataFrameというデータ構造を使っています。
DataFrameのAPIはScala, Java, Python, Rで利用することができ、今回のコードはすべてScalaで記述しています。
実行環境について
メモリ16GBのMacBook Proで実行しました。
Zeppelinのデフォルトの設定だと、メモリ領域やヒープ領域が足りずOutOfMemoryErrorやStackOverflowErrorが出たので、InterpreterからSparkインタプリタの設定を変更してみましたがどうも上手くいかず。。。
最終的には、設定ファイルconf/zeppelin-env.shに以下の行を追加することで解決しました。
export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" export ZEPPELIN_MEM="-Xms4096m -Xmx4096m -Xss4096k -XX:PermSize=256m -XX:MaxPermSize=256m" # Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxPermSize=512m
ちなみに、同ファイルにS3へのアクセスキーなども記述する必要があります。
export AWS_SECRET_ACCESS_KEY=#### export AWS_ACCESS_KEY_ID=#### export AWS_DEFAULT_REGION=####
S3に置かれているログデータをローカルに保存
まず、取得する期間やパスなどを初期変数として定めておきます。この変数は今後も使います。
///初期変数 //startDateのhh/mm/ssは無視されて、各日のデータはすべて取得される //サーバからローカルに保存する場合も、ローカルからDataFrameに保存する場合も、同じこれらの初期変数を使う val dayRange = z.input("ログ取得日数", 7).toString.toInt val baseKey = z.input("S3パス", "Your Server's Path").toString val baseLocalPath = z.input("保存先パス", "Your Local Path").toString val startDate = z.input("ログ取得開始日時(yyyy/MM/dd HH:mm:ss)", "2017/02/21 23:00:00").toString
日付のパースなどを行い、4台あるサーバのデータをそれぞれローカルに保存していきます。
///S3からログデータ取得、ローカルに保存 import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.{LongWritable, Text} import java.text.SimpleDateFormat import java.util.Calendar import java.util.TimeZone import java.util.ArrayList import org.apache.http.NameValuePair import org.apache.http.client.entity.UrlEncodedFormEntity import org.apache.http.client.methods.HttpPost import org.apache.http.impl.client.DefaultHttpClient import org.apache.http.message.BasicNameValuePair //Dateのパース var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/HH/mm") sdf.setTimeZone(TimeZone.getTimeZone("JST")) var nowdt:Calendar = Calendar.getInstance nowdt.setTimeZone(TimeZone.getTimeZone("JST")) if(startDate.length > 0) { var startDateTmp = startDate.split(" ") var sYear = startDateTmp(0).split("/")(0).toInt var sMonth = startDateTmp(0).split("/")(1).toInt-1 var sDay = startDateTmp(0).split("/")(2).toInt var sHour = startDateTmp(1).split(":")(0).toInt var sMinute = startDateTmp(1).split(":")(1).toInt var sSecond = startDateTmp(1).split(":")(2).toInt nowdt.set(sYear,sMonth,sDay,sHour,sMinute,sSecond) } var nowdate = sdf.format(nowdt.getTime) def saveAsTextFile(rdd:RDD[String], localPath:String) = { rdd.isEmpty() // InvalidInputException (matches 0 files) 対策 rdd.saveAsTextFile(localPath,classOf[org.apache.hadoop.io.compress.GzipCodec]) println("save as text file =>"+localPath) } def get(inputPath:String, host:String) = { try { var textFileRDD = sc.textFile(baseKey+inputPath+"*") saveAsTextFile(textFileRDD.repartition(1), baseLocalPath+inputPath+"0") } catch { case faee:org.apache.hadoop.mapred.FileAlreadyExistsException => throw faee } } def getDate(year:Int, month:Int, day:Int, add:Int) : String = { val resDate = "%tY/%<tm/%<td" format new java.util.GregorianCalendar(year, month-1, day-add); return resDate; } def zeroPadding2(str:String) : String = { var res = str if(str.length == 1) res = "0"+str return res } val nowdatesplit = nowdate.split("/") val nowYear = nowdatesplit(0).toInt val nowMonth = nowdatesplit(1).toInt val nowDay = nowdatesplit(2).toInt val nowHour = nowdatesplit(3).toInt val nowMinuteStr = nowdatesplit(4).substring(0,1)+"0" // example 27 -> 20 //ログデータを10分ごとに切り出しまとめて保存 try { for(addDay <- 0 to (dayRange-1)) { for(addHour <- ((24-1) to 0 by -1)) { //(nowHour to 0 by -1) for(addMinute <- List(50,40,30,20,10,0)) { var analysisDate = getDate(nowYear, nowMonth, nowDay, addDay) var fileDate = analysisDate.replaceAll("/","") var dateSplit = analysisDate.split("/") var year = dateSplit(0) var month = dateSplit(1) var day = dateSplit(2) var hourStr = zeroPadding2(addHour.toString) var minuteStr = zeroPadding2(addMinute.toString) var minutePath = minuteStr.substring(0,1) get("SERVER-MG/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-MG") get("SERVER-SR01/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR01") get("SERVER-SR02/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR02") get("SERVER-SR03/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR03") } } } } catch { // 取得するログの日付を遡り取得済みであれば処理終了 case faee:org.apache.hadoop.mapred.FileAlreadyExistsException => println("exit : "+faee) }
ローカルに保存したデータをDataframeとして読み出し
前の処理でローカルに保存したログデータを、まずは1つのDataframeに格納します。
///ローカルに保存してあるログデータをDataFrame形式で読み込み import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.{LongWritable, Text} import java.text.SimpleDateFormat import java.util.Calendar import java.util.TimeZone import java.util.ArrayList import org.apache.http.NameValuePair import org.apache.http.client.entity.UrlEncodedFormEntity import org.apache.http.client.methods.HttpPost import org.apache.http.impl.client.DefaultHttpClient import org.apache.http.message.BasicNameValuePair //Dateのパースは同様 var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/HH/mm") sdf.setTimeZone(TimeZone.getTimeZone("JST")) var nowdt:Calendar = Calendar.getInstance nowdt.setTimeZone(TimeZone.getTimeZone("JST")) if(startDate.length > 0) { var startDateTmp = startDate.split(" ") var sYear = startDateTmp(0).split("/")(0).toInt var sMonth = startDateTmp(0).split("/")(1).toInt-1 var sDay = startDateTmp(0).split("/")(2).toInt var sHour = startDateTmp(1).split(":")(0).toInt var sMinute = startDateTmp(1).split(":")(1).toInt var sSecond = startDateTmp(1).split(":")(2).toInt nowdt.set(sYear,sMonth,sDay,sHour,sMinute,sSecond) } var nowdate = sdf.format(nowdt.getTime) def getDate(year:Int, month:Int, day:Int, add:Int) : String = { val resDate = "%tY/%<tm/%<td" format new java.util.GregorianCalendar(year, month-1, day-add); return resDate; } def zeroPadding2(str:String) : String = { var res = str if(str.length == 1) res = "0"+str return res } val nowdatesplit = nowdate.split("/") val nowYear = nowdatesplit(0).toInt val nowMonth = nowdatesplit(1).toInt val nowDay = nowdatesplit(2).toInt val nowHourStr = nowdatesplit(3) val nowMinuteStr = nowdatesplit(4).substring(0,1)+"0" //全データをmetricDfに格納 var metricDf:org.apache.spark.sql.DataFrame = null try { for(addDay <- (0 to (dayRange-1))) { for(addHour <- ((24-1) to 0 by -1)) { for(addMinute <- List(50,40,30,20,10,0)) { //50,40,30,20,10,0 var analysisDate = getDate(nowYear, nowMonth, nowDay, addDay) var fileDate = analysisDate.replaceAll("/","") var dateSplit = analysisDate.split("/") var year = dateSplit(0) var month = dateSplit(1) var day = dateSplit(2) var hourStr = zeroPadding2(addHour.toString) var minuteStr = zeroPadding2(addMinute.toString) var minutePath = minuteStr.substring(0,1) var minutePathForDisplay = minuteStr.substring(0,2) var metricDfMg = sqlContext.read.json(baseLocalPath+"SERVER-MG/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-MG") var metricDfS1 = sqlContext.read.json(baseLocalPath+"SERVER-SR01/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR01") var metricDfS2 = sqlContext.read.json(baseLocalPath+"SERVER-SR02/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR02") var metricDfS3 = sqlContext.read.json(baseLocalPath+"SERVER-SR03/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR03") val path = year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePathForDisplay println(path) if (metricDf == null) { metricDf = metricDfMg.unionAll(metricDfS1).unionAll(metricDfS2).unionAll(metricDfS3) metricDf = metricDfMg } else { metricDf = metricDf.unionAll(metricDfMg).unionAll(metricDfS1).unionAll(metricDfS2).unionAll(metricDfS3) } metricDfMg = null; metricDfS1 = null; metricDfS2 = null; metricDfS3 = null; } } } } catch { case iie:java.io.IOException => println(iie) }
DataFrameに対して、意味のある情報の抜き出し・データ整形といった処理を行う
今回取り出したい情報は、CPU利用率、ディスクI/O、ファイルシステム利用率、メモリ利用率、ネットワーク流量の5つです。
metricDfには余計な情報も含まれているので、これらの情報のみを抜き出します。
さらに、メトリクスには累計ネットワーク流量が記録されていたので、これを使って単位時間あたりのネットワーク流量を計算します。
まずは前の処理で得たmetricDfから上記の項目ごとの切り出しを行い、それぞれの項目ごとにDataFrameを作成します。
import sqlContext.implicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions.{unix_timestamp, to_date} //DataFrameから項目ごとに切り出し、それぞれの項目ごとにDataFrame作成 var cpuDf = metricDf.where("metricset.name like '%cpu%'") var diskDf = metricDf.where("metricset.name like '%diskio%'") var fsDf = metricDf.where("metricset.name like '%filesystem%'") var memoryDf = metricDf.where("metricset.name like '%memory%'") var networkDf = metricDf.where("metricset.name like '%network%'") //必要なカラムだけ取り出す cpuDf = cpuDf.select("@timestamp", "beat.hostname", "system.cpu.system.pct", "system.cpu.user.pct") val newNamesCpu = Seq("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User") //パーセント表示 cpuDf = cpuDf.toDF(newNamesCpu:_*) diskDf = diskDf.select("@timestamp", "beat.hostname", "system.diskio.read.bytes", "system.diskio.write.bytes") val newNamesDisk = Seq("Date", "Host", "Disk_IO_Reading_Bytes", "Disk_IO_Writing_Bytes") //バイト表示 diskDf = diskDf.toDF(newNamesDisk:_*) fsDf = fsDf.select("@timestamp", "beat.hostname", "system.filesystem.used.pct") val newNamesFs = Seq("Date", "Host", "Filesystem_Usage") //パーセント表示 fsDf = fsDf.toDF(newNamesFs:_*) memoryDf = memoryDf.select("@timestamp", "beat.hostname", "system.memory.used.pct") val newNamesMemory = Seq("Date", "Host", "Memory_Usage") //パーセント表示 memoryDf = memoryDf.toDF(newNamesMemory:_*) networkDf = networkDf.select("@timestamp", "beat.hostname", "system.network.in.bytes", "system.network.out.bytes") val newNamesNetwork = Seq("Date", "Host", "Network_In_Bytes", "Network_Out_Bytes") //バイト表示 networkDf = networkDf.toDF(newNamesNetwork:_*)
このあと、DataFrameのままSQLライクなメソッドチェーンを繋げて処理をしていたのですが、ものすごく実行時間が掛かってしまっていました。
そこで、DataFrameをRDD経由でScalaのArrayに変換し、Arrayに対して処理を行うことで、処理速度が大幅に改善されました。
DataFrameをScalaのArrayとして操作する
CPU利用率についての処理を行います。
まず、Dataをパースし、最小時間単位をhourに直します。
//Dateをmapして、最小時間単位をhourにするよう部分文字列切り出し val DateTime = ("""([0-9]{4}-[0-9]{2}-[0-9]{2})""" + "T" + """([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})""" + "Z").r def get(s:String) = s match { case DateTime(d,t) => d + " " + t.slice(0,2) } var rowsCpu: RDD[Row] = cpuDf.rdd var cpuDfMapped = rowsCpu.map({ case Row(date: String, host: String, cPU_Usage_By_System: Double, cPU_Usage_By_User: Double) => (get(date), host, cPU_Usage_By_System, cPU_Usage_By_User) }).toDF("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User")
1週間分のデータを可視化するので、毎時間最大値の情報のみを使うことにします。
import org.apache.spark.sql.functions.{lead, lag} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ //Cpuログをホストごとに分類 <- この部分、処理を軽くするために行っているので場合によっては分けなくてよい(groupByでDate,Hostを指定しているため) var cpuDf_filtered_mg = cpuDfMapped.where('Host === "SERVER-MG") var cpuDf_filtered_1 = cpuDfMapped.where('Host === "SERVER-SR01") var cpuDf_filtered_2 = cpuDfMapped.where('Host === "SERVER-SR02") var cpuDf_filtered_3 = cpuDfMapped.where('Host === "SERVER-SR03") //毎時間最大値を抽出 //rowscpuDf_mgの型: Array[org.apache.spark.sql.Row] var rowscpuDf_mg = cpuDf_filtered_mg.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect() var rowscpuDf_1 = cpuDf_filtered_1.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect() var rowscpuDf_2 = cpuDf_filtered_2.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect() var rowscpuDf_3 = cpuDf_filtered_3.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect() //cpuDf_mg: Array[(String, String, Double, Double)] var cpuDf_mg = rowscpuDf_mg.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) } var cpuDf_1 = rowscpuDf_1.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) } var cpuDf_2 = rowscpuDf_2.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) } var cpuDf_3 = rowscpuDf_3.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) } //Arrayを統合し、DataFrameに戻しテーブル登録 var cpuDfs = cpuDf_mg ++ cpuDf_1 ++ cpuDf_2 ++ cpuDf_3 var cpuDfs_modified = sc.makeRDD(cpuDfs).toDF("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User") cpuDfs_modified.registerTempTable("T_CPU")
ディスクI/O・ファイルシステム利用率・メモリ利用率の他3つのデータについての操作は、これと同様に行いました。
Scalaのコレクションメソッドmapを使った処理
ネットワーク流量についての処理を行います。
メトリクスに記録されていたネットワーク情報は、上り・下り転送量の累積バイト数であり、2^32-1バイトで循環的になっていました。
そこで、1つ前のデータとの差分を取ることで、単位時間あたりのデータ転送量を計算しました。
//Dateをmapして、最小時間単位をhourにするよう部分文字列切り出し val DateTime = ("""([0-9]{4}-[0-9]{2}-[0-9]{2})""" + "T" + """([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})""" + "Z").r def get(s:String) = s match { case DateTime(d,t) => d + " " + t.slice(0,2) } var rowsNetwork:RDD[Row] = networkDf.rdd var networkDfMapped = rowsNetwork.map({ case Row(date: String, host: String, network_In_Bytes: Long, network_Out_Bytes: Long) => (get(date), host, network_In_Bytes, network_Out_Bytes) }).toDF("Date", "Host", "Network_In_Bytes", "Network_Out_Bytes")
Scalaのコレクションのメソッドであるmapを使って差分の計算を行います。負になった場合は循環の切れ目が入っていると判断し、カウンタ値2^32 - 1を使って処理しました。
SERVER-MGサーバのデータだけであとの3台分は省略しています。
import org.apache.spark.sql.functions.{lead, lag} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ //Networkログをホストごとに分類 var networkDf_filtered_mg = networkDfMapped.where('Host === "SERVER-MG") //毎時間最大値を抽出 //rowsNetwork_mg: Array[org.apache.spark.sql.Row] var rowsNetwork_mg = networkDf_filtered_mg.groupBy("Date", "Host").agg(max("Network_In_Bytes"), max("Network_Out_Bytes")).sort($"Date").rdd.collect() //差分を計算するための関数 def delta(a:Any, b:Any): Long = { var atmp: Long = a.asInstanceOf[Long] var btmp: Long = b.asInstanceOf[Long] if (atmp - btmp > 0) { return atmp - btmp } else { return 4294967295L - btmp + atmp //4294967295Lはカウンタ値 } } //tailで先頭の要素を除いたArrayを、zipでもとのArrayの同インデックス要素とのタプルにし、mapで差分を計算 var rowsNetwork_mg_delta = rowsNetwork_mg.zip(rowsNetwork_mg.tail).map{ u => (u._2.apply(0).toString, u._2.apply(1).toString, delta(u._2.apply(2), u._1.apply(2)), delta(u._2.apply(3), u._1.apply(3))) } //DataFrameに戻し、列の名前を付ける val networkDf_mg_modified = sc.makeRDD(rowsNetwork_mg_delta).toDF("Date", "Host", "Network_In_Per_DeltaTime", "Network_Out_Per_DeltaTime") //テーブル登録 networkDf_mg_modified.registerTempTable("T_NETWORK_MG")