CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

LangGraphで"Human-In-The-Loop"を組んでみました。

こんにちは、CCCMKホールディングス AIエンジニアの三浦です。

4月ですね!今日インターネットでニュースを見ていたら、個人的にすごくびっくりするニュースを見つけました。その後、今日がエイプリルフールだということを思い出しました・・・。

さて最近Agent開発フレームワークのLangGraphについて調べていたのですが、その中で面白そうなトピックを見つけました。それはAgentの"Human-In-The-Loop"の実装に関するもので、今後Agentシステムを開発する際に導入したい、と思う内容でした。

Agentシステムにおける"Human-In-The-Loop"の役割について考えてみます。Agentシステムは複雑なワークフローを状況に応じて自律的に判断して実行する仕組みですが、その中で影響が大きいタスクを実行する場合、実行前に処理を一時停止し、ユーザーに実行内容の確認と承認を求めることが必要です。たとえば商品を購入する、コストが発生するAPIを実行する金銭に関わるタスク、メールの送信、SNSへの投稿といった他者に影響を与えるタスクが該当します。

LangGraphではこういったユーザーの承認を得たり、実行内容を確認・更新する機能をどのように実装するのかを調べ、実際に試してみました。

LangGraphにおけるHuman-In-The-Loop

LangGraphではどのようにHuman-In-The-Loopを実現しているのか。それに関してはこちらのページに記されています。

blog.langchain.dev

重要な点は、処理を中断しユーザーからの指示を受け取って処理を再開するinterruptという関数と、システムの状態を読み書きし永続化する、LangGraphに組み込まれた"persistence layer"です。

LangGraphではシステム(アプリケーション)をグラフ構造で構築します。グラフ内のNodeは関数に相当し、接続されたNode間で逐次処理が実行されていきます。あるNode内でinterrupt関数が呼ばれると、処理がそこで中断され、グラフの状態がpersistence layerに保存されます。再開する時にはユーザーは追加でグラフにパラメータを渡すことが出来、このパラメータはinterrupt関数の戻り値で受け取ることが出来ます。パラメータの値に応じてinterrupt以降の処理を制御することが出来ます。

たとえば再開時のパラメータに"approval"という文字列を渡し、"approval"を受け取った場合のみAPIを実行する、といったことが可能です。

具体的な実装例

では具体的にどのように実装するのかをまとめてみました。こちらの内容は以下を参考にしています。

langchain-ai.github.io

ここで取り上げるアプリケーションは、ユーザーの質問に対してDuckDuckGo APIを呼び出してウェブ検索を実行し、ウェブ検索結果を使って質問に回答する、という動作をします。そしてDuckDuckGo APIを呼び出す前に一度処理を中断し、ユーザーの承認を受けてから処理を再開する、といったことを実現します。

グラフの構造は以下の様になります。

グラフ構造

使用するツール群は以下の通りです。

from langchain_community.tools import DuckDuckGoSearchRun
from langchain_openai import AzureChatOpenAI

# テキスト生成
llm = AzureChatOpenAI(
    temperature=0.0,
    api_version="2024-10-21",
    azure_deployment="gpt-4o"
)

# DuckDuckGo API(ウェブ検索ツール)
search_tool = DuckDuckGoSearchRun()

処理を再開する際には"継続の承認"と"ユーザーによる実行内容の更新・実行"といったパターンを取ることが出来ます。まずユーザーからの"承認"を得る"Approval or Reject"パターンからまとめていきます。

Approval or Reject

LangGraphにおける"Human-In-The-Loop"の実装で特に重要な内容が、先に掲載したグラフの図の中の"human_approval"Nodeに詰まっています。このNodeで実行される関数の内容は以下の様になっています。

def human_approval(state: State) -> Command[Literal["search", END]]:
    question = state["question"]
    query = state["query"]
    is_approval = interrupt(
        {
            "question": "これで検索します。",
            "query": query
        }
    )
    if is_approval:
        return Command(goto="search", update={"question":question, "query": query})
    else:
        return Command(goto=END, update={"answer":"検索をやめました。"})

最初にCommandについて説明します。Commandクラスを使うと次に遷移するNodeと渡すパラメータを同時に指定することが出来ます。Commandを使用するとadd_edgeadd_conditional_edgesを使ったNode間の接続設定が必要なくなります。

interruptを実行するとパラメータで与えたメッセージを含むGraphInterruptという例外が発生し、処理が中断します。

※参考 langchain-ai.github.io

再開時にユーザーが入力したパラメータはinterrupt関数の戻り値(ここだとis_approval)で受け渡すことが出来ます。この例だとTrueまたはFalseを受け取ることを想定していて、Trueが入力された場合に限りDuckDuckGo APIを実行するsearchNodeに移行し、それ以外の場合はENDNodeに移行し、"検索をやめました。"というメッセージを"answer"キーに格納してアプリケーションを終了します。

また、interruptによって処理を中断する際のグラフの状態の保存、再開時の状態の復元のために、LangGraphのpersistence layerへの読み書きが必要になります。これを実現するために、LangGraphのcheckpointerを利用します。ストレージタイプに応じたcheckpointerが用意されていますが、今回はメモリへの格納を行うMemorySaverを使用しました。

