/ Дневник курса / Урок 6 /

LangGraph: агент как машина состояний

LangGraph описывает агента как граф: ветвления, циклы, сохранение прогресса и паузы на подтверждение человека — то, чего линейная цепочка не даёт.

  • langgraph
  • ai-agents
  • orchestration
  • state-machine

Линейная цепочка вход → LLM → tool → выход не умеет ветвиться, циклиться, прерываться и восстанавливаться — а агенту всё это нужно. LangGraph описывает агента как граф из узлов и рёбер поверх единого типизированного состояния. Разберём сборку графа, reducer’ы state, checkpointer для возобновляемости, паузы на человеческий аппрув и time-travel — на актуальном API 2026 года, с пометками, что из старого синтаксиса уже устарело.

1. Агенты — это графы, а не цепочки

Главный тезис урока: агенту нужен контроль потока исполнения, прерывание с восстановлением и управление состоянием — ровно то, чего цепочка не даёт. LangChain-цепочка хороша как API для прототипа: один проход, без циклов, падает на первой ошибке и не воспроизводится. Для прода этого мало.

LangGraph реализует паттерн state machine (машина состояний): каждый шаг агента — это узел графа с явно определённым состоянием. Три примитива, из которых собирается любая логика:

  • State (состояние) — единое строго типизированное хранилище данных, которое ходит между всеми узлами.
  • Node (узел) — Python-функция: читает state, возвращает его частичное обновление.
  • Edge (ребро) — переход между узлами, обычный или условный (функция-маршрутизатор выбирает следующий узел).

Показательная деталь: высокоуровневая create_agent() из LangChain сама написана на LangGraph. «Готовый» ReAct-агент под капотом — тот же граф из узлов и условного ребра.

2. State и reducer’ы: как обновляется состояние

State определяет схему всех данных графа. На уровне графа оно неизменяемо: узел не мутирует объект на месте, а возвращает описание обновления, которое движок применяет сам. Именно из неизменяемости растёт всё остальное — раз на каждом шаге можно снять снимок состояния, его можно сохранить, прервать и переиграть.

Схема задаётся через TypedDict, Pydantic-модель или dataclass. Но вот вопрос: узел вернул {"messages": [...]} — движку заменить старое значение или слить с ним? Это решение и есть reducer. Без него канал работает по принципу last-write-wins (перезапись); кастомное слияние задают через Annotated[...].

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph.message import add_messages

class State(TypedDict):
    simple_value: str                       # перезаписывается (поведение по умолчанию)
    numbers: Annotated[list, add]           # новое значение мержится со старым
    total: Annotated[int, lambda x, y: x + y]  # кастомная логика слияния
    messages: Annotated[list, add_messages]    # история диалога

Самый частый случай — история сообщений. Перезапись здесь потеряла бы весь диалог; нужно накапливать. Reducer add_messages не просто конкатенирует: он дописывает новые сообщения, резолвит обновления по уникальным id (без дублей) и авто-конвертит dict-payload в BaseMessage. Если же reducer надо разово обойти и сбросить канал, обновление оборачивают в Overwrite:

from langgraph.types import Overwrite

def reset_node(state: State) -> dict:
    return {"messages": Overwrite([])}  # сбросить историю в обход add_messages

3. Узлы, рёбра и сборка графа

Логика живёт в узлах, поток — в рёбрах. Маршрут не зашит в код функций, а описан декларативно: его можно ветвить, замыкать в цикл и визуализировать. Узел — функция state -> dict; ребро задаёт порядок. Обычное (add_edge("A", "B")) — жёсткий факт «B после A». Условное (add_conditional_edges) — функция-маршрутизатор возвращает имя следующего узла на основе state, и условием тут служит что угодно, вплоть до решения самой LLM.

Граф собирается билдером StateGraph и фиксируется вызовом .compile().

from langgraph.graph import StateGraph, START, END

def route_by_number(state: State) -> str:   # функция-маршрутизатор
    return "positive" if state["number"] > 0 else "negative"

builder = StateGraph(State)
builder.add_node("check", check_number)
builder.add_node("positive", handle_positive)
builder.add_node("negative", handle_negative)
builder.add_edge(START, "check")
builder.add_conditional_edges("check", route_by_number,
                              {"positive": "positive", "negative": "negative"})
builder.add_edge("positive", END)
builder.add_edge("negative", END)
graph = builder.compile()

Тот же механизм даёт минимальный агентный цикл — ReAct, выраженный как граф. Два узла (agent и tools) замкнуты условным ребром: маршрутизатор смотрит на последнее сообщение модели, есть tool_calls — идём в инструменты и возвращаемся в agent, нет — на выход.

