Spaces:
Build error
Build error
File size: 7,450 Bytes
8ef403e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
#!/usr/bin/env python3
"""
Timing utilities for AbMelt inference pipeline.
Provides context managers and report generation for measuring pipeline performance.
"""
import time
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@dataclass
class TimingEntry:
"""Single timing entry with start/end times."""
name: str
start_time: float = 0.0
end_time: float = 0.0
duration: float = 0.0
parent: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"name": self.name,
"duration_seconds": self.duration,
"parent": self.parent
}
class TimingReport:
"""
Aggregates timing data with hierarchical structure.
Usage:
report = TimingReport()
with report.time("Step 1"):
# do work
with report.time("Sub-step 1.1", parent="Step 1"):
# do sub-work
print(report.format_summary())
report.save_json("timing.json")
"""
def __init__(self):
self.entries: Dict[str, TimingEntry] = {}
self.order: List[str] = [] # Preserve insertion order
self.start_time: float = 0.0
self.end_time: float = 0.0
def start(self):
"""Mark the start of the overall timing."""
self.start_time = time.perf_counter()
def stop(self):
"""Mark the end of the overall timing."""
self.end_time = time.perf_counter()
@property
def total_duration(self) -> float:
"""Total pipeline duration in seconds."""
if self.end_time > 0:
return self.end_time - self.start_time
return time.perf_counter() - self.start_time
@contextmanager
def time(self, name: str, parent: Optional[str] = None):
"""
Context manager for timing a code block.
Args:
name: Name of the step being timed
parent: Optional parent step name for hierarchical display
"""
entry = TimingEntry(name=name, parent=parent)
entry.start_time = time.perf_counter()
try:
yield entry
finally:
entry.end_time = time.perf_counter()
entry.duration = entry.end_time - entry.start_time
self.entries[name] = entry
if name not in self.order:
self.order.append(name)
logger.info(f"[TIMING] {name}: {self._format_duration(entry.duration)}")
def add_entry(self, name: str, duration: float, parent: Optional[str] = None):
"""Manually add a timing entry."""
entry = TimingEntry(name=name, duration=duration, parent=parent)
self.entries[name] = entry
if name not in self.order:
self.order.append(name)
@staticmethod
def _format_duration(seconds: float) -> str:
"""Format duration in human-readable format."""
if seconds < 60:
return f"{seconds:.2f}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}m {secs:.1f}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours}h {minutes}m {secs:.0f}s"
def format_summary(self) -> str:
"""Generate human-readable timing summary."""
lines = []
lines.append("")
lines.append("=" * 60)
lines.append("PIPELINE TIMING REPORT")
lines.append("=" * 60)
lines.append(f"Total Pipeline Time: {self._format_duration(self.total_duration)}")
lines.append("")
lines.append(f"{'Step':<45} {'Time':>10} {'%':>6}")
lines.append("-" * 60)
# Group by parent
top_level = [name for name in self.order if self.entries[name].parent is None]
for i, name in enumerate(top_level, 1):
entry = self.entries[name]
pct = (entry.duration / self.total_duration * 100) if self.total_duration > 0 else 0
lines.append(f"{i}. {name:<42} {self._format_duration(entry.duration):>10} {pct:>5.1f}%")
# Find children
children = [n for n in self.order if self.entries[n].parent == name]
for j, child_name in enumerate(children):
child = self.entries[child_name]
child_pct = (child.duration / self.total_duration * 100) if self.total_duration > 0 else 0
prefix = "└─" if j == len(children) - 1 else "├─"
lines.append(f" {prefix} {child_name:<39} {self._format_duration(child.duration):>10} {child_pct:>5.1f}%")
lines.append("=" * 60)
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Convert report to dictionary for JSON serialization."""
return {
"total_duration_seconds": self.total_duration,
"total_duration_formatted": self._format_duration(self.total_duration),
"entries": [self.entries[name].to_dict() for name in self.order]
}
def save_json(self, filepath: str):
"""Save timing report to JSON file."""
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
json.dump(self.to_dict(), f, indent=2)
logger.info(f"Timing report saved to: {filepath}")
def save_csv(self, filepath: str):
"""Save timing report to CSV file."""
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
f.write("step,parent,duration_seconds,duration_formatted,percentage\n")
for name in self.order:
entry = self.entries[name]
pct = (entry.duration / self.total_duration * 100) if self.total_duration > 0 else 0
parent = entry.parent or ""
f.write(f'"{name}","{parent}",{entry.duration:.3f},"{self._format_duration(entry.duration)}",{pct:.2f}\n')
logger.info(f"Timing report saved to: {filepath}")
# Global timing report instance for pipeline use
_global_report: Optional[TimingReport] = None
def get_timing_report() -> TimingReport:
"""Get or create the global timing report instance."""
global _global_report
if _global_report is None:
_global_report = TimingReport()
return _global_report
def reset_timing_report():
"""Reset the global timing report."""
global _global_report
_global_report = None
@contextmanager
def time_step(name: str, parent: Optional[str] = None):
"""
Convenience function to time a step using the global report.
Args:
name: Name of the step
parent: Optional parent step name
"""
report = get_timing_report()
with report.time(name, parent=parent) as entry:
yield entry
|