ThreadStore API
The ThreadStore
class provides a unified interface for thread storage with pluggable backends. It supports both in-memory storage for development/testing and SQL backends (PostgreSQL and SQLite) for production use.
Initialization
from tyler.database.thread_store import ThreadStore
# RECOMMENDED: Factory pattern for immediate connection validation
# PostgreSQL
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")
# SQLite
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/db.sqlite")
# In-memory
store = await ThreadStore.create() # No URL for memory backend
# For backward compatibility: Direct constructor (connects on first operation)
store = ThreadStore("postgresql+asyncpg://user:pass@localhost/dbname")
The factory pattern immediately connects to the database, allowing you to:
- Validate connection parameters early
- Fail fast if there are connection problems
- Ensure the store is fully ready to use
Connection Pooling Configuration
Environment variables for connection pooling:
# Connection pool settings
TYLER_DB_POOL_SIZE=5 # Max number of connections to keep open
TYLER_DB_MAX_OVERFLOW=10 # Max additional connections above pool_size
TYLER_DB_POOL_TIMEOUT=30 # Seconds to wait for a connection from pool
TYLER_DB_POOL_RECYCLE=300 # Seconds after which a connection is recycled
Methods
initialize
Initialize the storage backend.
async def initialize(self) -> None
Initializes the underlying storage backend. Called automatically by the factory method or on first operation if using the direct constructor.
save
Save a thread to storage, filtering out system messages.
async def save(self, thread: Thread) -> Thread
Creates or updates thread and all non-system messages. System messages are never saved because they are ephemeral and injected by agents at completion time.
Example:
thread = Thread()
thread.add_message(Message(role="user", content="Hello"))
saved_thread = await store.save(thread)
get
Get a thread by ID.
async def get(self, thread_id: str) -> Optional[Thread]
Returns thread with all messages if found, None otherwise.
Example:
thread = await store.get("thread_123")
if thread:
print(f"Found {len(thread.messages)} messages")
delete
Delete a thread by ID.
async def delete(self, thread_id: str) -> bool
Returns True if thread was deleted, False if not found.
Example:
if await store.delete("thread_123"):
print("Thread deleted")
list
List threads with pagination.
async def list(
self,
limit: int = 100,
offset: int = 0
) -> List[Thread]
Returns threads sorted by updated_at/created_at.
Example:
# Get first page
threads = await store.list(limit=50, offset=0)
# Get next page
next_page = await store.list(limit=50, offset=50)
find_by_attributes
Find threads by matching attributes.
async def find_by_attributes(
self,
attributes: Dict[str, Any]
) -> List[Thread]
Returns threads where all specified attributes match.
Example:
threads = await store.find_by_attributes({
"customer_id": "123",
"priority": "high"
})
find_by_source
Find threads by source name and properties.
async def find_by_source(
self,
source_name: str,
properties: Dict[str, Any]
) -> List[Thread]
Returns threads matching source name and properties.
Example:
threads = await store.find_by_source(
"slack",
{
"channel": "C123",
"thread_ts": "1234567890.123"
}
)
list_recent
List recent threads.
async def list_recent(
self,
limit: Optional[int] = None
) -> List[Thread]
Returns threads sorted by updated_at/created_at (newest first).
Example:
# Get 10 most recent threads
recent = await store.list_recent(limit=10)
Properties
database_url
Get the database URL if using SQL backend.
@property
def database_url(self) -> Optional[str]
Returns the database URL or None for memory backend.
engine
Get the SQLAlchemy engine if using SQL backend.
@property
def engine(self) -> Optional[Any]
Returns the SQLAlchemy engine or None for memory backend.
Backend Types
MemoryBackend
In-memory storage for development and testing.
# Uses memory backend by default when no configuration is provided
store = await ThreadStore.create() # No URL creates memory backend
SQLBackend
SQL-based storage for production use.
# PostgreSQL
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")
# SQLite
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/db.sqlite")
System Message Handling
System messages are never saved by the ThreadStore
. This is by design:
- System messages are ephemeral and controlled by agents
- Each agent can inject its own system message at completion time
- This allows the same thread to be used by different agents with different system prompts
- When a thread is retrieved from storage, any system messages will need to be added again by the agent
# System messages are never saved
thread = Thread()
thread.add_message(Message(role="system", content="You are an assistant"))
thread.add_message(Message(role="user", content="Hello"))
await thread_store.save(thread)
# When retrieved, only the user message is present
retrieved = await thread_store.get(thread.id)
system_messages = [m for m in retrieved.messages if m.role == "system"]
print(len(system_messages)) # 0
Best Practices
-
Use Factory Pattern
# Connect and validate at startup
try:
store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname")
print("Database connection established")
except Exception as e:
print(f"Database connection failed: {e}")
# Handle the error appropriately (e.g., exit application or use fallback) -
Backend Selection
# For development/testing
store = await ThreadStore.create() # In-memory
# For local development with persistence
store = await ThreadStore.create("sqlite+aiosqlite:///app.db")
# For production
store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname") -
Error Handling
try:
# Connect at startup for early error detection
store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname")
except Exception as e:
print(f"Database connection failed: {e}")
# Handle startup error
try:
# Handle operation errors
thread = await store.get(thread_id)
except Exception as e:
print(f"Database operation error: {e}")
# Handle operation error -
Batch Operations
# Use pagination for large datasets
async def process_all_threads():
offset = 0
while True:
threads = await store.list(limit=100, offset=offset)
if not threads:
break
for thread in threads:
await process_thread(thread)
offset += 100 -
Source Management
# Track external sources
thread = Thread(
source={
"name": "slack",
"channel": "C123",
"thread_ts": "123.456"
}
)
await store.save(thread)
# Find related threads
related = await store.find_by_source(
"slack",
{"channel": "C123"}
) -
System Message Awareness
# Remember that system messages aren't stored
agent = Agent(thread_store=store)
# When retrieving a thread, the agent will need to add its system message
thread = await store.get(thread_id)
# The agent adds its own system message before processing
await agent.go(thread)