from langgraph.prebuilt import ToolNode
from langchain.messages import SystemMessage

model = chat_model.bind_tools([pay])  # модель получает схемы инструментов

def agent(state: State) -> dict:
    return {"messages": [model.invoke([SystemMessage(SYSTEM_PROMPT)] + state["messages"])]}

def router(state: State) -> str:
    last = state["messages"][-1]
    return "tools" if getattr(last, "tool_calls", None) else END

builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_node("tools", ToolNode([pay]))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", router)
builder.add_edge("tools", "agent")   # замыкаем цикл
app = builder.compile()

Если кастомный цикл не нужен, тот же ReAct-агент собирается одной строкой — но через новый путь from langchain.agents import create_agent (старый create_react_agent из langgraph.prebuilt помечен deprecated):

from langchain.agents import create_agent

app = create_agent(model, tools=[pay], system_prompt=SYSTEM_PROMPT)

Ручной граф берут, когда нужен контроль над циклом, кастомные узлы или нестандартное ветвление; готовая обёртка — когда хватает классического «думай → вызови инструмент → наблюдай».

4. Циклы, лимиты и надёжность

Цикличный граф (узел возвращает управление себе) рискует зациклиться, если маршрутизатор не сработает — например, агент бесконечно дописывает сообщения. Поэтому в циклах обязателен жёсткий лимит итераций плюс условное ребро на выход или fallback при превышении порога.

Второй слой надёжности — RetryPolicy: повтор узла при временных ошибках (таймаут API, невалидный JSON) с экспоненциальной задержкой. Важный нюанс — retry_on оценивается в рантайме, чтобы отличать временную (transient) ошибку от бага в коде: TypeError падает сразу, без повторов.

from langgraph.types import RetryPolicy

retry = RetryPolicy(max_attempts=3, initial_interval=1.0,
                    backoff_factor=2.0, max_interval=10.0, jitter=True)  # 1с→2с→4с, ±50%
builder.add_node("api_call", api_call_node, retry=retry)

Свежие версии добавили ещё два механизма поверх ретраев. Per-node timeout через add_node(..., timeout=...): при превышении поднимается NodeTimeoutError, чистятся pending writes и запускается retry-политика. Node-level error_handler — callback, который вызывается при исчерпании ретраев с контекстом исключения и может вернуть Command для роутинга в compensation-узел (паттерны Saga / rollback):

builder.add_node("charge", charge_node,
                 retry=retry, timeout=30.0, error_handler=compensation_node)

И поверх всего — идемпотентность: повторный вызов узла с теми же входными данными даёт тот же результат. Она обязательна, потому что узел может вызваться снова при retry, восстановлении из чекпоинта или возобновлении после паузы. Приёмы: детерминировать LLM (temperature=0, seed), проверять operation_id перед необратимой операцией, передавать idempotency_key в API, кэшировать результат в state.

5. Checkpointer: возобновляемость по thread_id

По умолчанию состояние живёт в оперативной памяти — сервер перезапустился, и весь прогресс потерян. Решение: подключить checkpointer при компиляции. Он пишет снимок состояния на каждой границе superstep, и любой запуск с тем же thread_id подхватывает сохранённую историю.

   ┌──────────────────────────────────┐
   │           checkpointer           │
   │   (InMemory / Sqlite / Postgres) │
   └──────────────┬───────────────────┘
                  │ снимок на каждом superstep
                  ▼
  START ─▶ узел A ─▶ узел B ─▶ узел C ─▶ END
                  ▲
   thread_id ─────┘  любой запуск с тем же id
                     продолжает с сохранённого места
Рисунок — checkpointer (InMemorySaver / SqliteSaver / PostgresSaver) сохраняет снимок состояния на каждой границе superstep; запуск с тем же thread_id восстанавливает прогресс с последнего чекпоинта, а не с нуля.

thread_id — это идентификатор сессии: каждый пользователь восстанавливается независимо, что заодно даёт многопользовательские диалоги. Канонический класс памяти теперь InMemorySaver (MemorySaver остался алиасом); для отладки — он, для прода — SqliteSaver/PostgresSaver. Код графа при смене хранилища не меняется — только строка подключения.

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

checkpointer = AsyncPostgresSaver(conn)
await checkpointer.setup()                          # инициализация таблиц
app = builder.compile(checkpointer=checkpointer)

thread = {"configurable": {"thread_id": "chat_123"}}
await app.ainvoke({"messages": [...]}, thread)
# после рестарта сервера состояние есть в БД — продолжаем тот же thread_id:
state = await app.aget_state(thread)

