CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

TiDBでRAGに対応したChatBotを作ってみました。

こんにちは、CCCCMKホールディングスAIエンジニアの三浦です。

寒いですね。朝寒くてなかなか起きられないので、スマートフォンの目覚ましアラームを10分刻みでセットして二度寝しないようにしています。

最近TiDBというオープンソースの分散型SQLデータベースについて調べていました。TiDBはMySQLと互換性を持ちながら、従来の単一ノードRDBでは難しかった水平方向のスケール(スケールアウト)が可能なデータベースで、PingCAP社を中心に開発が進められています。

こちらの記事ではTiDBの性能検証がまとめられています。

techblog.cccmkhd.co.jp

私の方ではTiDBをアプリからどんな風に使えるかな・・・という観点で調べていました。調べていく中で個人的に面白いな、と思ったのがHybrid Transaction Analytical Processing(HTAP)という、TiDBの中でトランザクション処理(OLTP)に長けたストレージと分析処理(OLAP)に長けたストレージ双方を一本化しているという特徴です。TiDBではTiKVと呼ばれるOLTPのストレージからTiFlashと呼ばれるOLAPのストレージへはリアルタイムで同期を取ることが出来るようになっているそうです。

さらにTiDBはベクトル検索機能も提供しているとのことなので、Retrieval-Augmented Generation (RAG)の情報検索用途でも活用することが出来ます。これらを組み合わせれば、会話の履歴をOLTPのストレージに書き込みながら関連情報をベクトル検索で取得するといったことが全てTiDBで完結させることが出来そうです!

ということで、今回はベクトル検索と会話の履歴の記録までTiDBで完結させたChatBot作りにトライしてみましたので、その内容をご紹介したいと思います。

TiDBのベクトル検索について

TiDBのベクトル検索はテーブル作成時にVECTORデータ型を指定したベクトルデータが格納されたカラムに対し、入力ベクトルとの距離を距離関数で計算し、上位k件の近似ベクトルを探し出す方法(KNN)で行われます。

docs.pingcap.com

この場合、都度テーブル内の全てのデータとの距離計算が行われるため、データ量が増えてくると処理速度が低下してしまう可能性もあります。そこでTiDBではベクトルインデックスによる近似KNN(ANN)のオプションが提供されています。近似アルゴリズムは現在HNSWが利用可能とのことです。

docs.pingcap.com

ベクトルインデックスを使う場合は検索精度が若干下がってしまう可能性があること、OLAPのストレージTiFlashにテーブルの複製を行う必要があるといった制約があるようですが、今回はせっかくなのでこのベクトルインデックスを利用したベクトル検索を試してみました。

Chatbotが使用するテーブル一覧

Chatbot用の以下のテーブルをTiDBのデータベースに構築します。

chunks テーブル(検索対象データ)

カラム名 説明
chunk_id VARCHAR(80) 各チャンクを一意に識別するID(主キー)
content MEDIUMTEXT チャンクテキスト本文
embedding VECTOR(3072) contentから生成した埋め込みベクトル
created_at DATETIME チャンク作成日時

補足

  • embedding カラムに対して HNSW ベースのベクトルインデックスを作成します
  • TiFlash レプリカを有効化することで高速なベクトル検索を実現します

conversations テーブル(会話スレッド)

カラム名 説明
conversation_id VARCHAR(64) 会話スレッドを一意に識別するID(主キー)
user_id VARCHAR(64) 会話を開始したユーザーの識別子
started_at DATETIME 会話開始日時
last_activity_at DATETIME 最後にメッセージが追加された日時

補足

  • 1つのconversationが1つの会話スレッドに対応しています

messages テーブル(会話メッセージ)

カラム名 説明
message_id VARCHAR(64) メッセージを一意に識別するID(主キー)
conversation_id VARCHAR(64) 所属する会話スレッドのID(外部キー)
role VARCHAR(16) 発話者の役割(user / assistant)
content MEDIUMTEXT メッセージ本文
created_at DATETIME メッセージ作成日時

補足

  • 会話削除時はON DELETE CASCADEで関連メッセージも自動削除するようにしました

実装

初期設定

