CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

LangGraphとDatabricksのGenieによるTable参照AgentアプリをModel Servingで動かしてみました。

はじめに

こんにちは、CCCMKホールディングスAIエンジニアの三浦です。

前回Azure DatabricksでUnity CatalogのTableの内容を参照して回答するAgentアプリケーションをLangGraphで構築した話をご紹介しました。

techblog.cccmkhd.co.jp

Notebookで動かすところまでは進められたものの、この機能をAPIで提供しようとDatabricksのModel Servingにデプロイしたところ、Model ServingからTableが参照できない、といった課題が見つかりました。

どうやったら解決できるんだろう、と色々調べていたところ、"Genie"というDatabricksの機能を使うとやりたいことが実現できることが分かりました。

今回はGenieとLangGraphでAgentアプリを構築し、Model Servingにデプロイ、Model Serving Endpointにリクエストを投げて結果を得るところまでを実現することが出来たので、ご紹介したいと思います!

Genie

Genieは自然言語でUnity Catalogに格納されたデータに問い合わせをすることが出来るDatabricksの機能です。

learn.microsoft.com

このドキュメントにはGenieがどのように動作するのかが説明されています。その中で特に

  • Genieはユーザーの質問に対しSQLクエリを生成し、クエリを実行し、結果を返す
  • SQLクエリを生成する際はユーザーの質問と対象のTableのメタデータを参照する
  • SQLの生成はAzureの場合、Azure OpenAI Serviceのモデルを使用する
  • Azure OpenAI ServiceにはTableのローデータは送信されない

といった点が重要なポイントだと思います。

ドキュメントを読み、Genieの処理の内容をまとめてみた図がこちらです。

Genieの処理をまとめてみました

アプリケーションの処理の全体図

前回作ったアプリケーションの処理は以下のようになっていました。

前回作成したアプリケーションの処理の概要

この図の中の"create_sql"と"SQLクエリ実行"の部分を、今回はGenieに置き換えます。GenieへのアクセスはAPIで可能です。前回と同様、全体の処理はAgent開発フレームワークLangGraphで実装し、Nodeの中にGenieの呼び出し処理を実装します。

今回の処理は次のようになります。

TableへのアクセスはGenieが担当する

また、今回もHugging Faceのこちらのデータセットを参照データとして使用しています。

huggingface.co

アプリケーション構築手順

ここからはアプリケーションを構築し、Model Servingにデプロイするまでの手順をまとめます。

Genieの作成

最初にDatabricksでGenieの作成を行います。具体的には参照させるUnity CatalogのTableの指定と、質問とSQLクエリの例を設定しました。

Genieの設定

触ってみた感じですが、質問とSQLクエリの例は含めたほうがよいと思いました。たとえば"エネルギーにあふれた曲を教えて"という質問に対し、SQLクエリ例がないと属性値"energy"の降順でソートした結果を全件返すSQLクエリが生成されたのですが、理想はその上位数件を返す"LIMIT"を含めたSQLクエリです。そこでそれを例として与えると、望ましい動きを実現することが出来ました。

また、Genieの設定画面のURL文字列を見ると"https://adb-xxxxx.azuredatabricks.net/genie/rooms/xxxxx?sp=..."のようになっているのですが、"room/"以降の文字列は"Genie Space ID"と呼ばれ、APIでGenieにアクセスする際に必要になる情報です。

PATのシークレットへの登録

GenieにAPIでアクセスする際にPersonal Access Token(PAT)が必要になります。PATを作成し、シークレットに登録する作業を行いました。

PATの発行はこちらの手順で行いました。

learn.microsoft.com

シークレットへの登録はこちらの手順に記載されている、Databricks CLIを使用した方法で登録しました。

learn.microsoft.com

NotebookからGenieにアクセスする

ここからは具体的な実装に入りますが、実装はこちらのページに掲載されているExample Notebookの内容に沿っています。

learn.microsoft.com

まずLangGraphでアプリを開発し、Unity CatalogのModelとして登録するまでに必要になるライブラリ一式をインストールします。

%pip install databricks-langchain databricks-agents langgraph>=0.3 uv
dbutils.library.restartPython()

"uv"というライブラリはPythonのPackageとProject管理の役割を担っていて、これから作成するアプリケーションの実行環境をModel Serving上に再現する時に必要になります。

次にGenieにアクセスするために必要な各種情報を設定します。必要になるのは"Genie Space ID"とDatabricksのWorkspaceのURL、そしてPATです。PATはシークレットから取得します。

import os

from dbruntime.databricks_repl_context import get_context

GENIE_SPACE_ID = "Genie Space ID"

