CCCMKホールディングス TECH Labの Tech Blog

TECH Labスタッフによる格闘記録やマーケティング界隈についての記事など

Spark UIを使ってSparkがどんな風に処理を行っているのか見てみました!

こんにちは、CCCMKホールディングス技術開発の三浦です。

2月に入り立春が過ぎたので、冬が終わり春がやってくる頃です。相変わらず寒いのですが、外に出るとなんとなく空気の肌触りが丸くなったかな?と感じることが増えてきました。季節は4つあってそれぞれ季節の変わり目があるのですが、冬から春への季節の変わり目は他に比べてなんだか特別な感じがします。

さて、この頃Sparkを使って機械学習モデルを学習する処理を実行する機会が増えてきて、ようやく使えるようになってきたかな?と感じ始めてきました。そうなると今度はもっと処理を高速化するにはどうしたらいいんだろう、といったことにも興味が出てきて、そのためにもSparkではどんな風に処理を行っているのかを見てみたいと考えるようになりました。Spark UIというWebインターフェースを使うことで色々情報を見ることが出来るのですが、その見方が今一つ分かっていなかったため、今回調べてみました。

PySparkで実行するコード

今回試しに動かしてみるのは以下のような、Food-101データセットのクラス別のデータ数を集計する処理です。

food101_dir = '/***/food101'

def path_to_index(path):
  return labels.index(path.split('/')[-2])

#Sparkで使用するユーザー定義関数にする
path2indexUDF = udf(lambda x:path_to_index(x),LongType()) 

df = spark.read.format('binaryFile')\
    .load(food101_dir + '/images/*')\
    .withColumn('label',path2indexUDF(col('path')))\
    .select('label','content')\
    .groupby(col('label'))\
    .count()

path2indexUDFはファイルパス文字列に含まれるその画像のクラス名を抽出し、インデックスに変換する処理です。

このコードを実行すると、対象になるファイルのリスト化の処理が実行されるものの、データの加工や集計処理はまだ実行されません。データを出力したり表示する指示がトリガーとなって一連の加工や集計処理が実行されます。

実行されるPlanの確認

実行する前に上記のコードがSparkでどのように解釈され、実行されるのかを確認することが出来ます。これらはPlanと呼ばれ、Logical PlanとPhysical Planの2種類があります。Logical Planは抽象的なもので、Logical Planを最適化して実際にSparkで実行される状態になったのがPhysical Planです。

PlanはDataFrameのexplain()メソッドでnotebook上で確認出来、extended=Trueのオプションで両方のLogical/Physical両方のPlanを表示することが出来ます。(extended=FalseだとPhysicalのみ)

