
こんにちは、CCCCMKホールディングスAIエンジニアの三浦です。
寒いですね。朝寒くてなかなか起きられないので、スマートフォンの目覚ましアラームを10分刻みでセットして二度寝しないようにしています。
最近TiDBというオープンソースの分散型SQLデータベースについて調べていました。TiDBはMySQLと互換性を持ちながら、従来の単一ノードRDBでは難しかった水平方向のスケール(スケールアウト)が可能なデータベースで、PingCAP社を中心に開発が進められています。
こちらの記事ではTiDBの性能検証がまとめられています。
私の方ではTiDBをアプリからどんな風に使えるかな・・・という観点で調べていました。調べていく中で個人的に面白いな、と思ったのがHybrid Transaction Analytical Processing(HTAP)という、TiDBの中でトランザクション処理(OLTP)に長けたストレージと分析処理(OLAP)に長けたストレージ双方を一本化しているという特徴です。TiDBではTiKVと呼ばれるOLTPのストレージからTiFlashと呼ばれるOLAPのストレージへはリアルタイムで同期を取ることが出来るようになっているそうです。
さらにTiDBはベクトル検索機能も提供しているとのことなので、Retrieval-Augmented Generation (RAG)の情報検索用途でも活用することが出来ます。これらを組み合わせれば、会話の履歴をOLTPのストレージに書き込みながら関連情報をベクトル検索で取得するといったことが全てTiDBで完結させることが出来そうです!
ということで、今回はベクトル検索と会話の履歴の記録までTiDBで完結させたChatBot作りにトライしてみましたので、その内容をご紹介したいと思います。
TiDBのベクトル検索について
TiDBのベクトル検索はテーブル作成時にVECTORデータ型を指定したベクトルデータが格納されたカラムに対し、入力ベクトルとの距離を距離関数で計算し、上位k件の近似ベクトルを探し出す方法(KNN)で行われます。
この場合、都度テーブル内の全てのデータとの距離計算が行われるため、データ量が増えてくると処理速度が低下してしまう可能性もあります。そこでTiDBではベクトルインデックスによる近似KNN(ANN)のオプションが提供されています。近似アルゴリズムは現在HNSWが利用可能とのことです。
ベクトルインデックスを使う場合は検索精度が若干下がってしまう可能性があること、OLAPのストレージTiFlashにテーブルの複製を行う必要があるといった制約があるようですが、今回はせっかくなのでこのベクトルインデックスを利用したベクトル検索を試してみました。
Chatbotが使用するテーブル一覧
Chatbot用の以下のテーブルをTiDBのデータベースに構築します。
chunks テーブル(検索対象データ)
| カラム名 | 型 | 説明 |
|---|---|---|
| chunk_id | VARCHAR(80) | 各チャンクを一意に識別するID(主キー) |
| content | MEDIUMTEXT | チャンクテキスト本文 |
| embedding | VECTOR(3072) | contentから生成した埋め込みベクトル |
| created_at | DATETIME | チャンク作成日時 |
補足
embeddingカラムに対して HNSW ベースのベクトルインデックスを作成します- TiFlash レプリカを有効化することで高速なベクトル検索を実現します
conversations テーブル(会話スレッド)
| カラム名 | 型 | 説明 |
|---|---|---|
| conversation_id | VARCHAR(64) | 会話スレッドを一意に識別するID(主キー) |
| user_id | VARCHAR(64) | 会話を開始したユーザーの識別子 |
| started_at | DATETIME | 会話開始日時 |
| last_activity_at | DATETIME | 最後にメッセージが追加された日時 |
補足
- 1つのconversationが1つの会話スレッドに対応しています
messages テーブル(会話メッセージ)
| カラム名 | 型 | 説明 |
|---|---|---|
| message_id | VARCHAR(64) | メッセージを一意に識別するID(主キー) |
| conversation_id | VARCHAR(64) | 所属する会話スレッドのID(外部キー) |
| role | VARCHAR(16) | 発話者の役割(user / assistant) |
| content | MEDIUMTEXT | メッセージ本文 |
| created_at | DATETIME | メッセージ作成日時 |
補足
- 会話削除時は
ON DELETE CASCADEで関連メッセージも自動削除するようにしました
実装
初期設定
実装はPythonで行いました。TiDBはMySQLと互換性があり、PythonのMySQLクライアントpymysqlを使って操作することが可能です。また、埋め込みベクトルやLLMはAzure OpenAI Serviceのものを使用しました。
TiDBの環境はクラウドですぐに使える"TiDB Cloud"と自分で環境を構築する"TiDB Self-Managed"で用意できますが、今回は"TiDB Cloud"で用意しました。
TiDB CloudのWeb UIからClusterをStarter Planで作成し、作成したClusterへの接続情報を取得しておきます。pymysqlを使った場合の詳細な手順は以下のドキュメントに記載されています。
以下初期設定です。
import os from datetime import datetime import json import pymysql from openai import AzureOpenAI # TiDB 接続情報 TIDB_HOST = os.getenv("TIDB_HOST", "your-tidb-host") TIDB_PORT = int(os.getenv("TIDB_PORT", "4000")) TIDB_USER = os.getenv("TIDB_USER", "your-user") TIDB_PASSWORD = os.getenv("TIDB_PASSWORD", "your-password") TIDB_DB = "ragbot" # Azure OpenAI AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "https://xxx.openai.azure.com/") AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "your-key") AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview") AZURE_OPENAI_EMBEDDING_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT", "text-embedding-3-large") # 埋め込み次元 EMBED_DIM = int(os.getenv("EMBED_DIM", "3072")) aoai = AzureOpenAI( api_key=AZURE_OPENAI_API_KEY, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_version=AZURE_OPENAI_API_VERSION, ) def initial_tidb_conn(): """DDLを実行しDB環境を作るときに使う""" return pymysql.connect( host=TIDB_HOST, port=TIDB_PORT, user=TIDB_USER, password=TIDB_PASSWORD, ssl_verify_cert = True, ssl_verify_identity = True, ssl_ca = "<CA_PATH>", autocommit=True, ) def tidb_conn(): """DB構築後はこちらを使う""" return pymysql.connect( host=TIDB_HOST, port=TIDB_PORT, user=TIDB_USER, password=TIDB_PASSWORD, database=TIDB_DB, ssl_verify_cert = True, ssl_verify_identity = True, ssl_ca = "<CA_PATH>", autocommit=True, )
データベースとテーブル作成
次にデータベースとテーブルを作成します。特にチャンク用のテーブルを作った後、ベクトルインデックスを作成するためALTER TABLE chunks SET TIFLASH REPLICA 1;を実行してTiFlashへのレプリカ作成を行っているところがポイントです。
# DDL(データベース&テーブル作成) ddl = f""" CREATE DATABASE IF NOT EXISTS {TIDB_DB}; USE {TIDB_DB}; -- チャンク CREATE TABLE IF NOT EXISTS chunks ( chunk_id VARCHAR(80) PRIMARY KEY, content MEDIUMTEXT NOT NULL, embedding VECTOR(3072) NOT NULL, created_at DATETIME ); -- TiFlashへのレプリカ作成 ALTER TABLE chunks SET TIFLASH REPLICA 1; -- ベクトルインデックス(HNSW): Cosine距離 CREATE VECTOR INDEX IF NOT EXISTS idx_chunks_embedding ON chunks ((VEC_COSINE_DISTANCE(embedding))) USING HNSW; -- 会話(=スレッド) CREATE TABLE IF NOT EXISTS conversations ( conversation_id VARCHAR(64) PRIMARY KEY, user_id VARCHAR(64), started_at DATETIME, last_activity_at DATETIME ); -- メッセージ CREATE TABLE IF NOT EXISTS messages ( message_id VARCHAR(64) PRIMARY KEY, conversation_id VARCHAR(64) NOT NULL, role VARCHAR(16) NOT NULL, -- user / assistant content MEDIUMTEXT NOT NULL, created_at DATETIME, CONSTRAINT fk_msg_conv FOREIGN KEY (conversation_id) REFERENCES conversations(conversation_id) ON DELETE CASCADE ); """ with initial_tidb_conn() as conn: with conn.cursor() as cur: for stmt in [s.strip() for s in ddl.split(";") if s.strip()]: print(stmt) cur.execute(stmt) print("DDL done")
ベクトル埋め込みの保存
次にベクトル埋め込み表現を先ほど作成したchunkテーブルに格納します。以下のドキュメントを参考にすると、INSERTにベクトルを渡す際には'[1,2,1]'のようなベクトルの文字列表現にする必要があるようです。
それを踏まえ、以下のようにしてベクトル埋め込みの保存を行いました。 (今回はChatGPTを使って作成したTiDBの機能に関するドキュメントを検証に使用しています)
from datetime import datetime import json from uuid import uuid4 def get_embedding(text: str) -> list[float]: """埋め込みベクトルの生成""" text = " ".join(text.split()) resp = aoai.embeddings.create( model=AZURE_OPENAI_EMBEDDING_DEPLOYMENT, input=text, ) return resp.data[0].embedding def vec_text(vec: list[float]) -> str: """TiDBにベクトルを渡すためにベクトルの文字列表現を生成する""" return json.dumps(vec, separators=(",", ":")) def insert_text(text): """テキストを埋め込みベクトルとともにTiDBのchunkテーブルに格納する""" chunk_id = uuid4().hex created_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") embedding = vec_text(get_embedding(text)) with tidb_conn() as conn: with conn.cursor() as cur: # text insert cur.execute( """ INSERT INTO chunks(chunk_id, content, embedding, created_at) VALUES(%s, %s, %s, %s) """, (chunk_id, text, embedding, created_at) ) # テキストを埋め込みベクトルとともにTiDBのchunkテーブルに格納する for t in texts: insert_text(t)
ベクトル検索の実装
ベクトル検索処理は、以下のようにしました。
def search_chunks(query: str, k: int = 5): qvec = get_embedding(query) # 次元チェック(テーブル定義 VECTOR(EMBED_DIM) と一致しているか) if len(qvec) != EMBED_DIM: raise ValueError(f"Query embedding dim mismatch: got {len(qvec)} but table expects {EMBED_DIM}") qvec_txt = vec_text(qvec) sql = """ SELECT chunk_id, content, VEC_COSINE_DISTANCE(embedding, %s) AS score FROM chunks ORDER BY score LIMIT %s """ with tidb_conn() as conn: with conn.cursor() as cur: cur.execute(sql, (qvec_txt, k)) rows = cur.fetchall() results = [] for r in rows: results.append({ "chunk_id": r[0], "content": r[1], "score": float(r[2]), }) return results # 検索テスト query = "HTAP" hits = search_chunks(query, k=5) hits
結果は以下のようになり、"HTAP"に関連するドキュメントが取得できていました。
[{'chunk_id': 'df34d78a021248df9e471decbb950d8f',
'content': 'HTAPの利点\n\nHTAPの利点は、...',
'score': 0.6322445588694952},
{'chunk_id': '487dbc00d6f5404d8c535f97d45e0e73',
'content': 'TiDBとは何か...'}
...
]
RAGBotの実装
最後はベクトル検索機能を持ち、ユーザーとの会話の履歴をTiDBに保持するBotクラスの実装をしました。
import os import uuid from dataclasses import dataclass from datetime import datetime from typing import Callable, Any @dataclass class RagBotConfig: chat_deployment: str = "gpt-4.1-mini" temperature: float = 0.2 max_context_chars: int = 2500 class TiDBRagBot: """ TiDBに会話ログを保存しつつ、検索結果(hits)をコンテキストにして回答するBot。 """ def __init__( self, tidb_conn_factory: Callable[[], Any], aoai_client: Any, search_fn: Callable[[str, int], list[dict]], config: RagBotConfig | None = None, ): self._conn_factory = tidb_conn_factory self._aoai = aoai_client self._search = search_fn self._cfg = config or RagBotConfig() @staticmethod def _now_utc_str() -> str: return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") @staticmethod def _new_id() -> str: return str(uuid.uuid4()) def create_conversation(self, user_id: str) -> str: """会話の最初に会話(スレッドを作成する)""" conv_id = self._new_id() now = self._now_utc_str() with self._conn_factory() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO conversations(conversation_id, user_id, started_at, last_activity_at) VALUES(%s,%s,%s,%s)", (conv_id, user_id, now, now), ) return conv_id def save_message(self, conversation_id: str, role: str, content: str) -> str: """メッセージ発生時にメッセージをTiDBに保存する""" msg_id = self._new_id() now = self._now_utc_str() with self._conn_factory() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO messages(message_id, conversation_id, role, content, created_at) VALUES(%s,%s,%s,%s,%s)", (msg_id, conversation_id, role, content, now), ) cur.execute( "UPDATE conversations SET last_activity_at=%s WHERE conversation_id=%s", (now, conversation_id), ) return msg_id def build_context(self, hits: list[dict]) -> str: """取得した情報をプロンプトに組み込むよう整形する""" parts: list[str] = [] total = 0 for h in hits: block = f"[{h['chunk_id']}] {h['content']}" if total + len(block) > self._cfg.max_context_chars: break parts.append(block) total += len(block) return "\n\n".join(parts) def _make_prompt(self, question: str, hits: list[dict]) -> str: """プロンプトの生成""" context = self.build_context(hits) return f"""あなたはユーザーの質問に与えられたコンテキストを使って回答します。推測は避け、根拠が薄い場合はそう述べてください。 # コンテキスト {context} # 質問 {question} """ def answer_with_context(self, question: str, hits: list[dict]) -> str: prompt = self._make_prompt(question, hits) resp = self._aoai.chat.completions.create( model=self._cfg.chat_deployment, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ], temperature=self._cfg.temperature, ) return resp.choices[0].message.content def chat( self, user_id: str, question: str, conversation_id: str | None = None, k: int = 5, log_citations: bool = True, ) -> dict: """ 1) 会話IDがなければ作る 2) userメッセージ保存 3) 検索 4) 回答生成 5) assistantメッセージ保存 """ conv_id = conversation_id or self.create_conversation(user_id=user_id) user_msg_id = self.save_message(conv_id, "user", question) hits = self._search(question, k) answer = self.answer_with_context(question, hits) assistant_msg_id = self.save_message(conv_id, "assistant", answer) return { "conversation_id": conv_id, "user_message_id": user_msg_id, "assistant_message_id": assistant_msg_id, "answer": answer, "hits": hits, }
以下のように呼び出すことが出来ます。
bot = TiDBRagBot(
tidb_conn_factory=tidb_conn,
aoai_client=aoai,
search_fn=search_chunks,
)
r1 = bot.chat(user_id="miura", question="TiDBの強みは?", k=5)
回答は次のように得られました。
'TiDBの強みは以下の点にまとめられます。\n\n1. **MySQL互換の分散SQLデータベース** \n MySQL互換のインターフェースを持ち、既存のMySQL環境からの移行や利用が比較的容易である。...
参照した情報も、関連するものが取れていました。
{'chunk_id': '487dbc00d6f5404d8c535f97d45e0e73',
'content': 'TiDBとは何か:概要・特徴・機能のまとめ(検証用テキスト)'...
}...
TiDB CloudのUIでmessagesテーブルの中を確認すると、会話のやり取りが記録できていることが確認できました。

まとめ
ということで今回はTiDBでRAGに対応したChatBotを作ってみた話をまとめてみました。TiDBを使うとすぐに会話記録用のテーブルからベクトル検索までRAGに必要な環境が全て用意できていいな、と思いました。 今回はTiDB Cloudを使ってサクッとTiDBを使う準備をしましたが、今度はTiDB Self-Managedも試してみたいです。