Skip to main content

Documentation Index

Fetch the complete documentation index at: https://langchain-5e9cc07a-preview-srcust-1779804026-9564889.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

This guide covers implementing BaseCheckpointSaver from scratch for a custom storage backend. If you already have a working checkpointer and only need to add delta channel support, jump to Delta channel support.

Overview

LangGraph’s persistence layer is built on two storage abstractions:
  • Checkpoints table — one row per superstep; stores the serialized graph state (channel_values, channel_versions, versions_seen) and links to its parent checkpoint.
  • Writes table — one row per node output within a superstep; stores (task_id, channel, value) tuples linked to a checkpoint.
Your checkpointer manages both tables. put writes a checkpoint row; put_writes writes node-output rows; get_tuple reads both back into a CheckpointTuple.

Base contract

Subclass BaseCheckpointSaver and implement these five methods. All are required — a missing base method raises NotImplementedError at runtime.
from collections.abc import AsyncIterator, Iterator, Sequence
from typing import Any
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    ChannelVersions,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
)

class MyCheckpointer(BaseCheckpointSaver):
    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        ...

    async def aput_writes(
        self,
        config: RunnableConfig,
        writes: Sequence[tuple[str, Any]],
        task_id: str,
        task_path: str = "",
    ) -> None:
        ...

    async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        ...

    async def alist(
        self,
        config: RunnableConfig | None,
        *,
        filter: dict[str, Any] | None = None,
        before: RunnableConfig | None = None,
        limit: int | None = None,
    ) -> AsyncIterator[CheckpointTuple]:
        ...
        yield  # make this an async generator

    async def adelete_thread(self, thread_id: str) -> None:
        ...

put / aput

Store one checkpoint row. Return an updated config with the stored checkpoint_id. Key requirements:
  • Serialize the checkpoint using self.serde.dumps_typed(checkpoint) — this handles all LangGraph-native types including _DeltaSnapshot blobs used by delta channels.
  • Store metadata in full — do not strip unknown keys. LangGraph adds new metadata fields (such as counters_since_delta_snapshot for delta channels) in minor releases; discarding them silently breaks features.
  • Store config["configurable"].get("checkpoint_id") as the parent checkpoint ID so get_tuple can populate parent_config.
async def aput(self, config, checkpoint, metadata, new_versions):
    thread_id = config["configurable"]["thread_id"]
    checkpoint_ns = config["configurable"]["checkpoint_ns"]
    checkpoint_id = checkpoint["id"]
    parent_id = config["configurable"].get("checkpoint_id")

    type_, blob = self.serde.dumps_typed(checkpoint)
    serialized_metadata = self.serde.dumps_typed(metadata)

    await self.db.execute(
        "INSERT INTO checkpoints (...) VALUES (...)",
        thread_id, checkpoint_ns, checkpoint_id, parent_id,
        type_, blob, *serialized_metadata,
    )
    return {
        "configurable": {
            "thread_id": thread_id,
            "checkpoint_ns": checkpoint_ns,
            "checkpoint_id": checkpoint_id,
        }
    }

put_writes / aput_writes

Store node-output rows for a single task within the current superstep. These rows are linked to the checkpoint by (thread_id, checkpoint_ns, checkpoint_id).
async def aput_writes(self, config, writes, task_id, task_path=""):
    thread_id = config["configurable"]["thread_id"]
    checkpoint_ns = config["configurable"]["checkpoint_ns"]
    checkpoint_id = config["configurable"]["checkpoint_id"]

    rows = []
    for idx, (channel, value) in enumerate(writes):
        type_, blob = self.serde.dumps_typed(value)
        final_idx = WRITES_IDX_MAP.get(channel, idx)
        rows.append((thread_id, checkpoint_ns, checkpoint_id,
                      task_id, task_path, final_idx, channel, type_, blob))

    await self.db.executemany("INSERT INTO writes (...) VALUES (...)", rows)
