Asynchronous Connection Template

The asynchronous connection template provides a high-performance way to connect your agent to Snowglobe for testing. This template is ideal when your application needs to handle multiple concurrent requests efficiently or uses async APIs.

When to Use

Use the asynchronous template when:
  • Your application needs to handle high volumes of concurrent requests
  • You’re using async LLM clients (like AsyncOpenAI)
  • Your agent performs multiple I/O operations that can be parallelized
  • You want optimal performance and resource utilization
  • Your existing codebase is already async-based

Template Code

When you run snowglobe-connect init and select the asynchronous template, Snowglobe generates this code:
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
from openai import AsyncOpenAI
import os
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    """
    Process a scenario request from Snowglobe.
    
    This function is called by the Snowglobe client to process test requests. It should return a
    CompletionFunctionOutputs object with the response content.
    
    Args:
        request (CompletionRequest): The request object containing messages for the test.

    Returns:
        CompletionFunctionOutputs: The response object with the generated content.
    """

    # Process the request using the messages. Example using OpenAI:
    messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    return CompletionFunctionOutputs(response=response.choices[0].message.content)

Code Walkthrough

1. Imports and Setup

from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
from openai import AsyncOpenAI
import os
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
  • AsyncOpenAI: The asynchronous version of OpenAI’s client for non-blocking API calls
  • Same Snowglobe imports: Uses the same request/response objects as the sync template
  • Environment variable: Safely loads your OpenAI API key from environment

2. Main Async Function

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
The acompletion function is the async entry point that Snowglobe calls. Key differences from sync:
  • Function name is acompletion (not completion)
  • Decorated with async keyword
  • Can use await for non-blocking operations
  • Snowglobe automatically handles the async execution

3. Message Processing (Same as Sync)

messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
Message processing works identically to the synchronous version. The CompletionRequest object provides the same methods and data.

4. Async API Call

response = await client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages
)
The key difference: using await for the API call. This allows other requests to be processed concurrently while waiting for the LLM response.

5. Response Formatting (Same as Sync)

return CompletionFunctionOutputs(response=response.choices[0].message.content)
Response formatting is identical to the synchronous version.

Performance Benefits

The asynchronous template provides several advantages:
  • Concurrent Processing: Handle multiple requests simultaneously
  • Better Resource Utilization: CPU isn’t blocked during I/O operations
  • Scalability: Handle higher request volumes with the same hardware
  • Reduced Latency: Other requests don’t wait when one request is slow

Advanced Customization Examples

Multiple Concurrent API Calls

import asyncio

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
    
    # Make multiple API calls concurrently
    tasks = []
    for i in range(3):  # Generate 3 responses
        task = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            temperature=0.7 + (i * 0.1)  # Different temperatures
        )
        tasks.append(task)
    
    # Wait for all responses
    responses = await asyncio.gather(*tasks)
    
    # Use the best response (or combine them)
    best_response = max(responses, key=lambda r: len(r.choices[0].message.content))
    
    return CompletionFunctionOutputs(response=best_response.choices[0].message.content)

Async Database Operations

import asyncpg
import asyncio

# Database connection pool (initialize once)
db_pool = None

async def init_db_pool():
    global db_pool
    if db_pool is None:
        db_pool = await asyncpg.create_pool("postgresql://...")

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    await init_db_pool()
    
    # Get conversation history from database concurrently with LLM call
    conversation_id = request.get_conversation_id()
    
    # Start both operations concurrently
    history_task = get_conversation_history(conversation_id)
    llm_task = generate_response(request)
    
    # Wait for both to complete
    history, response = await asyncio.gather(history_task, llm_task)
    
    # Store the interaction
    await store_interaction(conversation_id, request, response)
    
    return CompletionFunctionOutputs(response=response)

async def get_conversation_history(conversation_id: str):
    async with db_pool.acquire() as conn:
        return await conn.fetch(
            "SELECT * FROM conversations WHERE id = $1", 
            conversation_id
        )

async def generate_response(request: CompletionRequest):
    messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    return response.choices[0].message.content

async def store_interaction(conversation_id: str, request: CompletionRequest, response: str):
    async with db_pool.acquire() as conn:
        await conn.execute(
            "INSERT INTO interactions (conversation_id, request, response) VALUES ($1, $2, $3)",
            conversation_id, str(request.messages), response
        )

Async External API Integration

import aiohttp

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    conversation_id = request.get_conversation_id()
    
    # Concurrently fetch user context and generate response
    async with aiohttp.ClientSession() as session:
        context_task = fetch_user_context(session, conversation_id)
        response_task = generate_llm_response(request)
        
        context, llm_response = await asyncio.gather(context_task, response_task)
        
        # Enhance response with context
        enhanced_response = f"Based on your history: {context}\n\n{llm_response}"
        
        return CompletionFunctionOutputs(response=enhanced_response)

async def fetch_user_context(session, conversation_id):
    async with session.get(f"https://api.example.com/users/{conversation_id}") as resp:
        if resp.status == 200:
            data = await resp.json()
            return data.get("context", "No additional context")
        return "Context unavailable"

async def generate_llm_response(request):
    messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    return response.choices[0].message.content

Error Handling with Retries

import asyncio
from typing import Optional

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    max_retries = 3
    for attempt in range(max_retries):
        try:
            messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
            response = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages
            )
            return CompletionFunctionOutputs(response=response.choices[0].message.content)
            
        except Exception as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                return CompletionFunctionOutputs(
                    response=f"Service temporarily unavailable: {str(e)}"
                )
            else:
                # Wait before retrying (exponential backoff)
                await asyncio.sleep(2 ** attempt)
                continue

Testing Your Async Implementation

  1. Test the connection:
    snowglobe-connect test
    
  2. Start the client:
    snowglobe-connect start
    
  3. Monitor performance: The async template will show better performance under concurrent load

Performance Comparison

FeatureSynchronousAsynchronous
Concurrent RequestsLimitedHigh
Memory UsageHigher per requestLower overall
CPU UtilizationBlocked during I/OEfficient
Implementation ComplexitySimpleModerate
Best ForLow-medium trafficHigh traffic, complex workflows

Common Pitfalls

1. Blocking Operations

# ❌ Don't do this - blocks the event loop
def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    time.sleep(1)  # Blocks everything!
    
# ✅ Do this instead
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    await asyncio.sleep(1)  # Non-blocking

2. Forgetting await

# ❌ This won't work - returns a coroutine object
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    response = client.chat.completions.create(...)  # Missing await!
    
# ✅ Always await async operations
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    response = await client.chat.completions.create(...)

3. Mixing Sync and Async Incorrectly

# ❌ Don't mix sync client in async function
from openai import OpenAI  # Sync client
client = OpenAI()

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    response = client.chat.completions.create(...)  # Blocks event loop
    
# ✅ Use async client
from openai import AsyncOpenAI
client = AsyncOpenAI()

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    response = await client.chat.completions.create(...)

Next Steps