CCCMKホールディングス TECH Labの Tech Blog

TECH Labスタッフによる格闘記録やマーケティング界隈についての記事など

Azure DatabricksのSparkで深層学習モデルの分散学習をしました。

こんにちは、技術開発の三浦です。

この前の休日はとても暑くて、ベランダに打ち水をしてみてもたった30分で乾いてしまうような、それくらいの暑さでした。何か夏らしいことを、ということで、かき氷機を出してかき氷を作ってみました。冷たくて美味しくて大満足だったので、今年の夏はたくさんかき氷を作ろうと思いました。

今回はAzure DatabricksのSpark上で、TensorFlowで書いた深層学習モデルの分散学習をした話を紹介したいと思います。

分散学習の必要性

画像やテキストなどのデータを分析する方法として、深層学習を選択することは当たり前になってきたと思います。

深層学習のモデルは柔軟に構造を決めることが出来、高機能なモデルになるほど構造が複雑になる傾向があります。

モデルの構造が複雑になると学習対象のパラメータの数が膨大になりますし、そもそも入力データのサイズが大きいので、「学習に時間がかかる」「データが大きすぎてメモリに乗りきらない」という問題にちょくちょくぶつかります。

こういった問題に対応する方法として、データや処理ステップを分割して複数のコンピュータやGPUで分散して学習させる、分散学習の必要性が出てきます。

Sparkと深層学習

Databricksで使われているApache Sparkは、主にビッグデータを処理するための分散処理システムです。分散処理の方法として「MapReduce」というプロセスに基づいた方法が採用されています。

一方で深層学習の分散処理においてはMPI(Message Passing Interface)という方法が使用されていて、これを実装したオープンソースのフレームワークがHorovodです。

SparkにおいてもMPIに近い「Barrier Mode」という方法が導入されて、Spark上での深層学習の分散学習が可能になったようです。

DatabricksにおいてはSparkのBarrier ModeによってHorovodを利用することが出来ます。HorovodRunnerというAPIを通じてSparkのClusterを構成するworkerに実行プログラム(関数)を送り、SparkのBarrier ModeによってSparkのJobに組み込まれたHorovodのJobが実行されるようです。

以下を参照にさせて頂きました。

HorovodRunner: distributed deep learning with Horovod | Databricks on AWS

HorovodRunnerによるTensorFlowモデルの分散学習

実際にAzure DatabricksでHorovodRunnerを使ってTensorFlowモデルの分散学習を実現する手順を調べてみました。

1プロセスで実行可能なプログラムの作成

最初に1プロセスで実行可能な学習用プログラムを作成します。今回は「Food-101」データセットを使用して、食べ物の画像を101に分類する分類モデルをTensorFlowで作りました。

モデルの構造は、EfficentNetB0をベースにしてFlatternDenseレイヤを接続して作りました。EfficentNetB0はImageNetデータセットで事前学習済みのものを使用し、モデルの学習時はFlattenレイヤとDenseレイヤの間にDropoutレイヤを接続しました。(dropout rateは0.3)

また、GANsやTransformerを学習するときはfit()メソッドを使わずに独自の学習ステップを実装することが多いので、今回もtf.GradientTapeを使った学習ステップを実装し、学習ループ上で実行する方法でモデルの学習を行いました。

学習データ全体を繰り返す回数epochは12に設定しました。以下にコードを掲載します。

import tensorflow as tf
import tensorflow_datasets as tfds
import mlflow

#あらかじめdownloadしたものを使用
data = tfds.load('food101',data_dir='path/',download=False)

# Data Augmentation用のレイヤ
augmentation = tf.keras.Sequential([
  tf.keras.layers.Resizing(224, 224),
  tf.keras.layers.RandomFlip("horizontal"),
  tf.keras.layers.RandomRotation(0.2),
  tf.keras.layers.RandomZoom(.2, .2),
])