実装はPythonで行いました。TiDBはMySQLと互換性があり、PythonのMySQLクライアントpymysqlを使って操作することが可能です。また、埋め込みベクトルやLLMはAzure OpenAI Serviceのものを使用しました。

TiDBの環境はクラウドですぐに使える"TiDB Cloud"と自分で環境を構築する"TiDB Self-Managed"で用意できますが、今回は"TiDB Cloud"で用意しました。

pingcap.co.jp

TiDB CloudのWeb UIからClusterをStarter Planで作成し、作成したClusterへの接続情報を取得しておきます。pymysqlを使った場合の詳細な手順は以下のドキュメントに記載されています。

docs.pingcap.com

以下初期設定です。

import os
from datetime import datetime
import json
import pymysql
from openai import AzureOpenAI

# TiDB 接続情報
TIDB_HOST = os.getenv("TIDB_HOST", "your-tidb-host")
TIDB_PORT = int(os.getenv("TIDB_PORT", "4000")) 
TIDB_USER = os.getenv("TIDB_USER", "your-user")
TIDB_PASSWORD = os.getenv("TIDB_PASSWORD", "your-password")
TIDB_DB = "ragbot"

# Azure OpenAI
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "https://xxx.openai.azure.com/")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "your-key")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")
AZURE_OPENAI_EMBEDDING_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-3-large")

# 埋め込み次元
EMBED_DIM = int(os.getenv("EMBED_DIM", "3072"))

aoai = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_version=AZURE_OPENAI_API_VERSION,
)

def initial_tidb_conn():
    """DDLを実行しDB環境を作るときに使う"""
    return pymysql.connect(
        host=TIDB_HOST,
        port=TIDB_PORT,
        user=TIDB_USER,
        password=TIDB_PASSWORD,
        ssl_verify_cert = True,
        ssl_verify_identity = True,
        ssl_ca = "<CA_PATH>",
        autocommit=True,
    )

def tidb_conn():
    """DB構築後はこちらを使う"""
    return pymysql.connect(
        host=TIDB_HOST,
        port=TIDB_PORT,
        user=TIDB_USER,
        password=TIDB_PASSWORD,
        database=TIDB_DB,
        ssl_verify_cert = True,
        ssl_verify_identity = True,
        ssl_ca = "<CA_PATH>",
        autocommit=True,
    )

データベースとテーブル作成

次にデータベースとテーブルを作成します。特にチャンク用のテーブルを作った後、ベクトルインデックスを作成するためALTER TABLE chunks SET TIFLASH REPLICA 1;を実行してTiFlashへのレプリカ作成を行っているところがポイントです。

# DDL(データベース&テーブル作成)

ddl = f"""
CREATE DATABASE IF NOT EXISTS {TIDB_DB};
USE {TIDB_DB};

-- チャンク
CREATE TABLE IF NOT EXISTS chunks (
  chunk_id      VARCHAR(80) PRIMARY KEY,
  content       MEDIUMTEXT NOT NULL,
  embedding     VECTOR(3072) NOT NULL,
  created_at    DATETIME
);

-- TiFlashへのレプリカ作成
ALTER TABLE chunks SET TIFLASH REPLICA 1;

-- ベクトルインデックス(HNSW): Cosine距離
CREATE VECTOR INDEX IF NOT EXISTS idx_chunks_embedding
ON chunks ((VEC_COSINE_DISTANCE(embedding)))
USING HNSW;

-- 会話(=スレッド)
CREATE TABLE IF NOT EXISTS conversations (
  conversation_id VARCHAR(64) PRIMARY KEY,
  user_id         VARCHAR(64),
  started_at      DATETIME,
  last_activity_at DATETIME
);

-- メッセージ
CREATE TABLE IF NOT EXISTS messages (
  message_id      VARCHAR(64) PRIMARY KEY,
  conversation_id VARCHAR(64) NOT NULL,
  role            VARCHAR(16) NOT NULL,  -- user / assistant
  content         MEDIUMTEXT NOT NULL,
  created_at      DATETIME,
  CONSTRAINT fk_msg_conv FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE
);
"""

with initial_tidb_conn() as conn:
    with conn.cursor() as cur:
        for stmt in [s.strip() for s in ddl.split(";") if s.strip()]:
            print(stmt)
            cur.execute(stmt)