Import WRITES_IDX_MAP from langgraph.checkpoint.base. It maps special channels (__error__, __interrupt__, etc.) to reserved negative indices so they do not collide with regular write indices.

get_tuple / aget_tuple

Retrieve a checkpoint. The config may contain:
  • No checkpoint_id — return the latest checkpoint for the thread + namespace.
  • A specific checkpoint_id — return that exact checkpoint.
Both paths must work correctly. The specific-id path is used for time travel and — critically — for delta channel state reconstruction on every graph invocation (see Delta channel support). A broken specific-id lookup silently corrupts delta channel state.
async def aget_tuple(self, config):
    thread_id = config["configurable"]["thread_id"]
    checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
    checkpoint_id = config["configurable"].get("checkpoint_id")

    if checkpoint_id:
        row = await self.db.fetchone(
            "SELECT * FROM checkpoints "
            "WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id=?",
            thread_id, checkpoint_ns, checkpoint_id,
        )
    else:
        row = await self.db.fetchone(
            "SELECT * FROM checkpoints "
            "WHERE thread_id=? AND checkpoint_ns=? "
            "ORDER BY checkpoint_id DESC LIMIT 1",
            thread_id, checkpoint_ns,
        )

    if row is None:
        return None

    writes = await self.db.fetchall(
        "SELECT task_id, channel, type, value FROM writes "
        "WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id=? "
        "ORDER BY task_id, idx",
        thread_id, checkpoint_ns, row["checkpoint_id"],
    )
    pending_writes = [
        (w["task_id"], w["channel"], self.serde.loads_typed((w["type"], w["value"])))
        for w in writes
    ]

    checkpoint = self.serde.loads_typed((row["type"], row["blob"]))
    metadata = self.serde.loads_typed((row["metadata_type"], row["metadata"]))

    parent_config = None
    if row["parent_checkpoint_id"]:
        parent_config = {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": row["parent_checkpoint_id"],
            }
        }

    return CheckpointTuple(
        config={
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": row["checkpoint_id"],
            }
        },
        checkpoint=checkpoint,
        metadata=metadata,
        parent_config=parent_config,
        pending_writes=pending_writes,
    )
Row key / index design matters for the specific-id lookup. If your storage uses a time-ordered key (e.g., a reversed timestamp) that does not embed checkpoint_id, you cannot do a direct row read by id. You must either encode checkpoint_id in the row key, or build a secondary index. A scan with a value filter on every lookup works but does not scale.

list / alist

Return checkpoints for a thread, newest first. Respect before (return only checkpoints older than that config’s checkpoint_id) and limit.

delete_thread / adelete_thread

Delete all checkpoints and writes for a thread. Both checkpoint rows and write rows must be deleted.

Row key / index design

How you store and index checkpoints directly affects correctness and performance. Recommended schema (SQL):
CREATE TABLE checkpoints (
    thread_id          TEXT NOT NULL,
    checkpoint_ns      TEXT NOT NULL DEFAULT '',
    checkpoint_id      TEXT NOT NULL,   -- ULID, lexicographically sortable newest-last
    parent_checkpoint_id TEXT,
    type               TEXT,
    checkpoint         BYTEA,
    metadata           JSONB,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

CREATE TABLE writes (
    thread_id     TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id       TEXT NOT NULL,
    task_path     TEXT NOT NULL DEFAULT '',
    idx           INTEGER NOT NULL,
    channel       TEXT NOT NULL,
    type          TEXT,
    value         BYTEA,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, task_path, idx)
);
Because checkpoint_id is a ULID, it sorts lexicographically — larger values are newer. “Get latest” is ORDER BY checkpoint_id DESC LIMIT 1; “get by id” is an equality lookup on the primary key. For non-SQL stores: the same principle applies. Whatever key scheme you use, direct lookup by (thread_id, checkpoint_ns, checkpoint_id) must be O(1) or close to it. Avoid designs where the only way to find a checkpoint by id is to scan all rows for a thread.

Serialization

