CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

Snowflake Cortex AnalystとLangGraphでテーブルデータ分析Agentを作る。

こんにちは、CCCMKホールディングス AIエンジニアの三浦です。

最近はだいぶ暖かくなって、新緑がまぶしい季節になりました。外に出るのが気持ちのいい時期は一年の中で意外と限られているので、しっかりと堪能したいです。

さて前回SnowflakeのCortex Analystという機能を使って自然言語でTableに問い合わせ、データを取得する、という流れを試してみました。

techblog.cccmkhd.co.jp

前回はSnowflakeのWeb UI(Snowsight)上で動作を確認したのですが、やはり外部のアプリケーションからCortex Analystを使えると色々とやれることが広がりそうだなと感じていました。

そこで今回はCortex Analystの機能をAPIで実行し、Tableの分析を行うことが出来るAgentをLangGraphで作ってみました。その内容についてまとめていきたいと思います。

Cortex AnalystのAPIについて

Cortex AnalystのAPIの情報は、公式のドキュメントを参照しました。

docs.snowflake.com

Pythonの具体的な実装方法は以下のサンプルコードを参考にしています。

github.com

ドキュメントを見ると、APIのレスポンスの形式は一般的なLLMのAPIと同様になっていることが分かります。ドキュメントに掲載されているExampleでは以下のような例が提示されています。

{
    "request_id": "75d343ee-699c-483f-83a1-e314609fb563",
    "message": {
        "role": "analyst",
        "content": [
            {
                "type": "text",
                "text": "We interpreted your question as ..."
            },
            {
                "type": "sql",
                "statement": "SELECT * FROM table",
                "confidence": {
                    "verified_query_used": {
                        "name": "My verified query",
                        "question": "What was the total revenue?",
                        "sql": "SELECT * FROM table2",
                        "verified_at": 1714497970,
                        "verified_by": "Jane Doe"
                    }
                }
            }
        ]
    },
    "warnings": [
        {
            "message": "Table table1 has (30) columns, which exceeds the recommended maximum of 10"
        },
        {
            "message": "Table table2 has (40) columns, which exceeds the recommended maximum of 10"
        }
    ]
}

messageというキーに格納されたrolecontentというキーを持つ辞書型のデータを参照すればよいことが分かります。この例を見て知ったのですが、Cortex AnalystのAPIは生成したSQLを返しますが、そのSQLを実行した結果は返しません。なのでTableを分析する機能を実装する場合には、返ってきたSQLを実行する処理の実装が別途必要になることが分かります。

実装

ではここからは具体的な実装の内容を紹介します。今回のコードはAzure DatabricksのNotebookで実装しました。Pythonのバージョンは3.12です。また、Agentで使うLLMはAzure OpenAI Serviceの"gpt-4o"を利用しました。

また、今回使用したデータセットは前回のCortex Analystの検証の時と同様、Hugging Faceで公開されているこちらのデータセットです。

huggingface.co

準備

まず必要なライブラリをインストールします。Snowflakeへの接続情報などは".env"に書き込んだので、それらの情報を読み込んで環境変数にセットするためにpython-dotenvをインストールしています。

%pip install langchain langchain-openai langgraph python-dotenv requests snowflake-connector-python
dbutils.library.restartPython()

それからSnowflake接続に必要な設定やLLMの設定などを行いました。

# initializer
from dotenv import load_dotenv
import os

from langchain_core.tools import tool
from langchain_openai import AzureChatOpenAI
from langgraph.prebuilt import create_react_agent
import snowflake.connector

load_dotenv()

HOST = f'{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com'
WAREHOUSE = os.environ.get("SNOWFLAKE_WAREHOUSE")
ACCOUNT = os.environ.get("SNOWFLAKE_ACCOUNT")
USER = os.environ.get("SNOWFLAKE_USER")
PASSWORD = os.environ.get("SNOWFLAKE_PASSWORD")
ROLE = os.environ.get("SNOWFLAKE_ROLE")

conn = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )
token = conn.rest.token

HEADERS = {
    "Authorization": f'Snowflake Token="{token}"',
    "Content-Type": "application/json",
}

CORTEX_ANALYST_ENDPOINT = f'https://{HOST}/api/v2/cortex/analyst/message'

SEMANTIC_MODEL = f'@{os.environ.get("SNOWFLAKE_DB")}' +\
    f'.{os.environ.get("SNOWFLAKE_SCHEMA")}' + \
    f'.{os.environ.get("SNOWFLAKE_STAGE")}' + \
    f'/{os.environ.get("SNOWFLAKE_SEMANTIC")}'

