Socket-Based Connection Template

The socket-based connection template provides a real-time way to connect your agent to Snowglobe for testing. This template is designed for applications that maintain persistent connections and require conversational state, such as real-time chat applications or streaming LLM services.

When to Use

Use the socket-based template when:
  • Your application uses WebSocket or persistent connections
  • You need to maintain conversation state across multiple messages
  • You’re working with real-time streaming APIs (like OpenAI’s Realtime API)
  • Your agent requires session persistence between requests
  • You want to test conversational flows that depend on connection state

Template Code

When you run snowglobe-connect init and select the socket template, Snowglobe generates this code:
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
import logging
import websockets
import json
from openai import AsyncOpenAI

LOGGER = logging.getLogger(__name__)
socket_cache = {}
openai_client = AsyncOpenAI()

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    """
    When dealing with a realtime socket, we need to create a socket for each conversation.
    We store the socket in a cache and reuse it for the same conversation_id so that we can maintain the conversation context.
    Swap out the websocket client for your preferred realtime client.

    Args:
        request (CompletionRequest): The request object containing messages for the test.

    Returns:
        CompletionFunctionOutputs: The response object with the generated content.
    """
    conversation_id = request.get_conversation_id()
    
    if conversation_id not in socket_cache:
        socket = await websockets.connect(
            "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01&modalities=text",
            additional_headers={
                "Authorization": f"Bearer {openai_client.api_key}",
                "OpenAI-Beta": "realtime=v1"
            }
        )
        socket_cache[conversation_id] = socket
    else:
        socket = socket_cache[conversation_id]
    
    # Send user message
    messages = request.to_openai_messages()
    user_message = messages[-1]["content"]
    
    await socket.send(json.dumps({
        "type": "conversation.item.create",
        "session": {
                "modalities": ["text"],  # Only text, no audio
        },
        "item": {
            "type": "message",
            "role": "user",
            "content": [{"type": "input_text", "text": user_message}]
        }
    }))
    
    await socket.send(json.dumps({"type": "response.create"}))
    
    # Get response
    response_content = ""
    async for message in socket:
        data = json.loads(message)
        if data.get("type") == "response.audio_transcript.delta":
            response_content += data.get("delta", "")
        elif data.get("type") == "response.done":
            break
    
    return CompletionFunctionOutputs(response=response_content)

Code Walkthrough

1. Imports and Setup

from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
import logging
import websockets
import json
from openai import AsyncOpenAI

LOGGER = logging.getLogger(__name__)
socket_cache = {}
openai_client = AsyncOpenAI()
  • websockets: Python library for WebSocket connections
  • json: For encoding/decoding WebSocket messages
  • socket_cache: Dictionary to store persistent connections per conversation
  • LOGGER: For debugging WebSocket interactions

2. Conversation-Based Socket Management

conversation_id = request.get_conversation_id()

if conversation_id not in socket_cache:
    # Create new socket for this conversation
    socket = await websockets.connect(...)
    socket_cache[conversation_id] = socket
else:
    # Reuse existing socket
    socket = socket_cache[conversation_id]
Key Concept: Each conversation gets its own persistent WebSocket connection. This allows:
  • Maintaining conversation context across multiple messages
  • Session state preservation
  • More efficient resource usage for ongoing conversations

3. WebSocket Connection Setup

socket = await websockets.connect(
    "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01&modalities=text",
    additional_headers={
        "Authorization": f"Bearer {openai_client.api_key}",
        "OpenAI-Beta": "realtime=v1"
    }
)
This connects to OpenAI’s Realtime API, but you can replace this with:
  • Your own WebSocket server
  • Other real-time LLM services
  • Custom streaming endpoints

4. Message Sending

await socket.send(json.dumps({
    "type": "conversation.item.create",
    "session": {
        "modalities": ["text"],
    },
    "item": {
        "type": "message",
        "role": "user",
        "content": [{"type": "input_text", "text": user_message}]
    }
}))

await socket.send(json.dumps({"type": "response.create"}))
The template sends structured JSON messages over the WebSocket. The format depends on your specific real-time API.

5. Response Handling

response_content = ""
async for message in socket:
    data = json.loads(message)
    if data.get("type") == "response.audio_transcript.delta":
        response_content += data.get("delta", "")
    elif data.get("type") == "response.done":
        break
This listens for streaming response chunks and assembles the complete response.

Customization Examples

Generic WebSocket Client

import websockets
import json

