こんにちは、CCCMKホールディングス AIエンジニアの三浦です。
最近はだいぶ暖かくなって、新緑がまぶしい季節になりました。外に出るのが気持ちのいい時期は一年の中で意外と限られているので、しっかりと堪能したいです。
さて前回SnowflakeのCortex Analystという機能を使って自然言語でTableに問い合わせ、データを取得する、という流れを試してみました。
前回はSnowflakeのWeb UI(Snowsight)上で動作を確認したのですが、やはり外部のアプリケーションからCortex Analystを使えると色々とやれることが広がりそうだなと感じていました。
そこで今回はCortex Analystの機能をAPIで実行し、Tableの分析を行うことが出来るAgentをLangGraphで作ってみました。その内容についてまとめていきたいと思います。
Cortex AnalystのAPIについて
Cortex AnalystのAPIの情報は、公式のドキュメントを参照しました。
Pythonの具体的な実装方法は以下のサンプルコードを参考にしています。
ドキュメントを見ると、APIのレスポンスの形式は一般的なLLMのAPIと同様になっていることが分かります。ドキュメントに掲載されているExampleでは以下のような例が提示されています。
{ "request_id": "75d343ee-699c-483f-83a1-e314609fb563", "message": { "role": "analyst", "content": [ { "type": "text", "text": "We interpreted your question as ..." }, { "type": "sql", "statement": "SELECT * FROM table", "confidence": { "verified_query_used": { "name": "My verified query", "question": "What was the total revenue?", "sql": "SELECT * FROM table2", "verified_at": 1714497970, "verified_by": "Jane Doe" } } } ] }, "warnings": [ { "message": "Table table1 has (30) columns, which exceeds the recommended maximum of 10" }, { "message": "Table table2 has (40) columns, which exceeds the recommended maximum of 10" } ] }
message
というキーに格納されたrole
とcontent
というキーを持つ辞書型のデータを参照すればよいことが分かります。この例を見て知ったのですが、Cortex AnalystのAPIは生成したSQLを返しますが、そのSQLを実行した結果は返しません。なのでTableを分析する機能を実装する場合には、返ってきたSQLを実行する処理の実装が別途必要になることが分かります。
実装
ではここからは具体的な実装の内容を紹介します。今回のコードはAzure DatabricksのNotebookで実装しました。Pythonのバージョンは3.12です。また、Agentで使うLLMはAzure OpenAI Serviceの"gpt-4o"を利用しました。
また、今回使用したデータセットは前回のCortex Analystの検証の時と同様、Hugging Faceで公開されているこちらのデータセットです。
準備
まず必要なライブラリをインストールします。Snowflakeへの接続情報などは".env"に書き込んだので、それらの情報を読み込んで環境変数にセットするためにpython-dotenv
をインストールしています。
%pip install langchain langchain-openai langgraph python-dotenv requests snowflake-connector-python dbutils.library.restartPython()
それからSnowflake接続に必要な設定やLLMの設定などを行いました。
# initializer from dotenv import load_dotenv import os from langchain_core.tools import tool from langchain_openai import AzureChatOpenAI from langgraph.prebuilt import create_react_agent import snowflake.connector load_dotenv() HOST = f'{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com' WAREHOUSE = os.environ.get("SNOWFLAKE_WAREHOUSE") ACCOUNT = os.environ.get("SNOWFLAKE_ACCOUNT") USER = os.environ.get("SNOWFLAKE_USER") PASSWORD = os.environ.get("SNOWFLAKE_PASSWORD") ROLE = os.environ.get("SNOWFLAKE_ROLE") conn = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, host=HOST, port=443, warehouse=WAREHOUSE, role=ROLE, ) token = conn.rest.token HEADERS = { "Authorization": f'Snowflake Token="{token}"', "Content-Type": "application/json", } CORTEX_ANALYST_ENDPOINT = f'https://{HOST}/api/v2/cortex/analyst/message' SEMANTIC_MODEL = f'@{os.environ.get("SNOWFLAKE_DB")}' +\ f'.{os.environ.get("SNOWFLAKE_SCHEMA")}' + \ f'.{os.environ.get("SNOWFLAKE_STAGE")}' + \ f'/{os.environ.get("SNOWFLAKE_SEMANTIC")}' LLM = AzureChatOpenAI(model="gpt-4o", api_version="2025-01-01-preview") PROMPT = "あなたは様々な楽曲について答えることが出来ます。"
Cortex Analystを呼び出すToolの定義
今回のAgentには自然言語の質問を入力すると、それに対応するデータを取得することが出来るToolを渡します。このToolは以下のようにquery_table
という関数で実装しました。
# define tool from datetime import datetime import json import requests from typing import Dict, List from langchain_core.tools import tool import pandas as pd @tool def query_table(query: str) -> Dict: """ 楽曲の情報、アーティスト、アルバムの情報が格納されたテーブルに自然言語で問い合わせを行い結果を得ることが出来る。 格納されている情報は定量的な情報がメインなので、質問は曖昧なものにせず、具体的なものにすること。 input: query: 入力クエリ 例: 最もエネルギッシュな曲のタイトルは? output: 問い合わせ結果(辞書型) sql: 実施したSQL result: SQLを実行して取得したデータ """ data = { "messages": [ { "role": "user", "content": [ { "type": "text", "text": query } ] } ], "semantic_model_file": SEMANTIC_MODEL } # Cortex Analyst APIを実行する。 response = requests.post(url=CORTEX_ANALYST_ENDPOINT, data=json.dumps(data), headers=HEADERS) # レスポンスのパース response_json = json.loads(response.content) sql = "" for message in response_json["message"]["content"]: # SQLだけ取り出す。 if message["type"] == "sql": sql = message["statement"] # SnowflakeにSQLクエリを発行し、結果を取得する。 result_df = pd.read_sql(sql,conn) # テーブルの行数が大きすぎる場合を想定し、最初の30件までに制限する。 result_df = result_df.head(30) # カンマ区切りの文字列に変換 result_df_str = result_df.to_csv(index=False) return {"sql": sql, "result": result_df_str}
Cortex Analyst APIを実行した後に生成されたSQLをpandas
のread_sql
を使って実行しました。返ってきた結果をカンマ区切りの文字列に変換してLLMに渡すのですが、SQLの実行結果の行数が大きくなるとLLMのトークン数の制限に引っかかり、LLMに入力出来なくなるため、多くても30件になるようにしました。
ReAct Agentの構築
あとはlanggraph
のcreate_react_agent
を使い、定義したToolを渡してAgentを構築します。ReActはAgentの動作を決めるフレームワークで、Agentにユーザーの要望を満たすために必要なToolの選択や実行、実行結果による次の行動を自律的に実行することを可能にします。
tools = [query_table] agent = create_react_agent(LLM, tools, prompt=PROMPT)
実行結果
たとえば連休明けなので"連休明けなので、少しずつテンションがあげられる曲を探して。"というクエリを与えてみます。
query = "連休明けなので、少しずつテンションがあげられる曲を探して。" messages = {"messages":[{"role":"user","content": query}]} agent.invoke(messages)
Agentの処理の様子を見ると、まずLLMが以下のように2つのクエリを生成しCortex Analyst APIに渡していました。
"テンポが平均以上の曲トップ5を知りたい。"に対し、Cortex Analystが生成したSQLは以下のようになっていました。
WITH __spotify_tracks AS ( SELECT "track_name" AS track_name, "tempo" AS tempo FROM db.schema.spotify_tracks), avg_tempo AS ( SELECT AVG(tempo) AS avg_tempo FROM __spotify_tracks ), top_tempo_tracks AS ( SELECT track_name, tempo FROM __spotify_tracks WHERE tempo > ( SELECT avg_tempo FROM avg_tempo ) ) SELECT track_name, tempo FROM top_tempo_tracks ORDER BY tempo DESC NULLS LAST LIMIT 5
結果、次のような結果をAgentが生成しました。
音楽に疎いのでこの結果が良いのか判断がつかないのですが、途中で実行されている処理は納得感があるように感じます。今度この曲を聞いてみたいな、と思いました。
まとめ
ということで今回はSnowflakeのCortex AnalystのAPIを利用してLangGraphのAgentにToolとして与えることで、楽曲について教えてくれるAgentを作ってみました。前回の記事の内容と合わせ、Cortex Analystの作り方から実践的な使い方までを触れることが出来たと思います。あとは回答精度をどうやって向上できるのか、といったことに今後取り組んでみたいと思います。