Skip to content

Advanced Usage

Advanced features and customization options for power users.

LogFire Integration

LayerCode Gym integrates with LogFire for real-time observability and debugging.

Setup

# Install LogFire dependencies (already included in layercode-gym)
uv add logfire

# Configure LogFire
logfire configure

Enable in LayerCode Gym

LogFire is automatically enabled when you provide a LOGFIRE_TOKEN:

export LOGFIRE_TOKEN="your_token_here"

LayerCode Gym automatically instruments PydanticAI and OpenAI when a LogFire token is present.

What You Get

With LogFire enabled, you get:

  • Real-time conversation tracking in the LogFire UI
  • Performance metrics and spans for each operation
  • WebSocket event streaming visualization
  • Error tracking and stack traces
  • Timeline view of conversation flow

View in LogFire Dashboard

# Start LogFire UI
logfire view

# Or visit https://logfire.pydantic.dev

You'll see:

  • Conversation spans with nested operations
  • WebSocket events (connect, message, disconnect)
  • TTS synthesis operations
  • LLM API calls (for AI personas)
  • Timing metrics for each operation

Custom LogFire Spans

Add your own instrumentation:

import logfire

async def my_custom_callback(
    turn_number: int,
    user_message: str,
    agent_message: str,
    conversation_id: str
) -> None:
    with logfire.span("custom_analysis"):
        # Your analysis code
        sentiment = analyze_sentiment(agent_message)
        logfire.info(
            "Sentiment analysis",
            turn=turn_number,
            sentiment=sentiment
        )

client = LayercodeClient(
    simulator=simulator,
    turn_callback=my_custom_callback
)

Custom TTS Engines

Use alternative TTS providers like ElevenLabs, Azure, or local engines.

ElevenLabs Example

from pathlib import Path
import httpx
from layercode_gym.simulator import TTSEngineProtocol

