twenty_questions_game / tests /test_openai_connection.py
Boopster's picture
Implement 20 Questions game for Reachy Mini with OpenAI Realtime API
eaf056f
#!/usr/bin/env python3
"""
Test script for OpenAI Realtime API connection and audio handling.
This script tests:
1. OpenAI API connection
2. Event receiving
3. Audio sending/receiving (if Reachy Mini is available)
4. Audio conversion utilities
Usage:
python test_openai_connection.py
"""
import os
import asyncio
import json
import base64
import logging
from pathlib import Path
from dotenv import load_dotenv
import websockets
# Load environment variables
env_paths = [
Path(__file__).parent / ".env",
Path.cwd() / ".env",
]
for env_path in env_paths:
if env_path.exists():
load_dotenv(env_path)
print(f"βœ… Loaded .env from {env_path}")
break
else:
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# OpenAI settings
OPENAI_MODEL = "gpt-realtime-2025-08-28"
OPENAI_VOICE = "alloy"
async def test_openai_connection():
"""Test basic OpenAI Realtime API connection"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("❌ OPENAI_API_KEY not set in environment!")
return False
print(f"πŸ”‘ API Key found: {api_key[:10]}...")
url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}"
headers = {
"Authorization": f"Bearer {api_key}",
"OpenAI-Beta": "realtime=v1"
}
print(f"πŸ”Œ Connecting to OpenAI Realtime API...")
print(f" URL: {url}")
try:
ws = await websockets.connect(
url,
additional_headers=headers,
ping_interval=20,
ping_timeout=10
)
print("βœ… Connected to OpenAI!")
# Wait for session.created
print("⏳ Waiting for session.created event...")
response = await asyncio.wait_for(ws.recv(), timeout=10.0)
event = json.loads(response)
if event.get("type") == "session.created":
print(f"βœ… Session created: {event.get('session', {}).get('id', 'unknown')}")
else:
print(f"⚠️ Unexpected event: {event.get('type')}")
print(f" Event: {json.dumps(event, indent=2)}")
# Configure session
print("βš™οΈ Configuring session...")
config = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": "You are a helpful assistant. Respond briefly.",
"voice": OPENAI_VOICE,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"turn_detection": {
"type": "semantic_vad",
"eagerness": "low",
"create_response": True,
"interrupt_response": True
},
"temperature": 0.8,
"max_response_output_tokens": 500
}
}
await ws.send(json.dumps(config))
print("βœ… Session configured")
# Test: Trigger a response
print("πŸ’¬ Triggering test response...")
await ws.send(json.dumps({
"type": "response.create",
"response": {
"instructions": "Say 'Hello! This is a test. Can you hear me?'"
}
}))
# Listen for events
print("πŸ‘‚ Listening for events (10 seconds)...")
events_received = 0
audio_chunks_received = 0
transcription_received = False
async def listen_for_events():
nonlocal events_received, audio_chunks_received, transcription_received
async for message in ws:
event = json.loads(message)
event_type = event.get("type", "unknown")
events_received += 1
print(f"πŸ“¨ Event #{events_received}: {event_type}")
if event_type == "response.audio.delta":
audio_b64 = event.get("delta", "")
if audio_b64:
audio_chunks_received += 1
if audio_chunks_received % 10 == 0:
print(f" πŸ”Š Received {audio_chunks_received} audio chunks")
elif event_type == "conversation.item.input_audio_transcription.completed":
transcript = event.get("transcript", "")
print(f" πŸ“ Transcription: {transcript}")
transcription_received = True
elif event_type == "response.done":
print(f" βœ… Response completed")
return True
elif event_type == "error":
error = event.get("error", {})
print(f" ❌ Error: {error}")
if events_received >= 20: # Limit events for testing
return True
try:
await asyncio.wait_for(listen_for_events(), timeout=10.0)
except asyncio.TimeoutError:
print("⏱️ Timeout waiting for events")
# Summary
print("\nπŸ“Š Test Summary:")
print(f" Events received: {events_received}")
print(f" Audio chunks: {audio_chunks_received}")
print(f" Transcription: {'βœ…' if transcription_received else '❌'}")
# Close connection
await ws.close()
print("βœ… Connection closed")
return True
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_audio_transcription():
"""Test audio transcription by sending audio to OpenAI"""
print("\nπŸ§ͺ Testing audio transcription...")
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print(" ❌ OPENAI_API_KEY not set!")
return False
try:
from twenty_questions_game.audio_utils import prepare_audio_for_openai, OPENAI_SAMPLE_RATE
import numpy as np
url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}"
headers = {
"Authorization": f"Bearer {api_key}",
"OpenAI-Beta": "realtime=v1"
}
print(" πŸ”Œ Connecting to OpenAI...")
ws = await websockets.connect(
url,
additional_headers=headers,
ping_interval=20,
ping_timeout=10
)
# Wait for session.created
response = await ws.recv()
event = json.loads(response)
if event.get("type") != "session.created":
print(f" ❌ Unexpected event: {event.get('type')}")
await ws.close()
return False
# Configure session
config = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": "You are a helpful assistant. Transcribe what you hear.",
"voice": OPENAI_VOICE,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"turn_detection": {
"type": "semantic_vad",
"eagerness": "low",
"create_response": False, # Don't create response, just transcribe
"interrupt_response": False
},
"temperature": 0.8
}
}
await ws.send(json.dumps(config))
# Generate test audio (simple sine wave to simulate speech-like audio)
# OpenAI requires at least 100ms of audio (2400 samples at 24kHz = 4800 bytes)
print(" 🎡 Generating test audio...")
sample_rate = 16000 # Input sample rate
duration = 0.5 # 500ms (well above 100ms minimum)
frequency = 440 # A4 note
samples = int(sample_rate * duration)
t = np.linspace(0, duration, samples, False)
# Create a more speech-like signal with modulation
test_audio = np.sin(2 * np.pi * frequency * t) * (1 + 0.5 * np.sin(2 * np.pi * 5 * t))
test_audio = (test_audio * 0.3 * 32767).astype(np.int16) # Scale down to avoid clipping
# Convert to OpenAI format (24kHz, PCM16)
audio_bytes = prepare_audio_for_openai(test_audio, sample_rate)
# Calculate expected samples at 24kHz
expected_samples_24k = int(len(test_audio) * 24000 / sample_rate)
expected_bytes = expected_samples_24k * 2 # 2 bytes per int16 sample
print(f" πŸ“Š Audio: {len(test_audio)} samples @ {sample_rate}Hz -> {len(audio_bytes)} bytes @ 24kHz")
print(f" πŸ“Š Expected: {expected_samples_24k} samples = {expected_bytes} bytes")
# Split audio BYTES into chunks (not base64 string!)
# Each chunk should be a complete base64-encoded segment
chunk_size_bytes = len(audio_bytes) // 10 # 10 chunks
if chunk_size_bytes == 0:
chunk_size_bytes = len(audio_bytes) # If too small, send as one chunk
chunks = []
for i in range(0, len(audio_bytes), chunk_size_bytes):
chunk_bytes = audio_bytes[i:i+chunk_size_bytes]
chunk_b64 = base64.b64encode(chunk_bytes).decode('ascii')
chunks.append(chunk_b64)
print(f" πŸ“€ Sending {len(chunks)} audio chunks ({len(audio_bytes)} total bytes) to OpenAI...")
for i, chunk in enumerate(chunks):
await ws.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": chunk
}))
if i < len(chunks) - 1: # Don't sleep after last chunk
await asyncio.sleep(0.01) # Small delay between chunks
# Wait a moment for buffer to process
await asyncio.sleep(0.1)
# Signal end of input
print(" βœ… Committing audio buffer...")
await ws.send(json.dumps({
"type": "input_audio_buffer.commit"
}))
print(" πŸ‘‚ Waiting for transcription (5 seconds)...")
transcription_received = False
transcript_text = ""
events_received = 0
async def listen_for_transcription():
nonlocal transcription_received, transcript_text, events_received
async for message in ws:
event = json.loads(message)
events_received += 1
event_type = event.get("type", "unknown")
if event_type == "conversation.item.input_audio_transcription.completed":
transcript = event.get("transcript", "")
transcript_text = transcript
transcription_received = True
print(f" πŸ“ Transcription received: '{transcript}'")
return True
elif event_type == "conversation.item.input_audio_transcription.failed":
error = event.get("error", {})
print(f" ❌ Transcription failed: {error}")
return False
elif event_type == "error":
error = event.get("error", {})
print(f" ❌ Error: {error}")
return False
if events_received >= 50: # Limit events
return False
try:
result = await asyncio.wait_for(listen_for_transcription(), timeout=5.0)
except asyncio.TimeoutError:
print(" ⏱️ Timeout waiting for transcription")
result = False
await ws.close()
if transcription_received:
print(f" βœ… Transcription test passed: '{transcript_text}'")
return True
else:
print(f" ❌ No transcription received (got {events_received} events)")
return False
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_audio_conversion():
"""Test audio conversion utilities"""
print("\nπŸ§ͺ Testing audio conversion utilities...")
try:
from twenty_questions_game.audio_utils import (
prepare_audio_for_openai,
decode_audio_from_openai,
prepare_audio_for_reachy,
OPENAI_SAMPLE_RATE
)
import numpy as np
# Create test audio (sine wave)
sample_rate = 16000
duration = 0.1 # 100ms
frequency = 440 # A4 note
samples = int(sample_rate * duration)
t = np.linspace(0, duration, samples, False)
test_audio = np.sin(2 * np.pi * frequency * t)
test_audio = (test_audio * 32767).astype(np.int16)
print(f" Created test audio: {len(test_audio)} samples at {sample_rate}Hz")
# Test: Reachy -> OpenAI
audio_bytes = prepare_audio_for_openai(test_audio, sample_rate)
print(f" βœ… Reachy->OpenAI: {len(audio_bytes)} bytes")
# Test: OpenAI -> Reachy
audio_b64 = base64.b64encode(audio_bytes).decode('ascii')
audio_decoded = decode_audio_from_openai(audio_b64)
audio_for_reachy = prepare_audio_for_reachy(audio_decoded, 48000)
print(f" βœ… OpenAI->Reachy: {len(audio_for_reachy)} samples at 48kHz")
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_with_reachy():
"""Test with actual Reachy Mini (if available)"""
print("\nπŸ€– Testing with Reachy Mini...")
try:
from reachy_mini import ReachyMini
print(" Connecting to Reachy Mini...")
reachy = ReachyMini()
print(" βœ… Connected to Reachy Mini")
# Test audio
print(" Testing audio capture...")
reachy.media.start_recording()
samples_received = 0
for i in range(50): # Try for ~1 second at 50Hz
audio = reachy.media.get_audio_sample()
if audio is not None and len(audio) > 0:
samples_received += 1
reachy.media.stop_recording()
print(f" βœ… Audio capture: {samples_received}/50 samples received")
# Test playback
print(" Testing audio playback...")
import numpy as np
# Reachy Mini expects float32, normalized -1.0 to 1.0
test_audio = np.zeros(4800, dtype=np.float32) # 0.1s at 48kHz
reachy.media.start_playing()
reachy.media.push_audio_sample(test_audio)
await asyncio.sleep(0.2)
reachy.media.stop_playing()
print(" βœ… Audio playback test completed")
return True
except ImportError:
print(" ⚠️ Reachy Mini not available (this is OK for testing)")
return None
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests"""
print("=" * 60)
print("πŸ§ͺ OpenAI Realtime API Test Script")
print("=" * 60)
results = {}
# Test 1: OpenAI Connection
print("\n" + "=" * 60)
print("TEST 1: OpenAI Connection")
print("=" * 60)
results['openai'] = await test_openai_connection()
# Test 2: Audio Transcription
print("\n" + "=" * 60)
print("TEST 2: Audio Transcription")
print("=" * 60)
results['transcription'] = await test_audio_transcription()
# Test 3: Audio Conversion
print("\n" + "=" * 60)
print("TEST 3: Audio Conversion Utilities")
print("=" * 60)
results['audio_conversion'] = await test_audio_conversion()
# Test 4: Reachy Mini (optional)
print("\n" + "=" * 60)
print("TEST 4: Reachy Mini Integration (Optional)")
print("=" * 60)
results['reachy'] = await test_with_reachy()
# Final Summary
print("\n" + "=" * 60)
print("πŸ“‹ FINAL SUMMARY")
print("=" * 60)
for test_name, result in results.items():
if result is None:
status = "⚠️ SKIPPED"
elif result:
status = "βœ… PASSED"
else:
status = "❌ FAILED"
print(f" {test_name:20s}: {status}")
print("\n" + "=" * 60)
if __name__ == "__main__":
asyncio.run(main())