Тонкий момент для прода: в state часто лежат API-ключи и креды. Чекпоинтер оборачивают в EncryptedSerializer.from_pycryptodome_aes(key) — шифрование at-rest, чтобы секреты не легли в БД открытым текстом.

6. Human-in-the-loop: пауза на аппрув

Доверите финансовому агенту свои деньги без единой проверки? Вот и нет. Human-in-the-loop (человек в цикле) — пауза исполнения в критической точке, чтобы человек подтвердил или отредактировал действие, после чего граф продолжает с того же места. Работает только поверх checkpointer: прервать и возобновить можно лишь персистентное состояние.

Каноничный способ — динамический interrupt() прямо внутри узла. При вызове узел приостанавливается, состояние пишется в checkpointer, а JSON-сериализуемый payload возвращается клиенту; поток ждёт. Возобновление — Command(resume=value), и это значение возвращается как результат исходного вызова interrupt().

from langgraph.types import Command, interrupt

def human_node(state: State) -> dict:
    feedback = interrupt({"question": "Одобряете заказ?", "amount": state["amount"]})
    return {"approved": feedback["approved"]}

# первый запуск прервётся внутри human_node:
async for event in app.astream(input_data, thread):
    print(event)
# человек нажал «Approve» — возобновляем:
await app.ainvoke(Command(resume={"approved": True}), thread)

Раньше точки паузы объявляли статически при компиляции — compile(interrupt_before=[...], interrupt_after=[...]). Этот синтаксис помечен deprecated в пользу динамического interrupt(): решение прервать принимается в рантайме внутри узла, а не зашивается в граф заранее.

   агент ──▶  human_node ──interrupt()──▶  [ПАУЗА]
                                              │ state в checkpointer
                                              ▼
                                       внешний аппрув
                                       (человек: approve / edit)
                                              │
   агент  ◀──Command(resume=…)──◀─────────────┘
Рисунок — HITL-пауза: узел вызывает interrupt(), состояние сохраняется в checkpointer, поток ждёт внешнего решения человека; Command(resume=value) возвращает это значение в точку вызова interrupt() и граф продолжает.

Здесь есть ловушка идемпотентности: узел перезапускается с начала, поэтому необратимый код нельзя ставить до interrupt(). Классический баг — charge_card() перед прерыванием: первый проход списал деньги и встал на паузу, после resume узел стартует заново и списывает второй раз. Лечится выносом необратимой операции за узел с interrupt() (см. runbook ниже).

7. Time-travel: отладка недетерминизма

Недетерминизм LLM делает классическую отладку почти невозможной — один и тот же вход даёт разные ответы, баг не воспроизвести как в обычном бэкенде. Раз каждый шаг сохранён как чекпоинт, появляется time-travel: вернуться в любую точку истории и либо воспроизвести её (Replay), либо подменить состояние и пойти альтернативным путём (Fork). Это и превращает недетерминированного агента в воспроизводимый процесс.

История доступна через get_state_history — список снимков с checkpoint_id и значениями. Fork применяет значения к выбранному чекпоинту через его reducer’ы, создавая новую ветку без изменения исходной истории:

history = [s async for s in app.aget_state_history(thread)]
old = history[1]                                  # откат к нужному чекпоинту

fork = {"configurable": {"thread_id": "fork_456"}}   # новый thread — новая ветка
await app.aupdate_state(fork, old.values,
        checkpoint_id=old.config["configurable"]["checkpoint_id"])
await app.ainvoke({"messages": [...]}, fork)         # «а что если?» на других данных

Нюанс с подграфами: по умолчанию subgraph наследует чекпоинтер родителя и трактуется как один superstep — внутрь него time-travel не зайдёт. Для пошаговой отладки subgraph компилируют со своим чекпоинтером (compile(checkpointer=True)), а доступ к его истории — через get_state(config, subgraphs=True). Отдельно стоит держать в уме стоимость replay: если на пути дорогие LLM-узлы, повторный проход недёшев — их результаты кэшируют в state, чтобы переигрывать только дешёвую часть.

8. Собираем контролируемый workflow

Все примитивы складываются в одну архитектуру production-агента:

  • Персистентное хранилище state настроено (Postgres-checkpointer).
  • Узел LLM не вызывает опасные операции без HITL и не выполняет детерминированную логику.
  • Узел Tools выполняет всю детерминированную работу.
  • Узел Human Approval проверяет рискованные действия через прерывание.
  • Узел Error Handler обрабатывает ошибки: retry, fallback, compensation.

