CCCMKホールディングス TECH Labの Tech Blog

TECH Labスタッフによる格闘記録やマーケティング界隈についての記事など

ブログタイトル

Azure Machine Learning(Azure ML)のPipelineを使ってバッチ推論処理を実装してみました。

こんにちは、CCCMKホールディングス技術開発の三浦です。

久しぶりに家の机の周りの片づけをしました。片づけをしていてここのところ頭を悩ませるのが、たくさんの電源コードの上手な扱い方についてです。パソコンをはじめ、スマートフォン、タブレットなど、いろんなデバイスのコードがどうしてもごちゃごちゃしてしまい、なんとかスッキリ出来ないかな・・・と考えています。でももしかしたらあと数年後には電源回りのコードレス化が進むのかな・・・とも想像したりします。

さて、最近毎月一回、多数の画像データに対して機械学習モデルを適用し、推論結果をCSVファイルで出力する方法について色々考えていました。以前Azure Machine Learningを通じてモデルの推論機能をAPIで提供する方法についてご紹介しました。

techblog.cccmk.co.jp

この時は1件のデータを受け取るたびに推論結果を返す、オンラインのモデルの使い方をしていましたが、今回は一度に多数のデータを受け取り、モデルで推論し、推論した結果を1つのCSVにまとめて出力する、という使い方です。

Azure MLではこのようなバッチ処理をPipelineを使って実現することが出来ます。今回はその方法について調べてみたことをご紹介したいと思います。

バッチ推論処理全体像

今回Azure MLで作ったバッチ推論処理の全体像は以下の様になります。

バッチ推論処理全体

まずバッチ処理を施したい画像データをAzure StorageのContainerの中に月別のフォルダを作成し格納します。Azure MLでは処理対象のデータ群はDatasetとして表現されるため、バッチ処理実行月のフォルダを処理対象のDatasetにします。

あとはAzure MLのModelsに予め登録されている機械学習モデルとそれを実行する環境(コンテナ)をEnvironmentsから取得し、Compute Clusterで実行させ、その結果をAzure MLのデフォルトで用意されているStorageにCSVファイルとして出力します。

この全体像を実現するために、Azure MLでは機械学習に必要となる概念をどのように表現しているのかを調べる必要がありました。今回は特にDatastore, DatasetおよびPipelineについて最初に簡単に触れておきたいと思います。

Azure MLのDatastoreとDataset

Azure MLではデータはDatastoreDatasetという概念で管理されます。 DatastoreはAzure Datalake Storageなど、データの格納場所を表現しており、Datasetは処理の対象になるデータの集まりそのものを表現しています。 ですのでAzure Machine Learningで何かデータを処理対象にしたい場合、まずどこにデータを見にいけばよいのか(どこのStorage?どこのDB?など)をDatastoreで示し、どのデータを見るのかをDatasetで示します。DatastoreおよびDatasetの作成はPythonのSDKで行うことが出来ます。

Datastoreを作成するコードの例です。

# Azure Storage AccountのContainerを参照するAzure ML Datastore作成
from azureml.core import Datastore

container_name='***' #接続するContainer
account_name='***' #接続するStorage Account
sas_token='***' #SAS token
datastore_name = '***' #Azure ML上のDatastore名

blob_datastore = Datastore.register_azure_blob_container(workspace=ws, 
                                                         datastore_name=blob_datastore_name, 
                                                         container_name=container_name, 
                                                         account_name=account_name,
                                                         sas_token=sas_token
)

Datasetを作成するコードの例です。

# 当月フォルダ配下のファイルを参照するDatasetの作成
from azureml.core import Dataset
import datetime

dataset_name = '***' #作成するDataset名
datastore = Datastore.get(ws, datastore_name=datastore_name)
month = datetime.datetime.now().strftime('%Y%m')
ds = Dataset.File.from_files((datastore, f'/{month}'))
ds = ds.register(ws, name=dataset_name,create_new_version=True)

Azure MLのPipeline

Azure MLでは1つの機械学習タスクを複数のStepで構成されたPipelineとして表現します。Stepとしては例えばモデルに入力する前に必要になるデータの前処理や、推論処理などが挙げられます。Step間でデータを受け渡すことが可能ですし、Stepごとにその処理に使用する環境やAzure MLのCompute(VMのクラスタ)を指定することが出来ます。

また、Pipelineの実行は何らかのアクションをトリガーに起こすことが可能です。例えば指定の時間になったらPipelineを実行する、といったことも可能です。

Pipeline実行までの手順

ではここからは推論バッチ処理を実行するPipelineを作成し、実行するまでの手順についてご紹介していきます。今回は全てPythonのSDKを使って作業を行いました。

Stepを実行するComputeの作成

PipelineのStepを実行するAzure MLのCompute(VMのクラスタ)を作成します。VMのサイズやクラスタを構成する最大ノード数などを指定します。VMサイズはGPUタイプも選択でき、コストを抑えるため低優先度(LowPriority)での使用も可能です。

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# make gpu cluster
infer_cluster_name = 'infer-cluster'

try:
  infer_cluster = ComputeTarget(workspace=ws, name=infer_cluster_name)
  print('Found existing cluster, use it.')
except ComputeTargetException:
  compute_config = AmlCompute.provisioning_configuration(
    vm_size='Standard_NC4as_T4_v3',
    max_nodes=1,
    vm_priority='lowpriority'
  )
  infer_cluster = ComputeTarget.create(ws, infer_cluster_name, compute_config)
  infer_cluster.wait_for_completion(show_output=True)

Stepで実行する処理を記述したPythonスクリプトファイルの作成

