getting started
Installation
OpenLCM requires Python 3.10+ and SQLite (stdlib). One install — all adapters and providers included.
All framework adapters (LangGraph, Google ADK, AutoGen, CrewAI, LlamaIndex, Haystack), all provider SDKs (OpenAI, Anthropic, Gemini), and the live dashboard are included. No extras needed.
Tip — reuse your existing LLM client
Every adapter accepts an llm= kwarg so you can pass your existing model client instead of configuring a separate one for summarization. No extra API keys needed.
getting started
Quick Start
The minimal pattern: create an engine, bind a session, call compress() before each LLM turn.
minimal example (any framework)python
import asyncio
from openlcm import LCMEngine
# 1. Create engine — any LiteLLM model string works
engine = LCMEngine(
model="anthropic/claude-haiku-4-5-20251001",
db_path="~/.openlcm/myapp.db",
)
# 2. Bind a session and declare context size
engine.bind_session("session-001", context_length=200_000)
# 3. Call compress() before every LLM turn
async def agent_turn(messages: list[dict], user_input: str) -> str:
messages.append({"role": "user", "content": user_input})
# LCM compresses automatically when threshold is exceeded
messages = await engine.compress(messages)
response = await my_llm.chat(messages) # your LLM call
messages.append(response)
# Report token usage so LCM can track pressure
engine.update_from_response(response.usage)
return response.content, messages
asyncio.run(agent_turn([], "Hello!"))
LCM internal format
compress() expects and returns a list of dicts:
{"role": "user"|"assistant"|"system"|"tool", "content": "string"}. Tool calls are serialized as JSON in the content field. Use the framework message converters (see
Message Converters) to convert from framework-native types.
getting started
Configuration
All parameters can be set in code via LCMConfig, via environment variables, or via a config.yaml file.
LCMConfig — all knobspython
from openlcm.core.config import LCMConfig
from openlcm import LCMEngine
config = LCMConfig.from_env() # starts from defaults + env overrides
# ── Compression trigger ────────────────────────────────────────────────────
config.context_threshold = 0.75 # compress at 75% of context window (default)
# range: 0.30 – 0.95
# ── Fresh tail ─────────────────────────────────────────────────────────────
config.fresh_tail_count = 64 # protect last N messages from compression (default: 64)
# set lower (e.g. 8) for tool-heavy agents
# ── Leaf chunk size ────────────────────────────────────────────────────────
config.leaf_chunk_tokens = 20_000 # tokens per D0 leaf summary (default: 20,000)
# ── DAG arc creation ───────────────────────────────────────────────────────
config.condensation_fanin = 4 # D0 nodes before creating a D1 arc (default: 4)
# lower = arc nodes created sooner
engine = LCMEngine(model="...", config=config)
Environment variables
| Variable | Type | Default | Description |
| LCM_CONTEXT_THRESHOLD | float | 0.75 | Compression trigger as fraction of context window |
| LCM_FRESH_TAIL_COUNT | int | 64 | Messages protected from compression at tail |
| LCM_LEAF_CHUNK_TOKENS | int | 20000 | Tokens per D0 leaf summary chunk |
| LCM_CONDENSATION_FANIN | int | 4 | D0 nodes required before D1 arc is created |
| LCM_DB_PATH | str | ~/.openlcm/lcm.db | SQLite database path |
getting started
Core Concepts
Two-layer architecture
LCM has two independent stores that work together:
1
Immutable Message Store — every message written verbatim to SQLite with a stable store_id. Never modified, never deleted. FTS5-indexed for full-text search.
2
Summary DAG — a directed acyclic graph of summary nodes. D0 leaf → D1 arc → D2 durable. Each node points back to the source message range it compresses.
DAG depth levels
| Depth | Name | Created when |
| D0 | Leaf node | Context threshold exceeded; oldest messages outside fresh tail are summarized |
| D1 | Session arc | condensation_fanin D0 nodes have accumulated |
| D2+ | Durable history | condensation_fanin D1 nodes have accumulated (unbounded depth) |
Active context formula
what the model sees each turntext
active_context = system_prompt
+ highest_dag_node (D2 or D1 if no D2)
+ recent_d0_nodes (any D0 not yet condensed)
+ fresh_tail (last N raw messages, verbatim)
Sessions
One SQLite DB file holds all sessions. bind_session() sets the active session and context window size. Multiple agents can share one DB with different session IDs.
session managementpython
# One DB, multiple sessions
engine = LCMEngine(model="...", db_path="shared.db")
engine.bind_session("user-alice", context_length=128_000)
engine.bind_session("user-bob", context_length=200_000)
# Get live stats for the current session
status = engine.get_status()
# → {"store_messages": 47, "dag_nodes": 5, "compression_count": 3,
# "last_prompt_tokens": 14200, ...}
framework adapters
LangGraph
LangGraph
Two integration points: LCMCheckpointer (graph persistence) and LangChainMessages (explicit compression inside a node).
Option A — LCMCheckpointer (recommended)
Drop-in replacement for MemorySaver. LCM compresses checkpoint state automatically before each graph run.
langgraph_agent.pypython
from langgraph.graph import StateGraph, START, END
from openlcm import LCMEngine
from openlcm.adapters.langgraph import LCMCheckpointer
engine = LCMEngine(model="anthropic/claude-haiku-4-5-20251001")
engine.bind_session("lg-session", context_length=200_000)
# Replace MemorySaver with LCMCheckpointer — no other changes needed
graph = StateGraph(MyState).compile(
checkpointer=LCMCheckpointer(engine)
)
# thread_id maps to session_id automatically
config = {"configurable": {"thread_id": "lg-session"}}
result = await graph.ainvoke({"messages": [...]}, config)
Option B — Manual compression inside a node
Use LangChainMessages to convert messages, check pressure, and compress explicitly. Gives you full control over when compression fires.
langgraph_manual.py — complete working example with toolspython
import asyncio
from typing import Annotated
from typing_extensions import TypedDict
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from openlcm import LCMEngine
from openlcm.core.config import LCMConfig
from openlcm.adapters.langchain import LangChainMessages
# ── Tools ─────────────────────────────────────────────────────────────────
@tool
def get_weather(city: str) -> dict:
"""Get current weather for a city."""
return {"city": city, "temp_c": 22, "condition": "Sunny"}
tools = [get_weather]
# ── LLM + Engine ──────────────────────────────────────────────────────────
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
llm_with_tools = llm.bind_tools(tools)
config = LCMConfig.from_env()
config.context_threshold = 0.60
config.fresh_tail_count = 8
engine = LCMEngine(summarize_fn=llm, config=config)
engine.bind_session("demo", context_length=6_000)
# ── State ─────────────────────────────────────────────────────────────────
class State(TypedDict):
messages: Annotated[list, add_messages]
SYSTEM = SystemMessage(content="You are a helpful assistant with weather tools.")
# ── Nodes ─────────────────────────────────────────────────────────────────
async def chatbot(state: State):
messages = state["messages"]
if not messages or not isinstance(messages[0], SystemMessage):
messages = [SYSTEM] + list(messages)
# Convert to LCM format → compress if needed → convert back
lcm_msgs = LangChainMessages.to_lcm(messages)
if engine.should_compress_preflight(lcm_msgs):
lcm_msgs = await engine.compress(lcm_msgs)
messages = LangChainMessages.from_lcm(lcm_msgs)
response = await llm_with_tools.ainvoke(messages)
return {"messages": [response]}
tool_node = ToolNode(tools)
# ── Graph ─────────────────────────────────────────────────────────────────
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_node("tools", tool_node)
builder.add_edge(START, "chatbot")
builder.add_conditional_edges("chatbot", tools_condition)
builder.add_edge("tools", "chatbot")
graph = builder.compile()
async def main():
conversation = []
while True:
user_input = input("You: ")
conversation.append(HumanMessage(content=user_input))
result = await graph.ainvoke({"messages": conversation})
conversation = result["messages"]
print(f"Agent: {conversation[-1].content}")
asyncio.run(main())
framework adapters
Google ADK
Google ADK
Two components work together: LCMSessionService persists every ADK event to SQLite, and lcm_compress_callback compresses context before each Gemini API call.
Setup
Set GOOGLE_API_KEY in your environment. Install with pip install openlcm[google-adk].
adk_agent.py — complete working examplepython
import asyncio
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.genai import types
from openlcm import LCMEngine
from openlcm.core.config import LCMConfig
from openlcm.adapters.google_adk import LCMSessionService, lcm_compress_callback
# ── Mock tools (no extra API keys) ────────────────────────────────────────
def get_weather(city: str) -> dict:
"""Get weather for a city. Args: city: City name."""
return {"city": city, "temp_c": 22, "condition": "Sunny"}
def get_stock_price(ticker: str) -> dict:
"""Get stock price. Args: ticker: Stock symbol e.g. AAPL."""
return {"ticker": ticker, "price": 195.42, "change_pct": 1.2}
# ── LCM Engine ────────────────────────────────────────────────────────────
config = LCMConfig.from_env()
config.context_threshold = 0.60
config.fresh_tail_count = 8
engine = LCMEngine(
model="gemini/gemini-2.0-flash",
config=config,
db_path="adk_demo.db",
)
engine.bind_session("adk-session", context_length=500_000)
# ── ADK Agent with LCM hooks ──────────────────────────────────────────────
agent = LlmAgent(
name="research_assistant",
model="gemini-2.0-flash",
instruction="You are a research assistant. Use your tools proactively.",
tools=[get_weather, get_stock_price],
before_model_callback=lcm_compress_callback(engine), # compression hook
)
session_service = LCMSessionService(engine) # persistence + dashboard
runner = Runner(
agent=agent,
app_name="my-app",
session_service=session_service,
)
# ── Run ───────────────────────────────────────────────────────────────────
async def run_turn(session_id: str, user_input: str) -> str:
# Manually ingest user message so it appears in the LCM store
engine._ingest_messages([{"role": "user", "content": user_input}])
content = types.Content(
role="user", parts=[types.Part(text=user_input)]
)
final_text = ""
async for event in runner.run_async(
user_id="user", session_id=session_id, new_message=content
):
# Consume ALL events — never break early (causes GeneratorExit in OTel)
if event.is_final_response() and not final_text:
if event.content and event.content.parts:
final_text = "".join(
getattr(p, "text", "") or ""
for p in event.content.parts
if getattr(p, "text", None)
)
return final_text or "(no response)"
async def main():
session = await runner.session_service.create_session(
app_name="my-app", user_id="user", session_id="adk-session"
)
while True:
user_input = input("You: ").strip()
if not user_input: continue
reply = await run_turn(session.id, user_input)
print(f"Agent: {reply}")
asyncio.run(main())
Important: consume all events
Never use break after is_final_response(). ADK's run_async generator runs inside OpenTelemetry spans — breaking early throws GeneratorExit into those spans and corrupts the session state. Always drain all events to natural completion.
How it works
| Component | Interface | What it does |
| LCMSessionService | BaseSessionService | Wraps InMemorySessionService; mirrors every append_event call to SQLite for dashboard visibility |
| lcm_compress_callback | before_model_callback | Intercepts LlmRequest.contents before each Gemini API call and replaces it with compressed context |
framework adapters
AutoGen
AutoGen
LCMContext is a ChatCompletionContext subclass. Pass it as model_context to any AutoGen agent — no other changes needed.
autogen_agent.pypython
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from openlcm import LCMEngine
from openlcm.adapters.autogen import LCMContext
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
# Reuse the same client for LCM summarization — no extra API key
engine = LCMEngine(llm=model_client)
engine.bind_session("autogen-session", context_length=128_000)
agent = AssistantAgent(
name="assistant",
model_client=model_client,
model_context=LCMContext(engine), # ← LCM drop-in
)
# Multi-agent: each agent gets its own session
planner = AssistantAgent("planner", model_client=model_client,
model_context=LCMContext(llm=model_client, session_id="planner"))
executor = AssistantAgent("executor", model_client=model_client,
model_context=LCMContext(llm=model_client, session_id="executor"))
LCMContext methods
Satisfies the full ChatCompletionContext ABC:
| Method | Behaviour |
| add_message(msg) | Persists to SQLite, triggers compression if threshold exceeded |
| get_messages() | Returns LCM-optimised context as typed AutoGen LLMMessage objects |
| clear() | Resets in-memory list and deletes session messages from store |
| message_count() | Returns count of messages currently held |
| save_state() | Returns serialisable state dict for checkpointing |
| load_state(state) | Restores context from a saved state dict |
framework adapters
CrewAI
CrewAI
LCMStorage plugs into LongTermMemory as a storage backend. All crew memory goes through LCM's immutable store.
crewai_agent.pypython
from crewai import Agent, Crew, Task
from crewai.memory import LongTermMemory
from openlcm import LCMEngine
from openlcm.adapters.crewai import LCMStorage
engine = LCMEngine(model="openai/gpt-4o-mini")
engine.bind_session("crewai-session", context_length=128_000)
researcher = Agent(
role="Research Analyst",
goal="Gather and analyse market data",
backstory="Expert at finding and synthesising information.",
verbose=True,
)
crew = Crew(
agents=[researcher],
tasks=[Task(description="Research AI trends in 2025", agent=researcher)],
memory=True,
long_term_memory=LongTermMemory(
storage=LCMStorage(engine) # ← LCM drop-in
),
)
result = crew.kickoff()
framework adapters
OpenAI SDK
OpenAI SDK
OpenAIMessages converts between the OpenAI message format and LCM's internal format. Compatible with Groq, Together, Mistral, Azure, Ollama, vLLM, and any OpenAI-compatible endpoint.
openai_agent.py — with tool callspython
import asyncio, json
from openai import AsyncOpenAI
from openlcm import LCMEngine
from openlcm.adapters.openai import OpenAIMessages
client = AsyncOpenAI()
engine = LCMEngine(model="openai/gpt-4o-mini")
engine.bind_session("openai-session", context_length=128_000)
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}]
async def chat(messages: list, user_input: str) -> tuple:
messages.append({"role": "user", "content": user_input})
# Convert to LCM → compress if needed → convert back to OpenAI format
lcm = OpenAIMessages.to_lcm(messages)
if engine.should_compress_preflight(lcm):
lcm = await engine.compress(lcm)
messages = OpenAIMessages.from_lcm(lcm)
response = await client.chat.completions.create(
model="gpt-4o-mini", messages=messages, tools=tools
)
msg = response.choices[0].message
messages.append(msg.model_dump())
# Handle tool calls
if msg.tool_calls:
for tc in msg.tool_calls:
args = json.loads(tc.function.arguments)
result = {"city": args["city"], "temp_c": 22} # mock
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(result),
})
# Recurse to get final answer after tool results
return await chat(messages, "") if not user_input else (messages, "")
engine.update_from_response({
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
})
return messages, msg.content
Groq / Together / Ollama / vLLM
Use the same OpenAIMessages converter — the message format is identical. Just change the client's base_url and the LCMEngine(model=...) string.
framework adapters
Anthropic SDK
Anthropic SDK
AnthropicMessages handles Anthropic's content block format. from_lcm() returns a (system_str, messages) tuple because Anthropic takes system as a separate parameter.
anthropic_agent.pypython
import asyncio
from anthropic import AsyncAnthropic
from openlcm import LCMEngine
from openlcm.adapters.anthropic import AnthropicMessages
client = AsyncAnthropic()
engine = LCMEngine(model="anthropic/claude-haiku-4-5-20251001")
engine.bind_session("anthropic-session", context_length=200_000)
SYSTEM = "You are a helpful assistant."
async def chat(messages: list, user_input: str) -> tuple:
# Convert to LCM internal format (system is extracted from messages)
lcm = AnthropicMessages.to_lcm(messages, system=SYSTEM)
if engine.should_compress_preflight(lcm):
lcm = await engine.compress(lcm)
# Add new user message after compression
lcm.append({"role": "user", "content": user_input})
# from_lcm returns (system_str, anthropic_messages)
system_out, anthropic_msgs = AnthropicMessages.from_lcm(lcm)
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=2048,
system=system_out or SYSTEM,
messages=anthropic_msgs,
)
reply = response.content[0].text
messages.append({"role": "assistant", "content": reply})
engine.update_from_response({
"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
})
return messages, reply
framework adapters
LlamaIndex
LlamaIndex
LlamaIndexMessages converts between ChatMessage objects (with MessageRole enum) and LCM's internal format.
llamaindex_agent.pypython
from llama_index.core.llms import ChatMessage, MessageRole
from llama_index.llms.anthropic import Anthropic
from openlcm import LCMEngine
from openlcm.adapters.llamaindex import LlamaIndexMessages
llm = Anthropic(model="claude-haiku-4-5-20251001")
engine = LCMEngine(llm=llm)
engine.bind_session("llama-session", context_length=200_000)
history: list[ChatMessage] = []
async def chat(user_input: str) -> str:
history.append(ChatMessage(role=MessageRole.USER, content=user_input))
# Convert → compress if needed → convert back
lcm = LlamaIndexMessages.to_lcm(history)
if engine.should_compress_preflight(lcm):
lcm = await engine.compress(lcm)
history[:] = LlamaIndexMessages.from_lcm(lcm)
response = await llm.achat(history)
history.append(ChatMessage(
role=MessageRole.ASSISTANT,
content=response.message.content
))
return response.message.content
framework adapters
Haystack
Haystack
HaystackMessages handles both Haystack ≥2.3 ToolCall dataclass style and legacy additional_kwargs style.
haystack_agent.pypython
from haystack.dataclasses import ChatMessage
from haystack.components.generators.chat import OpenAIChatGenerator
from openlcm import LCMEngine
from openlcm.adapters.haystack import HaystackMessages
generator = OpenAIChatGenerator(model="gpt-4o-mini")
engine = LCMEngine(model="openai/gpt-4o-mini")
engine.bind_session("haystack-session", context_length=128_000)
history: list[ChatMessage] = []
async def chat(user_input: str) -> str:
history.append(ChatMessage.from_user(user_input))
lcm = HaystackMessages.to_lcm(history)
if engine.should_compress_preflight(lcm):
lcm = await engine.compress(lcm)
history[:] = HaystackMessages.from_lcm(lcm)
result = generator.run(history)
reply = result["replies"][0]
history.append(reply)
return reply.text
framework adapters
Gemini (raw)
Gemini (raw google-genai)
GeminiMessages converts between types.Content objects (Gemini's native format) and LCM. Also used internally by lcm_compress_callback.
gemini_agent.pypython
import asyncio
from google import generativeai as genai
from google.genai import types
from openlcm import LCMEngine
from openlcm.adapters.gemini import GeminiMessages
model = genai.GenerativeModel("gemini-2.0-flash")
engine = LCMEngine(model="gemini/gemini-2.0-flash")
engine.bind_session("gemini-session", context_length=1_000_000)
history: list = [] # list[types.Content]
async def chat(user_input: str) -> str:
history.append(types.Content(
role="user", parts=[types.Part(text=user_input)]
))
# Convert → compress → convert back
lcm = GeminiMessages.to_lcm(history)
if engine.should_compress_preflight(lcm):
lcm = await engine.compress(lcm)
_, history[:] = GeminiMessages.from_lcm(lcm) # (system, contents)
response = model.generate_content(history)
history.append(response.candidates[0].content)
return response.text
reference
Message Converters
Every framework adapter ships a static converter class with to_lcm() and from_lcm() methods you can use independently of the higher-level adapters.
all converters at a glancepython
from openlcm.adapters.openai import OpenAIMessages
from openlcm.adapters.anthropic import AnthropicMessages
from openlcm.adapters.langchain import LangChainMessages
from openlcm.adapters.llamaindex import LlamaIndexMessages
from openlcm.adapters.haystack import HaystackMessages
from openlcm.adapters.gemini import GeminiMessages
from openlcm.adapters.autogen import AutoGenMessages
# All follow the same two-method interface:
lcm_msgs = OpenAIMessages.to_lcm(openai_messages) # → list[dict]
oai_msgs = OpenAIMessages.from_lcm(lcm_msgs) # → list[dict]
# Anthropic and Gemini return a tuple from from_lcm (system is separate):
system, msgs = AnthropicMessages.from_lcm(lcm_msgs) # → (str, list)
system, msgs = GeminiMessages.from_lcm(lcm_msgs) # → (str, list[Content])
# Auto-detect converter from message type:
from openlcm.adapters import auto_detect
converter = auto_detect(messages) # returns the right class
lcm_msgs = converter.to_lcm(messages)
LCM internal format
All converters normalise to this format. Tool calls are JSON-serialised into the content string.
internal message schemapython
# Plain message
{"role": "user" | "assistant" | "system", "content": "string"}
# Assistant with tool calls (content is JSON string)
{"role": "assistant", "content": "{\"text\": \"...\", \"tool_calls\": [{\"id\": \"tc_1\", \"name\": \"get_weather\", \"args\": {\"city\": \"Tokyo\"}, \"type\": \"function\"}]}"}
# Tool result
{"role": "tool", "content": "{\"temp_c\": 22}", "tool_call_id": "tc_1", "name": "get_weather"}
reference
Live Dashboard
Every agent automatically gets a live browser dashboard. No config required.
Start the dashboard
embedded in your agent (recommended)python
import threading
from openlcm.viz.server import create_app, serve as viz_serve
def _start_viz():
app = create_app(engine)
viz_serve(app, host="127.0.0.1", port=7842, open_browser=True)
threading.Thread(target=_start_viz, daemon=True).start()
# → opens http://localhost:7842 automatically
standalone CLIshell
openlcm viz # http://localhost:7842
openlcm viz --port 8080 # custom port
openlcm viz --db ~/.openlcm/app.db # point at a specific DB
Dashboard panels
| Panel | Shows |
| Token Pressure Gauge | Live prompt token count vs threshold and max. Green → amber → red. |
| Summary DAG Viewer | Live tree of all DAG nodes grouped by depth (D0/D1/D2). Compression ratio per node. |
| SQLite Store | Every raw message with role badge, token estimate, and full content viewer. Tool calls shown with amber TOOL badge. |
| Event Log | Chronological stream: session_bound, compaction_start, node_added, compaction_end, token_pressure. |
| Sessions List | All sessions in the DB. Click to drill into any session's full history. |
reference
CLI Reference
all commandsshell
# Dashboard
openlcm viz [--port 7842] [--db PATH] [--no-browser]
# Full-text search across all sessions (FTS5)
openlcm grep "search term" [--session SESSION_ID] [--limit 20]
# Session statistics
openlcm status [--session SESSION_ID] [--db PATH]
# Export session to JSON
openlcm export SESSION_ID [-o output.json]
# Recover raw messages from a DAG node
openlcm expand NODE_ID [--session SESSION_ID]
reference
API Reference
LCMEngine
| Method / Property | Signature | Description |
| LCMEngine() | model=, config=, db_path=, summarize_fn=, llm= | Create engine. Pass a LiteLLM model string, or an existing LLM via llm= or summarize_fn=. |
| bind_session() | (session_id, context_length=, platform="") | Activate a session and set its context window size. |
| compress() | async (messages: list[dict]) → list[dict] | Compress messages if threshold exceeded. No-op if not. Always returns a valid message list. |
| should_compress_preflight() | (messages: list[dict]) → bool | Check whether compression would fire without actually compressing. |
| update_from_response() | (usage: dict) | Feed token usage from the LLM response back to the engine for pressure tracking. |
| get_status() | () → dict | Returns store_messages, dag_nodes, compression_count, last_prompt_tokens, tokens_freed. |
| _ingest_messages() | (messages: list[dict]) | Write messages directly to the SQLite store without triggering compression. Used by ADK adapter. |
LCMConfig fields
| Field | Type | Default | Description |
| context_threshold | float | 0.75 | Fraction of context_length at which compression fires. |
| fresh_tail_count | int | 64 | Messages at the tail protected from compression. |
| leaf_chunk_tokens | int | 20000 | Approximate token budget per D0 leaf summary. |
| condensation_fanin | int | 4 | Number of D0 nodes before a D1 arc is created. |
| dynamic_leaf_chunk_enabled | bool | False | Auto-tune leaf_chunk_tokens based on observed turn sizes. |
| dynamic_leaf_chunk_max | int | 40000 | Upper bound for dynamic leaf chunk tuning. |