secret_scope_name = "PATが登録されたシークレットスコープ名"
secret_key_name = "PATが登録されたシークレットキー名"

os.environ["DB_MODEL_SERVING_HOST_URL"] = "https://" + get_context().workspaceUrl
os.environ["DATABRICKS_GENIE_PAT"] = dbutils.secrets.get(
    scope=secret_scope_name, key=secret_key_name
)

これらの情報を使って、Genieにアクセスすることが出来るようになります。

import os

from databricks_langchain.genie import GenieAgent
from databricks.sdk import WorkspaceClient

description = \
"""トラックID、アーティスト名、アルバム名、トラック名、人気度、トラックの長さ、ダンス適性、エネルギー、キー、ラウドネス、モード、スピーチネス、アコースティックネス、インストゥルメンタルネス、ライブネス、ヴァレンス、テンポ、タイムシグネチャー、トラックジャンルといった、音楽に関する情報を持ち、検索することが出来ます。"""

genie = GenieAgent(
    genie_space_id=GENIE_SPACE_ID,
    genie_agent_name="music-answer-agent",
    description=description,
    client=WorkspaceClient(
        host=os.getenv("DB_MODEL_SERVING_HOST_URL"),
        token=os.getenv("DATABRICKS_GENIE_PAT")
    )
)

input = {
    "messages": [{"role":"user","content":"落ち着いた雰囲気にマッチしたトラックを検索して。"}]
}
genie.invoke(input)

次のような結果が得られました。入力した自然言語から、SQLクエリが生成され、そのクエリを実行したことで得られた結果が返ってきています。

NotebookでGenieを実行した結果

LangGraphのGraphをスクリプトに書き出す

Notebookの1つのセルに、LangGraphのGraphを記述し、スクリプトに書き出します。セルに先頭に%%writefile genie_agent.pyのように記述して実行すると、セルの内容がスクリプトに書き出されます。

%%writefile genie_agent.py
import os

from typing import Annotated, Any, Generator, Literal, Optional
from typing_extensions import TypedDict

from databricks_langchain import ChatDatabricks
from databricks_langchain.genie import GenieAgent
from databricks.sdk import WorkspaceClient
from langgraph.graph import StateGraph, END
from langgraph.graph.state import CompiledStateGraph
from langgraph.graph.message import add_messages
from langgraph.types import Command
import mlflow
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import(
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)


LLM_ENDPOINT = "databricks-meta-llama-3-3-70b-instruct"
GENIE_SPACE_ID = "Genie Space ID"

# Genie
description = \
"""トラックID、アーティスト名、アルバム名、トラック名、人気度、トラックの長さ、ダンス適性、エネルギー、キー、ラウドネス、モード、スピーチネス、アコースティックネス、インストゥルメンタルネス、ライブネス、ヴァレンス、テンポ、タイムシグネチャー、トラックジャンルといった、音楽に関する情報を持ち、検索することが出来ます。"""

genie = GenieAgent(
    genie_space_id=GENIE_SPACE_ID,
    genie_agent_name="music-answer-agent",
    description=description,
    client=WorkspaceClient(
        host=os.getenv("DB_MODEL_SERVING_HOST_URL"),
        token=os.getenv("DATABRICKS_GENIE_PAT")
    )
)

# LLM
llm = ChatDatabricks(endpoint=LLM_ENDPOINT)

class State(TypedDict):
    messages: Annotated[list, add_messages]
    query_result: str

class Rooting(TypedDict):
    direction: Literal["refer_table", "normal_chat"]

def rooting(state: State):
    """StructuredOutputでクエリに対してDB問い合わせが必要かどうか判断する"""
    query = state["messages"][-1].content
    llm_with_structured = llm.with_structured_output(Rooting)
    prompt = \
    f"""与えられた質問に対し、音楽作品やアーティストの参照が必要な場合は"refer_table"を、それ以外は"normal_chat"を出力してください。
    
    質問: {query}"""
    output = llm_with_structured.invoke(prompt)

    if output["direction"] == "refer_table":
        return Command(goto="refer_table")
    else:
        return Command(goto="normal_chat")

def normal_chat(state: State):
    """ユーザーの入力に対してデータベースを参照せずに回答する"""
    query = state["messages"][-1].content
    prompt = \
    f"""ユーザーの入力に回答してください。
    入力: {query}
    回答: """
    output = llm.invoke(prompt)
    return Command(
        goto=END, 
        update={
            "messages": [
                {"role":"assistant","content":output.content,"id":output.id}
            ]
        }
    )

