Inside LangGraph: How Agentic Loops Are Actually Wired
A deep technical breakdown of LangGraph's execution model — state, nodes, edges, checkpointing, and multi-agent patterns used in production by Uber, LinkedIn, and Klarna.
Agents feel like magic until they fail. LangGraph is what you reach for when you want the magic to behave like software: bounded execution, inspectable state transitions, resumability, and a control plane you can actually reason about.
Why LangGraph Over Other Frameworks
LangChain makes it easy to wire prompts, tools, and memory, but the default abstraction is still a function call that returns a string. AutoGen and CrewAI push multi-agent coordination, but the execution model is often implicit and debugging becomes log archaeology. LangGraph is opinionated about the missing piece: agent execution is a state machine, and you should model it as a graph you can checkpoint, interrupt, and replay.
The Four Primitives
State
State is the only thing your graph mutates. Every node is just a pure-ish function: State -> State.
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
Nodes
Nodes are callables that read state and return partial updates. They can be sync or async.
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def llm_node(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
Edges
Edges define control flow. A normal edge is deterministic. A conditional edge selects the next node based on state.
def route(state: AgentState) -> str:
last = state["messages"][-1]
if getattr(last, "tool_calls", None):
return "tools"
return "end"
Graph
A graph is the wiring: add nodes, add edges, compile to a runnable.
from langgraph.graph import StateGraph, START, END
graph = StateGraph(AgentState)
graph.add_node("llm", llm_node)
graph.add_edge(START, "llm")
graph.add_edge("llm", END)
app = graph.compile()
The Agent Loop Internals
This is the loop most production agents end up implementing, whether they admit it or not.
Here is a full, working implementation with a real tool, a real ToolNode, a conditional edge, compilation, and a trace you can eyeball.
pip install -U langgraph langchain-core langchain-openai
export OPENAI_API_KEY="..."
from __future__ import annotations
import json
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a city."""
city = city.strip().lower()
if city in {"sf", "san francisco", "san-francisco"}:
return "foggy, 57F"
if city in {"nyc", "new york", "new york city"}:
return "clear, 72F"
return "unknown, 65F"
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
def llm_node(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
last = state["messages"][-1]
if getattr(last, "tool_calls", None):
return "tools"
return "end"
tool_node = ToolNode(tools)
graph = StateGraph(AgentState)
graph.add_node("llm", llm_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "llm")
graph.add_conditional_edges(
"llm",
should_continue,
{
"tools": "tools",
"end": END,
},
)
graph.add_edge("tools", "llm")
app = graph.compile()
def pretty_message(m) -> str:
t = type(m).__name__
if t == "AIMessage":
tool_calls = getattr(m, "tool_calls", None)
if tool_calls:
return f"AI(tool_calls={json.dumps(tool_calls)})"
return f"AI({m.content!r})"
if t == "HumanMessage":
return f"HUMAN({m.content!r})"
if t == "ToolMessage":
return f"TOOL(name={m.name!r}, content={m.content!r})"
return f"{t}({getattr(m, 'content', None)!r})"
inputs = {"messages": [("user", "What's the weather in SF? Reply in one line.")]}
final = app.invoke(inputs)
for m in final["messages"]:
print(pretty_message(m))
HUMAN("What's the weather in SF? Reply in one line.")
AI(tool_calls=[{"name": "get_weather", "args": {"city": "SF"}, "id": "call_...", "type": "tool_call"}])
TOOL(name="get_weather", content="foggy, 57F")
AI("SF: foggy, 57F")
That loop is not “an agent”. It is deterministic graph traversal with a state accumulator. The only nondeterminism is the model output that decides whether a tool edge is taken.
State Persistence and Checkpointing
Checkpointing is how you stop treating agents like a one-shot script. You want a stable thread_id so multiple invocations continue the same state, and you want the graph to persist intermediate steps so you can resume after a crash or a human approval.
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string("langgraph_checkpoints.sqlite")
app = graph.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "support-ticket-9f2c"}}
# Turn 1
out1 = app.invoke({"messages": [("user", "Remember that my favorite editor is vim.")]}, config=config)
print(out1["messages"][-1].content)
# Turn 2 (same thread_id, state is loaded from the checkpoint store)
out2 = app.invoke({"messages": [("user", "What editor do I like?")]}, config=config)
print(out2["messages"][-1].content)
The pattern is simple: treat thread_id as the durable conversation key, and treat your checkpointer as a database-backed event log for graph steps.
Human-in-the-Loop
Human-in-the-loop is not “ask the user a question”. It is “pause execution before a boundary, persist state, and resume from the same point without re-running side effects”. LangGraph supports this by compiling with interrupts.
from langgraph.types import Command
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string("hitl.sqlite")
# Pause right before tool execution.
app = graph.compile(checkpointer=memory, interrupt_before=["tools"])
config = {"configurable": {"thread_id": "deploy-approval-17"}}
# First run stops before "tools".
for event in app.stream(
{"messages": [("user", "What's the weather in NYC?")]},
config=config,
stream_mode="values",
):
last = event["messages"][-1]
print(type(last).__name__, getattr(last, "content", ""))
# At this point, you can inspect stored state in your DB, UI, or logs.
# To resume, send a Command.
for event in app.stream(
Command(resume=True),
config=config,
stream_mode="values",
):
last = event["messages"][-1]
print(type(last).__name__, getattr(last, "content", ""))
When you do this for real, the “human” is usually an approval UI that reads the latest checkpoint, shows the pending tool call payload, then resumes the graph after an explicit action.
Building a Multi-Agent System
Most “multi-agent” systems in production collapse into one pattern: a supervisor that routes work to specialized subgraphs, where each subgraph is itself an agent loop with tools and memory.
from __future__ import annotations
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class TeamState(TypedDict):
messages: Annotated[list, add_messages]
next: str
supervisor_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_node(state: TeamState) -> dict:
prompt = (
"You are a supervisor. Choose next worker: 'research' or 'coder' or 'end'. "
"Return exactly one token.\n\n"
"If the user asks for code, choose 'coder'. If the user asks for facts, choose 'research'."
)
decision = supervisor_llm.invoke([("system", prompt)] + state["messages"])
token = decision.content.strip().lower()
if token not in {"research", "coder", "end"}:
token = "end"
return {"next": token}
def research_agent(state: TeamState) -> dict:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
msg = llm.invoke([("system", "You are a research agent. Be brief and cite assumptions.")] + state["messages"])
return {"messages": [msg]}
def coder_agent(state: TeamState) -> dict:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
msg = llm.invoke([("system", "You are a coding agent. Output runnable Python only.")] + state["messages"])
return {"messages": [msg]}
def route(state: TeamState) -> Literal["research", "coder", "end"]:
return state.get("next", "end") # type: ignore[return-value]
team = StateGraph(TeamState)
team.add_node("supervisor", supervisor_node)
team.add_node("research", research_agent)
team.add_node("coder", coder_agent)
team.add_edge(START, "supervisor")
team.add_conditional_edges(
"supervisor",
route,
{
"research": "research",
"coder": "coder",
"end": END,
},
)
team.add_edge("research", "supervisor")
team.add_edge("coder", "supervisor")
multi_agent_app = team.compile()
out = multi_agent_app.invoke({"messages": [("user", "Write a Python function to chunk a list into batches of 32.")]})
print(out["messages"][-1].content)
The important part is not the two workers. It is the explicit routing edge and the fact that each worker is just another node you can checkpoint, interrupt, and unit test.
Who’s Running This in Production
Uber has used networks of agents for internal developer workflows like code migration plus unit test generation, where different agents handle planning, refactoring, and verification and the graph is the thing that bounds and audits the rollout.
LinkedIn has built NL to SQL assistants where the system is multi-agent by necessity: one agent parses intent, another generates SQL, another enforces permissioned data access, and a final step validates or explains results before anything touches production data.
Klarna has reported an assistant serving 85M users for customer support with an 80% reduction in resolution time, which only works if you treat tool access, routing, and escalation as explicit control flow, not prompt vibes.
Elastic has pushed agentic systems for real-time threat detection and SecOps automation, where you need strict tooling boundaries, reproducible state, and human approvals for destructive actions.
AppFolio (Realm-X) has described a property management copilot that achieved roughly 2x accuracy improvements and saves operators 10+ hours per week, which is exactly the kind of workload that benefits from checkpointing and resumable execution.
Replit has leaned into human-in-the-loop and multi-agent coordination, pairing a supervisor style router with explicit pause and resume points so developers can approve changes and recover from failures without restarting the whole run.
What Actually Breaks in Production
Unbounded loops happen when the model keeps calling tools or re-asking itself. Put a hard cap in state with max_iterations and route to END when you hit it.
from typing_extensions import TypedDict
class BoundedState(TypedDict):
iterations: int
max_iterations: int
messages: list
def bump_iterations(state: BoundedState) -> dict:
return {"iterations": state.get("iterations", 0) + 1}
def should_continue_bounded(state: BoundedState) -> str:
if state.get("iterations", 0) >= state.get("max_iterations", 8):
return "end"
last = state["messages"][-1]
if getattr(last, "tool_calls", None):
return "tools"
return "end"
State bloat happens when you keep appending full transcripts, tool payloads, and retrieved documents. Trim aggressively at graph boundaries.
from langchain_core.messages import BaseMessage
def trim_messages(messages: list[BaseMessage], keep_last: int = 12) -> list[BaseMessage]:
return messages[-keep_last:]
def post_process(state: AgentState) -> dict:
return {"messages": trim_messages(state["messages"], keep_last=12)}
Tool errors not handled becomes a silent failure mode when exceptions bubble out and you lose the chance to convert failures into state. Wrap tool implementations and return structured errors the model can react to.
from langchain_core.tools import tool
@tool
def safe_divide(a: float, b: float) -> str:
try:
return str(a / b)
except Exception as e:
return f"TOOL_ERROR: {type(e).__name__}: {e}"
Checkpoint stores become a bottleneck when you serialize huge states or checkpoint too frequently. Keep state minimal, store large artifacts out-of-band, and checkpoint at meaningful boundaries.
# Practical rule: store big blobs (documents, images, logs) in object storage,
# and keep only references (URLs, IDs, hashes) in LangGraph state.
artifact_ref = {"doc_id": "kb:contracts:3921", "sha256": "..."}
The mental model that holds up is boring: agent execution is stateful graph traversal, not a function call. LangGraph runs a state machine where nodes transform state, edges pick control flow, and checkpoints make execution durable so you can inspect, replay, and resume instead of restarting when something breaks.
Get Deep Dives in your inbox
Join engineers receiving bi-weekly in-depth technical articles on system design, architecture patterns, and engineering best practices.