CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

Azure DatabricksでRAG AgentのModel Servingへのデプロイ・モニタリングまでを実装してみる!

こんにちは、CCCMKホールディングスTECH LABの三浦です。

あっという間に10月も終わりですね。朝は寒いけど昼間はまだ暖かい、今の時期は服を選ぶのにいつも苦労します・・・。

テストからPoCへ

前回、自分が書いたブログの内容を参照して回答してくれるRAG AgentをAzure Databricksで構築した話をまとめました。

techblog.cccmkhd.co.jp

前回はAgentに参照させるテキストデータをDatabricksのVector Searchを使ってVector Indexに格納し、Vector Indexにクエリを実行する処理をFunctionにしてUnity Catalogに登録、LLMにFunctionをToolとして与えてRAG Agentとしての動作をPlaygroundというテスト環境で確認するところまで進めることが出来ました。

Playgroundでは開発者が動作を確認することが出来ますが、本番運用を想定した場合、開発者以外のプロジェクトに関わるメンバー(ステークホルダー)が動作を確認しフィードバックをし、その結果を確認してRAG Agentの品質を向上していく必要が出てきます。

これを実現する仕組みを1から作るのはかなり大変ですが、DatabricksではModel ServingにAgentをデプロイすることでステークホルダーがAgentの回答に対するフィードバックを与えることが出来る機能が利用出来たり、運用後もAI Assistant機能を利用した継続的な回答の品質チェックが行える機能がパブリックプレビューで利用出来ることを知りました。

Agentの品質をモニタリングするダッシュボード

今回はPlaygroundからModel Servingへのデプロイ、モニタリングダッシュボードの起動まで実際に動かしてみたので、どんな作業を行ったのかをまとめてみたいと思います。

作業はサンプルのNotebookに従ってある程度進めることが出来るのですが、自分の環境では上手くいかない箇所があり、ところどころ変更を加えました。そういった箇所については具体的なコードの変更内容もご紹介したいと思います。

RAG Agentの概要

前回私が作ったRAG Agentは、私がこれまで書いた3つのブログの内容について参照し、回答してくれるAgentです。内部で動くLLMと埋め込みモデルはそれぞれ

  • LLM: gpt-4o
  • 埋め込みモデル: text-embedding-3-large

で、どちらもAzure OpenAI Serviceで提供されているものです。それらを外部モデルとしてModel Servingと連携し、Model Servingを経由して利用しています。

PlaygroundからExportする

Model ServingにAgentをデプロイするために、まずAgentをPythonのコードで定義してMLflowを使ってUnity CatalogのModelsに登録します。Playgroundから、それを実現するために必要な実装がある程度済んだNotebook一式を出力することが出来ます。

Playgroundの画面でLLMとToolを選んでAgentとしての動作を確認した後、Playgroundの画面の"Export"というボタンをクリックします。

"Export"をクリックします。

自分のWorkspaceの中に自動的にディレクトリが作られ、以下の様に2つのNotebookとconfigファイルが生成されます。

ExportされたNotebookとconfig

これらのファイルの役割は次の通りです。

  • config.yml
    Agentが依存する他のModel Serving EndpointやFunctionなどが記載された設定ファイル。

  • agent
    Agentを定義する処理が記載されたNotebook。LangGraphで記述されています。

  • driver
    agentで定義されたAgentをUnity CatalogのModelsに登録し、Model ServingにデプロイするNotebook。

デプロイまでの作業をざっくりとまとめると、

  1. agentのNotebookを確認、適宜変更を加えてテスト

  2. driverのNotebookを確認、適宜変更を加えて実行してデプロイ

という流れになります。

Vector Indexにクエリ実行するFunctionが動かない・・・

Playgroundで出力したagentのNotebookを実行したところ、私の環境では上手く動かないところがありました。

私がPlaygroundで試したAgentはToolとしてVector Indexにクエリを実行するUnity Catalog登録済みのFunctionを与えていたのですが、このFunctionをLangChainのToolとして実行すると、1件もデータが取得出来ない、という現象が発生しました。

※実行したコマンド

uc_function_tools[0].invoke({"query":"ブログの作者の三浦"})

※実行結果