def refer_table(state: State):
    """ユーザーのクエリに回答するために必要な情報をGenieを使って取得する"""
    genie_response = genie.invoke(
        {"messages":[state["messages"][-1]]}
    )["messages"][-1].content
    return Command(goto="answer", update ={"query_result": genie_response})


def answer(state: State):
    """ユーザーの質問にデータベースの結果を使って回答する"""

    query = state["messages"][-1].content
    query_result = state["query_result"]
    analyze_prompt = \
    f"""あなたは音楽に詳しいアシスタントです。
    与えられた情報は、ユーザーの質問に対し答えるために必要な情報をデータベースから抽出したものです。
    この情報を使ってユーザーの質問に答えて下さい。

    # データベースから取得した情報
    {query_result}

    質問: {query}
    回答: """

    output = llm.invoke(analyze_prompt)
    return Command(
        goto=END, 
        update ={"messages": [{"role":"assistant","content":output.content,"id":output.id}]}
    )

# Build Graph
def build_graph():
    graph_builder = StateGraph(State)
    graph_builder.add_node("rooting", rooting)
    graph_builder.add_node("normal_chat", normal_chat)
    graph_builder.add_node("refer_table", refer_table)
    graph_builder.add_node("answer", answer)
    graph_builder.set_entry_point("rooting")
    return graph_builder.compile()

class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent
    
    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:     
        request = {"messages": self._convert_messages_to_dict(messages)}

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                if not node_data:
                    continue
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {"messages": self._convert_messages_to_dict(messages)}
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
                )

mlflow.langchain.autolog()
app = build_graph()
AGENT = LangGraphChatAgent(app)
mlflow.models.set_model(AGENT)

Graphの登録

スクリプトに書き出したGraphをUnity CatalogのModelに登録します。最初にMLflowに記録し、そこからUnity Catalogに登録する、という手順を取ります。

以下でMLflowへの記録を行います。

import mlflow
from mlflow.models.resources import (
  DatabricksServingEndpoint,
  DatabricksGenieSpace
)
from pkg_resources import get_distribution

from genie_agent import GENIE_SPACE_ID, LLM_ENDPOINT 

# 関連するリソース
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT),
    DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID)
]
input_example = {"messages":[{"role":"user","content":"こんにちは"}]}

with mlflow.start_run():
  logged_agent_info = mlflow.pyfunc.log_model(
    python_model="genie_agent.py",
    artifact_path="agent",
    input_example=input_example,
    extra_pip_requirements=[
        f"databricks-connect=={get_distribution('databricks-connect').version}"
    ],
    resources=resources
  )

次にUnity CatalogのModelに登録します。

mlflow.set_registry_uri("databricks-uc")

catalog = "Catalog名"
schema = "Schema名"
model_name = "Model名"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

ここまではすべて1つのNotebook内で実行しました。

Model Servingへのデプロイ

ここからは別のNotebookで作業を進めました。

必要なライブラリのインストールをします。

%pip install databricks-agents
dbutils.library.restartPython()

次を実行すると、Unity Catalogに登録したModelをModel Servingにデプロイすることが出来ます。 環境変数にPATを登録したシークレットの情報を渡す必要があります。

from databricks import agents

secret_scope_name = "PATが登録されたシークレットスコープ名"
secret_key_name = "PATが登録されたシークレットキー名"

uc_model_name = "Catalog名.Schema名.Model名"
version = 1
deployment = agents.deploy(
    uc_model_name, 
    version,
    environment_vars={
        "DATABRICKS_GENIE_PAT": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
    }
)

# Retrieve the query endpoint URL for making API requests
deployment.query_endpoint

これで新しいModel Serving Endpointが生成され、デプロイが完了するとモデルを利用することが出来るようになります。

Model Serving Endpointの使い方

デプロイが完了すると、DatabricksのUI上でReview Appを起動して会話形式でテストをすることが出来ます。

Review Appでテスト

また、次のようにCurlコマンドを利用して利用することも出来ます。PATとEndpointのURLをセットする必要があります。

curl \
  -u token:xxxxxxx \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"messages":[{"role":"user","content":"気分が上がる音楽教えて"}]}'\
  https://xxxxxxx/invocations

結果

{"messages":[{"role":"assistant","content":"気分が上がる音楽ですね。\n\nデータベースから抽出した情報によると、以下の楽曲が気分を上げるのに適しているようです。\n\n* Amigo Charly Brown by Jürgen Drews....."}}

まとめ

今回は前回実現することが出来なかった、Tableの情報を参照して回答出来るアプリケーションをModel Servingで提供するということを、DatabricksのGenieを使って実現してみました。以前から実現してみたかったことだったので、一通り自分で完遂することが出来てよかったです!