Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Timing utilities for AbMelt inference pipeline. | |
| Provides context managers and report generation for measuring pipeline performance. | |
| """ | |
| import time | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass, field | |
| from contextlib import contextmanager | |
| logger = logging.getLogger(__name__) | |
| class TimingEntry: | |
| """Single timing entry with start/end times.""" | |
| name: str | |
| start_time: float = 0.0 | |
| end_time: float = 0.0 | |
| duration: float = 0.0 | |
| parent: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON serialization.""" | |
| return { | |
| "name": self.name, | |
| "duration_seconds": self.duration, | |
| "parent": self.parent | |
| } | |
| class TimingReport: | |
| """ | |
| Aggregates timing data with hierarchical structure. | |
| Usage: | |
| report = TimingReport() | |
| with report.time("Step 1"): | |
| # do work | |
| with report.time("Sub-step 1.1", parent="Step 1"): | |
| # do sub-work | |
| print(report.format_summary()) | |
| report.save_json("timing.json") | |
| """ | |
| def __init__(self): | |
| self.entries: Dict[str, TimingEntry] = {} | |
| self.order: List[str] = [] # Preserve insertion order | |
| self.start_time: float = 0.0 | |
| self.end_time: float = 0.0 | |
| def start(self): | |
| """Mark the start of the overall timing.""" | |
| self.start_time = time.perf_counter() | |
| def stop(self): | |
| """Mark the end of the overall timing.""" | |
| self.end_time = time.perf_counter() | |
| def total_duration(self) -> float: | |
| """Total pipeline duration in seconds.""" | |
| if self.end_time > 0: | |
| return self.end_time - self.start_time | |
| return time.perf_counter() - self.start_time | |
| def time(self, name: str, parent: Optional[str] = None): | |
| """ | |
| Context manager for timing a code block. | |
| Args: | |
| name: Name of the step being timed | |
| parent: Optional parent step name for hierarchical display | |
| """ | |
| entry = TimingEntry(name=name, parent=parent) | |
| entry.start_time = time.perf_counter() | |
| try: | |
| yield entry | |
| finally: | |
| entry.end_time = time.perf_counter() | |
| entry.duration = entry.end_time - entry.start_time | |
| self.entries[name] = entry | |
| if name not in self.order: | |
| self.order.append(name) | |
| logger.info(f"[TIMING] {name}: {self._format_duration(entry.duration)}") | |
| def add_entry(self, name: str, duration: float, parent: Optional[str] = None): | |
| """Manually add a timing entry.""" | |
| entry = TimingEntry(name=name, duration=duration, parent=parent) | |
| self.entries[name] = entry | |
| if name not in self.order: | |
| self.order.append(name) | |
| def _format_duration(seconds: float) -> str: | |
| """Format duration in human-readable format.""" | |
| if seconds < 60: | |
| return f"{seconds:.2f}s" | |
| elif seconds < 3600: | |
| minutes = int(seconds // 60) | |
| secs = seconds % 60 | |
| return f"{minutes}m {secs:.1f}s" | |
| else: | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = seconds % 60 | |
| return f"{hours}h {minutes}m {secs:.0f}s" | |
| def format_summary(self) -> str: | |
| """Generate human-readable timing summary.""" | |
| lines = [] | |
| lines.append("") | |
| lines.append("=" * 60) | |
| lines.append("PIPELINE TIMING REPORT") | |
| lines.append("=" * 60) | |
| lines.append(f"Total Pipeline Time: {self._format_duration(self.total_duration)}") | |
| lines.append("") | |
| lines.append(f"{'Step':<45} {'Time':>10} {'%':>6}") | |
| lines.append("-" * 60) | |
| # Group by parent | |
| top_level = [name for name in self.order if self.entries[name].parent is None] | |
| for i, name in enumerate(top_level, 1): | |
| entry = self.entries[name] | |
| pct = (entry.duration / self.total_duration * 100) if self.total_duration > 0 else 0 | |
| lines.append(f"{i}. {name:<42} {self._format_duration(entry.duration):>10} {pct:>5.1f}%") | |
| # Find children | |
| children = [n for n in self.order if self.entries[n].parent == name] | |
| for j, child_name in enumerate(children): | |
| child = self.entries[child_name] | |
| child_pct = (child.duration / self.total_duration * 100) if self.total_duration > 0 else 0 | |
| prefix = "└─" if j == len(children) - 1 else "├─" | |
| lines.append(f" {prefix} {child_name:<39} {self._format_duration(child.duration):>10} {child_pct:>5.1f}%") | |
| lines.append("=" * 60) | |
| return "\n".join(lines) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert report to dictionary for JSON serialization.""" | |
| return { | |
| "total_duration_seconds": self.total_duration, | |
| "total_duration_formatted": self._format_duration(self.total_duration), | |
| "entries": [self.entries[name].to_dict() for name in self.order] | |
| } | |
| def save_json(self, filepath: str): | |
| """Save timing report to JSON file.""" | |
| path = Path(filepath) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, 'w') as f: | |
| json.dump(self.to_dict(), f, indent=2) | |
| logger.info(f"Timing report saved to: {filepath}") | |
| def save_csv(self, filepath: str): | |
| """Save timing report to CSV file.""" | |
| path = Path(filepath) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, 'w') as f: | |
| f.write("step,parent,duration_seconds,duration_formatted,percentage\n") | |
| for name in self.order: | |
| entry = self.entries[name] | |
| pct = (entry.duration / self.total_duration * 100) if self.total_duration > 0 else 0 | |
| parent = entry.parent or "" | |
| f.write(f'"{name}","{parent}",{entry.duration:.3f},"{self._format_duration(entry.duration)}",{pct:.2f}\n') | |
| logger.info(f"Timing report saved to: {filepath}") | |
| # Global timing report instance for pipeline use | |
| _global_report: Optional[TimingReport] = None | |
| def get_timing_report() -> TimingReport: | |
| """Get or create the global timing report instance.""" | |
| global _global_report | |
| if _global_report is None: | |
| _global_report = TimingReport() | |
| return _global_report | |
| def reset_timing_report(): | |
| """Reset the global timing report.""" | |
| global _global_report | |
| _global_report = None | |
| def time_step(name: str, parent: Optional[str] = None): | |
| """ | |
| Convenience function to time a step using the global report. | |
| Args: | |
| name: Name of the step | |
| parent: Optional parent step name | |
| """ | |
| report = get_timing_report() | |
| with report.time(name, parent=parent) as entry: | |
| yield entry | |