AssertionError: Statement execution succeeded but no result was provided.

実行はしたものの、結果が1件も取得出来ていないようです。

ただ次のようにして同じクエリを実行した場合は結果を取得することが出来ました。

※実行コマンド

spark.sql(
    """
    SELECT 
        * 
    FROM catalog_name.schema_name.get_blog_contents(query=>'ブログの作者の三浦')
    """
).show()

私が作ったVector Indexは埋め込みモデルとしてAzure OpenAI Serviceのものを使っているため、もしかしたらそちらの認証周りで失敗しているのかな?などなど、色々原因が考えられそうです。ちょっと解決が難しそうだったので、今回はUnity CatalogのFunctionは使わずに実装することにしました。

agent Notebookの変更箇所

PlaygroundからExportしたagent Notebookに対して変更を加えた場所に絞って変更内容をご紹介します。

追加パッケージのインストール

Vector Indexにアクセスする処理を実装するため、databricks-vectorsearchというライブラリを追加でインストールしました。

%pip install -U -qqqq mlflow-skinny langchain==0.2.16 langgraph-checkpoint==1.0.12 langchain_core langchain-community==0.2.16 langgraph==0.2.16 pydantic databricks-vectorsearch
dbutils.library.restartPython()

configの記述と出力

Exportされた"config.yml"はFunctionを使う想定で書かれていて内容に相違があるため、自分でconfigの内容を再設定して出力するようにしました。

またAgentをデプロイした後にエラーが発生したのですが、その原因がAgentがAzure OpenAI ServiceのモデルをModel Serving経由で使おうとした時に発生した権限エラーによるものでした。

Agentはデプロイ後、Workspace内のさまざまなリソースにアクセスします。一部のリソースはデプロイしたユーザーと同じ権限でアクセス出来るように自動的に設定されるようですが、外部モデルと連携したModel Servingについては手動で設定が必要なようです。

解決策として今回はconfigファイルにDatabricksのホストURLやAccess Tokenを書き込んで渡す方法を取りました。Access TokenはDatabricksのSecretを経由してconfigに書き込むようにしたのですが、もう少しいい方法がないかな、と考えています。

※DatabricksのSecretを作成し、Access Tokenを設定した時の手順

  1. Databricks CLIをインストール
     WindowsのWSL(Ubuntu)に入れました。

  2. ログイン
    terminalなどで、databricks auth login --host https:xxx.azuredatabricks.net/を入力してログイン

  3. Scopeの作成
    databricks secrets create-scope Scope名
     を実行

  4. Secretの作成
    databricks secrets put-secret Scope名 Secret名
     を実行

  5. Secretの値入力 設定する値を聞かれるので、Databricksで発行したAccess Tokenを入力

Secret設定後、次の処理をagent Notebookで実行します。

# オリジナルのconfigファイルを作成
import mlflow
import yaml

config_file_name = "rag_agent_config.yaml"

databricks_host = "https://xxxxx.azuredatabricks.net"
databricks_token = dbutils.secrets.get(
  # tokenを設定したSecretのScope名とSecret名を指定する
    scope="scope_name", 
    key="secret_name"
)

catalog_name = "catalog_name"
schema_name = "schema_name"
index_name = "blog_chunked_table_index"

rag_agent_config = {
    "agent_prompt": "あなたはユーザーの質問に回答出来るアシスタントです。ステップバイステップで考え、回答してください。",
    "databricks_resources": {
        "databricks-host": databricks_host,
        "databricks-token": databricks_token,
        "llm_endpoint_name": "gpt-4o",
        "vector_search_endpoint_name": "blog_vector_search",
    },
    "input_example": {
        "messages": [{"content": "ブログを書いた三浦さんの所属会社は?", "role": "user"}]
    },
    "llm_config": {
        "llm_parameters": {"max_tokens": 1500, "temperature": 0.01},
    },
    "retriever_config": {
        "chunk_template": "Passage: {chunk_text}\n",
        "data_pipeline_tag": "poc",
        "parameters": {"k": 5, "query_type": "hybrid"},
        "schema": {"chunk_text": "text", "primary_key": "id"},
        "vector_search_index": f"{catalog_name}.{schema_name}.{index_name}",
    },
}