df.explain(extended=True)
== Optimized Logical Plan ==
Aggregate [label#47L], [label#47L, count(1) AS count#59L]
+- Project [pythonUDF0#64L AS label#47L]
   +- BatchEvalPython [<lambda>(path#38)#46L], [pythonUDF0#64L]
      +- Project [path#38]
         +- Relation [path#38,modificationTime#39,length#40L,content#41] binaryFile

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[label#47L], functions=[finalmerge_count(merge count#63L) AS count(1)#58L], output=[label#47L, count#59L])
   +- Exchange hashpartitioning(label#47L, 200), ENSURE_REQUIREMENTS, [plan_id=39]
      +- HashAggregate(keys=[label#47L], functions=[partial_count(1) AS count#63L], output=[label#47L, count#63L])
         +- Project [pythonUDF0#64L AS label#47L]
            +- BatchEvalPython [<lambda>(path#38)#46L], [pythonUDF0#64L]
               +- FileScan binaryFile [path#38] Batched: false, DataFilters: [], Format: org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat@7e54979e, Location: InMemoryFileIndex(101 paths)[dbfs:/***/food101/images/Apple pie, dbfs:/mnt/s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<path:string>

PySparkで書いた処理がどんな順序でSparkによって実行されるか、大体掴むことが出来ます。"BatchEvalPython"でユーザー定義関数path2indexUDFが実行されるようです。"Project"は"プロジェクト"の意味ではなく"Projection"("投影")の意味です。Physical Planの方を眺めていると、"HashAggregate"が2回実行されていることが分かります。

HashAggregateは記述内容から、labelごとにデータ数をカウントする.groupby(col('label')).count()に当たる処理を行っていることが分かりますが、この処理が"Exchange"を挟んで2回実行されています。これはSparkによる並列処理を効率的に行う工夫で、最初に分散処理を担う各Workerで個別にHashAggregateを施してデータをコンパクトにしたあと、Exchangeによってその結果を集約し、次のHashAggregateで全体の集計処理を行っています。Exchangeで転送されるデータのサイズを出来るだけ小さくすることで、全体の処理時間を短くする工夫がSparkによって自動的に行われます。

Spark UI上での確認

上で書いた処理を実行し、その結果を20行表示させます。

df.show()

この行を実行することで、Planに従って処理が行われていきます。内部で実際にどんなことが行われているのかはSpark UIで確認することが出来、どこがボトルネックになったのかなど、診断することが出来ます。

Job/Stage/Task

Sparkでは一連の処理をまとめてJob, Stage, Taskとして扱います。Jobを細かくしたのがStage, Stageを細かくしたのがTaskです。Spark UIのJobsタブでは実行されたJobのタイムラインを確認することが出来ます。3つのJobが実行されていることが分かります。

Spark UIのJobsタブを表示

1つ目は指定されたディレクトリ配下のファイルやサブディレクトリのリスト化、2つ目が加工と集計処理、3つ目がshow()による結果の出力を行っており、やはり加工や集計処理の2つ目のJobが一番重たく、時間を費やしていることが分かります。

Jobについて、さらに詳細を確認することが出来ます。そのJobを構成するStageの情報を有向非巡回グラフ(DAG)などで確認することが出来ます。

加工集計をしているJobの詳細

ここからStageに関する詳細ページに飛ぶことが出来ます。Stageの詳細ページでは、そのStageで実行されたタスクのタイムラインを確認することが出来ます。途中でWorkerが追加され、それによってタスクの実行が並列で行われていることが分かります。

Spark UIのStagesタブで確認

Query

df.explain(extended=True)で確認したPlanがどのように実行されたのかを確認することが出来ます。こちらもDAGで確認することが出来ます。

Spark UIのSQL / DataFrameタブで確認出来ます。

それぞれのボックスを展開すると、その処理にどれくらいの時間がかかったのかなどを見ることが出来ます。(おそらくここで表示されているのは各Taskに費やした時間の合計で、並列化によって処理時間自体は圧縮されています)

Exchange前後のHashAggregateの詳細の比較

下のHashAggregateではかなりの処理時間をかけていますが、上のHashAggregateではわずかな時間で処理が完了できていることが分かります。並列数を増やす(Workerを増やす)ことで下のHashAggregateの処理の実時間は短縮出来そうなので、全体の処理時間をもう少し圧縮出来るのかもしれません。

また最初のHashAggregateの後のWorker間でデータをやり取りするExchangeについても詳細を確認出来ます。

Exchangeの詳細

ここでは費やした時間は3.9sで、全体で見ればそれほど気になる時間ではありません。ただこの処理を何回も繰り返すとボトルネックになりそうです。モデルの分散学習時は繰り返しこの処理が走りそうなので、気にした方が良さそうです。

まとめ

今回はSparkで処理を実行するとどんな処理が行われるのかについて、Spark UIを見ながら確認してみた話をご紹介しました。公開されているドキュメントや投稿を見ていると、処理の順序を変更するなどほんの少しの工夫で、Sparkの処理時間を改善出来た、といったこともあるようです。モデルの分散学習の時、どうしてこんなに時間がかかっているんだろう?と不思議に思うことが時々あるので、これからはSpark UIを見ながらボトルネックになっている箇所を特定し、対処できるようになりたいと思います。