socket_cache = {}

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    conversation_id = request.get_conversation_id()
    
    # Connect to your custom WebSocket server
    if conversation_id not in socket_cache:
        socket = await websockets.connect("ws://your-server.com/chat")
        socket_cache[conversation_id] = socket
    else:
        socket = socket_cache[conversation_id]
    
    # Send message in your custom format
    message = {
        "conversation_id": conversation_id,
        "messages": [{"role": msg.role, "content": msg.content} for msg in request.messages],
        "timestamp": request.messages[-1].snowglobe_data.timestamp if request.messages else None
    }
    
    await socket.send(json.dumps(message))
    
    # Wait for response
    response_data = await socket.recv()
    response = json.loads(response_data)
    
    return CompletionFunctionOutputs(response=response.get("content", ""))

Socket Cleanup and Error Handling

import asyncio
import weakref

# Use WeakValueDictionary for automatic cleanup
socket_cache = weakref.WeakValueDictionary()

class ManagedSocket:
    def __init__(self, socket, conversation_id):
        self.socket = socket
        self.conversation_id = conversation_id
        self.last_used = asyncio.get_event_loop().time()
    
    async def send(self, data):
        self.last_used = asyncio.get_event_loop().time()
        await self.socket.send(data)
    
    async def recv(self):
        self.last_used = asyncio.get_event_loop().time()
        return await self.socket.recv()
    
    async def close(self):
        await self.socket.close()

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    conversation_id = request.get_conversation_id()
    
    try:
        # Get or create managed socket
        if conversation_id not in socket_cache:
            raw_socket = await websockets.connect("ws://your-server.com/chat")
            managed_socket = ManagedSocket(raw_socket, conversation_id)
            socket_cache[conversation_id] = managed_socket
        else:
            managed_socket = socket_cache[conversation_id]
        
        # Send message
        message = json.dumps({
            "conversation_id": conversation_id,
            "content": request.messages[-1].content
        })
        
        await managed_socket.send(message)
        
        # Receive response
        response_data = await managed_socket.recv()
        response = json.loads(response_data)
        
        return CompletionFunctionOutputs(response=response.get("content", ""))
        
    except websockets.ConnectionClosed:
        # Remove from cache and retry once
        if conversation_id in socket_cache:
            del socket_cache[conversation_id]
        
        # Recursive retry (only once)
        if not hasattr(request, '_retry_attempted'):
            request._retry_attempted = True
            return await acompletion(request)
        else:
            return CompletionFunctionOutputs(response="Connection error - please try again")
            
    except Exception as e:
        LOGGER.error(f"Socket error for conversation {conversation_id}: {e}")
        return CompletionFunctionOutputs(response=f"Error: {str(e)}")

Server-Sent Events (SSE) Alternative

import aiohttp
import json

sse_clients = {}

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    conversation_id = request.get_conversation_id()
    
    # Create or reuse SSE connection
    if conversation_id not in sse_clients:
        session = aiohttp.ClientSession()
        sse_clients[conversation_id] = session
    else:
        session = sse_clients[conversation_id]
    
    # Send message via POST
    async with session.post('http://your-server.com/chat', json={
        'conversation_id': conversation_id,
        'message': request.messages[-1].content
    }) as resp:
        if resp.status != 200:
            return CompletionFunctionOutputs(response="Server error")
    
    # Listen to SSE stream
    response_content = ""
    async with session.get(f'http://your-server.com/stream/{conversation_id}') as resp:
        async for line in resp.content:
            if line.startswith(b'data: '):
                data = json.loads(line[6:].decode())
                if data.get('type') == 'content':
                    response_content += data.get('content', '')
                elif data.get('type') == 'done':
                    break
    
    return CompletionFunctionOutputs(response=response_content)

Socket Pool Management

import asyncio
from collections import defaultdict
import time

