CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

DatabricksでOLTPデータベースが作れる!Lakebaseを試してみました!

<この記事は🎄Databricks Advent Calendar 2025🎁におけるシリーズ2の23 日目の記事です。>

こんにちは、CCCMKホールディングスAIエンジニアの三浦です。

この前の日曜日に街に出かけたらクリスマスの雰囲気でとても賑やかでした。普段見慣れた景色がいつもと違うように見えて、特別感があっていいな、と思いました。

さて、今年6月にDatabricksのDATA+AI Summitに参加したのですが、そこで発表されたLakebaseという機能をそういえばちゃんと触っていないことに気が付きました。

www.databricks.com

DATA+AI Summitで初めてLakebaseの話を聞いたときは「フルマネージドでサーバレスのPostgres」や「AI Agent向けのデータベース」という理解をしました。実際その理解は間違っていないのですが、今回もう少し調べ、そして実際に使ってみたところ「DatabricksでOLTPのデータベースが作れる」という点こそが実はLakebaseの真髄なのでは、と感じるようになりました。

DatabricksのDelta Lakeは大量のデータを集計しその結果を取り出すような分析用途に向いています。OLAP(Online Analytical Processing)と呼ばれる処理方式です。一方リアルタイムに書き込んだりIDに紐づいたデータを取ってくる用途、つまりOLTP(Online Transaction Processing)と呼ばれる処理方式にはあまり向いていない認識を持っていました。

なのでリアルタイム性が求められるアプリケーション用のデータベースとしてDatabricksを使うイメージがなかったのですが、OLTPに適したLakebaseによってアプリケーション用のデータベースもDatabricksに持つことが出来るようになります。また、Databricks AppsによってDatabricksの中でアプリケーションを動かすことも出来るので、アプリケーションに必要な構成要素がおおむねDatabricksの中で完結出来るようになります。

こう考えるとLakebaseの発表はすごく意味のあることだったんだな・・・と改めて感じました。

今回はそんなLakebaseを実際に使ってみた話をご紹介します。

※今回検証したAzure DatabricksにおいてLakebaseは現在Previewの状態で、westus, westus2, eastus, eastus2, centralus, southcentralus, northeurope, westeurope, australiaeast, brazilsouth, canadacentral, centralindia, southeastasia, uksouthのリージョンでのみ利用可能です。

Lakebaseの特徴

LakebaseはPostgresをベースにしたフルマネージドなOLTPデータベースです。Postgres用のライブラリを使ってアクセスすることが可能です。一方でDatabricksらしい特徴もあります。

たとえばストレージとコンピューティングが分離されていて、必要に応じてスケーリングすることが出来ます。コンピューティングはサーバレスなので、高速に起動することが可能です。また、Delta LakeのTableと同期出来る機能もあり、これによって大量のデータはDelta Lakeで処理し、処理済みのアプリケーション参照用のデータを自動的にLakebaseに連携する、といったことが実現出来ます。さらにUnity Catalogでの管理が可能でガバナンスを利かせることが出来ます。

Lakebaseを利用するためにはまずLakebaseのDatabase instanceを作ります。Database instanceはストレージとコンピュートリソースを管理・設定したり、ユーザーがDatabaseにアクセスするための接続情報を参照したりすることが出来ます。

Database instanceの作成

こちらの手順を参照し、NotebookからPythonで実行しました。

learn.microsoft.com

以降作業を開始する前には以下のコマンドを実行し、必要なライブラリをインストールしておきます。

%pip install databricks-sdk>=0.61.0
dbutils.library.restartPython()

以下のコマンドでDatabase instanceが作成されます。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import DatabaseInstance

# Initialize the Workspace client
w = WorkspaceClient()

# Create a database instance
instance = w.database.create_database_instance(
    DatabaseInstance(
        name="my-database-instance",
        capacity="CU_1"
    )
)

print(f"Created database instance: {instance.name}")

接続テスト

PythonのPostgres接続用のライブラリpsycopg2を使って接続できるか確認してみました。

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "my-database-instance"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = w.current_user.me().user_name,
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

結果

