Advanced Usage¶
Advanced features and customization options for power users.
LogFire Integration¶
LayerCode Gym integrates with LogFire for real-time observability and debugging.
Setup¶
# Install LogFire dependencies (already included in layercode-gym)
uv add logfire
# Configure LogFire
logfire configure
Enable in LayerCode Gym¶
LogFire is automatically enabled when you provide a LOGFIRE_TOKEN:
LayerCode Gym automatically instruments PydanticAI and OpenAI when a LogFire token is present.
What You Get¶
With LogFire enabled, you get:
- Real-time conversation tracking in the LogFire UI
- Performance metrics and spans for each operation
- WebSocket event streaming visualization
- Error tracking and stack traces
- Timeline view of conversation flow
View in LogFire Dashboard¶
You'll see:
- Conversation spans with nested operations
- WebSocket events (connect, message, disconnect)
- TTS synthesis operations
- LLM API calls (for AI personas)
- Timing metrics for each operation
Custom LogFire Spans¶
Add your own instrumentation:
import logfire
async def my_custom_callback(
turn_number: int,
user_message: str,
agent_message: str,
conversation_id: str
) -> None:
with logfire.span("custom_analysis"):
# Your analysis code
sentiment = analyze_sentiment(agent_message)
logfire.info(
"Sentiment analysis",
turn=turn_number,
sentiment=sentiment
)
client = LayercodeClient(
simulator=simulator,
turn_callback=my_custom_callback
)
Custom TTS Engines¶
Use alternative TTS providers like ElevenLabs, Azure, or local engines.
ElevenLabs Example¶
from pathlib import Path
import httpx
from layercode_gym.simulator import TTSEngineProtocol
class ElevenLabsTTS(TTSEngineProtocol):
def __init__(self, api_key: str, voice_id: str):
self.api_key = api_key
self.voice_id = voice_id
self.base_url = "https://api.elevenlabs.io/v1"
async def synthesize(self, text: str, **kwargs) -> Path:
url = f"{self.base_url}/text-to-speech/{self.voice_id}"
headers = {"xi-api-key": self.api_key}
data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5
}
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
response.raise_for_status()
# Save audio
output_path = Path(f"tts_{hash(text)}.mp3")
output_path.write_bytes(response.content)
return output_path
# Use it
tts = ElevenLabsTTS(
api_key="your_elevenlabs_key",
voice_id="your_voice_id"
)
simulator = UserSimulator.from_text(
messages=["Hello!", "How are you?"],
send_as_text=False,
tts_engine=tts
)
Azure TTS Example¶
import azure.cognitiveservices.speech as speechsdk
from pathlib import Path
from layercode_gym.simulator import TTSEngineProtocol
class AzureTTS(TTSEngineProtocol):
def __init__(self, subscription_key: str, region: str):
self.speech_config = speechsdk.SpeechConfig(
subscription=subscription_key,
region=region
)
self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural"
async def synthesize(self, text: str, **kwargs) -> Path:
output_path = Path(f"tts_{hash(text)}.wav")
audio_config = speechsdk.audio.AudioOutputConfig(
filename=str(output_path)
)
synthesizer = speechsdk.SpeechSynthesizer(
speech_config=self.speech_config,
audio_config=audio_config
)
result = synthesizer.speak_text_async(text).get()
if result.reason != speechsdk.ResultReason.SynthesizingAudioCompleted:
raise Exception(f"TTS failed: {result.reason}")
return output_path
Custom LLM Providers¶
LayerCode Gym uses PydanticAI, which supports many LLM providers.
Anthropic Claude¶
from layercode_gym import UserSimulator, Persona
simulator = UserSimulator.from_agent(
persona=Persona(
background_context="You are a technical user",
intent="You want detailed information"
),
model="anthropic:claude-3-5-sonnet", # Use Claude
max_turns=5
)
Local Models (Ollama)¶
simulator = UserSimulator.from_agent(
persona=Persona(
background_context="You are a casual user",
intent="You want simple answers"
),
model="ollama:llama3", # Use local model
max_turns=5
)
Google Gemini¶
simulator = UserSimulator.from_agent(
persona=Persona(
background_context="You are a researcher",
intent="You want comprehensive information"
),
model="gemini:gemini-1.5-pro", # Use Gemini
max_turns=5
)
Custom PydanticAI Agent¶
For full control, create a custom agent:
from pydantic_ai import Agent
from layercode_gym import UserSimulator
# Define dependencies
class ConversationDeps:
def __init__(self):
self.history = []
def add_message(self, role: str, content: str):
self.history.append({"role": role, "content": content})
# Create custom agent
agent = Agent(
"openai:gpt-5",
system_prompt="""
You are simulating a frustrated customer who has been on hold
for 30 minutes. You are impatient and want quick resolution.
""",
deps_type=ConversationDeps
)
# Use it
deps = ConversationDeps()
simulator = UserSimulator.from_agent(
agent=agent,
deps=deps,
max_turns=5
)
Testing Long-Running Operations¶
When your voice agent performs operations that take time (API calls, database queries, file processing), the AI simulator uses wait handling to behave realistically.
How Wait Handling Works¶
The AI agent simulator automatically detects when to wait based on the assistant's message:
# The assistant says: "Processing your request... please wait about 10 seconds."
# The AI simulator will:
# 1. Recognize this as a wait scenario
# 2. Return WaitForAssistant(wait_seconds=12) # 10s + 20% buffer
# 3. System waits, then calls simulator again with updated message
# 4. If assistant says "Done! Here are your results:", simulator responds normally
Example: Testing a Delay-Based Agent¶
from layercode_gym import LayercodeClient, UserSimulator, Persona
# Create an AI persona that will naturally wait
simulator = UserSimulator.from_agent(
persona=Persona(
background_context="You are testing a data processing system",
intent="You want to process a large dataset and get the results"
),
model="openai:gpt-5-mini",
max_turns=5
)
# The conversation might go:
# User: "Please process my dataset"
# Assistant: "Processing your dataset... this will take about 15 seconds."
# [AI simulator waits ~18 seconds]
# Assistant: "Done! Your dataset has 1,234 records processed."
# User: "Great, can you show me the summary?"
client = LayercodeClient(simulator=simulator)
await client.run()
Debugging Wait Behavior¶
Enable debug logging to see wait decisions:
import logging
logging.getLogger("layercode_gym").setLevel(logging.DEBUG)
# You'll see logs like:
# DEBUG: Simulator requested wait (wait #1, total 12.0s). Scheduling idle timer in 12.0s
# DEBUG: Wait context: waited 1 time(s), new content arrived: True
Custom Wait Logic¶
For advanced scenarios, implement a custom simulator:
from layercode_gym.simulator import (
UserSimulatorProtocol,
UserRequest,
UserResponse,
WaitContext
)
class SmartWaitSimulator(UserSimulatorProtocol):
def __init__(self, max_total_wait: float = 60.0):
self.max_total_wait = max_total_wait
async def get_response(self, request: UserRequest) -> UserResponse | None:
text = request.text or ""
wait_ctx = request.wait_context
# Check if we've waited too long overall
if wait_ctx and wait_ctx.total_wait_seconds >= self.max_total_wait:
return UserResponse(
text="I've been waiting a while. Is everything okay?",
audio_path=None,
data=()
)
# Detect if assistant is still processing
if any(phrase in text.lower() for phrase in ["please wait", "processing", "one moment"]):
# Check if this is new content or same as before
if wait_ctx and not wait_ctx.has_new_content(len(text)):
# No new content after waiting - wait a bit more
return UserResponse(
text=None, audio_path=None, data=(),
wait_seconds=15.0
)
# New content but still processing
return UserResponse(
text=None, audio_path=None, data=(),
wait_seconds=10.0
)
# Assistant is done - respond normally
return UserResponse(
text="Thanks for the update!",
audio_path=None,
data=()
)
Audio Processing¶
Background Noise Injection¶
Add realistic background noise to test transcription:
from pydub import AudioSegment
from pydub.generators import WhiteNoise
from pathlib import Path
def add_background_noise(
audio_path: Path,
noise_level: float = 0.1
) -> Path:
# Load audio
audio = AudioSegment.from_wav(audio_path)
# Generate white noise
noise = WhiteNoise().to_audio_segment(
duration=len(audio),
volume=noise_level
)
# Mix audio with noise
mixed = audio.overlay(noise)
# Save
output_path = audio_path.parent / f"{audio_path.stem}_noisy.wav"
mixed.export(output_path, format="wav")
return output_path
# Use in simulator
from layercode_gym import UserSimulator
# Generate noisy versions of audio files
noisy_files = [
add_background_noise(Path("audio/msg1.wav")),
add_background_noise(Path("audio/msg2.wav"))
]
simulator = UserSimulator.from_files(files=noisy_files)
Speed Variation¶
Test with different speaking speeds:
from pydub import AudioSegment
from pydub.playback import play
def change_speed(audio_path: Path, speed: float = 1.0) -> Path:
# speed > 1.0 = faster, speed < 1.0 = slower
audio = AudioSegment.from_wav(audio_path)
# Change frame rate
sound_with_altered_frame_rate = audio._spawn(
audio.raw_data,
overrides={"frame_rate": int(audio.frame_rate * speed)}
)
# Convert back to original frame rate
return sound_with_altered_frame_rate.set_frame_rate(audio.frame_rate)
Batch Processing Patterns¶
Parallel Processing with Resource Limits¶
import asyncio
from layercode_gym import LayercodeClient, UserSimulator
async def run_with_semaphore(
message: str,
semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
simulator = UserSimulator.from_text(
messages=[message],
send_as_text=True
)
client = LayercodeClient(simulator=simulator)
return await client.run()
async def main():
# Limit to 10 concurrent conversations
semaphore = asyncio.Semaphore(10)
scenarios = ["Message " + str(i) for i in range(100)]
tasks = [
run_with_semaphore(msg, semaphore)
for msg in scenarios
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} conversations")
asyncio.run(main())
Retry Logic¶
import asyncio
from typing import Optional
async def run_with_retry(
simulator: UserSimulatorProtocol,
max_retries: int = 3
) -> Optional[str]:
for attempt in range(max_retries):
try:
client = LayercodeClient(simulator=simulator)
return await client.run()
except Exception as e:
if attempt == max_retries - 1:
print(f"Failed after {max_retries} attempts: {e}")
return None
await asyncio.sleep(2 ** attempt) # Exponential backoff
Progress Tracking¶
from tqdm import tqdm
import asyncio
async def run_with_progress(scenarios: list[str]):
results = []
with tqdm(total=len(scenarios), desc="Running conversations") as pbar:
for scenario in scenarios:
simulator = UserSimulator.from_text(
messages=[scenario],
send_as_text=True
)
client = LayercodeClient(simulator=simulator)
conv_id = await client.run()
results.append(conv_id)
pbar.update(1)
return results
Evaluation Frameworks¶
Custom Scoring System¶
from layercode_gym.models import ConversationLog
from typing import Dict
class ConversationScorer:
def __init__(self):
self.scores: Dict[str, float] = {}
async def score_conversation(
self,
log: ConversationLog
) -> float:
score = 0.0
# Score based on duration (prefer shorter)
if log.stats["duration_seconds"] < 60:
score += 2.0
elif log.stats["duration_seconds"] < 120:
score += 1.0
# Score based on latency
if log.stats["avg_latency_ms"] < 500:
score += 2.0
elif log.stats["avg_latency_ms"] < 1000:
score += 1.0
# Score based on turn count
if log.stats["total_turns"] >= 3:
score += 1.0
return score
# Use it
scorer = ConversationScorer()
async def evaluate_callback(log: ConversationLog) -> None:
score = await scorer.score_conversation(log)
print(f"Conversation {log.conversation_id} scored: {score}/5.0")
client = LayercodeClient(
simulator=simulator,
conversation_callback=evaluate_callback
)
A/B Testing¶
from enum import Enum
from typing import List
import statistics
class AgentVersion(Enum):
V1 = "agent_v1_id"
V2 = "agent_v2_id"
async def ab_test(
scenarios: List[str],
num_runs_per_version: int = 10
):
results = {AgentVersion.V1: [], AgentVersion.V2: []}
for version in AgentVersion:
settings = Settings(
server_url="http://localhost:8001",
agent_id=version.value
)
for scenario in scenarios[:num_runs_per_version]:
simulator = UserSimulator.from_text(
messages=[scenario],
send_as_text=True
)
client = LayercodeClient(
simulator=simulator,
settings=settings
)
conv_id = await client.run()
# Collect metrics
# ... analyze conversation ...
results[version].append(conv_id)
# Compare results
print("A/B Test Results:")
for version, conv_ids in results.items():
print(f"{version.name}: {len(conv_ids)} conversations")
CI/CD Integration¶
GitHub Actions Example¶
name: Voice Agent Tests
on:
pull_request:
push:
branches: [main]
jobs:
test-agent:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: uv sync
- name: Start backend server
run: |
uvx layercode-create-app run &
sleep 5
- name: Run tests
env:
SERVER_URL: http://localhost:8001
LAYERCODE_AGENT_ID: ${{ secrets.AGENT_ID }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python examples/01_text_messages.py
python examples/05_batch_evaluation.py
- name: Upload results
uses: actions/upload-artifact@v4
with:
name: conversation-logs
path: conversations/
Performance Optimization¶
Reuse WebSocket Connections¶
For high-volume testing, consider connection pooling (advanced, requires modification):
# This is a conceptual example - would require changes to core client
class ClientPool:
def __init__(self, pool_size: int = 10):
self.pool_size = pool_size
self.clients = []
async def get_client(self) -> LayercodeClient:
# Return a client from the pool
# This would require refactoring LayercodeClient
pass
Disable Audio File Saving¶
If you only need metrics:
# Modify storage settings (conceptual - would need implementation)
settings = Settings(
server_url="http://localhost:8001",
agent_id="your_agent_id",
save_audio=False # Don't save audio files
)
Use Text Mode¶
For maximum speed:
Next Steps¶
- API Reference - Full API documentation
- Examples - Practical usage examples
- Roadmap - Upcoming features