# train/validデータに分ける
train_datasets = data['train']
train_datasets = train_datasets.map(lambda x: (augmentation(x['image']),x['label']))
valid_datasets = data['validation']
valid_datasets = valid_datasets.map(lambda x: (tf.keras.layers.Resizing(224,224)(x['image']),x['label']))

# モデル定義
class FoodClassifier(tf.keras.Model):
  def __init__(self, num_class, drop_out_rate):
    super(FoodClassifier,self).__init__()
    self.num_class = num_class
    self.base_model = tf.keras.applications.efficientnet.EfficientNetB0(
      include_top=False,
      weights='imagenet'
    )
    self.drop_out = tf.keras.layers.Dropout(drop_out_rate)
    self.last_layer = tf.keras.layers.Dense(num_class)
    self.flatten_layer = tf.keras.layers.Flatten()
    
  def call(self, img, training):
    x = tf.keras.layers.Resizing(224, 224)(img)
    x = tf.keras.applications.efficientnet.preprocess_input(x)
    x = self.base_model(x)
    x = self.flatten_layer(x)
    x = self.drop_out(x, training=training)
    output = self.last_layer(x)
    return output

# loss
def loss(pred, real):
  loss_ = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True
  )
  return loss_(real, pred)

# optimizer
optimizer = tf.keras.optimizers.Adam()

# 学習状況の記録用
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
val_loss = tf.keras.metrics.Mean(name='validation_loss')
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='validation_accuracy')

# モデル構築
model = FoodClassifier(101, 0.3)

# 学習ステップ
@tf.function
def train_step(img, label):
  with tf.GradientTape() as tape:
    pred = model(img, training=True)
    loss_value = loss(pred, label)
  
  gradients = tape.gradient(loss_value, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  train_loss.update_state(loss_value)
  train_accuracy.update_state(label, pred)

# 学習ループ
import time
# setting
run_name = 'base_line'
epochs = 12
bacth_size = 64
fix_train_datasets = train_datasets.shuffle(50000).batch(bacth_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
fix_valid_datasets = valid_datasets.batch(bacth_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

experiment = mlflow.set_experiment('path/')

with mlflow.start_run(run_name=run_name) as run:
  for epoch in range(epochs):
    epoch_start = time.time()
    train_loss.reset_states()
    train_accuracy.reset_states()
    for (iter,(img, label)) in enumerate(fix_train_datasets):
      train_step(img, label)
      if iter % 100 == 0:
        mlflow.log_metrics({
          'train_loss':train_loss.result().numpy(),
          'train_accuracy': train_accuracy.result().numpy()
        })
    # validationデータによる測定
    val_loss.reset_states()
    val_accuracy.reset_states()

    for (img, label) in fix_valid_datasets:
      pred = model.predict(img)
      val_loss.update_state(loss(pred, label))
      val_accuracy.update_state(label, pred)
    
    #1epoch実行にかかった時間
    epoch_end = time.time()
    duration = epoch_end - epoch_start
    
    mlflow.log_metrics({
      'validation_loss':val_loss.result().numpy(),
      'validation_accuracy':val_accuracy.result().numpy(),
      'duration':duration
    },step=epoch)
  mlflow.keras.log_model(model,'model')

結果はMLflowを使い、Azure DatabricksのExperimentsに記録するようにしました。

問題なく動くことを確認したら、このプログラムをHorovodRunnerで実行できるように変更していきます。

MLflowの設定

まずworkerからMLflowを使用するため認証情報を取得します。HorovodRunnerで動かす上で、今回一番苦戦したところかもしれません。

必要な情報はDatabricksのホストURLユーザのAccessトークンです。ホストURLはAzure DatabricksポータルのURLの、「~xxxx.azuredatabricks.net/」の部分で、AccessトークンはAzure DatabricksポータルのUser Settingsから取得することが出来ます。

Accessトークンの作成画面

それぞれの値を変数に保存しておきます。

databricks_token = "access token"
databricks_host = 'host url'

分散学習の方針を決める

分散学習の方法として、Batchサイズを分割する、学習データを分割する、などを選択することが出来ますが、今回はepoch数を分割し、1つのworkerで実行するepoch数を減らすことで全体の処理時間を短くする方針を取りました。

一連の処理をまとめた関数を用意する

次に1プロセスで動作確認したプログラム全体をまとめて関数にします。この関数がHorovodRunnerによってworkerに送られます。

def train_hvd():
  import tensorflow as tf
  import tensorflow_datasets as tfds
  import mlflow
  import horovod.tensorflow as hvd
  import os

  # Initialize Horovod
  hvd.init()

  gpus = tf.config.experimental.list_physical_devices('GPU')
  for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
  if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],'GPU')
...

train_hvd()の中身は先のコードとほぼ同じなのですが、いくつか変更が必要です。具体的な実装内容についてはこちらの記事を参考にしました。

Horovod with TensorFlow — Horovod documentation

上の記事に従って変更を加えていけば動かすことは出来たので、MLflowの設定やlearning rateの変更などについてのみ、ここで説明します。

MLflowの設定

先ほど定義した変数を使って、workerの環境変数に接続情報をセットします。

mlflow.set_tracking_uri("databricks")
os.environ['DATABRICKS_HOST'] = databricks_host
os.environ['DATABRICKS_TOKEN'] = databricks_token
experiment = mlflow.set_experiment('path/')
learning rateの変更

各Horovodのworkerで計算された重みの勾配は、初期設定ではHorovodのworker全体で平均されます。すると1プロセスで実行したときと比べ、Horovodのworker数(hvd.size())分縮小された値になってしまうのでlearning rateをhvd.size()倍して対処します。(後ほど触れますが、実はこの変更では上手くいきませんでした。)

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())