Что в итоге получает такой граф по сравнению с цепочкой: воспроизводимость (любую цепочку можно повторить), наблюдаемость (трейсинг шагов через LangGraph Studio или LangFuse), управляемость (человек вмешивается на любом шаге), безопасность (опасные действия за HITL) и отказоустойчивость (не падает на таймаутах API и невалидном JSON от модели).

Типовые отказы и их лечение удобно держать как runbook:

Отказ Решение
Потеря состояния PostgreSQL-checkpointer
Бесконечные циклы Лимит итераций + fallback
Неидемпотентный код до interrupt() Вынести код за узел с interrupt()
Опасные действия агента interrupt на каждый опасный tool
Дорогие LLM-узлы при replay Кэшировать результаты в state

Особняком стоит Functional API — императивная альтернатива графу. @task оборачивает вычислительный шаг, @entrypoint(checkpointer=...) оркеструет его обычными Python-конструкциями (if, for, вызовы функций) без явной схемы state и рёбер. Состояние тут scoped к локальным переменным функции, не шарится глобально, и визуального time-travel нет — императивный call stack его не поддерживает. Выбор простой: декларативный граф берут под циклы, HITL и time-travel; Functional API — под линейную логику, где граф только добавляет шаблонный код (boilerplate).

Итог

  • Агент — это граф поверх типизированного state, а не линейная цепочка: ветвление, циклы, паузы и восстановление встроены в модель, а не прикручены сбоку.
  • Reducer решает, как сливать обновления state; add_messages — для истории диалога. Без reducer’а — перезапись.
  • Checkpointer + thread_id дают возобновляемость и заодно включают HITL и time-travel — без персистентности их попросту нет.
  • HITL — это динамический interrupt() + Command(resume=...); статические брейкпойнты устарели. Главный риск — неидемпотентный код до прерывания.
  • Надёжность собирается слоями: лимит итераций, RetryPolicy, per-node timeout, error_handler и идемпотентность узлов.
  • Актуальность API в 2026: InMemorySaver (не MemorySaver), create_agent (не create_react_agent), динамический interrupt() (не interrupt_before/after), Python ≥ 3.10.

FAQ

Чем LangGraph отличается от LangChain?

LangChain — это API для прототипирования: цепочки прогоняют данные линейно за один проход, без циклов и без восстановления после сбоя. LangGraph оркеструет агента как граф состояний с ветвлением, циклами, retry, персистентностью и human-in-the-loop. Показательно, что высокоуровневая create_agent() из LangChain сама написана на LangGraph.

Что такое reducer в LangGraph и когда он нужен?

Reducer — функция, которая решает, как обновление от узла соединяется со старым значением поля state. Без reducer’а поле перезаписывается (last-write-wins). Он нужен, когда значения надо накапливать, а не заменять: самый частый случай — история сообщений через add_messages, которая дописывает новые сообщения и резолвит дубли по id.

Зачем нужен checkpointer и thread_id?

Checkpointer сохраняет снимок состояния графа на каждом шаге в хранилище (память, SQLite, Postgres), а thread_id идентифицирует сессию. Вместе они дают возобновляемость: после перезапуска сервера агент продолжает с того же места, а не с нуля. На этой же персистентности стоят human-in-the-loop и time-travel — без неё прервать и переиграть исполнение нельзя.

Как сделать паузу на подтверждение человеком?

Вызвать interrupt(payload) прямо внутри узла: исполнение приостановится, состояние запишется в checkpointer, а payload вернётся клиенту. После решения человека граф возобновляют через Command(resume=value), и это значение приходит как результат исходного вызова interrupt(). Статические interrupt_before/interrupt_after для этого устарели.

Какой API LangGraph устарел к 2026 году?

create_react_agent из langgraph.prebuilt заменён на create_agent из langchain.agents. Статические брейкпойнты interrupt_before/interrupt_after уступили динамическому interrupt(). Канонический класс памяти теперь InMemorySaver (MemorySaver — алиас). Python 3.9 снят с поддержки, минимум по экосистеме — 3.10.

Что такое time-travel и чем Replay отличается от Fork?

Time-travel — отладка через историю чекпоинтов: возврат в любую сохранённую точку исполнения. Replay воспроизводит состояние с прошлого чекпоинта на том же thread_id. Fork применяет новые значения к выбранному чекпоинту через его reducer’ы и запускает альтернативную ветку в новом thread_id, не трогая исходную историю — это способ проверить гипотезу «а что если» на недетерминированном агенте.

Источники

Числовые ориентиры из текста (интервалы ретраев, лимиты итераций, размер истории чекпоинтов) — это дефолты и примеры из документации; конкретные значения подбираются под профиль нагрузки, стоимость LLM-узлов и требования к latency вашего workflow.