PostgreSQL 16.10 (0374078) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 11.4.0-1ubuntu1~22.04.2) 11.4.0, 64-bit

ちゃんとpsycopg2で接続できている・・・!

Delta LakeのTableと同期をとる

Lakebaseには"Synced Table"という機能があって、Delta LakeのTableと同期を取ることが出来ます。Synced TableはLakebaseからはRead-Onlyでアクセスすることが出来ます。

これを使うと、Delta Lakeで大量のデータを集計しID単位のデータまで加工した後、Lakebaseに連携し、アプリケーションで参照する、といった流れを作ることが出来そうです。

データ加工をDelta Lake側で行ってLakebaseに同期する

こちらのドキュメントを参考にしました。

learn.microsoft.com

まず、Delta Lakeにサンプル用のTableを作成しました。

import pandas as pd

sample_df = pd.DataFrame(
    {
        "name": ["apple", "banana"],
        "price": [120, 100]
    }
)

spark_df = spark.createDataFrame(sample_df)
spark_df.write.format("delta")\
    .option("delta.enableChangeDataFeed", "true")\
    .mode("overwrite")\
    .saveAsTable("catalog_name.schema_name.fruits_master")

次のコードでSynced Tableを作成します。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy

w = WorkspaceClient()

synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="catalog_name.schema_name.synced_fruits_master", 
        database_instance_name="my-database-instance",
        logical_database_name="databricks_postgres",
        spec=SyncedTableSpec(
            source_table_full_name="catalog_name.schema_name.fruits_master",
            primary_key_columns=["name"],
            scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
            create_database_objects_if_missing=True,
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="catalog_name",
                storage_schema="schema_name"
            )
        ),
    )
)

Synced Tableにアクセスしてみます。データ分析でおなじみのpandasを使ってみました。SELECT文でアクセスする際のSynced Tableの名前は"schema_name.synced_table_name"のようになります。

import psycopg2
import pandas as pd

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "my-database-instance"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = w.current_user.me().user_name,
    password = cred.token,
    sslmode = "require"
)

# pandasでtableにアクセス
df = pd.read_sql(
    'SELECT * FROM "schema_name"."synced_fruits_master"',
    conn
)
display(df)

結果

ちゃんと参照出来ています!次に参照元のDelta Tableにデータを追加してみてそれがSynced Tableにも反映されるか確認してみました。

追加するコードを実行します。

# 追加する行を作成
new_rows = pd.DataFrame(
    {
        "name": ["orange", "grape"],
        "price": [150, 200]
    }
)

new_spark_df = spark.createDataFrame(new_rows)
new_spark_df.write.format("delta")\
    .mode("append")\
    .saveAsTable("catalog_name.schema_name.fruits_master")

Synced Tableを確認してみます。

# pandasでtableにアクセス
df = pd.read_sql(
    'SELECT * FROM "schema_name"."synced_fruits_master"',
    conn
)
display(df)

結果

追加されたデータが確認できました。データの同期が取れているようです。

Agentの会話ログを記録する

最後は個人的に試しておきたかった、AI Agentの会話のログの保存先としてLakebaseを利用する、というケースを想定したテストです。Agent開発フレームワークLangGraphにはCheckpointという機能があって、会話の内容を記録したり、呼び出して再開することが可能になります。会話の保存先としてLakebaseを利用することが出来ます。

参考にしたドキュメントはこちらです。

learn.microsoft.com

まずライブラリをインストール。

%pip install -U "psycopg[binary,pool]" databricks-langchain langgraph langgraph-checkpoint-postgres
%pip install databricks-sdk>=0.61.0
dbutils.library.restartPython()

Agentに使用するLLMはFoundation Model APIで利用できる"databricks-claude-3-7-sonnet"にしました。

from databricks_langchain import ChatDatabricks

chat_model = ChatDatabricks(
    endpoint="databricks-claude-3-7-sonnet",
    temperature=0.1,
)

Databaseへの接続情報を取得します。

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "my-database-instance"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

Checkpointを使うために初期セットアップを行います。Checkpoint用のTableが生成されます。

from databricks_langchain import CheckpointSaver

