こんにちは、CCCMKホールディングス AIエンジニアの三浦です。
4月ですね!今日インターネットでニュースを見ていたら、個人的にすごくびっくりするニュースを見つけました。その後、今日がエイプリルフールだということを思い出しました・・・。
さて最近Agent開発フレームワークのLangGraphについて調べていたのですが、その中で面白そうなトピックを見つけました。それはAgentの"Human-In-The-Loop"の実装に関するもので、今後Agentシステムを開発する際に導入したい、と思う内容でした。
Agentシステムにおける"Human-In-The-Loop"の役割について考えてみます。Agentシステムは複雑なワークフローを状況に応じて自律的に判断して実行する仕組みですが、その中で影響が大きいタスクを実行する場合、実行前に処理を一時停止し、ユーザーに実行内容の確認と承認を求めることが必要です。たとえば商品を購入する、コストが発生するAPIを実行する金銭に関わるタスク、メールの送信、SNSへの投稿といった他者に影響を与えるタスクが該当します。
LangGraphではこういったユーザーの承認を得たり、実行内容を確認・更新する機能をどのように実装するのかを調べ、実際に試してみました。
LangGraphにおけるHuman-In-The-Loop
LangGraphではどのようにHuman-In-The-Loopを実現しているのか。それに関してはこちらのページに記されています。
重要な点は、処理を中断しユーザーからの指示を受け取って処理を再開するinterrupt
という関数と、システムの状態を読み書きし永続化する、LangGraphに組み込まれた"persistence layer"です。
LangGraphではシステム(アプリケーション)をグラフ構造で構築します。グラフ内のNodeは関数に相当し、接続されたNode間で逐次処理が実行されていきます。あるNode内でinterrupt
関数が呼ばれると、処理がそこで中断され、グラフの状態がpersistence layerに保存されます。再開する時にはユーザーは追加でグラフにパラメータを渡すことが出来、このパラメータはinterrupt
関数の戻り値で受け取ることが出来ます。パラメータの値に応じてinterrupt
以降の処理を制御することが出来ます。
たとえば再開時のパラメータに"approval"という文字列を渡し、"approval"を受け取った場合のみAPIを実行する、といったことが可能です。
具体的な実装例
では具体的にどのように実装するのかをまとめてみました。こちらの内容は以下を参考にしています。
ここで取り上げるアプリケーションは、ユーザーの質問に対してDuckDuckGo APIを呼び出してウェブ検索を実行し、ウェブ検索結果を使って質問に回答する、という動作をします。そしてDuckDuckGo APIを呼び出す前に一度処理を中断し、ユーザーの承認を受けてから処理を再開する、といったことを実現します。
グラフの構造は以下の様になります。
使用するツール群は以下の通りです。
from langchain_community.tools import DuckDuckGoSearchRun from langchain_openai import AzureChatOpenAI # テキスト生成 llm = AzureChatOpenAI( temperature=0.0, api_version="2024-10-21", azure_deployment="gpt-4o" ) # DuckDuckGo API(ウェブ検索ツール) search_tool = DuckDuckGoSearchRun()
処理を再開する際には"継続の承認"と"ユーザーによる実行内容の更新・実行"といったパターンを取ることが出来ます。まずユーザーからの"承認"を得る"Approval or Reject"パターンからまとめていきます。
Approval or Reject
LangGraphにおける"Human-In-The-Loop"の実装で特に重要な内容が、先に掲載したグラフの図の中の"human_approval"Nodeに詰まっています。このNodeで実行される関数の内容は以下の様になっています。
def human_approval(state: State) -> Command[Literal["search", END]]: question = state["question"] query = state["query"] is_approval = interrupt( { "question": "これで検索します。", "query": query } ) if is_approval: return Command(goto="search", update={"question":question, "query": query}) else: return Command(goto=END, update={"answer":"検索をやめました。"})
最初にCommand
について説明します。Command
クラスを使うと次に遷移するNodeと渡すパラメータを同時に指定することが出来ます。Command
を使用するとadd_edge
やadd_conditional_edges
を使ったNode間の接続設定が必要なくなります。
interrupt
を実行するとパラメータで与えたメッセージを含むGraphInterrupt
という例外が発生し、処理が中断します。
再開時にユーザーが入力したパラメータはinterrupt
関数の戻り値(ここだとis_approval
)で受け渡すことが出来ます。この例だとTrue
またはFalse
を受け取ることを想定していて、True
が入力された場合に限りDuckDuckGo APIを実行するsearch
Nodeに移行し、それ以外の場合はEND
Nodeに移行し、"検索をやめました。"というメッセージを"answer"キーに格納してアプリケーションを終了します。
また、interrupt
によって処理を中断する際のグラフの状態の保存、再開時の状態の復元のために、LangGraphのpersistence layerへの読み書きが必要になります。これを実現するために、LangGraphのcheckpointer
を利用します。ストレージタイプに応じたcheckpointer
が用意されていますが、今回はメモリへの格納を行うMemorySaver
を使用しました。
checkpointer = MemorySaver()
グラフをコンパイルする際にcheckpointer
というパラメータにこれを渡します。
app = graph_builder.compile(checkpointer=checkpointer)
グラフを定義する全体の実装は以下の様になります。
from typing import Literal, TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, StateGraph from langgraph.prebuilt import ToolNode from langgraph.types import interrupt, Command checkpointer = MemorySaver() class State(TypedDict): question: str query: str info: str answer: str def create_query(state: State)->Command[Literal["human_approval"]]: question = state["question"] rewrite_query_prompt = \ f"""この質問に回答するために必要な情報を検索するためのクエリを作成し、クエリのみ出力してください。 質問: {question} """ return Command( goto="human_approval", update={ "question": question, "query": llm.invoke(rewrite_query_prompt).content } ) def human_approval(state: State) -> Command[Literal["search", END]]: question = state["question"] query = state["query"] is_approval = interrupt( { "question": "これで検索します。", "query": query } ) if is_approval: return Command(goto="search", update={"question":question, "query": query}) else: return Command(goto=END, update={"answer":"検索をやめました。"}) def search(state: State)->Command[Literal["generate_answer"]]: query = state["query"] info = search_tool.invoke(query) return Command( goto="generate_answer", update={ "question": state["question"], "info": info } ) def generate_answer(state: State) -> Command[Literal[END]]: info = state["info"] question = state["question"] prompt = \ f"""与えられた情報を使って質問に回答してください。 情報: {info} 質問: {question} """ answer = llm.invoke(prompt).content return Command(goto=END, update={"answer": answer}) graph_builder = StateGraph(State) graph_builder.add_node("create_query",create_query) graph_builder.add_node("human_approval", human_approval) graph_builder.add_node("search", search) graph_builder.add_node("generate_answer", generate_answer) graph_builder.set_entry_point("create_query") app = graph_builder.compile(checkpointer=checkpointer)
動作を確認してみます。"thred id"に紐づけてグラフの状態を保存するので、実行時に"thread id"を指定する必要があります。
thread_config = {"configurable": {"thread_id": "1"}} input_data = { "question":"日本で一番高い山の高さは?" } app.invoke(input_data, config=thread_config)
このコードを実行すると、アプリケーションはhuman_approval
Nodeのinterrupt
の箇所で停止している状態になります。処理を再開し、search
NodeでのAPI実行に進めるためには次のように再度invoke
を呼び出します。
app.invoke(Command(resume=True), config=thread_config)
thread_config
に紐づいて状態を復元するので、config
には先と同じthread_config
を指定しなければなりません。再開時はCommand
オブジェクトを通じてパラメータを渡すことが出来ます。resume
に渡した値がinterrupt
関数の戻り値として使われることになります。
APIを実行せずに終了させる場合は
app.invoke(Command(resume=False), config=thread_config)
のようにresume
にFalse
を渡します。以下の結果が返ってきました。
{'question': '日本で一番高い山の高さは?', 'query': '日本で一番高い山の高さ', 'answer': '検索をやめました。'}
Edit
Command
のresume
パラメータにはたとえば辞書型で値を渡すことが出来ます。これを利用し、単純に"承認"だけでなく、グラフの状態を更新することも可能です。たとえば今回のアプリケーションではDuckDuck Go APIを実行する前に検索用クエリを生成しますが、この検索用クエリを人が書き換える、といったことが可能になります。
human_approval
の中身を次のようにしてみます。
def human_approval(state: State) -> Command[Literal["search", END]]: question = state["question"] query = state["query"] # ここから変更 human_review = interrupt( { "question": "これで検索します。", "query": query } ) review_type = human_review["type"] review_data = human_review["data"] if review_type == "approval": if review_data: return Command(goto="search", update={"question":question, "query": query}) else: return Command(goto=END, update={"answer":"検索をやめました。"}) elif review_type == "update": update_query = review_data return Command(goto="search", update={"question":question, "query": update_query}) else: return Command(goto=END, update={"answer":"検索をやめました。"})
human_review["type"]
で介入タイプ("approval":承認 or "update":更新)を判定し、human_review["data"]
で値を取得します。たとえばで介入タイプが"approval"の場合はTrue
またはFalse
が格納され、"update"の場合は更新したい検索クエリが格納する、といったことが出来ます。
Agentが作成した検索クエリを更新してアプリケーションを再開する場合は、次のようになります。
app.invoke( Command(resume={ "type": "update", "data": "日本 最も高い 山" }) , config=thread_config )
まとめ
いかがでしょうか。今回の内容を応用することで、Agentシステムが重要な処理を実行する前にユーザーによる承認や実行内容のコントロールが可能になります。ドキュメントを読むだけだと具体的にどんな動作になるのかがイメージが湧かなかったのですが、実際に実装して動かしてみると"こういうことだったのか"とスッキリしました。LangGraphにはAgentシステムを実装するための機能が一通り揃っているので、Agentシステムを組む際に考えなければならない事項を知ることが出来、とても面白いです!