Always use self.serde (inherited from BaseCheckpointSaver, defaults to JsonPlusSerializer) for checkpoints, writes, and metadata. Do not use pickle directly for metadata — it works, but JsonPlusSerializer produces human-readable output and handles versioning better. JsonPlusSerializer handles all LangGraph-native types automatically:
  • _DeltaSnapshot — the sentinel blob used by delta channels (msgpack ext code 7)
  • Pydantic v2 models, dataclasses, numpy arrays, datetimes, enums, and more
If you write a custom serializer, make sure it can round-trip _DeltaSnapshot from langgraph.checkpoint.serde.types.

Extended capabilities

These methods are optional but unlock additional Agent Server features. Implement them if your storage backend can support them efficiently.
MethodWhat it enables
adelete_for_runsRollback multitask strategy
acopy_threadEfficient thread forking
apruneThread history pruning
aget_delta_channel_historyEfficient delta channel state reconstruction (see below)
Agent Server auto-detects which capabilities your checkpointer implements at startup and activates the corresponding features.

Delta channel support

DeltaChannel is in beta. The API and on-disk representation may change while the design stabilizes.
DeltaChannel is a reducer channel that stores only a sentinel (MISSING) in checkpoint blobs instead of the full channel value. State is reconstructed by replaying ancestor writes through the reducer. This makes checkpoint blobs O(1) per step instead of O(N) for channels like messages that accumulate over time.

What the runtime needs

When loading a checkpoint whose delta channels are absent from channel_values, LangGraph calls saver.get_delta_channel_history(config=config, channels=[...]). This returns, for each channel:
  • writes — all writes to that channel in the ancestor chain, oldest first, up to the nearest snapshot.
  • seed (optional) — the stored _DeltaSnapshot blob at the nearest ancestor that has one; absent if the walk reaches the root without finding a snapshot.
The runtime then calls channel.from_checkpoint(seed) and channel.replay_writes(writes) to reconstruct the live value.

Default implementation

BaseCheckpointSaver provides a default get_delta_channel_history that works with any correct get_tuple implementation:
# Simplified from BaseCheckpointSaver
def get_delta_channel_history(self, *, config, channels):
    target = self.get_tuple(config)          # load the head checkpoint
    cursor = target.parent_config            # walk from its parent
    collected = {ch: [] for ch in channels}
    seed = {}
    remaining = set(channels)

    while cursor and remaining:
        tup = self.get_tuple(cursor)         # ← requires correct by-id lookup
        if tup is None:
            break
        for write in reversed(tup.pending_writes or []):
            if write[1] in remaining:
                collected[write[1]].append(write)
        for ch in list(remaining):
            if ch in tup.checkpoint["channel_values"]:
                seed[ch] = tup.checkpoint["channel_values"][ch]
                remaining.discard(ch)
        cursor = tup.parent_config

    return {
        ch: {"writes": list(reversed(collected[ch])), **({"seed": seed[ch]} if ch in seed else {})}
        for ch in channels
    }
The critical dependency: get_tuple(cursor) is always called with a specific checkpoint_id (the parent’s id). If that lookup returns None, the walk stops immediately and every delta channel reconstructs as empty — silently, with no error. This is why the specific-id path in get_tuple must be correct.

Performance override