# Create tables if missing
with CheckpointSaver(instance_name=instance_name) as saver:
    saver.setup()           # sets up checkpoint tables
    print("✅ Checkpoint tables are ready.")

今回はToolなどは持たない、単純に会話だけ出来るAgentをLangGraphで構築しました。

from langgraph.graph import StateGraph, MessagesState, START

def create_agent(chat_model,checkpointer):
    """Agentを構築する"""
    def call_model(state: MessagesState):
        """応答Node"""
        response = chat_model.invoke(state["messages"])
        return {"messages": response}

    builder = StateGraph(MessagesState)
    builder.add_node(call_model)
    builder.add_edge(START, "call_model")
    graph = builder.compile(checkpointer=checkpointer)
    return graph

def call_agent(message, thread_id):
    """Agentを呼び出して応答を生成。"""
    with CheckpointSaver(instance_name=INSTANCE_NAME) as checkpointer:  
        agent = create_agent(chat_model, checkpointer)
        config = {
            "configurable": {
                "thread_id": str(thread_id)
            }
        }
        results = []
        for chunk in agent.stream(
            {"messages": [message]},
            config,  
            stream_mode="values"
        ):
            chunk["messages"][-1].pretty_print()
            results.append(chunk)
        return results

call_agentはユーザーの入力とthread_idを受け取ります。thread_idに紐づいてスレッド単位で会話が記録されます。

テストをしてみます。

call_agent({"role":"user","content":"こんにちは、三浦だよ!今日はいい天気だね。"},thread_id=1)
call_agent({"role":"user","content":"僕の名前と今日の天気を教えて。"},thread_id=1)

結果

================================ Human Message =================================

こんにちは、三浦だよ!今日はいい天気だね。
================================== Ai Message ==================================

こんにちは、三浦さん!はい、今日は本当にいい天気ですね。散歩したり、外出したりするのに最適な日ですね。何か特別な予定はありますか?

(Translation: Hello, Miura-san! Yes, today is really nice weather. It's perfect for taking a walk or going out. Do you have any special plans today?)
================================ Human Message =================================

僕の名前と今日の天気を教えて。
================================== Ai Message ==================================

あなたの名前は三浦さんです。そして、今日の天気については、あなたが「今日はいい天気だね」とおっしゃっていたので、いい天気のようですね。ただ、私からは実際の天気を確認することはできません。現在の正確な天気情報については、天気予報サービスをご確認いただくことをお勧めします。

2回目の会話で、ちゃんと1回目の内容を覚えていることが確認できました。

別のスレッド(thread_id=2)で会話をしてみました。

call_agent({"role":"user","content":"こんにちは、Bobだよ!今日は雪が降ってるよ。"},thread_id=2)
call_agent({"role":"user","content":"僕の名前と今日の天気を教えて。"},thread_id=2)

結果

================================ Human Message =================================

こんにちは、Bobだよ!今日は雪が降ってるよ。
================================== Ai Message ==================================

こんにちは、Bob!雪が降っているんですね。素敵な冬の日ですね。雪景色はいかがですか?積もっていますか?
================================ Human Message =================================

僕の名前と今日の天気を教えて。
================================== Ai Message ==================================

あなたの名前はBobです。そして、あなたが先ほど教えてくれたように、今日は雪が降っているようですね。

こちらも会話の内容が記録できているようです。thread_id=2では出てきていない"三浦"の話をしてみます。

call_agent({"role":"user","content":"三浦との会話の内容覚えてる?"},thread_id=2)

結果

================================ Human Message =================================

三浦との会話の内容覚えてる?
================================== Ai Message ==================================

いいえ、三浦さんとの会話の内容は覚えていません。あなた(Bob)と私の間で三浦さんについての会話はこれまでありませんでした。もし三浦さんとの会話について質問されているなら、私はその会話に参加していないため、内容を知ることができません。

知らないそうです。つまりそれぞれのスレッドごとに会話が別々に記録されていることが確認できました!

まとめ

ということで、今回はDatabricksのOLTPデータベースLakebaseについてまとめてみました。けっこう色々な用途で使うことが出来そうで、ワクワクしました。はやく日本リージョンでGA版が出るとうれしいなぁと思いました。