class ElevenLabsTTS(TTSEngineProtocol):
    def __init__(self, api_key: str, voice_id: str):
        self.api_key = api_key
        self.voice_id = voice_id
        self.base_url = "https://api.elevenlabs.io/v1"

    async def synthesize(self, text: str, **kwargs) -> Path:
        url = f"{self.base_url}/text-to-speech/{self.voice_id}"
        headers = {"xi-api-key": self.api_key}
        data = {
            "text": text,
            "model_id": "eleven_monolingual_v1",
            "voice_settings": {
                "stability": 0.5,
                "similarity_boost": 0.5
            }
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(url, headers=headers, json=data)
            response.raise_for_status()

        # Save audio
        output_path = Path(f"tts_{hash(text)}.mp3")
        output_path.write_bytes(response.content)

        return output_path

# Use it
tts = ElevenLabsTTS(
    api_key="your_elevenlabs_key",
    voice_id="your_voice_id"
)

simulator = UserSimulator.from_text(
    messages=["Hello!", "How are you?"],
    send_as_text=False,
    tts_engine=tts
)

Azure TTS Example

import azure.cognitiveservices.speech as speechsdk
from pathlib import Path
from layercode_gym.simulator import TTSEngineProtocol

class AzureTTS(TTSEngineProtocol):
    def __init__(self, subscription_key: str, region: str):
        self.speech_config = speechsdk.SpeechConfig(
            subscription=subscription_key,
            region=region
        )
        self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural"

    async def synthesize(self, text: str, **kwargs) -> Path:
        output_path = Path(f"tts_{hash(text)}.wav")

        audio_config = speechsdk.audio.AudioOutputConfig(
            filename=str(output_path)
        )

        synthesizer = speechsdk.SpeechSynthesizer(
            speech_config=self.speech_config,
            audio_config=audio_config
        )

        result = synthesizer.speak_text_async(text).get()

        if result.reason != speechsdk.ResultReason.SynthesizingAudioCompleted:
            raise Exception(f"TTS failed: {result.reason}")

        return output_path

Custom LLM Providers

LayerCode Gym uses PydanticAI, which supports many LLM providers.

Anthropic Claude

from layercode_gym import UserSimulator, Persona

simulator = UserSimulator.from_agent(
    persona=Persona(
        background_context="You are a technical user",
        intent="You want detailed information"
    ),
    model="anthropic:claude-3-5-sonnet",  # Use Claude
    max_turns=5
)

Local Models (Ollama)

# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh

# Pull a model
ollama pull llama3
simulator = UserSimulator.from_agent(
    persona=Persona(
        background_context="You are a casual user",
        intent="You want simple answers"
    ),
    model="ollama:llama3",  # Use local model
    max_turns=5
)

Google Gemini

simulator = UserSimulator.from_agent(
    persona=Persona(
        background_context="You are a researcher",
        intent="You want comprehensive information"
    ),
    model="gemini:gemini-1.5-pro",  # Use Gemini
    max_turns=5
)

Custom PydanticAI Agent

For full control, create a custom agent:

from pydantic_ai import Agent
from layercode_gym import UserSimulator

# Define dependencies
class ConversationDeps:
    def __init__(self):
        self.history = []

    def add_message(self, role: str, content: str):
        self.history.append({"role": role, "content": content})

# Create custom agent
agent = Agent(
    "openai:gpt-5",
    system_prompt="""
    You are simulating a frustrated customer who has been on hold
    for 30 minutes. You are impatient and want quick resolution.
    """,
    deps_type=ConversationDeps
)

# Use it
deps = ConversationDeps()
simulator = UserSimulator.from_agent(
    agent=agent,
    deps=deps,
    max_turns=5
)

Testing Long-Running Operations

When your voice agent performs operations that take time (API calls, database queries, file processing), the AI simulator uses wait handling to behave realistically.

How Wait Handling Works

The AI agent simulator automatically detects when to wait based on the assistant's message:

# The assistant says: "Processing your request... please wait about 10 seconds."
# The AI simulator will:
# 1. Recognize this as a wait scenario
# 2. Return WaitForAssistant(wait_seconds=12)  # 10s + 20% buffer
# 3. System waits, then calls simulator again with updated message
# 4. If assistant says "Done! Here are your results:", simulator responds normally

Example: Testing a Delay-Based Agent

from layercode_gym import LayercodeClient, UserSimulator, Persona

# Create an AI persona that will naturally wait
simulator = UserSimulator.from_agent(
    persona=Persona(
        background_context="You are testing a data processing system",
        intent="You want to process a large dataset and get the results"
    ),
    model="openai:gpt-5-mini",
    max_turns=5
)

# The conversation might go:
# User: "Please process my dataset"
# Assistant: "Processing your dataset... this will take about 15 seconds."
# [AI simulator waits ~18 seconds]
# Assistant: "Done! Your dataset has 1,234 records processed."
# User: "Great, can you show me the summary?"

client = LayercodeClient(simulator=simulator)
await client.run()

Debugging Wait Behavior

Enable debug logging to see wait decisions:

import logging
logging.getLogger("layercode_gym").setLevel(logging.DEBUG)

# You'll see logs like:
# DEBUG: Simulator requested wait (wait #1, total 12.0s). Scheduling idle timer in 12.0s
# DEBUG: Wait context: waited 1 time(s), new content arrived: True

Custom Wait Logic

For advanced scenarios, implement a custom simulator:

from layercode_gym.simulator import (
    UserSimulatorProtocol,
    UserRequest,
    UserResponse,
    WaitContext
)

class SmartWaitSimulator(UserSimulatorProtocol):
    def __init__(self, max_total_wait: float = 60.0):
        self.max_total_wait = max_total_wait

    async def get_response(self, request: UserRequest) -> UserResponse | None:
        text = request.text or ""
        wait_ctx = request.wait_context

        # Check if we've waited too long overall
        if wait_ctx and wait_ctx.total_wait_seconds >= self.max_total_wait:
            return UserResponse(
                text="I've been waiting a while. Is everything okay?",
                audio_path=None,
                data=()
            )

        # Detect if assistant is still processing
        if any(phrase in text.lower() for phrase in ["please wait", "processing", "one moment"]):
            # Check if this is new content or same as before
            if wait_ctx and not wait_ctx.has_new_content(len(text)):
                # No new content after waiting - wait a bit more
                return UserResponse(
                    text=None, audio_path=None, data=(),
                    wait_seconds=15.0
                )
            # New content but still processing
            return UserResponse(
                text=None, audio_path=None, data=(),
                wait_seconds=10.0
            )

        # Assistant is done - respond normally
        return UserResponse(
            text="Thanks for the update!",
            audio_path=None,
            data=()
        )

Audio Processing

Background Noise Injection

Add realistic background noise to test transcription:

from pydub import AudioSegment
from pydub.generators import WhiteNoise
from pathlib import Path

def add_background_noise(
    audio_path: Path,
    noise_level: float = 0.1
) -> Path:
    # Load audio
    audio = AudioSegment.from_wav(audio_path)

    # Generate white noise
    noise = WhiteNoise().to_audio_segment(
        duration=len(audio),
        volume=noise_level
    )

    # Mix audio with noise
    mixed = audio.overlay(noise)

    # Save
    output_path = audio_path.parent / f"{audio_path.stem}_noisy.wav"
    mixed.export(output_path, format="wav")

    return output_path

# Use in simulator
from layercode_gym import UserSimulator

# Generate noisy versions of audio files
noisy_files = [
    add_background_noise(Path("audio/msg1.wav")),
    add_background_noise(Path("audio/msg2.wav"))
]

simulator = UserSimulator.from_files(files=noisy_files)

Speed Variation

Test with different speaking speeds:

from pydub import AudioSegment
from pydub.playback import play

def change_speed(audio_path: Path, speed: float = 1.0) -> Path:
    # speed > 1.0 = faster, speed < 1.0 = slower
    audio = AudioSegment.from_wav(audio_path)

    # Change frame rate
    sound_with_altered_frame_rate = audio._spawn(
        audio.raw_data,
        overrides={"frame_rate": int(audio.frame_rate * speed)}
    )

    # Convert back to original frame rate
    return sound_with_altered_frame_rate.set_frame_rate(audio.frame_rate)

Batch Processing Patterns

Parallel Processing with Resource Limits

import asyncio
from layercode_gym import LayercodeClient, UserSimulator

async def run_with_semaphore(
    message: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        simulator = UserSimulator.from_text(
            messages=[message],
            send_as_text=True
        )
        client = LayercodeClient(simulator=simulator)
        return await client.run()

async def main():
    # Limit to 10 concurrent conversations
    semaphore = asyncio.Semaphore(10)

    scenarios = ["Message " + str(i) for i in range(100)]
    tasks = [
        run_with_semaphore(msg, semaphore)
        for msg in scenarios
    ]

    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} conversations")

asyncio.run(main())

Retry Logic

import asyncio
from typing import Optional

async def run_with_retry(
    simulator: UserSimulatorProtocol,
    max_retries: int = 3
) -> Optional[str]:
    for attempt in range(max_retries):
        try:
            client = LayercodeClient(simulator=simulator)
            return await client.run()
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"Failed after {max_retries} attempts: {e}")
                return None
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Progress Tracking