print("DDL done")

ベクトル埋め込みの保存

次にベクトル埋め込み表現を先ほど作成したchunkテーブルに格納します。以下のドキュメントを参考にすると、INSERTにベクトルを渡す際には'[1,2,1]'のようなベクトルの文字列表現にする必要があるようです。

docs.pingcap.com

それを踏まえ、以下のようにしてベクトル埋め込みの保存を行いました。 (今回はChatGPTを使って作成したTiDBの機能に関するドキュメントを検証に使用しています)

from datetime import datetime
import json
from uuid import uuid4

def get_embedding(text: str) -> list[float]:
    """埋め込みベクトルの生成"""
    text = " ".join(text.split())
    resp = aoai.embeddings.create(
        model=AZURE_OPENAI_EMBEDDING_DEPLOYMENT,
        input=text,
    )
    return resp.data[0].embedding

def vec_text(vec: list[float]) -> str:
    """TiDBにベクトルを渡すためにベクトルの文字列表現を生成する""" 
    return json.dumps(vec, separators=(",", ":"))

def insert_text(text):
    """テキストを埋め込みベクトルとともにTiDBのchunkテーブルに格納する""" 
    chunk_id = uuid4().hex
    created_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

    embedding = vec_text(get_embedding(text))

    with tidb_conn() as conn:
        with conn.cursor() as cur:
            # text insert
            cur.execute(
                """
                INSERT INTO chunks(chunk_id, content, embedding, created_at)
                VALUES(%s, %s, %s, %s)
                """,
                (chunk_id, text, embedding, created_at)
            )

# テキストを埋め込みベクトルとともにTiDBのchunkテーブルに格納する
for t in texts:
    insert_text(t)

ベクトル検索の実装

ベクトル検索処理は、以下のようにしました。

def search_chunks(query: str, k: int = 5):
    qvec = get_embedding(query)

    # 次元チェック(テーブル定義 VECTOR(EMBED_DIM) と一致しているか)
    if len(qvec) != EMBED_DIM:
        raise ValueError(f"Query embedding dim mismatch: got {len(qvec)} but table expects {EMBED_DIM}")

    qvec_txt = vec_text(qvec)

    sql = """
    SELECT
      chunk_id, 
      content,
      VEC_COSINE_DISTANCE(embedding, %s) AS score
    FROM chunks
    ORDER BY score
    LIMIT %s
    """
    with tidb_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(sql, (qvec_txt, k))
            rows = cur.fetchall()

    results = []
    for r in rows:
        results.append({
            "chunk_id": r[0],
            "content": r[1],
            "score": float(r[2]),
        })
    return results

# 検索テスト
query = "HTAP"
hits = search_chunks(query, k=5)
hits

結果は以下のようになり、"HTAP"に関連するドキュメントが取得できていました。

[{'chunk_id': 'df34d78a021248df9e471decbb950d8f',
  'content': 'HTAPの利点\n\nHTAPの利点は、...',
  'score': 0.6322445588694952},
 {'chunk_id': '487dbc00d6f5404d8c535f97d45e0e73',
  'content': 'TiDBとは何か...'}
  ...
]

RAGBotの実装

最後はベクトル検索機能を持ち、ユーザーとの会話の履歴をTiDBに保持するBotクラスの実装をしました。

import os
import uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Any

@dataclass
class RagBotConfig:
    chat_deployment: str = "gpt-4.1-mini"
    temperature: float = 0.2
    max_context_chars: int = 2500

