CCCMKホールディングス TECH LABの Tech Blog

TECH LABのエンジニアが技術情報を発信しています

ブログタイトル

LangGraphでMap-reduce型のテキスト参照処理を作ってみました。

こんにちは、CCCMKホールディングスTECH LABの三浦です。

最近寒いですね。朝吐く息が白くなったのを見て、寒くなったなぁと感じました。

はじめに

LLMアプリケーションの開発をグラフ構造を組み立てて進めることが出来るLangGraphというライブラリがあります。最近ドキュメントを読んでいたところ、「これ、使ってみたい!」という機能がありました。Sendというクラスで、これを使うことで複数の処理を並列で実行し、その結果を集約する、というMap-reduceの処理をLangGraphで組むことが出来るようになります。

実は最近Retrieval-Augmented Generation(RAG)を使った取り組みの中でなかなか解決出来ない課題がありました。それは与えられた文章を要約させる、というタスクが従来のRAGだと上手くこなせない、という課題です。

基本形のRAGは、ユーザーの質問文と関係性がある部分だけを参照ドキュメントから抽出し、回答生成に利用します。要約系のタスクは参照ドキュメント全体に目を通す必要があるため、基本形のRAGでは対応することが出来ません。

そこで参照ドキュメントを複数個の小さなパーツ(chunk)に分割し、それぞれのchunkごとに要約を生成し、最後にそれらの要約を集約して参照ドキュメントとして与えて要約文を生成すれば、1つ1つの処理で扱うテキストのサイズは小さくても大きなドキュメントの要約を生成することが出来そうです。そして各chunkごとに要約を生成し、それらを集約する処理はMap-reduceの処理で書くことが出来ると考えました。

今回はLangChainのSendクラスを使ってMap-reduceの処理を書いて、大きなドキュメントの要約文を生成する処理を組んでみましたのでご紹介したいと思います。

※注意 記事の終わりの"課題"にも書いたのですが今回のアプローチは参照するドキュメントによっては大量のトークンを消費するので、小さいサイズのドキュメントで試すことをお勧めします。

参照するドキュメント

参照するドキュメントは、以前私が書いたこちらの記事です。

techblog.cccmkhd.co.jp

この記事の内容をテキストファイル(txt)に保存したものを参照するドキュメントとして使用しました。

Graphの構造

今回組んだアプリケーションを構成するGraph構造は以下の様になります。

Graph構造

  1. 質問文の回答にドキュメントの参照が必要か否かで処理を分岐します。必要な場合は対象のドキュメントをchunkに分割する"split_text"Nodeに遷移、必要ない場合はLLMに回答を生成させる"chat"のNodeに遷移します。"chat"Node実行後はアプリケーションを終了します。
  2. 参照ドキュメントのテキストをchunk_sizeに基づいて分割します。
  3. chunkごとにSendオブジェクトを生成し、並列処理を実行します。
  4. Graphの状態(State)に格納される各プロセスの実行結果を参照して質問に対する回答を生成します。

実装

実装です。今回私はAzure Databricksでこの実験を行いました。使用するLLMはAzure DatabricksのFoundation Model APIで提供されているものを使ってみました。

ライブラリのインストール

以下のライブラリをインストールしました。

  • langchain==0.3.4
  • databricks-langchain
  • langgraph==0.2.39

GraphのState

LangGraphはアプリケーション実行中、各NodeにStateを通じてデータを送ることが出来ます。 また今回は並列処理を実行するのですが、並列処理で実行されるNodeにデータを渡すためのStateも定義しました。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_databricks import ChatDatabricks
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph 
from pydantic import BaseModel, Field


class ChunkTextState(TypedDict):
  """並列実行時に各プロセスに渡されるstate"""
  chunk_text: str # 1chunk
  question: str # 質問

class GraphState(TypedDict):
  """Graph全体で保持するState"""
  text: str # 参照ドキュメント
  chunk_answers: Annotated[list, operator.add] # 各chunkに対するanswer
  chunk_texts: list[ChunkTextState] # ドキュメントのchunk
  question: str # 質問
  response: str # 回答

ルーティング機能の実装

アプリケーションは質問に対し、ドキュメントの参照が必要であると判断した場合とそうでない場合で遷移先のNodeが分岐します。(split_textchat)

それを実現するために、Graphにルーティング機能を実装します。どちらのNodeに進むべきかはLLMに判断させるのですが、判断結果の形式に表記ゆれ等があると制御が難しくなります。langchainには"Structured Output"という機能があり、これを使うと出力形式を制御出来るようになります。ただしStructured Outputが可能なLLMは"Tool Calling"に対応したLLMである必要があります。

endpoint = "databricks-meta-llama-3-1-70b-instruct"

class Route(BaseModel):
    route: Literal["chat", "splitted_text"] = Field(
        ...,
        description="ユーザーから与えられたクエリに対し, chatかsplitted_textかを選択する。",
    )

def route(state: GraphState):
  """questionを見て次のNodeを決める"""
  question = state["question"]
  system_prompt = """あなたはユーザーのクエリに対し、適切なルーティングを行うことが出来るアシスタントです。クエリに回答するために追加の情報が必要な場合は"splitted_text"を、それ以外は"chat"を選択してください。"""
  prompt = ChatPromptTemplate.from_messages(
    [("system", system_prompt),("user","{question}")]
  )
  llm = ChatDatabricks(endpoint=endpoint, temperature=0.0)
  structured_llm_router = llm.with_structured_output(Route)
  router = prompt | structured_llm_router

  response = router.invoke({"question": question})

  if response.route == "split_text":
    return "split_text"
  else:
    return "chat"