HorovodRunnerの実行

あとはHorovodRunnerを実行して、先ほど作成したtrain_hvd()関数を各Horovod workerに実行させます。HorovodRunner(np=np)npで動かすHorovodのworker数を指定することが出来ます。

from sparkdl import HorovodRunner

with mlflow.start_run(experiment_id=experiment.experiment_id,run_name=run_name) as run:
  active_run_uuid = mlflow.active_run().info.run_uuid
  hr = HorovodRunner(np=np)
  model = hr.run(train_hvd)
  #モデルの保存
  mlflow.keras.log_model(model,'model')

実行結果

Horovodを使わずに1プロセスで実行した結果をベースライン(base_line)にして、プロセス数npを2~4まで変更させて実行した結果(dl_with_horovod_np_x)をまとめてみました。

比較表

処理時間(Duration)

np=4の時が圧倒的に処理時間が短い結果になりました。

精度(train_accuracy/train_loss, validation_accuracy/validation_loss)

しかし、npを増やすと精度が落ちてしまう傾向があるようです。

考察

Horovodを使うと処理時間はかなり短縮することが出来たのですが、モデルの精度も落ちてしまう結果となりました。

気になっていたのは先述したlearning rateをHorovod worker数(hvd.size())倍している点でした。今回lossの計算をする時にreduce_mean()などで集計をしていないので、もしかしたらlearning reteをhvd.size()倍する処理は要らないのでは・・・と気になっていた箇所です。

learning rateを変更せずに再度np=4で試してみました。

#optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
optimizer = tf.keras.optimizers.Adam(0.001)

ベースライン、np=4でlearning rateを変更した場合、np=4でlearning rateを変更しなかった場合の結果を以下に掲載します。

真ん中がnp=4でlearning rateをそのまま使用した場合です

ベースラインに比べるとまだ精度は劣りますが、先ほどのlearning rateを変更した場合に比べればかなり精度を向上させることが出来ました。処理時間が大幅に減少していることを考慮すれば、Horovodを使った分散学習はかなり効果的だと言えそうです。

まとめ

ということで、今回はAzure DatabricksのSparkで深層学習モデルの分散学習をした話を紹介させて頂きました。Azure DatabricksではHorovodRunnerやHorovodがあらかじめインストールされているのですぐに使うことが出来ます。今度はGANsの分散学習にチャレンジしたいと思います!