The default walk issues one get_tuple call per ancestor checkpoint. For backends with good query support, override get_delta_channel_history (and its async twin) to retrieve the ancestor chain and writes in two queries:
async def aget_delta_channel_history(self, *, config, channels):
    if not channels:
        return {}

    thread_id = config["configurable"]["thread_id"]
    checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
    checkpoint_id = config["configurable"]["checkpoint_id"]

    # Stage 1: stream ancestors newest-first until every channel has a seed
    ancestors = await self.db.fetchall(
        "SELECT checkpoint_id, parent_checkpoint_id, type, checkpoint "
        "FROM checkpoints "
        "WHERE thread_id=? AND checkpoint_ns=? AND checkpoint_id < ? "
        "ORDER BY checkpoint_id DESC",
        thread_id, checkpoint_ns, checkpoint_id,
    )

    chain_by_ch: dict[str, list[str]] = {ch: [] for ch in channels}
    seed_by_ch: dict[str, Any] = {}
    remaining = set(channels)
    cur_id = config["configurable"]["checkpoint_id"]

    for row in ancestors:
        if not remaining:
            break
        parent_id = row["parent_checkpoint_id"]
        ckpt = self.serde.loads_typed((row["type"], row["checkpoint"]))
        cv = ckpt.get("channel_values") or {}
        for ch in list(remaining):
            chain_by_ch[ch].append(row["checkpoint_id"])
            if ch in cv:
                seed_by_ch[ch] = cv[ch]
                remaining.discard(ch)
        cur_id = parent_id

    # Stage 2: fetch writes for each channel's ancestor chain in one query
    result: dict[str, DeltaChannelHistory] = {}
    for ch in channels:
        chain = chain_by_ch[ch]
        if not chain:
            entry: DeltaChannelHistory = {"writes": []}
            if ch in seed_by_ch:
                entry["seed"] = seed_by_ch[ch]
            result[ch] = entry
            continue

        write_rows = await self.db.fetchall(
            f"SELECT checkpoint_id, task_id, idx, type, value FROM writes "
            f"WHERE thread_id=? AND checkpoint_ns=? AND channel=? "
            f"AND checkpoint_id IN ({','.join('?' * len(chain))})"
            f"ORDER BY checkpoint_id, task_id, idx",
            thread_id, checkpoint_ns, ch, *chain,
        )
        writes_by_cid: dict[str, list[PendingWrite]] = {}
        for row in write_rows:
            cid = row["checkpoint_id"]
            value = self.serde.loads_typed((row["type"], row["value"]))
            writes_by_cid.setdefault(cid, []).append((row["task_id"], ch, value))

        # chain is newest-first; iterate oldest-first to get correct replay order
        collected: list[PendingWrite] = []
        for cid in reversed(chain):
            collected.extend(writes_by_cid.get(cid, []))

        entry = {"writes": collected}
        if ch in seed_by_ch:
            entry["seed"] = seed_by_ch[ch]
        result[ch] = entry

    return result

Pruning with delta channels

DeltaChannel state is not self-contained in a single checkpoint — it depends on the ancestor write chain back to the nearest _DeltaSnapshot. If you implement prune or delete_for_runs, you must not delete write rows that a surviving checkpoint’s delta channels depend on. Safe options:
  1. Walk before pruning — for each checkpoint you intend to keep, walk its ancestor chain and mark all write rows up to the nearest _DeltaSnapshot as non-deletable.
  2. Force a snapshot before pruning — rewrite channel_values[ch] = _DeltaSnapshot(reconstructed_value) on the checkpoint you are keeping, then delete ancestors freely.
  3. Skip pruning for delta-channel threads — the safest short-term option if you do not yet need pruning.

Copy thread with delta channels

When implementing copy_thread, copy the complete ancestor chain — not just the head checkpoint. The target thread must have write rows going back to at least one _DeltaSnapshot for every delta channel, or those channels will reconstruct as empty after the copy.

Testing with the conformance suite

langgraph-checkpoint-conformance validates your implementation against the full contract, including delta channel history:
pip install langgraph-checkpoint-conformance
import asyncio
from langgraph.checkpoint.conformance import checkpointer_test, validate

@checkpointer_test(name="MyCheckpointer")
async def my_checkpointer():
    async with MyCheckpointer.create() as saver:
        yield saver

async def main():
    report = await validate(my_checkpointer)
    report.print_report()
    # Fails the process if any base capability is missing or broken
    if not report.passed_all_base():
        raise RuntimeError("Checkpointer failed conformance suite")

asyncio.run(main())
The suite auto-detects which extended capabilities your checkpointer implements (including aget_delta_channel_history) and runs the relevant tests for each. Run it as part of your CI before shipping.

Next steps