checkpointer = MemorySaver()

グラフをコンパイルする際にcheckpointerというパラメータにこれを渡します。

app = graph_builder.compile(checkpointer=checkpointer)

グラフを定義する全体の実装は以下の様になります。

from typing import Literal, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command

checkpointer = MemorySaver()

class State(TypedDict):
    question: str
    query: str
    info: str
    answer: str

def create_query(state: State)->Command[Literal["human_approval"]]:
    question = state["question"]
    rewrite_query_prompt = \
        f"""この質問に回答するために必要な情報を検索するためのクエリを作成し、クエリのみ出力してください。
        質問: {question}
        """
    return Command(
        goto="human_approval",
        update={
            "question": question,
            "query": llm.invoke(rewrite_query_prompt).content
        }
    )

def human_approval(state: State) -> Command[Literal["search", END]]:
    question = state["question"]
    query = state["query"]
    is_approval = interrupt(
        {
            "question": "これで検索します。",
            "query": query
        }
    )
    if is_approval:
        return Command(goto="search", update={"question":question, "query": query})
    else:
        return Command(goto=END, update={"answer":"検索をやめました。"})

def search(state: State)->Command[Literal["generate_answer"]]:
    query = state["query"]
    info = search_tool.invoke(query)
    return Command(
        goto="generate_answer",
        update={
            "question": state["question"],
            "info": info
        }
    )

def generate_answer(state: State) -> Command[Literal[END]]:
    info = state["info"]
    question = state["question"]
    prompt = \
        f"""与えられた情報を使って質問に回答してください。
        情報: {info}
        質問: {question}
        """
    answer = llm.invoke(prompt).content
    return Command(goto=END, update={"answer": answer})

graph_builder = StateGraph(State)
graph_builder.add_node("create_query",create_query)
graph_builder.add_node("human_approval", human_approval)
graph_builder.add_node("search", search)
graph_builder.add_node("generate_answer", generate_answer)
graph_builder.set_entry_point("create_query")

app = graph_builder.compile(checkpointer=checkpointer)

動作を確認してみます。"thred id"に紐づけてグラフの状態を保存するので、実行時に"thread id"を指定する必要があります。

thread_config = {"configurable": {"thread_id": "1"}}

input_data = {
    "question":"日本で一番高い山の高さは?"
}

app.invoke(input_data, config=thread_config)

このコードを実行すると、アプリケーションはhuman_approvalNodeのinterruptの箇所で停止している状態になります。処理を再開し、searchNodeでのAPI実行に進めるためには次のように再度invokeを呼び出します。

app.invoke(Command(resume=True), config=thread_config)

thread_configに紐づいて状態を復元するので、configには先と同じthread_configを指定しなければなりません。再開時はCommandオブジェクトを通じてパラメータを渡すことが出来ます。resumeに渡した値がinterrupt関数の戻り値として使われることになります。

APIを実行せずに終了させる場合は

app.invoke(Command(resume=False), config=thread_config)

のようにresumeFalseを渡します。以下の結果が返ってきました。

{'question': '日本で一番高い山の高さは?', 'query': '日本で一番高い山の高さ', 'answer': '検索をやめました。'}

Edit

Commandresumeパラメータにはたとえば辞書型で値を渡すことが出来ます。これを利用し、単純に"承認"だけでなく、グラフの状態を更新することも可能です。たとえば今回のアプリケーションではDuckDuck Go APIを実行する前に検索用クエリを生成しますが、この検索用クエリを人が書き換える、といったことが可能になります。

human_approvalの中身を次のようにしてみます。

def human_approval(state: State) -> Command[Literal["search", END]]:
    question = state["question"]
    query = state["query"]
    # ここから変更
    human_review = interrupt(
        {
            "question": "これで検索します。",
            "query": query
        }
    )

    review_type = human_review["type"]
    review_data = human_review["data"]
    if review_type == "approval":
        if review_data:
            return Command(goto="search", update={"question":question, "query": query})
        else:
            return Command(goto=END, update={"answer":"検索をやめました。"})
    elif review_type == "update":
        update_query = review_data
        return Command(goto="search", update={"question":question, "query": update_query})
    else:
        return Command(goto=END, update={"answer":"検索をやめました。"})

human_review["type"]で介入タイプ("approval":承認 or "update":更新)を判定し、human_review["data"]で値を取得します。たとえばで介入タイプが"approval"の場合はTrueまたはFalseが格納され、"update"の場合は更新したい検索クエリが格納する、といったことが出来ます。

Agentが作成した検索クエリを更新してアプリケーションを再開する場合は、次のようになります。

app.invoke(
    Command(resume={
        "type": "update",
        "data": "日本 最も高い 山"
    })
    , config=thread_config
)

まとめ

いかがでしょうか。今回の内容を応用することで、Agentシステムが重要な処理を実行する前にユーザーによる承認や実行内容のコントロールが可能になります。ドキュメントを読むだけだと具体的にどんな動作になるのかがイメージが湧かなかったのですが、実際に実装して動かしてみると"こういうことだったのか"とスッキリしました。LangGraphにはAgentシステムを実装するための機能が一通り揃っているので、Agentシステムを組む際に考えなければならない事項を知ることが出来、とても面白いです!