|
|
|
|
|
""" |
|
|
Test script for OpenAI Realtime API connection and audio handling. |
|
|
|
|
|
This script tests: |
|
|
1. OpenAI API connection |
|
|
2. Event receiving |
|
|
3. Audio sending/receiving (if Reachy Mini is available) |
|
|
4. Audio conversion utilities |
|
|
|
|
|
Usage: |
|
|
python test_openai_connection.py |
|
|
""" |
|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import json |
|
|
import base64 |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from dotenv import load_dotenv |
|
|
import websockets |
|
|
|
|
|
|
|
|
env_paths = [ |
|
|
Path(__file__).parent / ".env", |
|
|
Path.cwd() / ".env", |
|
|
] |
|
|
for env_path in env_paths: |
|
|
if env_path.exists(): |
|
|
load_dotenv(env_path) |
|
|
print(f"β
Loaded .env from {env_path}") |
|
|
break |
|
|
else: |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
OPENAI_MODEL = "gpt-realtime-2025-08-28" |
|
|
OPENAI_VOICE = "alloy" |
|
|
|
|
|
|
|
|
async def test_openai_connection(): |
|
|
"""Test basic OpenAI Realtime API connection""" |
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
print("β OPENAI_API_KEY not set in environment!") |
|
|
return False |
|
|
|
|
|
print(f"π API Key found: {api_key[:10]}...") |
|
|
|
|
|
url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {api_key}", |
|
|
"OpenAI-Beta": "realtime=v1" |
|
|
} |
|
|
|
|
|
print(f"π Connecting to OpenAI Realtime API...") |
|
|
print(f" URL: {url}") |
|
|
|
|
|
try: |
|
|
ws = await websockets.connect( |
|
|
url, |
|
|
additional_headers=headers, |
|
|
ping_interval=20, |
|
|
ping_timeout=10 |
|
|
) |
|
|
print("β
Connected to OpenAI!") |
|
|
|
|
|
|
|
|
print("β³ Waiting for session.created event...") |
|
|
response = await asyncio.wait_for(ws.recv(), timeout=10.0) |
|
|
event = json.loads(response) |
|
|
|
|
|
if event.get("type") == "session.created": |
|
|
print(f"β
Session created: {event.get('session', {}).get('id', 'unknown')}") |
|
|
else: |
|
|
print(f"β οΈ Unexpected event: {event.get('type')}") |
|
|
print(f" Event: {json.dumps(event, indent=2)}") |
|
|
|
|
|
|
|
|
print("βοΈ Configuring session...") |
|
|
config = { |
|
|
"type": "session.update", |
|
|
"session": { |
|
|
"modalities": ["audio", "text"], |
|
|
"instructions": "You are a helpful assistant. Respond briefly.", |
|
|
"voice": OPENAI_VOICE, |
|
|
"input_audio_format": "pcm16", |
|
|
"output_audio_format": "pcm16", |
|
|
"input_audio_transcription": { |
|
|
"model": "whisper-1" |
|
|
}, |
|
|
"turn_detection": { |
|
|
"type": "semantic_vad", |
|
|
"eagerness": "low", |
|
|
"create_response": True, |
|
|
"interrupt_response": True |
|
|
}, |
|
|
"temperature": 0.8, |
|
|
"max_response_output_tokens": 500 |
|
|
} |
|
|
} |
|
|
|
|
|
await ws.send(json.dumps(config)) |
|
|
print("β
Session configured") |
|
|
|
|
|
|
|
|
print("π¬ Triggering test response...") |
|
|
await ws.send(json.dumps({ |
|
|
"type": "response.create", |
|
|
"response": { |
|
|
"instructions": "Say 'Hello! This is a test. Can you hear me?'" |
|
|
} |
|
|
})) |
|
|
|
|
|
|
|
|
print("π Listening for events (10 seconds)...") |
|
|
events_received = 0 |
|
|
audio_chunks_received = 0 |
|
|
transcription_received = False |
|
|
|
|
|
async def listen_for_events(): |
|
|
nonlocal events_received, audio_chunks_received, transcription_received |
|
|
async for message in ws: |
|
|
event = json.loads(message) |
|
|
event_type = event.get("type", "unknown") |
|
|
events_received += 1 |
|
|
|
|
|
print(f"π¨ Event #{events_received}: {event_type}") |
|
|
|
|
|
if event_type == "response.audio.delta": |
|
|
audio_b64 = event.get("delta", "") |
|
|
if audio_b64: |
|
|
audio_chunks_received += 1 |
|
|
if audio_chunks_received % 10 == 0: |
|
|
print(f" π Received {audio_chunks_received} audio chunks") |
|
|
|
|
|
elif event_type == "conversation.item.input_audio_transcription.completed": |
|
|
transcript = event.get("transcript", "") |
|
|
print(f" π Transcription: {transcript}") |
|
|
transcription_received = True |
|
|
|
|
|
elif event_type == "response.done": |
|
|
print(f" β
Response completed") |
|
|
return True |
|
|
|
|
|
elif event_type == "error": |
|
|
error = event.get("error", {}) |
|
|
print(f" β Error: {error}") |
|
|
|
|
|
if events_received >= 20: |
|
|
return True |
|
|
|
|
|
try: |
|
|
await asyncio.wait_for(listen_for_events(), timeout=10.0) |
|
|
except asyncio.TimeoutError: |
|
|
print("β±οΈ Timeout waiting for events") |
|
|
|
|
|
|
|
|
print("\nπ Test Summary:") |
|
|
print(f" Events received: {events_received}") |
|
|
print(f" Audio chunks: {audio_chunks_received}") |
|
|
print(f" Transcription: {'β
' if transcription_received else 'β'}") |
|
|
|
|
|
|
|
|
await ws.close() |
|
|
print("β
Connection closed") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
async def test_audio_transcription(): |
|
|
"""Test audio transcription by sending audio to OpenAI""" |
|
|
print("\nπ§ͺ Testing audio transcription...") |
|
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
if not api_key: |
|
|
print(" β OPENAI_API_KEY not set!") |
|
|
return False |
|
|
|
|
|
try: |
|
|
from twenty_questions_game.audio_utils import prepare_audio_for_openai, OPENAI_SAMPLE_RATE |
|
|
import numpy as np |
|
|
|
|
|
url = f"wss://api.openai.com/v1/realtime?model={OPENAI_MODEL}" |
|
|
headers = { |
|
|
"Authorization": f"Bearer {api_key}", |
|
|
"OpenAI-Beta": "realtime=v1" |
|
|
} |
|
|
|
|
|
print(" π Connecting to OpenAI...") |
|
|
ws = await websockets.connect( |
|
|
url, |
|
|
additional_headers=headers, |
|
|
ping_interval=20, |
|
|
ping_timeout=10 |
|
|
) |
|
|
|
|
|
|
|
|
response = await ws.recv() |
|
|
event = json.loads(response) |
|
|
if event.get("type") != "session.created": |
|
|
print(f" β Unexpected event: {event.get('type')}") |
|
|
await ws.close() |
|
|
return False |
|
|
|
|
|
|
|
|
config = { |
|
|
"type": "session.update", |
|
|
"session": { |
|
|
"modalities": ["audio", "text"], |
|
|
"instructions": "You are a helpful assistant. Transcribe what you hear.", |
|
|
"voice": OPENAI_VOICE, |
|
|
"input_audio_format": "pcm16", |
|
|
"output_audio_format": "pcm16", |
|
|
"input_audio_transcription": { |
|
|
"model": "whisper-1" |
|
|
}, |
|
|
"turn_detection": { |
|
|
"type": "semantic_vad", |
|
|
"eagerness": "low", |
|
|
"create_response": False, |
|
|
"interrupt_response": False |
|
|
}, |
|
|
"temperature": 0.8 |
|
|
} |
|
|
} |
|
|
await ws.send(json.dumps(config)) |
|
|
|
|
|
|
|
|
|
|
|
print(" π΅ Generating test audio...") |
|
|
sample_rate = 16000 |
|
|
duration = 0.5 |
|
|
frequency = 440 |
|
|
samples = int(sample_rate * duration) |
|
|
t = np.linspace(0, duration, samples, False) |
|
|
|
|
|
test_audio = np.sin(2 * np.pi * frequency * t) * (1 + 0.5 * np.sin(2 * np.pi * 5 * t)) |
|
|
test_audio = (test_audio * 0.3 * 32767).astype(np.int16) |
|
|
|
|
|
|
|
|
audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) |
|
|
|
|
|
|
|
|
expected_samples_24k = int(len(test_audio) * 24000 / sample_rate) |
|
|
expected_bytes = expected_samples_24k * 2 |
|
|
print(f" π Audio: {len(test_audio)} samples @ {sample_rate}Hz -> {len(audio_bytes)} bytes @ 24kHz") |
|
|
print(f" π Expected: {expected_samples_24k} samples = {expected_bytes} bytes") |
|
|
|
|
|
|
|
|
|
|
|
chunk_size_bytes = len(audio_bytes) // 10 |
|
|
if chunk_size_bytes == 0: |
|
|
chunk_size_bytes = len(audio_bytes) |
|
|
|
|
|
chunks = [] |
|
|
for i in range(0, len(audio_bytes), chunk_size_bytes): |
|
|
chunk_bytes = audio_bytes[i:i+chunk_size_bytes] |
|
|
chunk_b64 = base64.b64encode(chunk_bytes).decode('ascii') |
|
|
chunks.append(chunk_b64) |
|
|
|
|
|
print(f" π€ Sending {len(chunks)} audio chunks ({len(audio_bytes)} total bytes) to OpenAI...") |
|
|
for i, chunk in enumerate(chunks): |
|
|
await ws.send(json.dumps({ |
|
|
"type": "input_audio_buffer.append", |
|
|
"audio": chunk |
|
|
})) |
|
|
if i < len(chunks) - 1: |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
|
|
|
print(" β
Committing audio buffer...") |
|
|
await ws.send(json.dumps({ |
|
|
"type": "input_audio_buffer.commit" |
|
|
})) |
|
|
|
|
|
print(" π Waiting for transcription (5 seconds)...") |
|
|
transcription_received = False |
|
|
transcript_text = "" |
|
|
events_received = 0 |
|
|
|
|
|
async def listen_for_transcription(): |
|
|
nonlocal transcription_received, transcript_text, events_received |
|
|
async for message in ws: |
|
|
event = json.loads(message) |
|
|
events_received += 1 |
|
|
event_type = event.get("type", "unknown") |
|
|
|
|
|
if event_type == "conversation.item.input_audio_transcription.completed": |
|
|
transcript = event.get("transcript", "") |
|
|
transcript_text = transcript |
|
|
transcription_received = True |
|
|
print(f" π Transcription received: '{transcript}'") |
|
|
return True |
|
|
elif event_type == "conversation.item.input_audio_transcription.failed": |
|
|
error = event.get("error", {}) |
|
|
print(f" β Transcription failed: {error}") |
|
|
return False |
|
|
elif event_type == "error": |
|
|
error = event.get("error", {}) |
|
|
print(f" β Error: {error}") |
|
|
return False |
|
|
|
|
|
if events_received >= 50: |
|
|
return False |
|
|
|
|
|
try: |
|
|
result = await asyncio.wait_for(listen_for_transcription(), timeout=5.0) |
|
|
except asyncio.TimeoutError: |
|
|
print(" β±οΈ Timeout waiting for transcription") |
|
|
result = False |
|
|
|
|
|
await ws.close() |
|
|
|
|
|
if transcription_received: |
|
|
print(f" β
Transcription test passed: '{transcript_text}'") |
|
|
return True |
|
|
else: |
|
|
print(f" β No transcription received (got {events_received} events)") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
async def test_audio_conversion(): |
|
|
"""Test audio conversion utilities""" |
|
|
print("\nπ§ͺ Testing audio conversion utilities...") |
|
|
|
|
|
try: |
|
|
from twenty_questions_game.audio_utils import ( |
|
|
prepare_audio_for_openai, |
|
|
decode_audio_from_openai, |
|
|
prepare_audio_for_reachy, |
|
|
OPENAI_SAMPLE_RATE |
|
|
) |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
sample_rate = 16000 |
|
|
duration = 0.1 |
|
|
frequency = 440 |
|
|
samples = int(sample_rate * duration) |
|
|
t = np.linspace(0, duration, samples, False) |
|
|
test_audio = np.sin(2 * np.pi * frequency * t) |
|
|
test_audio = (test_audio * 32767).astype(np.int16) |
|
|
|
|
|
print(f" Created test audio: {len(test_audio)} samples at {sample_rate}Hz") |
|
|
|
|
|
|
|
|
audio_bytes = prepare_audio_for_openai(test_audio, sample_rate) |
|
|
print(f" β
Reachy->OpenAI: {len(audio_bytes)} bytes") |
|
|
|
|
|
|
|
|
audio_b64 = base64.b64encode(audio_bytes).decode('ascii') |
|
|
audio_decoded = decode_audio_from_openai(audio_b64) |
|
|
audio_for_reachy = prepare_audio_for_reachy(audio_decoded, 48000) |
|
|
print(f" β
OpenAI->Reachy: {len(audio_for_reachy)} samples at 48kHz") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
async def test_with_reachy(): |
|
|
"""Test with actual Reachy Mini (if available)""" |
|
|
print("\nπ€ Testing with Reachy Mini...") |
|
|
|
|
|
try: |
|
|
from reachy_mini import ReachyMini |
|
|
|
|
|
print(" Connecting to Reachy Mini...") |
|
|
reachy = ReachyMini() |
|
|
print(" β
Connected to Reachy Mini") |
|
|
|
|
|
|
|
|
print(" Testing audio capture...") |
|
|
reachy.media.start_recording() |
|
|
|
|
|
samples_received = 0 |
|
|
for i in range(50): |
|
|
audio = reachy.media.get_audio_sample() |
|
|
if audio is not None and len(audio) > 0: |
|
|
samples_received += 1 |
|
|
|
|
|
reachy.media.stop_recording() |
|
|
|
|
|
print(f" β
Audio capture: {samples_received}/50 samples received") |
|
|
|
|
|
|
|
|
print(" Testing audio playback...") |
|
|
import numpy as np |
|
|
|
|
|
test_audio = np.zeros(4800, dtype=np.float32) |
|
|
reachy.media.start_playing() |
|
|
reachy.media.push_audio_sample(test_audio) |
|
|
await asyncio.sleep(0.2) |
|
|
reachy.media.stop_playing() |
|
|
print(" β
Audio playback test completed") |
|
|
|
|
|
return True |
|
|
|
|
|
except ImportError: |
|
|
print(" β οΈ Reachy Mini not available (this is OK for testing)") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f" β Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Run all tests""" |
|
|
print("=" * 60) |
|
|
print("π§ͺ OpenAI Realtime API Test Script") |
|
|
print("=" * 60) |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("TEST 1: OpenAI Connection") |
|
|
print("=" * 60) |
|
|
results['openai'] = await test_openai_connection() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("TEST 2: Audio Transcription") |
|
|
print("=" * 60) |
|
|
results['transcription'] = await test_audio_transcription() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("TEST 3: Audio Conversion Utilities") |
|
|
print("=" * 60) |
|
|
results['audio_conversion'] = await test_audio_conversion() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("TEST 4: Reachy Mini Integration (Optional)") |
|
|
print("=" * 60) |
|
|
results['reachy'] = await test_with_reachy() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("π FINAL SUMMARY") |
|
|
print("=" * 60) |
|
|
for test_name, result in results.items(): |
|
|
if result is None: |
|
|
status = "β οΈ SKIPPED" |
|
|
elif result: |
|
|
status = "β
PASSED" |
|
|
else: |
|
|
status = "β FAILED" |
|
|
print(f" {test_name:20s}: {status}") |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|
|
|
|