LLM = AzureChatOpenAI(model="gpt-4o", api_version="2025-01-01-preview")
PROMPT = "あなたは様々な楽曲について答えることが出来ます。"

Cortex Analystを呼び出すToolの定義

今回のAgentには自然言語の質問を入力すると、それに対応するデータを取得することが出来るToolを渡します。このToolは以下のようにquery_tableという関数で実装しました。

# define tool
from datetime import datetime
import json

import requests
from typing import Dict, List
from langchain_core.tools import tool
import pandas as pd

@tool
def query_table(query: str) -> Dict:
    """
    楽曲の情報、アーティスト、アルバムの情報が格納されたテーブルに自然言語で問い合わせを行い結果を得ることが出来る。
    格納されている情報は定量的な情報がメインなので、質問は曖昧なものにせず、具体的なものにすること。
    input:
     query: 入力クエリ
      例: 最もエネルギッシュな曲のタイトルは?
    output:
      問い合わせ結果(辞書型)
       sql: 実施したSQL
       result: SQLを実行して取得したデータ
    """
    data = {
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": query
                    }
                ]
            }
        ],
        "semantic_model_file": SEMANTIC_MODEL
    }

    # Cortex Analyst APIを実行する。
    response = requests.post(url=CORTEX_ANALYST_ENDPOINT, data=json.dumps(data), headers=HEADERS)

    # レスポンスのパース
    response_json = json.loads(response.content)
    sql = ""
    for message in response_json["message"]["content"]:
        # SQLだけ取り出す。
        if message["type"] == "sql":
            sql = message["statement"]
            
    # SnowflakeにSQLクエリを発行し、結果を取得する。
    result_df = pd.read_sql(sql,conn)
    
    # テーブルの行数が大きすぎる場合を想定し、最初の30件までに制限する。
    result_df = result_df.head(30)
    
    # カンマ区切りの文字列に変換
    result_df_str = result_df.to_csv(index=False)
    return {"sql": sql, "result": result_df_str}

Cortex Analyst APIを実行した後に生成されたSQLをpandasread_sqlを使って実行しました。返ってきた結果をカンマ区切りの文字列に変換してLLMに渡すのですが、SQLの実行結果の行数が大きくなるとLLMのトークン数の制限に引っかかり、LLMに入力出来なくなるため、多くても30件になるようにしました。

ReAct Agentの構築

あとはlanggraphcreate_react_agentを使い、定義したToolを渡してAgentを構築します。ReActはAgentの動作を決めるフレームワークで、Agentにユーザーの要望を満たすために必要なToolの選択や実行、実行結果による次の行動を自律的に実行することを可能にします。

tools = [query_table]
agent = create_react_agent(LLM, tools, prompt=PROMPT)

実行結果

たとえば連休明けなので"連休明けなので、少しずつテンションがあげられる曲を探して。"というクエリを与えてみます。

query = "連休明けなので、少しずつテンションがあげられる曲を探して。"
messages = {"messages":[{"role":"user","content": query}]}

agent.invoke(messages)

Agentの処理の様子を見ると、まずLLMが以下のように2つのクエリを生成しCortex Analyst APIに渡していました。

"テンポが平均以上の曲トップ5を知りたい。"に対し、Cortex Analystが生成したSQLは以下のようになっていました。

WITH __spotify_tracks AS (
  SELECT
    "track_name" AS track_name,
    "tempo" AS tempo
  FROM db.schema.spotify_tracks), avg_tempo AS (
    SELECT
      AVG(tempo) AS avg_tempo
    FROM __spotify_tracks
  ), top_tempo_tracks AS (
    SELECT
      track_name,
      tempo
    FROM __spotify_tracks
    WHERE
      tempo > (
         SELECT
           avg_tempo
         FROM avg_tempo
        )
  )
  SELECT
   track_name,
   tempo
  FROM top_tempo_tracks
  ORDER BY
    tempo DESC NULLS LAST
  LIMIT 5

結果、次のような結果をAgentが生成しました。

音楽に疎いのでこの結果が良いのか判断がつかないのですが、途中で実行されている処理は納得感があるように感じます。今度この曲を聞いてみたいな、と思いました。

まとめ

ということで今回はSnowflakeのCortex AnalystのAPIを利用してLangGraphのAgentにToolとして与えることで、楽曲について教えてくれるAgentを作ってみました。前回の記事の内容と合わせ、Cortex Analystの作り方から実践的な使い方までを触れることが出来たと思います。あとは回答精度をどうやって向上できるのか、といったことに今後取り組んでみたいと思います。