from tqdm import tqdm
import asyncio

async def run_with_progress(scenarios: list[str]):
    results = []

    with tqdm(total=len(scenarios), desc="Running conversations") as pbar:
        for scenario in scenarios:
            simulator = UserSimulator.from_text(
                messages=[scenario],
                send_as_text=True
            )
            client = LayercodeClient(simulator=simulator)
            conv_id = await client.run()
            results.append(conv_id)
            pbar.update(1)

    return results

Evaluation Frameworks

Custom Scoring System

from layercode_gym.models import ConversationLog
from typing import Dict

class ConversationScorer:
    def __init__(self):
        self.scores: Dict[str, float] = {}

    async def score_conversation(
        self,
        log: ConversationLog
    ) -> float:
        score = 0.0

        # Score based on duration (prefer shorter)
        if log.stats["duration_seconds"] < 60:
            score += 2.0
        elif log.stats["duration_seconds"] < 120:
            score += 1.0

        # Score based on latency
        if log.stats["avg_latency_ms"] < 500:
            score += 2.0
        elif log.stats["avg_latency_ms"] < 1000:
            score += 1.0

        # Score based on turn count
        if log.stats["total_turns"] >= 3:
            score += 1.0

        return score

# Use it
scorer = ConversationScorer()

async def evaluate_callback(log: ConversationLog) -> None:
    score = await scorer.score_conversation(log)
    print(f"Conversation {log.conversation_id} scored: {score}/5.0")

client = LayercodeClient(
    simulator=simulator,
    conversation_callback=evaluate_callback
)

A/B Testing

from enum import Enum
from typing import List
import statistics

class AgentVersion(Enum):
    V1 = "agent_v1_id"
    V2 = "agent_v2_id"

async def ab_test(
    scenarios: List[str],
    num_runs_per_version: int = 10
):
    results = {AgentVersion.V1: [], AgentVersion.V2: []}

    for version in AgentVersion:
        settings = Settings(
            server_url="http://localhost:8001",
            agent_id=version.value
        )

        for scenario in scenarios[:num_runs_per_version]:
            simulator = UserSimulator.from_text(
                messages=[scenario],
                send_as_text=True
            )

            client = LayercodeClient(
                simulator=simulator,
                settings=settings
            )

            conv_id = await client.run()

            # Collect metrics
            # ... analyze conversation ...

            results[version].append(conv_id)

    # Compare results
    print("A/B Test Results:")
    for version, conv_ids in results.items():
        print(f"{version.name}: {len(conv_ids)} conversations")

CI/CD Integration

GitHub Actions Example

name: Voice Agent Tests

on:
  pull_request:
  push:
    branches: [main]

jobs:
  test-agent:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install uv
        run: curl -LsSf https://astral.sh/uv/install.sh | sh

      - name: Install dependencies
        run: uv sync

      - name: Start backend server
        run: |
          uvx layercode-create-app run &
          sleep 5

      - name: Run tests
        env:
          SERVER_URL: http://localhost:8001
          LAYERCODE_AGENT_ID: ${{ secrets.AGENT_ID }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python examples/01_text_messages.py
          python examples/05_batch_evaluation.py

      - name: Upload results
        uses: actions/upload-artifact@v4
        with:
          name: conversation-logs
          path: conversations/

Performance Optimization

Reuse WebSocket Connections

For high-volume testing, consider connection pooling (advanced, requires modification):

# This is a conceptual example - would require changes to core client
class ClientPool:
    def __init__(self, pool_size: int = 10):
        self.pool_size = pool_size
        self.clients = []

    async def get_client(self) -> LayercodeClient:
        # Return a client from the pool
        # This would require refactoring LayercodeClient
        pass

Disable Audio File Saving

If you only need metrics:

# Modify storage settings (conceptual - would need implementation)
settings = Settings(
    server_url="http://localhost:8001",
    agent_id="your_agent_id",
    save_audio=False  # Don't save audio files
)

Use Text Mode

For maximum speed:

simulator = UserSimulator.from_text(
    messages=["Hello!"],
    send_as_text=True  # Fastest mode
)

Next Steps