Nodeの実装

Nodeを以下の様に実装しました。

def chat(state: GraphState):
  """回答に参照ドキュメントを必要としない時にLLMに回答を生成させる"""
  question = state["question"]
  system_prompt = """あなたは親切なアシスタントです。ユーザーのクエリに回答してください。"""
  prompt = ChatPromptTemplate.from_messages(
    [("system", system_prompt),("user","{question}")]
  )
  llm = ChatDatabricks(endpoint=endpoint, temperature=0.0)
  chain = prompt|llm|StrOutputParser()
  response = chain.invoke({"question":question})
  return {"response": response}


def split_text(state: GraphState):
  """参照ドキュメントを分割する"""
  text = state["text"]
  chunk_size = 1500
  text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size,
    chunk_overlap=0
  )
  question = state["question"]
  splitted_docs = text_splitter.split_text(text)
  return {
    "chunk_text": [
      ChunkTextState(chunk_text=t, question=question) for t in splitted_docs
    ],
    "question": question
  }

def generate_answer_from_chunk(state: ChunkTextState):
  """chunkに対する回答を生成する"""
  prompt = PromptTemplate.from_template(
    """
    与えられた情報を参照してユーザーの指示に回答するために必要な情報を抜き出し、200文字程度でまとめてください。
    もし答えられない場合は答えられない理由を述べて下さい。
    指示: {question}
    情報: {chunk_text}
    回答: """
  )
  question = state["question"]
  chunk_text = state["chunk_text"]
  llm = ChatDatabricks(endpoint=endpoint, temperature=0.0)
  chain = prompt|llm|StrOutputParser()

  response = chain.invoke({"question": question, "chunk_text": chunk_text})
  return {"chunk_answers":[response]}

def generate_answer(state: GraphState):
  """chunkに対する回答を参照して回答を生成する。"""
  prompt = PromptTemplate.from_template(
    """
    ユーザーの指示に与えられた情報を使って回答してください。
    与えられた情報はある文章からユーザーの指示に回答するために必要な内容だけを事前に抽出したものです。
    もし答えられない場合は分からない、と答えて下さい。
    指示: {question}
    情報: {answers}
    回答: """
  )
  question = state["question"]
  answers = "\n".join(state["chunk_answers"])
  llm = ChatDatabricks(endpoint=endpoint, temperature=0.0)
  chain = prompt|llm|StrOutputParser()
  response = chain.invoke({"question": question, "answers": answers})
  return {"response": response, "chunk_answers": state["chunk_answers"]}

並列処理の実行部分

まず以下のような関数を作成します。

def continue_generate_answer_from_chunk(state: GraphState):
  """並列処理を実行する"""
  return [Send("generate_answer_from_chunk", t) for t in state["chunk_text"]]

この関数をGraph構築時にadd_conditional_edgesで指定することで並列処理を実行することが出来ます。

Graphの構築

Graphを構築します。

graph_builder = StateGraph(GraphState)

# Nodeの追加
graph_builder.add_node("chat", chat)
graph_builder.add_node("split_text", split_text)
graph_builder.add_node("generate_answer_from_chunk", generate_answer_from_chunk)
graph_builder.add_node("generate_answer", generate_answer)

# Edgeの追加
graph_builder.add_edge(START, "split_text")
graph_builder.add_edge("generate_answer_from_chunk", "generate_answer")
graph_builder.add_edge("generate_answer", END)
graph_builder.add_edge("chat", END)

# Conditonal Edgeの追加
graph_builder.add_conditional_edges(
  START,
  route,
  {
    "chat": "chat",
    "split_text": "split_text"
  }
)
graph_builder.add_conditional_edges(
  "split_text", 
  continue_generate_answer_from_chunk, 
  ["generate_answer_from_chunk"]
)

# Graph構築
graph = graph_builder.compile()

実行する

作ったアプリケーションを実行してみます。ブログのテキストファイルを読み込んで、質問と一緒に入力しています。

with open("./blog.txt", "r") as f:
  blog_content = f.read()

question = "この文章で書かれていることの中で特に面白い点を5つ説明して。"

result = graph.invoke(
  {"text": blog_content,"question": question}
)

結果はこちらです。通常のRAGでは難しい、文章全体からトピックを抜き出すことが出来ていることが分かります。

'1.  人格設定したLLMが生成した文章の評価方法として"LIWC-22"というアプリケーションを利用する方法があることを知ることができた。\n2.  BFIテストを用いることで人格を定量化して評価できることが分かった。\n3.  評価を人に依頼する際には"その文章をLLMが書いた"ことを知らせるか否かも大事な要素であることを知ることができた。\n4.  論文で参照されている他の研究内容もとても面白そうだった。\n5.  Big Five personality traitsに基づいたLLM personaにストーリーを書かせてみた。'

課題

このアプローチには課題があります。まずトークン数の消費がとても大きい点です。参照ドキュメントの情報を全てLLMに与えるためです。また、処理時間もかなりかかります。特に外部で提供されているLLMをAPIで利用する場合は、レスポンスが返って来ない、といったことも発生するかもしれません。chunk_sizeを大きくし、並列処理数を小さくする、などの工夫が必要だと考えられます。

まとめ

ということで今回はLangGraphのMap-reduceの機能を使って文章全体を見て回答が出来るアプリケーションを作ってみた話をご紹介しました。色々と課題もありますが、比較的短いコードで実現出来ることが分かりました。結構色々なシーンで活用できる機能だと感じました。