#!/usr/bin/env python3 """ Enhanced profiling script for ACE-Step inference with deep LLM analysis This script helps diagnose why LLM generation is slow by tracking: 1. Total tokens generated vs expected throughput (200 tokens/sec baseline) 2. Per-iteration timing to detect compilation overhead or slow operations 3. Constrained decoding overhead 4. CFG overhead (2x forward passes) 5. Model forward time vs sampling/processing time Usage: python profile_inference.py # Standard profiling with warmup python profile_inference.py --no-warmup # Profile first run (includes compilation) python profile_inference.py --llm-debug # Deep LLM performance debugging python profile_inference.py --detailed # Add cProfile function-level analysis Inference mode options: python profile_inference.py --thinking # Enable CoT for code generation python profile_inference.py --use-constrained-decoding # Use FSM constrained decoding python profile_inference.py --use-cot-metas # Enable LM to generate metadata via CoT """ import time import argparse import sys import os from contextlib import contextmanager from collections import defaultdict import json from typing import Tuple, Dict, Any, List from functools import wraps # Add project root to path project_root = os.path.abspath(os.path.dirname(__file__)) if project_root not in sys.path: sys.path.insert(0, project_root) def load_env_config(): """从 .env 文件加载配置""" env_config = { 'ACESTEP_CONFIG_PATH': 'acestep-v15-turbo', 'ACESTEP_LM_MODEL_PATH': 'acestep-5Hz-lm-0.6B', 'ACESTEP_DEVICE': 'auto', 'ACESTEP_LM_BACKEND': 'vllm', } env_file = os.path.join(project_root, '.env') if os.path.exists(env_file): with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 跳过空行和注释 if not line or line.startswith('#'): continue # 解析键值对 if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() if key in env_config and value: env_config[key] = value return env_config import torch from acestep.inference import generate_music, GenerationParams, GenerationConfig from acestep.handler import AceStepHandler from acestep.llm_inference import LLMHandler class PreciseTimer: """High-precision timer with CUDA synchronization for accurate GPU timing""" def __init__(self, device="cuda"): self.device = device self.timings = defaultdict(list) self.enabled = True def sync(self): """Synchronize CUDA operations for accurate timing""" if self.enabled and self.device.startswith("cuda") and torch.cuda.is_available(): torch.cuda.synchronize() @contextmanager def time(self, name: str): """Time a code section with CUDA synchronization""" if not self.enabled: yield return self.sync() start = time.perf_counter() try: yield finally: self.sync() elapsed = time.perf_counter() - start self.timings[name].append(elapsed) def get_total(self, name: str) -> float: """Get total accumulated time for a section""" return sum(self.timings.get(name, [])) def get_mean(self, name: str) -> float: """Get mean time per call for a section""" times = self.timings.get(name, []) return sum(times) / len(times) if times else 0.0 def get_count(self, name: str) -> int: """Get number of calls for a section""" return len(self.timings.get(name, [])) def get_all(self, name: str) -> List[float]: """Get all timing samples for a section""" return self.timings.get(name, []) class LLMDebugger: """Track detailed LLM performance metrics to diagnose slow generation""" def __init__(self): self.reset() def reset(self): """Reset all metrics""" self.total_tokens = 0 self.generation_start = None self.generation_end = None self.output_text = "" self.prompt_length = 0 def start(self, prompt_length: int = 0): """Mark generation start""" self.generation_start = time.perf_counter() self.prompt_length = prompt_length def end(self, output_text: str = ""): """Mark generation end and store output""" self.generation_end = time.perf_counter() self.output_text = output_text def set_token_count(self, count: int): """Set total token count""" self.total_tokens = count def get_throughput(self) -> float: """Calculate actual tokens per second""" if self.generation_start and self.generation_end and self.total_tokens > 0: total_time = self.generation_end - self.generation_start if total_time > 0: return self.total_tokens / total_time return 0.0 def print_analysis(self): """Print detailed LLM performance analysis""" if not self.generation_start or not self.generation_end: return print("\n" + "=" * 100) print("🔍 LLM PERFORMANCE DEEP DIVE") print("=" * 100) total_time = self.generation_end - self.generation_start throughput = self.get_throughput() # Basic metrics table print(f"\n{'Metric':<40} {'Value':<20} {'Notes'}") print("-" * 100) print(f"{'Total Tokens Generated:':<40} {self.total_tokens:<20} (new tokens only)") print(f"{'Prompt Length (estimate):':<40} {self.prompt_length:<20} (input tokens)") print(f"{'Total Generation Time:':<40} {total_time:<20.3f} seconds") print(f"{'Measured Throughput:':<40} {throughput:<20.1f} tokens/sec") print(f"{'Expected Throughput:':<40} {'200':<20} tokens/sec (baseline)") # Calculate performance gap if throughput > 0: slowdown = 200.0 / throughput efficiency = (throughput / 200.0) * 100 print(f"{'Performance vs Baseline:':<40} {efficiency:<20.1f}% of expected") print(f"{'Slowdown Factor:':<40} {slowdown:<20.2f}x slower") # Analyze generated output if self.output_text: print(f"\n{'Output Analysis:':<40}") print(f"{' Output length:':<40} {len(self.output_text):<20} characters") # Count audio codes import re code_pattern = r'<\|audio_code_\d+\|>' codes = re.findall(code_pattern, self.output_text) if codes: print(f"{' Audio codes generated:':<40} {len(codes):<20} codes") print(f"{' Expected audio duration:':<40} {f'~{len(codes)/5:.1f}s':<20} (5 codes per second)") if total_time > 0: print(f"{' Time per audio code:':<40} {f'{total_time/len(codes)*1000:.1f}ms':<20}") # Check for CoT section if '' in self.output_text and '' in self.output_text: cot_start = self.output_text.find('') cot_end = self.output_text.find('') + 8 cot_section = self.output_text[cot_start:cot_end] cot_token_est = len(cot_section) // 4 print(f"{' CoT section tokens (estimate):':<40} {f'~{cot_token_est}':<20}") # Diagnostic guidance print("\n" + "=" * 100) print("🔧 DIAGNOSTIC GUIDANCE") print("=" * 100) if throughput < 50: print("\n⚠️ CRITICAL: Throughput is extremely low (<50 tokens/sec)") print("\nThis is ~4x slower than expected. Likely causes:") print(" 1. ❗ Constrained decoding FSM overhead") print(" → Each token triggers FSM state machine validation") print(" → Try: set use_constrained_decoding=False in config") print(" 2. ❗ CFG with double forward passes") print(" → cfg_scale > 1.0 means running model twice per token") print(" → Check: params.lm_cfg_scale value") print(" 3. ❗ Running in eager mode without compilation") print(" → PyTorch should compile kernels after warmup") print(" → Check: torch._dynamo.config settings") elif throughput < 100: print("\n⚠️ WARNING: Throughput is low (50-100 tokens/sec)") print("\nLikely causes:") print(" 1. Constrained decoding overhead (~30-50% slowdown expected)") print(" 2. CFG enabled (2x compute per token if cfg_scale > 1.0)") print(" 3. Small model or inefficient GPU utilization") elif throughput < 150: print("\n⚠️ Throughput is below baseline but acceptable (100-150 tokens/sec)") print("\nMinor overhead from:") print(" - Constrained decoding: ~20-30% overhead") print(" - Profiling instrumentation: ~5-10% overhead") else: print(f"\n✓ Throughput is good ({throughput:.1f} tokens/sec)") print(" Performance is within acceptable range") # Global instances timer = None llm_debugger = None def wrap_method_with_timing(obj, method_name: str, timing_key: str): """Wrap a method with timing instrumentation""" original_method = getattr(obj, method_name) @wraps(original_method) def timed_wrapper(*args, **kwargs): with timer.time(timing_key): return original_method(*args, **kwargs) setattr(obj, method_name, timed_wrapper) return original_method def wrap_llm_with_debug_tracking(llm_handler): """Wrap LLM generation with detailed performance tracking""" original_method = llm_handler.generate_with_stop_condition @wraps(original_method) def debug_wrapper(*args, **kwargs): # Estimate prompt length caption = kwargs.get('caption', args[0] if len(args) > 0 else "") lyrics = kwargs.get('lyrics', args[1] if len(args) > 1 else "") prompt_estimate = len(caption) + len(lyrics) prompt_tokens_estimate = prompt_estimate // 4 # Start tracking llm_debugger.reset() llm_debugger.start(prompt_length=prompt_tokens_estimate) # Call original with timing with timer.time('llm_inference'): result = original_method(*args, **kwargs) # Extract and analyze output output_text = "" if isinstance(result, tuple) and len(result) >= 2: if isinstance(result[1], list): # Batch mode output_text = "".join(result[1]) else: # Single mode cot_output = "" if isinstance(result[0], dict): for v in result[0].values(): if isinstance(v, str): cot_output += v output_text = cot_output + str(result[1]) # Count tokens import re code_pattern = r'<\|audio_code_\d+\|>' codes = re.findall(code_pattern, output_text) remaining_text = re.sub(code_pattern, '', output_text) cot_tokens_estimate = len(remaining_text) // 4 total_tokens = len(codes) + cot_tokens_estimate llm_debugger.set_token_count(total_tokens) llm_debugger.end(output_text) return result llm_handler.generate_with_stop_condition = debug_wrapper return original_method def instrument_handlers(dit_handler, llm_handler, enable_llm_debug=False): """Add timing instrumentation to handler methods""" originals = {} # Instrument LLM if llm_handler and llm_handler.llm_initialized: if enable_llm_debug: originals['llm_generate'] = wrap_llm_with_debug_tracking(llm_handler) else: originals['llm_generate'] = wrap_method_with_timing( llm_handler, 'generate_with_stop_condition', 'llm_inference' ) # Instrument DiT handler originals['dit_prepare'] = wrap_method_with_timing( dit_handler, 'prepare_batch_data', 'prepare_batch_data' ) originals['dit_generate'] = wrap_method_with_timing( dit_handler, 'service_generate', 'dit_inference' ) originals['dit_decode'] = wrap_method_with_timing( dit_handler, 'tiled_decode', 'vae_decode' ) return originals def restore_handlers(dit_handler, llm_handler, originals): """Restore original handler methods after profiling""" if llm_handler and 'llm_generate' in originals: llm_handler.generate_with_stop_condition = originals['llm_generate'] dit_handler.prepare_batch_data = originals['dit_prepare'] dit_handler.service_generate = originals['dit_generate'] dit_handler.tiled_decode = originals['dit_decode'] def print_profiling_results(total_time: float, show_llm_debug: bool = False): """Print comprehensive profiling results with performance insights""" print("\n" + "=" * 100) print("🎯 PROFILING RESULTS") print("=" * 100) # Define timing categories model_sections = { 'llm_inference': 'LLM Inference (5Hz Language Model)', 'dit_inference': 'DiT Inference (Diffusion Transformer)', 'vae_decode': 'VAE Decode (Audio Decoder)', } non_model_sections = { 'prepare_batch_data': 'Prepare Batch Data (embedding, formatting)', } # Calculate totals model_time = sum(timer.get_total(k) for k in model_sections.keys()) non_model_time = sum(timer.get_total(k) for k in non_model_sections.keys()) other_time = total_time - model_time - non_model_time # Print summary table print(f"\n{'CATEGORY':<50} {'TIME (s)':<12} {'%':<8} {'CALLS':<8}") print("-" * 100) # Model time breakdown print(f"\n{'🤖 MODEL TIME (Total)':<50} {model_time:<12.3f} {100*model_time/total_time:>6.1f}% {'':<8}") for key, desc in model_sections.items(): t = timer.get_total(key) c = timer.get_count(key) if c > 0: mean = timer.get_mean(key) pct = 100 * t / total_time print(f" {'├─ ' + desc:<48} {t:<12.3f} {pct:>6.1f}% {c:<8} (avg: {mean:.3f}s)") # Non-model time breakdown print(f"\n{'⚙️ NON-MODEL TIME (Total)':<50} {non_model_time:<12.3f} {100*non_model_time/total_time:>6.1f}% {'':<8}") for key, desc in non_model_sections.items(): t = timer.get_total(key) c = timer.get_count(key) if c > 0: mean = timer.get_mean(key) pct = 100 * t / total_time print(f" {'├─ ' + desc:<48} {t:<12.3f} {pct:>6.1f}% {c:<8} (avg: {mean:.3f}s)") # Other time if other_time > 0.01: pct = 100 * other_time / total_time print(f"\n{'📦 OTHER TIME (I/O, overhead, audio save)':<50} {other_time:<12.3f} {pct:>6.1f}% {'':<8}") print(f"\n{'📊 TOTAL TIME':<50} {total_time:<12.3f} {'100.0%':>6} {'':<8}") # Show LLM detailed analysis if enabled if show_llm_debug: llm_debugger.print_analysis() # Performance insights print("\n" + "=" * 100) print("💡 PERFORMANCE INSIGHTS") print("=" * 100) llm_t = timer.get_total('llm_inference') dit_t = timer.get_total('dit_inference') vae_t = timer.get_total('vae_decode') prep_t = timer.get_total('prepare_batch_data') # Model time insights if model_time > 0: print(f"\n✓ Model operations: {model_time:.3f}s ({100*model_time/total_time:.1f}% of total)") if llm_t > 0: print(f" - LLM: {llm_t:.3f}s ({100*llm_t/model_time:.1f}% of model time)") if dit_t > 0: print(f" - DiT: {dit_t:.3f}s ({100*dit_t/model_time:.1f}% of model time)") if vae_t > 0: print(f" - VAE: {vae_t:.3f}s ({100*vae_t/model_time:.1f}% of model time)") # LLM bottleneck analysis if llm_t > dit_t and llm_t > 5.0: print(f"\n⚠️ LLM IS THE BOTTLENECK: {llm_t:.3f}s ({100*llm_t/total_time:.1f}% of total)") print(f"\n Possible causes:") print(f" 1. Generating too many tokens → use --llm-debug to verify") print(f" 2. Constrained decoding overhead → FSM validation per token") print(f" 3. CFG overhead → cfg_scale > 1.0 = 2x forward passes") print(f" 4. First-token latency → warmup should help") print(f" 5. KV cache inefficiency → should be ~5-10ms/token") # Non-model insights if non_model_time / total_time > 0.1: print(f"\n⚠️ Non-model operations: {non_model_time:.3f}s ({100*non_model_time/total_time:.1f}%)") if prep_t > 0.1: print(f" - Batch preparation: {prep_t:.3f}s") # I/O overhead if other_time / total_time > 0.2: print(f"\n⚠️ Overhead/I/O: {other_time:.3f}s ({100*other_time/total_time:.1f}%)") # Recommendations print("\n" + "=" * 100) print("🚀 OPTIMIZATION RECOMMENDATIONS") print("=" * 100) if llm_t > dit_t * 2: print("\n🎯 Priority: Optimize LLM") print(" 1. Run: python profile_inference.py --llm-debug") print(" → Shows exact token count and throughput") print(" 2. Check constrained decoding overhead") print(" 3. Check CFG scaling (lm_cfg_scale parameter)") print(" 4. Profile nanovllm engine step() timing") print(" 5. Compare vllm vs transformers backends") def run_profiled_generation(dit_handler, llm_handler, params, config, enable_cprofile=False, enable_llm_debug=False): """Execute generation with full profiling instrumentation""" # Instrument handlers originals = instrument_handlers(dit_handler, llm_handler, enable_llm_debug) try: print("\n[Profiling] Starting generation...") timer.sync() total_start = time.perf_counter() # Optional cProfile prof = None if enable_cprofile: import cProfile prof = cProfile.Profile() prof.enable() # Run generation result = generate_music(dit_handler, llm_handler, params, config, save_dir="./") # Stop timing timer.sync() total_time = time.perf_counter() - total_start # Save cProfile if enabled if enable_cprofile and prof: prof.disable() import pstats import io output_file = "profile_cprofile_detailed.txt" with open(output_file, 'w') as f: ps = pstats.Stats(prof, stream=f) ps.sort_stats('cumulative') ps.print_stats(100) # Print top functions print("\n" + "=" * 100) print("📊 TOP 20 FUNCTIONS BY CUMULATIVE TIME (cProfile)") print("=" * 100) s = io.StringIO() ps = pstats.Stats(prof, stream=s) ps.sort_stats('cumulative') ps.print_stats(20) print(s.getvalue()) print(f"\nFull report: {output_file}") # Print results print_profiling_results(total_time, show_llm_debug=enable_llm_debug) return result, total_time finally: restore_handlers(dit_handler, llm_handler, originals) def load_example_config(example_file: str) -> Tuple[GenerationParams, GenerationConfig]: """Load configuration from example JSON file""" try: with open(example_file, 'r', encoding='utf-8') as f: data = json.load(f) params = GenerationParams( caption=data.get('caption', ''), lyrics=data.get('lyrics', ''), bpm=data.get('bpm'), keyscale=data.get('keyscale', ''), timesignature=data.get('timesignature', ''), vocal_language=data.get('language', 'unknown'), duration=data.get('duration'), thinking=data.get('think', False), inference_steps=data.get('inference_steps', 8), seed=data.get('seed', 42), ) config = GenerationConfig(batch_size=data.get('batch_size', 1), seeds=[42]) return params, config except Exception as e: print(f" ❌ Failed to load: {e}") return None, None def main(): global timer, llm_debugger # 从 .env 文件加载默认配置 env_config = load_env_config() parser = argparse.ArgumentParser( description="Profile ACE-Step inference with LLM debugging" ) parser.add_argument("--checkpoint-dir", type=str, default="./checkpoints") parser.add_argument("--config-path", type=str, default=env_config['ACESTEP_CONFIG_PATH'], help=f"模型配置路径 (默认从 .env: {env_config['ACESTEP_CONFIG_PATH']})") parser.add_argument("--device", type=str, default=env_config['ACESTEP_DEVICE'], help=f"设备 (默认从 .env: {env_config['ACESTEP_DEVICE']})") parser.add_argument("--lm-model", type=str, default=env_config['ACESTEP_LM_MODEL_PATH'], help=f"LLM 模型路径 (默认从 .env: {env_config['ACESTEP_LM_MODEL_PATH']})") parser.add_argument("--lm-backend", type=str, default=env_config['ACESTEP_LM_BACKEND'], help=f"LLM 后端 (默认从 .env: {env_config['ACESTEP_LM_BACKEND']})") parser.add_argument("--no-warmup", action="store_true") parser.add_argument("--detailed", action="store_true") parser.add_argument("--llm-debug", action="store_true", help="Enable deep LLM debugging (token count, throughput)") parser.add_argument("--example", type=str, default="example_05.json") # Inference mode parameters parser.add_argument("--thinking", action="store_true", help="Enable CoT reasoning for LM to generate audio codes") parser.add_argument("--use-constrained-decoding", action="store_true", help="Use FSM-based constrained decoding for meta generation") parser.add_argument("--use-cot-metas", action="store_true", help="Enable LLM to generate music metadata via CoT reasoning") args = parser.parse_args() # Initialize timer = PreciseTimer(device=args.device) llm_debugger = LLMDebugger() print("=" * 100) print("🎵 ACE-Step Inference Profiler (LLM Performance Analysis)") print("=" * 100) print(f"\n模型配置 (从 .env 加载):") print(f" DiT 模型: {args.config_path}") print(f" LLM 模型: {args.lm_model}") print(f"\n运行配置:") print(f" Device: {args.device}") print(f" LLM Backend: {args.lm_backend}") print(f" LLM Debug: {'Enabled' if args.llm_debug else 'Disabled'}") print(f" Warmup: {'Disabled' if args.no_warmup else 'Enabled'}") print(f"\nInference Mode:") print(f" Thinking (CoT): {'Enabled' if args.thinking else 'Disabled'}") print(f" Constrained Decoding: {'Enabled' if args.use_constrained_decoding else 'Disabled'}") print(f" Use CoT for Metas: {'Enabled' if args.use_cot_metas else 'Disabled'}") # Initialize models print(f"\nInitializing models...") dit_handler = AceStepHandler() llm_handler = LLMHandler() print(" 🎹 Initializing DiT...") status_dit, success_dit = dit_handler.initialize_service( project_root=project_root, config_path=args.config_path, device=args.device, use_flash_attention=True, ) if not success_dit: print(f" ❌ Failed: {status_dit}") sys.exit(1) print(f" ✓ DiT ready") print(" 🧠 Initializing LLM...") if args.thinking or args.use_cot_metas: status_llm, success_llm = llm_handler.initialize( checkpoint_dir=args.checkpoint_dir, lm_model_path=args.lm_model, backend=args.lm_backend, device=args.device, ) if success_llm: print(f" ✓ LLM ready ({args.lm_backend})") else: print(f" ⚠ Failed: {status_llm}") else: print(f" ✓ LLM not initialized (thinking or use_cot_metas is disabled)") # Load example example_file = os.path.join(project_root, "examples", "text2music", args.example) if not os.path.exists(example_file): print(f"\n❌ Not found: {example_file}") sys.exit(1) print(f"\n📄 Loading: {args.example}") params, config = load_example_config(example_file) if not params or not config: print("❌ Failed to load config") sys.exit(1) print(f" Caption: {params.caption[:60]}...") print(f" Batch: {config.batch_size}, Steps: {params.inference_steps}, LLM: {params.thinking}") # Warmup if not args.no_warmup: print("\n" + "=" * 100) print("🔥 WARMUP RUN") print("=" * 100) warmup_params = GenerationParams( caption=params.caption, lyrics=params.lyrics, bpm=params.bpm, keyscale=params.keyscale, timesignature=params.timesignature, vocal_language=params.vocal_language, duration=params.duration, thinking=args.thinking, use_cot_metas=args.use_cot_metas, inference_steps=params.inference_steps, seed=params.seed, ) warmup_config = GenerationConfig(batch_size=1, seeds=[42]) warmup_config.use_constrained_decoding = args.use_constrained_decoding warmup_start = time.perf_counter() warmup_result = generate_music(dit_handler, llm_handler, warmup_params, warmup_config, save_dir="./") warmup_time = time.perf_counter() - warmup_start print(f"\n✓ Warmup: {warmup_time:.2f}s") if not warmup_result.success: print(f"⚠️ Warning: {warmup_result.error}") # Reset timer = PreciseTimer(device=args.device) llm_debugger = LLMDebugger() # Profiling run print("\n" + "=" * 100) print("⏱️ PROFILING RUN") print("=" * 100) # Apply inference mode settings config.use_constrained_decoding = args.use_constrained_decoding # Override thinking and use_cot_metas parameters if specified via CLI if args.thinking: params.thinking = True if args.use_cot_metas: params.use_cot_metas = True result, total_time = run_profiled_generation( dit_handler, llm_handler, params, config, enable_cprofile=args.detailed, enable_llm_debug=args.llm_debug ) if not result.success: print(f"\n❌ Failed: {result.error}") sys.exit(1) print(f"\n✅ Success! Generated {len(result.audios)} audio file(s)") # Final tips if args.detailed: print("\n💡 Check profile_cprofile_detailed.txt for function-level analysis") elif not args.llm_debug: print("\n💡 Run with --llm-debug to see LLM token count and throughput analysis") if __name__ == "__main__": main()