Skip to main content

ThreadStore API

The ThreadStore class provides a unified interface for thread storage with pluggable backends. It supports both in-memory storage for development/testing and SQL backends (PostgreSQL and SQLite) for production use.

Initialization

from tyler.database.thread_store import ThreadStore

# RECOMMENDED: Factory pattern for immediate connection validation
# PostgreSQL
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")

# SQLite
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/db.sqlite")

# In-memory
store = await ThreadStore.create() # No URL for memory backend

# For backward compatibility: Direct constructor (connects on first operation)
store = ThreadStore("postgresql+asyncpg://user:pass@localhost/dbname")

The factory pattern immediately connects to the database, allowing you to:

  • Validate connection parameters early
  • Fail fast if there are connection problems
  • Ensure the store is fully ready to use

Connection Pooling Configuration

Environment variables for connection pooling:

# Connection pool settings
TYLER_DB_POOL_SIZE=5 # Max number of connections to keep open
TYLER_DB_MAX_OVERFLOW=10 # Max additional connections above pool_size
TYLER_DB_POOL_TIMEOUT=30 # Seconds to wait for a connection from pool
TYLER_DB_POOL_RECYCLE=300 # Seconds after which a connection is recycled

Methods

initialize

Initialize the storage backend.

async def initialize(self) -> None

Initializes the underlying storage backend. Called automatically by the factory method or on first operation if using the direct constructor.

save

Save a thread to storage, filtering out system messages.

async def save(self, thread: Thread) -> Thread

Creates or updates thread and all non-system messages. System messages are never saved because they are ephemeral and injected by agents at completion time.

Example:

thread = Thread()
thread.add_message(Message(role="user", content="Hello"))
saved_thread = await store.save(thread)

get

Get a thread by ID.

async def get(self, thread_id: str) -> Optional[Thread]

Returns thread with all messages if found, None otherwise.

Example:

thread = await store.get("thread_123")
if thread:
print(f"Found {len(thread.messages)} messages")

delete

Delete a thread by ID.

async def delete(self, thread_id: str) -> bool

Returns True if thread was deleted, False if not found.

Example:

if await store.delete("thread_123"):
print("Thread deleted")

list

List threads with pagination.

async def list(
self,
limit: int = 100,
offset: int = 0
) -> List[Thread]

Returns threads sorted by updated_at/created_at.

Example:

# Get first page
threads = await store.list(limit=50, offset=0)

# Get next page
next_page = await store.list(limit=50, offset=50)

find_by_attributes

Find threads by matching attributes.

async def find_by_attributes(
self,
attributes: Dict[str, Any]
) -> List[Thread]

Returns threads where all specified attributes match.

Example:

threads = await store.find_by_attributes({
"customer_id": "123",
"priority": "high"
})

find_by_source

Find threads by source name and properties.

async def find_by_source(
self,
source_name: str,
properties: Dict[str, Any]
) -> List[Thread]

Returns threads matching source name and properties.

Example:

threads = await store.find_by_source(
"slack",
{
"channel": "C123",
"thread_ts": "1234567890.123"
}
)

list_recent

List recent threads.

async def list_recent(
self,
limit: Optional[int] = None
) -> List[Thread]

Returns threads sorted by updated_at/created_at (newest first).

Example:

# Get 10 most recent threads
recent = await store.list_recent(limit=10)

Properties

database_url

Get the database URL if using SQL backend.

@property
def database_url(self) -> Optional[str]

Returns the database URL or None for memory backend.

engine

Get the SQLAlchemy engine if using SQL backend.

@property
def engine(self) -> Optional[Any]

Returns the SQLAlchemy engine or None for memory backend.

Backend Types

MemoryBackend

In-memory storage for development and testing.

# Uses memory backend by default when no configuration is provided
store = await ThreadStore.create() # No URL creates memory backend

SQLBackend

SQL-based storage for production use.

# PostgreSQL
store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")

# SQLite
store = await ThreadStore.create("sqlite+aiosqlite:///path/to/db.sqlite")

System Message Handling

System messages are never saved by the ThreadStore. This is by design:

  1. System messages are ephemeral and controlled by agents
  2. Each agent can inject its own system message at completion time
  3. This allows the same thread to be used by different agents with different system prompts
  4. When a thread is retrieved from storage, any system messages will need to be added again by the agent
# System messages are never saved
thread = Thread()
thread.add_message(Message(role="system", content="You are an assistant"))
thread.add_message(Message(role="user", content="Hello"))
await thread_store.save(thread)

# When retrieved, only the user message is present
retrieved = await thread_store.get(thread.id)
system_messages = [m for m in retrieved.messages if m.role == "system"]
print(len(system_messages)) # 0

Best Practices

  1. Use Factory Pattern

    # Connect and validate at startup
    try:
    store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname")
    print("Database connection established")
    except Exception as e:
    print(f"Database connection failed: {e}")
    # Handle the error appropriately (e.g., exit application or use fallback)
  2. Backend Selection

    # For development/testing
    store = await ThreadStore.create() # In-memory

    # For local development with persistence
    store = await ThreadStore.create("sqlite+aiosqlite:///app.db")

    # For production
    store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname")
  3. Error Handling

    try:
    # Connect at startup for early error detection
    store = await ThreadStore.create("postgresql+asyncpg://user:pass@host/dbname")
    except Exception as e:
    print(f"Database connection failed: {e}")
    # Handle startup error

    try:
    # Handle operation errors
    thread = await store.get(thread_id)
    except Exception as e:
    print(f"Database operation error: {e}")
    # Handle operation error
  4. Batch Operations

    # Use pagination for large datasets
    async def process_all_threads():
    offset = 0
    while True:
    threads = await store.list(limit=100, offset=offset)
    if not threads:
    break
    for thread in threads:
    await process_thread(thread)
    offset += 100
  5. Source Management

    # Track external sources
    thread = Thread(
    source={
    "name": "slack",
    "channel": "C123",
    "thread_ts": "123.456"
    }
    )
    await store.save(thread)

    # Find related threads
    related = await store.find_by_source(
    "slack",
    {"channel": "C123"}
    )
  6. System Message Awareness

    # Remember that system messages aren't stored
    agent = Agent(thread_store=store)

    # When retrieving a thread, the agent will need to add its system message
    thread = await store.get(thread_id)

    # The agent adds its own system message before processing
    await agent.go(thread)

See Also