Documentation Index
Fetch the complete documentation index at: https://langchain-5e9cc07a-preview-srcust-1779804026-9564889.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
This guide covers implementing BaseCheckpointSaver from scratch for a custom storage backend. If you already have a working checkpointer and only need to add delta channel support, jump to Delta channel support.
Overview
LangGraph’s persistence layer is built on two storage abstractions:
- Checkpoints table — one row per superstep; stores the serialized graph state (
channel_values, channel_versions, versions_seen) and links to its parent checkpoint.
- Writes table — one row per node output within a superstep; stores
(task_id, channel, value) tuples linked to a checkpoint.
Your checkpointer manages both tables. put writes a checkpoint row; put_writes writes node-output rows; get_tuple reads both back into a CheckpointTuple.
Base contract
Subclass BaseCheckpointSaver and implement these five methods. All are required — a missing base method raises NotImplementedError at runtime.
from collections.abc import AsyncIterator, Iterator, Sequence
from typing import Any
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
ChannelVersions,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
)
class MyCheckpointer(BaseCheckpointSaver):
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
...
async def aput_writes(
self,
config: RunnableConfig,
writes: Sequence[tuple[str, Any]],
task_id: str,
task_path: str = "",
) -> None:
...
async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
...
async def alist(
self,
config: RunnableConfig | None,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> AsyncIterator[CheckpointTuple]:
...
yield # make this an async generator
async def adelete_thread(self, thread_id: str) -> None:
...
put / aput
Store one checkpoint row. Return an updated config with the stored checkpoint_id.
Key requirements:
- Serialize the checkpoint using
self.serde.dumps_typed(checkpoint) — this handles all LangGraph-native types including _DeltaSnapshot blobs used by delta channels.
- Store
metadata in full — do not strip unknown keys. LangGraph adds new metadata fields (such as counters_since_delta_snapshot for delta channels) in minor releases; discarding them silently breaks features.
- Store
config["configurable"].get("checkpoint_id") as the parent checkpoint ID so get_tuple can populate parent_config.
async def aput(self, config, checkpoint, metadata, new_versions):
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = checkpoint["id"]
parent_id = config["configurable"].get("checkpoint_id")
type_, blob = self.serde.dumps_typed(checkpoint)
serialized_metadata = self.serde.dumps_typed(metadata)
await self.db.execute(
"INSERT INTO checkpoints (...) VALUES (...)",
thread_id, checkpoint_ns, checkpoint_id, parent_id,
type_, blob, *serialized_metadata,
)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
put_writes / aput_writes
Store node-output rows for a single task within the current superstep. These rows are linked to the checkpoint by (thread_id, checkpoint_ns, checkpoint_id).
async def aput_writes(self, config, writes, task_id, task_path=""):
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
checkpoint_id = config["configurable"]["checkpoint_id"]
rows = []
for idx, (channel, value) in enumerate(writes):
type_, blob = self.serde.dumps_typed(value)
final_idx = WRITES_IDX_MAP.get(channel, idx)
rows.append((thread_id, checkpoint_ns, checkpoint_id,
task_id, task_path, final_idx, channel, type_, blob))
await self.db.executemany("INSERT INTO writes (...) VALUES (...)", rows)
Import WRITES_IDX_MAP from langgraph.checkpoint.base. It maps special channels (__error__, __interrupt__, etc.) to reserved negative indices so they do not collide with regular write indices.
get_tuple / aget_tuple
Retrieve a checkpoint. The config may contain:
- No
checkpoint_id — return the latest checkpoint for the thread + namespace.
- A specific
checkpoint_id — return that exact checkpoint.
Both paths must work correctly. The specific-id path is used for time travel and — critically — for delta channel state reconstruction on every graph invocation (see Delta channel support). A broken specific-id lookup silently corrupts delta channel state.
async def aget_tuple(self, config):
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = config["configurable"].get("checkpoint_id")
if checkpoint_id:
row = await self.db.fetchone(
"SELECT * FROM checkpoints "
"WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id=?",
thread_id, checkpoint_ns, checkpoint_id,
)
else:
row = await self.db.fetchone(
"SELECT * FROM checkpoints "
"WHERE thread_id=? AND checkpoint_ns=? "
"ORDER BY checkpoint_id DESC LIMIT 1",
thread_id, checkpoint_ns,
)
if row is None:
return None
writes = await self.db.fetchall(
"SELECT task_id, channel, type, value FROM writes "
"WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id=? "
"ORDER BY task_id, idx",
thread_id, checkpoint_ns, row["checkpoint_id"],
)
pending_writes = [
(w["task_id"], w["channel"], self.serde.loads_typed((w["type"], w["value"])))
for w in writes
]
checkpoint = self.serde.loads_typed((row["type"], row["blob"]))
metadata = self.serde.loads_typed((row["metadata_type"], row["metadata"]))
parent_config = None
if row["parent_checkpoint_id"]:
parent_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": row["parent_checkpoint_id"],
}
}
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": row["checkpoint_id"],
}
},
checkpoint=checkpoint,
metadata=metadata,
parent_config=parent_config,
pending_writes=pending_writes,
)
Row key / index design matters for the specific-id lookup. If your storage uses a time-ordered key (e.g., a reversed timestamp) that does not embed checkpoint_id, you cannot do a direct row read by id. You must either encode checkpoint_id in the row key, or build a secondary index. A scan with a value filter on every lookup works but does not scale.
list / alist
Return checkpoints for a thread, newest first. Respect before (return only checkpoints older than that config’s checkpoint_id) and limit.
delete_thread / adelete_thread
Delete all checkpoints and writes for a thread. Both checkpoint rows and write rows must be deleted.
Row key / index design
How you store and index checkpoints directly affects correctness and performance.
Recommended schema (SQL):
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL, -- ULID, lexicographically sortable newest-last
parent_checkpoint_id TEXT,
type TEXT,
checkpoint BYTEA,
metadata JSONB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
CREATE TABLE writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
task_path TEXT NOT NULL DEFAULT '',
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
value BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx)
);
Because checkpoint_id is a ULID, it sorts lexicographically — larger values are newer. “Get latest” is ORDER BY checkpoint_id DESC LIMIT 1; “get by id” is an equality lookup on the primary key.
For non-SQL stores: the same principle applies. Whatever key scheme you use, direct lookup by (thread_id, checkpoint_ns, checkpoint_id) must be O(1) or close to it. Avoid designs where the only way to find a checkpoint by id is to scan all rows for a thread.
Serialization
Always use self.serde (inherited from BaseCheckpointSaver, defaults to JsonPlusSerializer) for checkpoints, writes, and metadata. Do not use pickle directly for metadata — it works, but JsonPlusSerializer produces human-readable output and handles versioning better.
JsonPlusSerializer handles all LangGraph-native types automatically:
_DeltaSnapshot — the sentinel blob used by delta channels (msgpack ext code 7)
- Pydantic v2 models, dataclasses, numpy arrays, datetimes, enums, and more
If you write a custom serializer, make sure it can round-trip _DeltaSnapshot from langgraph.checkpoint.serde.types.
Extended capabilities
These methods are optional but unlock additional Agent Server features. Implement them if your storage backend can support them efficiently.
| Method | What it enables |
|---|
adelete_for_runs | Rollback multitask strategy |
acopy_thread | Efficient thread forking |
aprune | Thread history pruning |
aget_delta_channel_history | Efficient delta channel state reconstruction (see below) |
Agent Server auto-detects which capabilities your checkpointer implements at startup and activates the corresponding features.
Delta channel support
DeltaChannel is in beta. The API and on-disk representation may change while the design stabilizes.
DeltaChannel is a reducer channel that stores only a sentinel (MISSING) in checkpoint blobs instead of the full channel value. State is reconstructed by replaying ancestor writes through the reducer. This makes checkpoint blobs O(1) per step instead of O(N) for channels like messages that accumulate over time.
What the runtime needs
When loading a checkpoint whose delta channels are absent from channel_values, LangGraph calls saver.get_delta_channel_history(config=config, channels=[...]). This returns, for each channel:
writes — all writes to that channel in the ancestor chain, oldest first, up to the nearest snapshot.
seed (optional) — the stored _DeltaSnapshot blob at the nearest ancestor that has one; absent if the walk reaches the root without finding a snapshot.
The runtime then calls channel.from_checkpoint(seed) and channel.replay_writes(writes) to reconstruct the live value.
Default implementation
BaseCheckpointSaver provides a default get_delta_channel_history that works with any correct get_tuple implementation:
# Simplified from BaseCheckpointSaver
def get_delta_channel_history(self, *, config, channels):
target = self.get_tuple(config) # load the head checkpoint
cursor = target.parent_config # walk from its parent
collected = {ch: [] for ch in channels}
seed = {}
remaining = set(channels)
while cursor and remaining:
tup = self.get_tuple(cursor) # ← requires correct by-id lookup
if tup is None:
break
for write in reversed(tup.pending_writes or []):
if write[1] in remaining:
collected[write[1]].append(write)
for ch in list(remaining):
if ch in tup.checkpoint["channel_values"]:
seed[ch] = tup.checkpoint["channel_values"][ch]
remaining.discard(ch)
cursor = tup.parent_config
return {
ch: {"writes": list(reversed(collected[ch])), **({"seed": seed[ch]} if ch in seed else {})}
for ch in channels
}
The critical dependency: get_tuple(cursor) is always called with a specific checkpoint_id (the parent’s id). If that lookup returns None, the walk stops immediately and every delta channel reconstructs as empty — silently, with no error. This is why the specific-id path in get_tuple must be correct.
The default walk issues one get_tuple call per ancestor checkpoint. For backends with good query support, override get_delta_channel_history (and its async twin) to retrieve the ancestor chain and writes in two queries:
async def aget_delta_channel_history(self, *, config, channels):
if not channels:
return {}
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = config["configurable"]["checkpoint_id"]
# Stage 1: stream ancestors newest-first until every channel has a seed
ancestors = await self.db.fetchall(
"SELECT checkpoint_id, parent_checkpoint_id, type, checkpoint "
"FROM checkpoints "
"WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id < ? "
"ORDER BY checkpoint_id DESC",
thread_id, checkpoint_ns, checkpoint_id,
)
chain_by_ch: dict[str, list[str]] = {ch: [] for ch in channels}
seed_by_ch: dict[str, Any] = {}
remaining = set(channels)
cur_id = config["configurable"]["checkpoint_id"]
for row in ancestors:
if not remaining:
break
parent_id = row["parent_checkpoint_id"]
ckpt = self.serde.loads_typed((row["type"], row["checkpoint"]))
cv = ckpt.get("channel_values") or {}
for ch in list(remaining):
chain_by_ch[ch].append(row["checkpoint_id"])
if ch in cv:
seed_by_ch[ch] = cv[ch]
remaining.discard(ch)
cur_id = parent_id
# Stage 2: fetch writes for each channel's ancestor chain in one query
result: dict[str, DeltaChannelHistory] = {}
for ch in channels:
chain = chain_by_ch[ch]
if not chain:
entry: DeltaChannelHistory = {"writes": []}
if ch in seed_by_ch:
entry["seed"] = seed_by_ch[ch]
result[ch] = entry
continue
write_rows = await self.db.fetchall(
f"SELECT checkpoint_id, task_id, idx, type, value FROM writes "
f"WHERE thread_id=? AND checkpoint_ns=? AND channel=? "
f"AND checkpoint_id IN ({','.join('?' * len(chain))})"
f"ORDER BY checkpoint_id, task_id, idx",
thread_id, checkpoint_ns, ch, *chain,
)
writes_by_cid: dict[str, list[PendingWrite]] = {}
for row in write_rows:
cid = row["checkpoint_id"]
value = self.serde.loads_typed((row["type"], row["value"]))
writes_by_cid.setdefault(cid, []).append((row["task_id"], ch, value))
# chain is newest-first; iterate oldest-first to get correct replay order
collected: list[PendingWrite] = []
for cid in reversed(chain):
collected.extend(writes_by_cid.get(cid, []))
entry = {"writes": collected}
if ch in seed_by_ch:
entry["seed"] = seed_by_ch[ch]
result[ch] = entry
return result
Pruning with delta channels
DeltaChannel state is not self-contained in a single checkpoint — it depends on the ancestor write chain back to the nearest _DeltaSnapshot. If you implement prune or delete_for_runs, you must not delete write rows that a surviving checkpoint’s delta channels depend on.
Safe options:
- Walk before pruning — for each checkpoint you intend to keep, walk its ancestor chain and mark all write rows up to the nearest
_DeltaSnapshot as non-deletable.
- Force a snapshot before pruning — rewrite
channel_values[ch] = _DeltaSnapshot(reconstructed_value) on the checkpoint you are keeping, then delete ancestors freely.
- Skip pruning for delta-channel threads — the safest short-term option if you do not yet need pruning.
Copy thread with delta channels
When implementing copy_thread, copy the complete ancestor chain — not just the head checkpoint. The target thread must have write rows going back to at least one _DeltaSnapshot for every delta channel, or those channels will reconstruct as empty after the copy.
langgraph-checkpoint-conformance validates your implementation against the full contract, including delta channel history:
pip install langgraph-checkpoint-conformance
import asyncio
from langgraph.checkpoint.conformance import checkpointer_test, validate
@checkpointer_test(name="MyCheckpointer")
async def my_checkpointer():
async with MyCheckpointer.create() as saver:
yield saver
async def main():
report = await validate(my_checkpointer)
report.print_report()
# Fails the process if any base capability is missing or broken
if not report.passed_all_base():
raise RuntimeError("Checkpointer failed conformance suite")
asyncio.run(main())
The suite auto-detects which extended capabilities your checkpointer implements (including aget_delta_channel_history) and runs the relevant tests for each. Run it as part of your CI before shipping.
Next steps