Skip to main content

Agent API

The Agent class is the core component of Tyler, responsible for managing conversations and executing tasks. It uses Pydantic for data validation and Weave for monitoring and tracing.

Initialization

from tyler.models.agent import Agent

agent = Agent(
model_name: str = "gpt-4o",
temperature: float = 0.7,
name: str = "Tyler",
purpose: str = "To be a helpful assistant.",
notes: str = "",
tools: List[Union[str, Dict]] = [],
max_tool_iterations: int = 10,
thread_store: Optional[ThreadStore] = None
)

Parameters

ParameterTypeRequiredDefaultDescription
model_namestrNo"gpt-4o"The LLM model to use
temperaturefloatNo0.7Response creativity (0.0-1.0)
namestrNo"Tyler"Name of the agent
purposestrNo"To be a helpful assistant."The agent's purpose or role
notesstrNo""Additional notes for the agent
toolsList[Union[str, Dict]]No[]List of tools (strings for built-in modules or dicts for custom tools)
max_tool_iterationsintNo10Maximum number of tool iterations
thread_storeOptional[ThreadStore]NoThreadStore()Thread storage implementation

Methods

go

Process a thread and generate responses.

async def go(
self,
thread_or_id: Union[str, Thread],
new_messages: Optional[List[Message]] = None
) -> Tuple[Thread, List[Message]]

Parameters

ParameterTypeRequiredDescription
thread_or_idUnion[str, Thread]YesThread object or thread ID to process
new_messagesOptional[List[Message]]NoMessages added during this processing round

Returns

TypeDescription
Tuple[Thread, List[Message]]The processed thread and list of new non-user messages

Example

thread = Thread()
message = Message(role="user", content="Hello!")
thread.add_message(message)

processed_thread, new_messages = await agent.go(thread)

go_stream

Process a thread with streaming updates. This method provides real-time updates as the agent processes the request.

async def go_stream(
self,
thread: Thread
) -> AsyncGenerator[StreamUpdate, None]

Parameters

ParameterTypeRequiredDescription
threadThreadYesThread object to process

Returns

An async generator that yields StreamUpdate objects containing:

  • Content chunks as they arrive
  • Complete assistant messages with tool calls
  • Tool execution results
  • Final thread state
  • Any errors that occur

StreamUpdate Types

TypeDescriptionData Content
CONTENT_CHUNKPartial content from assistantstr
ASSISTANT_MESSAGEComplete assistant messageMessage
TOOL_MESSAGETool execution resultMessage
COMPLETEFinal thread stateTuple[Thread, List[Message]]
ERRORError during processingstr

Example

thread = Thread()
message = Message(role="user", content="Hello!")
thread.add_message(message)

print("Assistant: ", end="", flush=True)
async for update in agent.go_stream(thread):
if update.type == StreamUpdate.Type.CONTENT_CHUNK:
# Print content chunks as they arrive
print(update.data, end="", flush=True)
elif update.type == StreamUpdate.Type.TOOL_MESSAGE:
# Print tool results on new lines
tool_message = update.data
print(f"\nTool ({tool_message.name}): {tool_message.content}")
elif update.type == StreamUpdate.Type.ERROR:
# Print any errors that occur
print(f"\nError: {update.data}")
elif update.type == StreamUpdate.Type.COMPLETE:
# Final update contains (thread, new_messages)
print() # Add newline after completion

step

Execute a single step of the agent's processing.

@weave.op()
async def step(
self,
thread: Thread,
stream: bool = False
) -> Tuple[Any, Dict]

Parameters

ParameterTypeRequiredDescription
threadThreadYesThe thread to process
streamboolNoWhether to stream the response

Returns

TypeDescription
Tuple[Any, Dict]The completion response and metrics

Private Methods

_get_completion

Get a completion from the LLM with weave tracing.

@weave.op()
async def _get_completion(
self,
**completion_params
) -> Any

_get_thread

Get thread object from ID or return the thread object directly.

@weave.op()
async def _get_thread(
self,
thread_or_id: Union[str, Thread]
) -> Thread

_serialize_tool_calls

Serialize tool calls to a list of dictionaries.

@weave.op()
def _serialize_tool_calls(
self,
tool_calls: Optional[List[Any]]
) -> Optional[List[Dict]]

_process_tool_call

Process a single tool call and return whether to break the iteration.

@weave.op()
async def _process_tool_call(
self,
tool_call,
thread: Thread,
new_messages: List[Message]
) -> bool

_handle_max_iterations

Handle the case when max iterations is reached.

@weave.op()
async def _handle_max_iterations(
self,
thread: Thread,
new_messages: List[Message]
) -> Tuple[Thread, List[Message]]

_handle_tool_execution

Execute a single tool call and format the result message.

@weave.op()
async def _handle_tool_execution(
self,
tool_call
) -> dict

_normalize_tool_call

Convert a tool_call dict to an object with attributes so it can be used by tool_runner.

@weave.op()
def _normalize_tool_call(
self,
tool_call
)

_process_streaming_chunks

Process streaming chunks from the LLM.

@weave.op()
async def _process_streaming_chunks(
self,
chunks
) -> Tuple[str, str, List[Dict], Dict]

Private Attributes

AttributeTypeDescription
_promptAgentPromptHandles system prompt generation
_iteration_countintTracks tool iteration count
_processed_toolsList[Dict]Stores processed tool definitions

StreamUpdate Class

The StreamUpdate class is used to represent updates from streaming responses.

class StreamUpdate:
class Type(Enum):
CONTENT_CHUNK = "content_chunk" # Partial content from assistant
ASSISTANT_MESSAGE = "assistant_message" # Complete assistant message with tool calls
TOOL_MESSAGE = "tool_message" # Tool execution result
COMPLETE = "complete" # Final thread state and messages
ERROR = "error" # Error during processing

def __init__(self, type: Type, data: Any):
self.type = type
self.data = data

Error Handling

try:
processed_thread, new_messages = await agent.go(thread)
except ValueError as e:
# Handle validation errors (e.g., thread not found)
print(f"Validation error: {e}")
except Exception as e:
# Handle other errors
print(f"Error: {e}")

Tool Configuration

Built-in Tool Modules

agent = Agent(
tools=[
"web", # Use built-in web tools
"slack", # Use built-in Slack tools
"notion", # Use built-in Notion tools
"image", # Use built-in image tools
"command_line" # Use built-in command line tools
]
)

Custom Tools

custom_tool = {
"definition": {
"type": "function",
"function": {
"name": "custom_tool",
"description": "Tool description",
"parameters": {
"type": "object",
"properties": {
"param1": {"type": "string"}
}
}
}
},
"implementation": lambda param1: f"Result: {param1}",
"attributes": {
"type": "standard" # or "interrupt"
}
}

agent = Agent(tools=[custom_tool])

Best Practices

  1. Thread Store Selection

    # For development/testing
    agent = Agent() # Uses in-memory store

    # For production
    from tyler.database.thread_store import ThreadStore
    store = ThreadStore("postgresql://...")
    agent = Agent(thread_store=store)
  2. Tool Iteration Management

    # For complex workflows
    agent = Agent(max_tool_iterations=20)

    # For simple tasks
    agent = Agent(max_tool_iterations=5)
  3. Monitoring with Weave

    # Enable Weave tracing
    import weave
    weave.init("tyler")

    # Traces will be available for step() and go() methods

See Also