CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

Databricks Jobsでのエラー通知の実装

こんにちは、テックラボの井上です。

テックラボでは内製で社内向けのモデリング&スコアリングツールを開発しています。 学習・推論エンジンにDatabricks/Notebookを使って開発・運用しています。

幸いなことにこれまでエラーは滅多に起きないのでエラー通知機能を実装していなかったのですが、 機能追加によりエラーが起きる状況も出てきたためエラー通知機能を実装しました。

実装方法として以下の2案を検討しました。
案1)Databricksのジョブ機能の活用
案2)IPythonのエラーハンドリング機能の活用

それぞれの案について説明をします。

案1)Databricksのジョブ機能の活用
以下のようなジョブ構成にします。タスクが失敗したときに分岐して通知するタスクを追加します。 手順としてはmodeling_errorというタスクを追加し、以下のような設定をします。

そして以下のようにタスクに紐づいたNotebookのコードを修正します。

後続タスクでget()で取得するときのtaskKeyは依存先のタスク名(Task Name)です。地味にに迷うポイントです。
pkey = dbutils.jobs.taskValues.get(taskKey="modeling", key="pkey")

taskValuesというものが出てきましたが、
これは高速で確実な「軽量 K/V メタデータ」交換を目的とした機能でジョブ実行に紐づく一時的な共有コンテキストです。
数KB程度までの小容量データの受け渡しが可能です。
参照は実行ジョブ内でのみ可能で、タスク間での参照範囲も以下の図のようになっています。


案2)IPythonのエラーハンドリング機能の活用
IPython (Interactive Python) は、Pythonをより効率的に、対話形式で使うために開発された「強化版の対話型シェル」でJupyterNotebookのカーネル基盤となっています。 IPythonにはイベントハンドラが設定でき、特定の動作(セルの実行前、実行後、シェルの起動時、または異常発生時)が発生した際に、自作の関数を自動的に実行させることができるようになります。
・get_ipython().events.register() ・・・主に正常系のフック
・get_ipython().set_custom_exec() ・・・主に異常系のフック
最初のセルに以下のようなコードを設定することで以降のどのセルでエラーが起きてもエラー通知がされるようになります。

import requests
from IPython import get_ipython #①

def alert_on_teams(shell, etype, evalue, tb, tb_offset=None): #②
    pj_info = f“fuga fuga"
    try:

        message = {
  …        
        }
        resp = requests.post(
                url=“https//hoge.hoge.teams”,
                data=json.dumps(message),
                headers={'Content-Type': 'application/json'}
        )
        resp.raise_for_status()
    except Exception as notify_err:
        print(f" Teamsに送信できませんでした。")

    return None

get_ipython().set_custom_exc((Exception,), alert_on_teams) #③

①get_ipythonをインポートして②エラーのときに実行する関数を用意し③ハンドラーの設定という流れでとても簡単です。

設計思想面では役割によりNotebook/タスクを分離できる案1がよいのですが、 Notebookが2つだけという現状を踏まえ、手軽に対応できる案2を採用しました。 今後、Notebookが増えてきとき改めて案1へのシフトを検討してみようと思います。