# configファイルの書きこみ。
try:
    with open(config_file_name, 'w') as f:
        yaml.dump(rag_agent_config, f)
except:
    print('pass to work on build job')

# configの読み込み
config = mlflow.models.ModelConfig(development_config=config_file_name)

# set up MLflow traces
mlflow.langchain.autolog()

LLMとToolの定義

LangChainを使ってLLMとToolを定義します。

import os

from databricks.vector_search.client import VectorSearchClient
from langchain_community.chat_models import ChatDatabricks
from langchain_community.vectorstores import DatabricksVectorSearch
from langchain.tools.retriever import create_retriever_tool

# 設定
db_config = config.get("databricks_resources")
rv_config = config.get("retriever_config")
vs_schema = rv_config.get("schema")

# 認証の設定
os.environ["DATABRICKS_HOST"] = db_config.get("databricks-host")
os.environ["DATABRICKS_TOKEN"] = db_config.get("databricks-token")
    
# Create the llm
llm = ChatDatabricks(endpoint=db_config.get("llm_endpoint_name"))

# Create Retrieval Tool
client = VectorSearchClient(disable_notice=True)

# Vector Index
vs_index = client.get_index(
    endpoint_name=db_config.get("vector_search_endpoint_name"),
    index_name=rv_config.get("vector_search_index")
)

# Retriever
vector_search_as_retriever = DatabricksVectorSearch(
    vs_index,
    text_column=vs_schema.get("chunk_text"),
    columns=[
        vs_schema.get("chunk_text")
    ],
).as_retriever(search_kwargs=rv_config.get("parameters"))


# Retrieverを呼び出すToolの作成
retrieval_tool = create_retriever_tool(
    retriever=vector_search_as_retriever,
    name="get_blog_contents",
    description="このツールはテクロジーについてのブログの内容から情報を検索することが出来ます。"
)

tools = [retrieval_tool]

agentの中でLangGraphのcreate_tool_calling_agentを用いてAgentを定義する箇所があります。そこで渡されるToolはPlaygroundでExportされたものだとuc_function_toolsになっていますが、上記のコードで定義したtoolsを渡すように変更しました。

agent Notebookの変更箇所は以上です。

次はdriver Notebookの変更箇所についてまとめます。

driverの変更箇所

追加パッケージのインストール

agentと同様Vector Indexにアクセス出来るようdatabricks-vectorsearchを追加でインストールしました。

%pip install -U -qqqq databricks-agents mlflow langchain==0.2.16 langgraph-checkpoint==1.0.12  langchain_core langchain-community==0.2.16 langgraph==0.2.16 pydantic databricks-vectorsearch
dbutils.library.restartPython()

Agentのロギングの見直し

MLflowでAgentを登録する箇所にも変更を加えました。input_sampleを登録するAgentにマッチしたものに変更し、model_configをagent Notebookで新しく作成したrag_agent_config.yamlを参照するように変更しました。

また、pip_requirementsを指定した場合、Model Servingにデプロイする時のビルド処理で失敗することがあったので、今回は省くようにしました。

# Log the model to MLflow
import os
import mlflow

input_example = {
    "messages": [
        {
            "role": "user",
            "content": "ブログを書いた、三浦さんの所属は?"
        }
    ]
}

with mlflow.start_run():
    logged_agent_info = mlflow.langchain.log_model(
        lc_model=os.path.join(
            os.getcwd(),
            'agent',
        ),
#        pip_requirements=[
#            f"langchain==0.2.16",
#            f"langchain-community==0.2.16",
#            f"langgraph-checkpoint==1.0.12",
#            f"langgraph==0.2.16",
#            f"pydantic",
#        ],
#        model_config="config.yml",
        model_config="rag_agent_config.yaml",
        artifact_path='agent',
        input_example=input_example,
    )

Evaluate Dataの変更

デプロイ前にAgentの検証を行う処理がdriver Notebookに含まれています。検証に使うデータを今回のAgentにマッチしたものに変更しました。

import pandas as pd