class TiDBRagBot:
    """
    TiDBに会話ログを保存しつつ、検索結果(hits)をコンテキストにして回答するBot。
    """
    def __init__(
        self,
        tidb_conn_factory: Callable[[], Any],
        aoai_client: Any,
        search_fn: Callable[[str, int], list[dict]],
        config: RagBotConfig | None = None,
    ):
        self._conn_factory = tidb_conn_factory
        self._aoai = aoai_client
        self._search = search_fn
        self._cfg = config or RagBotConfig()

    @staticmethod
    def _now_utc_str() -> str:
        return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

    @staticmethod
    def _new_id() -> str:
        return str(uuid.uuid4())

    def create_conversation(self, user_id: str) -> str:
        """会話の最初に会話(スレッドを作成する)"""
        conv_id = self._new_id()
        now = self._now_utc_str()
        with self._conn_factory() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO conversations(conversation_id, user_id, started_at, last_activity_at) VALUES(%s,%s,%s,%s)",
                    (conv_id, user_id, now, now),
                )
        return conv_id

    def save_message(self, conversation_id: str, role: str, content: str) -> str:
        """メッセージ発生時にメッセージをTiDBに保存する"""
        msg_id = self._new_id()
        now = self._now_utc_str()
        with self._conn_factory() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO messages(message_id, conversation_id, role, content, created_at) VALUES(%s,%s,%s,%s,%s)",
                    (msg_id, conversation_id, role, content, now),
                )
                cur.execute(
                    "UPDATE conversations SET last_activity_at=%s WHERE conversation_id=%s",
                    (now, conversation_id),
                )
        return msg_id
    
    def build_context(self, hits: list[dict]) -> str:
        """取得した情報をプロンプトに組み込むよう整形する"""
        parts: list[str] = []
        total = 0
        for h in hits:
            block = f"[{h['chunk_id']}] {h['content']}"
            if total + len(block) > self._cfg.max_context_chars:
                break
            parts.append(block)
            total += len(block)
        return "\n\n".join(parts)

    def _make_prompt(self, question: str, hits: list[dict]) -> str:
        """プロンプトの生成"""
        context = self.build_context(hits)
        return f"""あなたはユーザーの質問に与えられたコンテキストを使って回答します。推測は避け、根拠が薄い場合はそう述べてください。

# コンテキスト
{context}

# 質問
{question}
"""

    def answer_with_context(self, question: str, hits: list[dict]) -> str:
        prompt = self._make_prompt(question, hits)
        resp = self._aoai.chat.completions.create(
            model=self._cfg.chat_deployment,
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt},
            ],
            temperature=self._cfg.temperature,
        )
        return resp.choices[0].message.content

    def chat(
        self,
        user_id: str,
        question: str,
        conversation_id: str | None = None,
        k: int = 5,
        log_citations: bool = True,
    ) -> dict:
        """
        1) 会話IDがなければ作る
        2) userメッセージ保存
        3) 検索
        4) 回答生成
        5) assistantメッセージ保存
        """
        conv_id = conversation_id or self.create_conversation(user_id=user_id)
        user_msg_id = self.save_message(conv_id, "user", question)

        hits = self._search(question, k)
        answer = self.answer_with_context(question, hits)

        assistant_msg_id = self.save_message(conv_id, "assistant", answer)

        return {
            "conversation_id": conv_id,
            "user_message_id": user_msg_id,
            "assistant_message_id": assistant_msg_id,
            "answer": answer,
            "hits": hits,
        }

以下のように呼び出すことが出来ます。

bot = TiDBRagBot(
    tidb_conn_factory=tidb_conn,
    aoai_client=aoai,
    search_fn=search_chunks, 
)

r1 = bot.chat(user_id="miura", question="TiDBの強みは?", k=5)

回答は次のように得られました。

'TiDBの強みは以下の点にまとめられます。\n\n1. **MySQL互換の分散SQLデータベース**  \n   MySQL互換のインターフェースを持ち、既存のMySQL環境からの移行や利用が比較的容易である。...

参照した情報も、関連するものが取れていました。

{'chunk_id': '487dbc00d6f5404d8c535f97d45e0e73',
  'content': 'TiDBとは何か:概要・特徴・機能のまとめ(検証用テキスト)'...
}...

TiDB CloudのUIでmessagesテーブルの中を確認すると、会話のやり取りが記録できていることが確認できました。

まとめ

ということで今回はTiDBでRAGに対応したChatBotを作ってみた話をまとめてみました。TiDBを使うとすぐに会話記録用のテーブルからベクトル検索までRAGに必要な環境が全て用意できていいな、と思いました。 今回はTiDB Cloudを使ってサクッとTiDBを使う準備をしましたが、今度はTiDB Self-Managedも試してみたいです。