PipelineのStepで実行する処理はPythonのスクリプトで記述出来ます。Azure MLに登録したモデルをAzure MLのModelsからロードし入力されたデータに対して推論を行う、という処理を実行します。たとえば以下のような内容のスクリプトになります。

from azureml.core import Model
from PIL import Image
import torch

def preprocess(img):
  ...

def init():
  # Runs when the pipeline step is initialized
  global model

  # load the model
  model_path = Model.get_model_path('***') # Azure ML Modelsに登録されているModel名
  model = torch.load(model_path).eval()

def run(mini_batch):
  # This runs for each batch
  resultList = []

  # process each file in the batch
  for f in mini_batch:
    input_img = Image.open(f)
    input_tensor = torch.unsqueeze(preprocess(input_img), axis=0)
    predict = model(input_tensor).detach().tolist()
    resultList.append({'img':f, 'score':predict})
    
  return resultList

init()関数はStepの最初に実行され、この中で推論に使用するモデルをロードします。

そしてrun()関数が推論処理のメインになる部分で、引数には複数のデータがバッチで渡されます。run()関数の中でバッチに含まれるデータを一つずつ処理していく流れになります。バッチを取得するDatasetはこのスクリプトを実行するStepを作成する時に指定します。またバッチサイズはStepの実行に関する設定で指定することが出来ます。

Step実行に関する設定

Step実行に関する各種設定を行います。Stepを作成するクラスはいくつかの種類がありますが、大量のデータを非同期で並列処理する用途に長けたParallelRunStepクラスを使用して推論用のStepを作成します。ParallelRunStepの設定には、ParallelRunConfigクラスを使用します。

ParallelRunConfigにはStepの実行時に使用するAzure MLのEnvironment(Pythonのバージョンや依存ライブラリ等の環境)や実行するPythonスクリプトファイル、バッチサイズや処理を実行するComputeなどを指定することが出来ます。

from azureml.pipeline.steps import ParallelRunConfig
from azureml.core import Environment

env = Environment.get(ws,'***') #Azure MLに登録済みのEnvironment名

parallel_run_config = ParallelRunConfig(
    source_directory='/script_dir_path/',
    entry_script="infer.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=env,
    compute_target=infer_cluster,
    node_count=1
)

Stepの作成

PipelineのStepを作成します。今回の推論StepはParallelRunStepクラスを使用して作成します。作成時に指定が必要になるのが先ほど作成したParallelRunConfigオブジェクトと入力するDataset、そして結果を出力するDatasetです。入力するDatasetの指定はDatasetConsumptionConfigを、出力するDatasetの指定はOutputFileDatasetConfigを使用します。入力するDatasetはバッチ処理を実行する月によって変わるため、Stepを含むPipeline全体を実行する時にパラメータとして指定できるようにしたいと考えました。Pipeline実行時に指定するパラメータはPipelineParameterによって取得することが出来ます。

from azureml.core import Dataset
from azureml.pipeline.steps import PythonScriptStep,ParallelRunStep
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import PipelineParameter

# input
default_ds = Dataset.get_by_name(ws, name='inference_data')
input_ds = PipelineParameter(name="input_ds", default_value=default_ds)
infer_data_config = DatasetConsumptionConfig("infer_data", input_ds).as_mount()

# output
# DatastoreはAzure MLのデフォルトを使用
outputstore = Datastore.get(ws, datastore_name='workspaceblobstore')
output_config = OutputFileDatasetConfig('score_data',(outputstore,'scoring/predictions.csv'))

infer_step = ParallelRunStep(
    name='inference',
    parallel_run_config=parallel_run_config,
    inputs=[infer_data_config],
    output=output_config,
    arguments=[],
    allow_reuse=True
)

Pipelineの作成と実行

Stepを使用してPipelineを作成します。複数のStepを繋げて1つのPipelineを作成出来ますが、今回は1つのStepで構成されたPipelineです。Pipelineを作成した後、その時点で分かるエラーがあるかどうかをvalidate()メソッドを呼び出すことで確認することが出来ます。

from azureml.pipeline.core import Pipeline
pipeline = Pipeline(ws,steps=[infer_step])
pipeline.validate()

Azure MLでのPipelineの実行はExperimentクラスを通じて行います。Experimentクラスのsubmit()にPipelineとパラメータを渡すことでPipelineがAzure MLで実行されます。パラメータにこのPipelineを実行する月に応じたDatasetを指定するようにしました。

from azureml.core import Experiment
import datetime

#実行月の取得
month = datetime.datetime.now().strftime('%Y%m')
exp = Experiment(ws, "azureml-pipeline")
input_ds = Dataset.File.from_files((datastore, f'/{month}'))
input_ds = input_ds.register(ws, name="inference_data",create_new_version=True)
run = exp.submit(pipeline,pipeline_parameters={'input_ds':input_ds})

Azure Machine Learning StudioでPipelineの実行の様子を確認することが出来ます。

Azure Machine Learning Studioで表示したPipeline

実行が正常に完了したら、OutputFileDatasetConfigで指定した場所にCSVファイルが生成され、推論結果が出力されていることを確認することが出来ます。

バッチ推論処理で生成されたCSVファイル

まとめ

今回はAzure MLのPipelineを使い、多数のデータを一度にまとめて推論処理をする方法について調べたことをご紹介しました。以前調べたオンライン(リアルタイム)推論も含めると、機械学習モデルの活用方法の幅がだいぶ広がったな、と感じました。Azure MLではオンラインでもバッチでも、推論処理を行うスクリプトはほとんど同じ内容で対応出来るため、どちらの形式でモデルを利用するか、スムーズに切り替えられて便利です。

今回、Pipelineの実行は手動で行いましたが、スケジュールを組んで自動実行出来る方法もあるようなのでそちらも調べていきたいと思います。