eval_examples_messages =  [
    {
        "role": "user",
        "content": "Foundation Model APIを使って何かソリューションを考えて。"

    },
    {
        "role": "user",
        "content": "ブログの作者の三浦さんの所属している会社は?"
    },
    {
        "role": "user",
        "content": "GraphRAGはどんな時に役立ちますか?"
    },
    {
        "role": "user",
        "content": "GraphRAGで作成したKnowledge-GraphをPlotlyで描画するコードを作って。"
    },
    {
        "role": "user",
        "content": "GraphRAGでKnowledge-Graphが生成される過程を教えて。"
    },
]

eval_data = [
    {
        "request": {
            "messages":[message]
        }
    } for message in eval_examples_messages
]

eval_dataset = pd.DataFrame(eval_data)
display(eval_dataset)

このデータセットに対してAgentに回答を生成させ、その結果について検証が行われます。これらの処理は mlflow.evaluatemodel_type="databricks-agent"というオプション付きで実行することで行われます。

内部ではAgentが回答を生成、DatabricksのAI Assistantが回答の評価検証を行い結果をMLflowのEvaluation resultsに記録しています。検証後は以下のような画面で結果を確認することが出来ました。

検証結果

あんまり合格率(Pass: 2/5)が良くないですね・・・。さらに詳細を確認することも出来ます。

個々のQ&Aについて詳細を確認出来ます。

抽出した参考情報と質問の関連性が低い、と判断される傾向があるようです。取るべき情報としては間違っていないと思うのですが、もう少しチャンクのサイズを小さくした方がいいかも、といった改善策が考えられます。

今回はこのままデプロイまで進めました。

Agent(モデル)を登録する先の指定

現在の環境に合わせたCatalog名、Schema名、Model名を設定し、AgentをUnity CatalogのModels配下に登録します。

mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
# 使用している環境に合わせて設定する。
catalog = "Catalog名" 
schema = "Schema名"
model_name = "my_blog_rag_agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, 
    name=UC_MODEL_NAME
)

あとはデプロイを実行するだけです。

from databricks import agents

# Deploy the model to the review app and a model serving endpoint
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version)

完了までは結構時間がかかります(15分くらい)。デプロイ開始からModel ServingのServing endpointsのリストにAgentが追加され、Stateが"Ready"になれば無事にデプロイが済み、利用出来るようになります。

Stateが"Ready"になると利用出来るようになります。

Review Appを使う

デプロイが完了すると、Review AppというAgent評価用のアプリが立ち上がり、アクセス出来るようになります。

Serving endpointsからAgentの詳細ページに入り、右上の"Use"のリストから"Open review app"を選択します。

"Open review app"をクリック

Review AppではPlaygroundと同じようにチャット形式でAgentの動作を確認することが出来ます。それだけでなく、Review AppではAgentの結果に対するフィードバックを与えることも出来ます。なかなか自前でフィードバックもらう仕組みを作るのは大変なので、とても便利だな、と思いました。

Review Appでチャット形式で確認

それに対するフィードバックを与えることが出来ました。

APIで利用する

Model ServingにAgentがデプロイされると、APIを使ってDatabricksの外からもAgentを利用出来るようになります。

たとえば次のようなPythonのコードで利用することが出来ます。

import json
import requests

endpoint = "https://xxxx.azuredatabricks.net/serving-endpoints/{endpoint_name}/invocations"
token = "{databrick-token}"

headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

data = {
    "messages":[
        {
            "role":"user",
            "content": "ブログを書いた三浦さんの所属"
        }
    ],
    "max_tokens": 100,
    "temperature": 0.1
}
response = requests.post(
    endpoint,
    data=json.dumps(data),
    headers=headers
)

print(response.json())

Agentのモニタリングダッシュボードを起動する

Agentがデプロイされると"{agent_name}_payload", "{agent_name}_payload_assessment_logs", "{agent_name}_payload_request_logs"という3つのTableがAgentと同じCatalog/Schema配下に生成され、Agentの呼び出し履歴が自動的に記録されるようになります。

最後にこれらのデータとmlflow.evaluateによる評価結果を可視化出来るAgentモニタリングダッシュボードの起動方法について、まとめます。

ダッシュボード起動Notebookの取得

ダッシュボードを起動するNotebookはこちらのページからダウロードすることが出来ます。