class SocketPool:
    def __init__(self, max_sockets_per_conversation=3, cleanup_interval=300):
        self.sockets = defaultdict(list)
        self.max_sockets = max_sockets_per_conversation
        self.cleanup_interval = cleanup_interval
        asyncio.create_task(self._cleanup_loop())
    
    async def get_socket(self, conversation_id: str):
        """Get an available socket or create a new one"""
        available_sockets = self.sockets[conversation_id]
        
        # Find an available socket
        for socket_info in available_sockets:
            if not socket_info['in_use']:
                socket_info['in_use'] = True
                socket_info['last_used'] = time.time()
                return socket_info['socket']
        
        # Create new socket if under limit
        if len(available_sockets) < self.max_sockets:
            socket = await websockets.connect("ws://your-server.com/chat")
            socket_info = {
                'socket': socket,
                'in_use': True,
                'created': time.time(),
                'last_used': time.time()
            }
            available_sockets.append(socket_info)
            return socket
        
        # Wait for available socket
        while True:
            for socket_info in available_sockets:
                if not socket_info['in_use']:
                    socket_info['in_use'] = True
                    socket_info['last_used'] = time.time()
                    return socket_info['socket']
            await asyncio.sleep(0.1)
    
    async def release_socket(self, conversation_id: str, socket):
        """Mark socket as available"""
        for socket_info in self.sockets[conversation_id]:
            if socket_info['socket'] == socket:
                socket_info['in_use'] = False
                break
    
    async def _cleanup_loop(self):
        """Clean up old unused sockets"""
        while True:
            await asyncio.sleep(self.cleanup_interval)
            current_time = time.time()
            
            for conversation_id, sockets in list(self.sockets.items()):
                sockets_to_remove = []
                for i, socket_info in enumerate(sockets):
                    # Remove sockets unused for 30 minutes
                    if (not socket_info['in_use'] and 
                        current_time - socket_info['last_used'] > 1800):
                        await socket_info['socket'].close()
                        sockets_to_remove.append(i)
                
                # Remove closed sockets
                for i in reversed(sockets_to_remove):
                    sockets.pop(i)
                
                # Clean up empty conversations
                if not sockets:
                    del self.sockets[conversation_id]

# Global socket pool
socket_pool = SocketPool()

async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
    conversation_id = request.get_conversation_id()
    
    socket = await socket_pool.get_socket(conversation_id)
    
    try:
        # Use the socket
        await socket.send(json.dumps({
            "conversation_id": conversation_id,
            "message": request.messages[-1].content
        }))
        
        response_data = await socket.recv()
        response = json.loads(response_data)
        
        return CompletionFunctionOutputs(response=response.get("content", ""))
        
    finally:
        # Always release the socket back to the pool
        await socket_pool.release_socket(conversation_id, socket)

Benefits of Socket-Based Connections

Advantages

  • State Persistence: Maintain conversation context across messages
  • Lower Latency: Persistent connections eliminate handshake overhead
  • Real-time Communication: Support for streaming and bi-directional communication
  • Resource Efficiency: Reuse connections for multiple messages
  • Enhanced Features: Support for typing indicators, presence, etc.

Use Cases

  • Conversational AI: Chatbots that need to remember context
  • Real-time Collaboration: Multi-user applications
  • Streaming Responses: Applications that stream LLM responses
  • Gaming or Interactive Apps: Applications requiring low-latency communication

Testing Your Socket Implementation

  1. Test the connection:
    snowglobe-connect test
    
  2. Monitor socket connections: Use logging to track connection lifecycle:
    import logging
    logging.basicConfig(level=logging.DEBUG)
    LOGGER = logging.getLogger(__name__)
    
    # Add logging to your socket operations
    LOGGER.info(f"Created new socket for conversation {conversation_id}")
    LOGGER.debug(f"Sending message: {message}")
    LOGGER.info(f"Socket cache size: {len(socket_cache)}")
    
  3. Start the client:
    snowglobe-connect start
    

Performance Considerations

Memory Management

  • Implement socket cleanup to prevent memory leaks
  • Use weak references for automatic garbage collection
  • Set reasonable limits on concurrent connections

Connection Limits

  • Most services limit WebSocket connections per client
  • Implement connection pooling for high-volume scenarios
  • Monitor and log connection statistics

Error Recovery

  • Handle network interruptions gracefully
  • Implement exponential backoff for reconnection
  • Provide fallback mechanisms for connection failures

Common Pitfalls

1. Socket Leaks

# ❌ Sockets never get cleaned up
socket_cache = {}

# ✅ Use cleanup mechanisms
import weakref
socket_cache = weakref.WeakValueDictionary()

2. Blocking Operations

# ❌ Don't use sync operations in async context
response = socket.recv()  # This will block!

# ✅ Always use async operations
response = await socket.recv()

3. No Error Handling

# ❌ Network errors will crash your agent
await socket.send(message)

# ✅ Handle connection errors
try:
    await socket.send(message)
except websockets.ConnectionClosed:
    # Handle reconnection
    del socket_cache[conversation_id]
    return await acompletion(request)  # Retry

Next Steps