こんにちは、CCCMKホールディングス技術開発の三浦です。
久しぶりに家の机の周りの片づけをしました。片づけをしていてここのところ頭を悩ませるのが、たくさんの電源コードの上手な扱い方についてです。パソコンをはじめ、スマートフォン、タブレットなど、いろんなデバイスのコードがどうしてもごちゃごちゃしてしまい、なんとかスッキリ出来ないかな・・・と考えています。でももしかしたらあと数年後には電源回りのコードレス化が進むのかな・・・とも想像したりします。
さて、最近毎月一回、多数の画像データに対して機械学習モデルを適用し、推論結果をCSVファイルで出力する方法について色々考えていました。以前Azure Machine Learningを通じてモデルの推論機能をAPIで提供する方法についてご紹介しました。
この時は1件のデータを受け取るたびに推論結果を返す、オンラインのモデルの使い方をしていましたが、今回は一度に多数のデータを受け取り、モデルで推論し、推論した結果を1つのCSVにまとめて出力する、という使い方です。
Azure MLではこのようなバッチ処理をPipelineを使って実現することが出来ます。今回はその方法について調べてみたことをご紹介したいと思います。
バッチ推論処理全体像
今回Azure MLで作ったバッチ推論処理の全体像は以下の様になります。
まずバッチ処理を施したい画像データをAzure StorageのContainerの中に月別のフォルダを作成し格納します。Azure MLでは処理対象のデータ群はDatasetとして表現されるため、バッチ処理実行月のフォルダを処理対象のDatasetにします。
あとはAzure MLのModelsに予め登録されている機械学習モデルとそれを実行する環境(コンテナ)をEnvironmentsから取得し、Compute Clusterで実行させ、その結果をAzure MLのデフォルトで用意されているStorageにCSVファイルとして出力します。
この全体像を実現するために、Azure MLでは機械学習に必要となる概念をどのように表現しているのかを調べる必要がありました。今回は特にDatastore, DatasetおよびPipelineについて最初に簡単に触れておきたいと思います。
Azure MLのDatastoreとDataset
Azure MLではデータはDatastoreとDatasetという概念で管理されます。 DatastoreはAzure Datalake Storageなど、データの格納場所を表現しており、Datasetは処理の対象になるデータの集まりそのものを表現しています。 ですのでAzure Machine Learningで何かデータを処理対象にしたい場合、まずどこにデータを見にいけばよいのか(どこのStorage?どこのDB?など)をDatastoreで示し、どのデータを見るのかをDatasetで示します。DatastoreおよびDatasetの作成はPythonのSDKで行うことが出来ます。
Datastoreを作成するコードの例です。
# Azure Storage AccountのContainerを参照するAzure ML Datastore作成 from azureml.core import Datastore container_name='***' #接続するContainer account_name='***' #接続するStorage Account sas_token='***' #SAS token datastore_name = '***' #Azure ML上のDatastore名 blob_datastore = Datastore.register_azure_blob_container(workspace=ws, datastore_name=blob_datastore_name, container_name=container_name, account_name=account_name, sas_token=sas_token )
Datasetを作成するコードの例です。
# 当月フォルダ配下のファイルを参照するDatasetの作成 from azureml.core import Dataset import datetime dataset_name = '***' #作成するDataset名 datastore = Datastore.get(ws, datastore_name=datastore_name) month = datetime.datetime.now().strftime('%Y%m') ds = Dataset.File.from_files((datastore, f'/{month}')) ds = ds.register(ws, name=dataset_name,create_new_version=True)
Azure MLのPipeline
Azure MLでは1つの機械学習タスクを複数のStepで構成されたPipelineとして表現します。Stepとしては例えばモデルに入力する前に必要になるデータの前処理や、推論処理などが挙げられます。Step間でデータを受け渡すことが可能ですし、Stepごとにその処理に使用する環境やAzure MLのCompute(VMのクラスタ)を指定することが出来ます。
また、Pipelineの実行は何らかのアクションをトリガーに起こすことが可能です。例えば指定の時間になったらPipelineを実行する、といったことも可能です。
Pipeline実行までの手順
ではここからは推論バッチ処理を実行するPipelineを作成し、実行するまでの手順についてご紹介していきます。今回は全てPythonのSDKを使って作業を行いました。
Stepを実行するComputeの作成
PipelineのStepを実行するAzure MLのCompute(VMのクラスタ)を作成します。VMのサイズやクラスタを構成する最大ノード数などを指定します。VMサイズはGPUタイプも選択でき、コストを抑えるため低優先度(LowPriority)での使用も可能です。
from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # make gpu cluster infer_cluster_name = 'infer-cluster' try: infer_cluster = ComputeTarget(workspace=ws, name=infer_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration( vm_size='Standard_NC4as_T4_v3', max_nodes=1, vm_priority='lowpriority' ) infer_cluster = ComputeTarget.create(ws, infer_cluster_name, compute_config) infer_cluster.wait_for_completion(show_output=True)
Stepで実行する処理を記述したPythonスクリプトファイルの作成
PipelineのStepで実行する処理はPythonのスクリプトで記述出来ます。Azure MLに登録したモデルをAzure MLのModelsからロードし入力されたデータに対して推論を行う、という処理を実行します。たとえば以下のような内容のスクリプトになります。
from azureml.core import Model from PIL import Image import torch def preprocess(img): ... def init(): # Runs when the pipeline step is initialized global model # load the model model_path = Model.get_model_path('***') # Azure ML Modelsに登録されているModel名 model = torch.load(model_path).eval() def run(mini_batch): # This runs for each batch resultList = [] # process each file in the batch for f in mini_batch: input_img = Image.open(f) input_tensor = torch.unsqueeze(preprocess(input_img), axis=0) predict = model(input_tensor).detach().tolist() resultList.append({'img':f, 'score':predict}) return resultList
init()
関数はStepの最初に実行され、この中で推論に使用するモデルをロードします。
そしてrun()
関数が推論処理のメインになる部分で、引数には複数のデータがバッチで渡されます。run()
関数の中でバッチに含まれるデータを一つずつ処理していく流れになります。バッチを取得するDatasetはこのスクリプトを実行するStepを作成する時に指定します。またバッチサイズはStepの実行に関する設定で指定することが出来ます。
Step実行に関する設定
Step実行に関する各種設定を行います。Stepを作成するクラスはいくつかの種類がありますが、大量のデータを非同期で並列処理する用途に長けたParallelRunStep
クラスを使用して推論用のStepを作成します。ParallelRunStep
の設定には、ParallelRunConfig
クラスを使用します。
ParallelRunConfig
にはStepの実行時に使用するAzure MLのEnvironment(Pythonのバージョンや依存ライブラリ等の環境)や実行するPythonスクリプトファイル、バッチサイズや処理を実行するComputeなどを指定することが出来ます。
from azureml.pipeline.steps import ParallelRunConfig from azureml.core import Environment env = Environment.get(ws,'***') #Azure MLに登録済みのEnvironment名 parallel_run_config = ParallelRunConfig( source_directory='/script_dir_path/', entry_script="infer.py", mini_batch_size="5", error_threshold=10, output_action="append_row", environment=env, compute_target=infer_cluster, node_count=1 )
Stepの作成
PipelineのStepを作成します。今回の推論StepはParallelRunStep
クラスを使用して作成します。作成時に指定が必要になるのが先ほど作成したParallelRunConfig
オブジェクトと入力するDataset、そして結果を出力するDatasetです。入力するDatasetの指定はDatasetConsumptionConfig
を、出力するDatasetの指定はOutputFileDatasetConfig
を使用します。入力するDatasetはバッチ処理を実行する月によって変わるため、Stepを含むPipeline全体を実行する時にパラメータとして指定できるようにしたいと考えました。Pipeline実行時に指定するパラメータはPipelineParameter
によって取得することが出来ます。
from azureml.core import Dataset from azureml.pipeline.steps import PythonScriptStep,ParallelRunStep from azureml.data.dataset_consumption_config import DatasetConsumptionConfig from azureml.data import OutputFileDatasetConfig from azureml.pipeline.core import PipelineParameter # input default_ds = Dataset.get_by_name(ws, name='inference_data') input_ds = PipelineParameter(name="input_ds", default_value=default_ds) infer_data_config = DatasetConsumptionConfig("infer_data", input_ds).as_mount() # output # DatastoreはAzure MLのデフォルトを使用 outputstore = Datastore.get(ws, datastore_name='workspaceblobstore') output_config = OutputFileDatasetConfig('score_data',(outputstore,'scoring/predictions.csv')) infer_step = ParallelRunStep( name='inference', parallel_run_config=parallel_run_config, inputs=[infer_data_config], output=output_config, arguments=[], allow_reuse=True )
Pipelineの作成と実行
Stepを使用してPipelineを作成します。複数のStepを繋げて1つのPipelineを作成出来ますが、今回は1つのStepで構成されたPipelineです。Pipelineを作成した後、その時点で分かるエラーがあるかどうかをvalidate()
メソッドを呼び出すことで確認することが出来ます。
from azureml.pipeline.core import Pipeline pipeline = Pipeline(ws,steps=[infer_step]) pipeline.validate()
Azure MLでのPipelineの実行はExperimentクラスを通じて行います。Experimentクラスのsubmit()
にPipelineとパラメータを渡すことでPipelineがAzure MLで実行されます。パラメータにこのPipelineを実行する月に応じたDatasetを指定するようにしました。
from azureml.core import Experiment import datetime #実行月の取得 month = datetime.datetime.now().strftime('%Y%m') exp = Experiment(ws, "azureml-pipeline") input_ds = Dataset.File.from_files((datastore, f'/{month}')) input_ds = input_ds.register(ws, name="inference_data",create_new_version=True) run = exp.submit(pipeline,pipeline_parameters={'input_ds':input_ds})
Azure Machine Learning StudioでPipelineの実行の様子を確認することが出来ます。
実行が正常に完了したら、OutputFileDatasetConfig
で指定した場所にCSVファイルが生成され、推論結果が出力されていることを確認することが出来ます。
まとめ
今回はAzure MLのPipelineを使い、多数のデータを一度にまとめて推論処理をする方法について調べたことをご紹介しました。以前調べたオンライン(リアルタイム)推論も含めると、機械学習モデルの活用方法の幅がだいぶ広がったな、と感じました。Azure MLではオンラインでもバッチでも、推論処理を行うスクリプトはほとんど同じ内容で対応出来るため、どちらの形式でモデルを利用するか、スムーズに切り替えられて便利です。
今回、Pipelineの実行は手動で行いましたが、スケジュールを組んで自動実行出来る方法もあるようなのでそちらも調べていきたいと思います。