learn.microsoft.com

Notebookの変更

このNotebookをインポートして対象のAgentやデータ取得サンプリングレートなどを設定するとダッシュボードが起動出来るのですが、そのまま実行するとエラーが発生しました。

具体的にはAgentが検索処理(Retriever)で取得した内容に"None"が含まれている場合に起因するエラーで、Notebookの該当箇所は"Run evaluate on unprocessed rows"というタイトルがついたセルです。

あまりきれいな対応ではないのですが、暫定的に次のように変更しました。

※タイトル: "Run evaluate on unprocessed rows"のセルに対する変更

from databricks.rag_eval import env_vars

eval_df = spark.sql(f"""
                    SELECT 
                      `timestamp`,
                      databricks_request_id as request_id, 
                      from_json(request_raw, 'STRUCT<messages ARRAY<STRUCT<role STRING, content STRING>>>') AS request,
                      trace
                    FROM {eval_requests_log_table_name} 
                    WHERE run_id="to_process"                  
                    """)

eval_pdf = eval_df.toPandas().drop_duplicates(subset=["request_id"])

# 追加した関数
def fill_null_retrieved_context(rcs):
  """
  eval_resultsの"retrieved_context"の欠損値を埋める
  """
  if rcs is None:
    return [{"doc_id":"No Result","content":"No Result"}]
  else:
    filled_rcs = []
    for rc in rcs:
      if rc["doc_uri"] is None:
        rc["doc_uri"] = "No Result"
      elif rc["content"] is None:
        rc["content"] = "No Result"
      filled_rcs.append(rc)
    return filled_rcs

if eval_pdf.empty:
  print("[Warning] No new rows to process.")
else:
  with mlflow.start_run() as run:
    ###############
    # CONFIG: Adjust mlflow.evaluate(...) to change which Databricks LLM judges are run. By default, judges that do not require ground truths
    # are run, including groundedness, safety, chunk relevance, and relevance to query. For more details, see the documentation:
    # AWS documentation: https://docs.databricks.com/en/generative-ai/agent-evaluation/advanced-agent-eval.html#evaluate-agents-using-a-subset-of-llm-judges
    # Azure documentation: https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/advanced-agent-eval#evaluate-agents-using-a-subset-of-llm-judges
    ###############
    for eval_pdf_batch in split_df_by_hour(eval_pdf, max_samples_per_hours=env_vars.RAG_EVAL_MAX_INPUT_ROWS.get()):
      eval_results = mlflow.evaluate(data=eval_pdf_batch, model_type="databricks-agent")


      # "eval_results"の"retrieved_context"がRetrieverToolを使わないとNoneになり、書き込み時にエラーになるので暫定対応
      eval_results_table = eval_results.tables['eval_results']
      eval_results_table["retrieved_context"] = \
        eval_results_table["retrieved_context"].apply(fill_null_retrieved_context)
      
      results_df = (
        spark
        #.createDataFrame(eval_results.tables['eval_results']) # 差し替え
        .createDataFrame(eval_results_table)
        .withColumn("databricks_request_id", F.col("request_id"))
        .withColumn("run_id", F.lit(run.info.run_id).cast("string"))
        .withColumn("experiment_id", F.lit(experiment_id).cast("string"))
 ・・・

起動するとユーザーによるフィードバックの結果やAI Assistantによる評価結果を可視化することが出来ます。

AI Assistantによる評価結果

ユーザーによるフィードバックの確認

Notebookを実行すると"{agent_name}_payload_request_logs_eval"というテーブルが生成され、AI Assistantによる評価結果が格納されます。このNotebookをjobを通じて定期実行することで、本番運用しながらAgentの回答結果を継続的に評価し、モニタリングをすることが可能になります。

まとめ

前回の記事と今回の記事で、DatabricksでRAG Agentに必要になるデータの収集から開発、本番運用、モニタリングまでの一連の流れを実装することが出来ました。RAG Agentの開発は試行錯誤したり、たくさんのステークホルダーからの要望が必要になることが多いのですが、これらが考慮された機能になっていると感じました。RAG Agentの精度改善に向けた実験環境としても、活用していきたいと思います。