Upload nightly test metrics to GH artifacts#17696
Conversation
Summary of ChangesHello @Kangyan-Zhou, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the continuous integration pipeline by introducing a robust system for collecting, processing, and consolidating performance metrics from nightly benchmarks. The new scripts ensure that detailed performance data, including latency and throughput, is systematically gathered from various benchmark runs and then merged into a single artifact. This streamlined approach will provide better insights into performance trends and regressions over time, making it easier to monitor the impact of code changes. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces two new scripts for collecting and merging test metrics, which is a great addition for CI monitoring. The code is well-structured and includes good error handling. I've provided a few suggestions to improve maintainability by reducing code duplication and simplifying some logic. Overall, this is a solid contribution.
| def find_partition_files(input_dir: str) -> list[str]: | ||
| """Find all partition metric files in the input directory.""" | ||
| patterns = [ | ||
| os.path.join(input_dir, "metrics-*.json"), | ||
| os.path.join(input_dir, "**/metrics-*.json"), | ||
| ] | ||
| files = [] | ||
| for pattern in patterns: | ||
| files.extend(glob.glob(pattern, recursive=True)) | ||
| return list(set(files)) |
There was a problem hiding this comment.
The logic for finding partition files can be simplified. The **/metrics-*.json pattern with recursive=True is sufficient to find files in both the input directory and its subdirectories, making the other pattern and the subsequent deduplication with set() unnecessary.
def find_partition_files(input_dir: str) -> list[str]:
"""Find all partition metric files in the input directory."""
pattern = os.path.join(input_dir, "**/metrics-*.json")
return glob.glob(pattern, recursive=True)| def merge_metrics( | ||
| input_dir: str, | ||
| output_file: str, | ||
| run_id: str, | ||
| commit_sha: str, | ||
| branch: str | None = None, | ||
| ) -> bool: | ||
| """Merge all partition metrics into a consolidated file.""" | ||
| # Find all partition files | ||
| partition_files = find_partition_files(input_dir) | ||
| print(f"Found {len(partition_files)} partition file(s)") | ||
|
|
||
| if not partition_files: | ||
| print("No partition metrics files found") | ||
| # Create an empty consolidated file | ||
| consolidated = { | ||
| "run_id": run_id, | ||
| "run_date": datetime.now(timezone.utc).isoformat(), | ||
| "commit_sha": commit_sha, | ||
| "branch": branch, | ||
| "results": [], | ||
| } | ||
| try: | ||
| os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True) | ||
| with open(output_file, "w", encoding="utf-8") as f: | ||
| json.dump(consolidated, f, indent=2) | ||
| print(f"Created empty consolidated file: {output_file}") | ||
| return True | ||
| except OSError as e: | ||
| print(f"Error writing empty consolidated file: {e}") | ||
| return False | ||
|
|
||
| # Load all partition files | ||
| all_results = [] | ||
| for filepath in sorted(partition_files): | ||
| print(f" Reading: {filepath}") | ||
| metrics = load_partition_metrics(filepath) | ||
| if metrics and "results" in metrics: | ||
| all_results.extend(metrics["results"]) | ||
|
|
||
| print(f"Total results collected: {len(all_results)}") | ||
|
|
||
| # Create consolidated structure | ||
| consolidated = { | ||
| "run_id": run_id, | ||
| "run_date": datetime.now(timezone.utc).isoformat(), | ||
| "commit_sha": commit_sha, | ||
| "branch": branch, | ||
| "results": all_results, | ||
| } | ||
|
|
||
| # Ensure output directory exists and write output | ||
| try: | ||
| output_dir = os.path.dirname(output_file) | ||
| if output_dir: | ||
| os.makedirs(output_dir, exist_ok=True) | ||
| with open(output_file, "w", encoding="utf-8") as f: | ||
| json.dump(consolidated, f, indent=2) | ||
| print(f"Saved consolidated metrics to: {output_file}") | ||
| return True | ||
| except OSError as e: | ||
| print(f"Error writing consolidated file: {e}") | ||
| return False |
There was a problem hiding this comment.
This function can be refactored to improve clarity and reduce code duplication. Specifically:
- The timestamp
datetime.now()is fetched in two separate branches. It's better to fetch it once at the beginning for consistency. - The logic for creating the output directory and writing the JSON file is duplicated. This can be consolidated into a single block at the end.
Here's a suggested refactoring that addresses these points:
def merge_metrics(
input_dir: str,
output_file: str,
run_id: str,
commit_sha: str,
branch: str | None = None,
) -> bool:
"""Merge all partition metrics into a consolidated file."""
run_date = datetime.now(timezone.utc).isoformat()
# Find all partition files
partition_files = find_partition_files(input_dir)
print(f"Found {len(partition_files)} partition file(s)")
all_results = []
if not partition_files:
print("No partition metrics files found")
else:
# Load all partition files
for filepath in sorted(partition_files):
print(f" Reading: {filepath}")
metrics = load_partition_metrics(filepath)
if metrics and "results" in metrics:
all_results.extend(metrics["results"])
print(f"Total results collected: {len(all_results)}")
# Create consolidated structure
consolidated = {
"run_id": run_id,
"run_date": run_date,
"commit_sha": commit_sha,
"branch": branch,
"results": all_results,
}
# Ensure output directory exists and write output
try:
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(consolidated, f, indent=2)
if not partition_files:
print(f"Created empty consolidated file: {output_file}")
else:
print(f"Saved consolidated metrics to: {output_file}")
return True
except OSError as e:
print(f"Error writing consolidated file: {e}")
return False| def find_result_files(search_dirs: list[str]) -> list[str]: | ||
| """Find all results_*.json files in the given directories.""" | ||
| result_files = [] | ||
| for search_dir in search_dirs: | ||
| if os.path.exists(search_dir): | ||
| pattern = os.path.join(search_dir, "results_*.json") | ||
| files = glob.glob(pattern) | ||
| result_files.extend(files) | ||
|
|
||
| # Also search in subdirectories (profile directories) | ||
| subdir_pattern = os.path.join(search_dir, "*", "results_*.json") | ||
| result_files.extend(glob.glob(subdir_pattern)) | ||
|
|
||
| # Also search in current directory and test directory | ||
| for pattern in ["results_*.json", "test/results_*.json"]: | ||
| result_files.extend(glob.glob(pattern)) | ||
|
|
||
| return list(set(result_files)) # Remove duplicates |
There was a problem hiding this comment.
This function can be simplified and made more robust. Using glob with a **/results_*.json pattern and recursive=True will handle searching in subdirectories more cleanly. The hardcoded paths at the end are redundant given the default search_dirs in main. Using a set to collect files will also prevent duplicates from overlapping search paths.
def find_result_files(search_dirs: list[str]) -> list[str]:
"""Find all results_*.json files in the given directories."""
result_files = set()
for search_dir in search_dirs:
if os.path.exists(search_dir):
pattern = os.path.join(search_dir, "**/results_*.json")
result_files.update(glob.glob(pattern, recursive=True))
return list(result_files)| "latency_ms": (latency or 0) * 1000 if latency is not None else None, | ||
| "input_throughput": result.get("input_throughput"), | ||
| "output_throughput": result.get("output_throughput"), | ||
| "overall_throughput": result.get("overall_throughput"), | ||
| "ttft_ms": (last_ttft or 0) * 1000 if last_ttft is not None else None, |
There was a problem hiding this comment.
The expressions for calculating latency_ms and ttft_ms can be simplified. The or 0 is redundant because of the if ... is not None check.
"latency_ms": latency * 1000 if latency is not None else None,
"input_throughput": result.get("input_throughput"),
"output_throughput": result.get("output_throughput"),
"overall_throughput": result.get("overall_throughput"),
"ttft_ms": last_ttft * 1000 if last_ttft is not None else None,| def save_metrics( | ||
| gpu_config: str, | ||
| partition: int, | ||
| run_id: str, | ||
| output_file: str, | ||
| search_dirs: list[str], | ||
| ) -> bool: | ||
| """Collect metrics and save to output file.""" | ||
| # Find all result files | ||
| result_files = find_result_files(search_dirs) | ||
| print(f"Found {len(result_files)} result file(s)") | ||
|
|
||
| if not result_files: | ||
| print("No benchmark result files found") | ||
| # Create an empty metrics file to indicate the job ran but had no results | ||
| metrics = { | ||
| "run_id": run_id, | ||
| "timestamp": datetime.now(timezone.utc).isoformat(), | ||
| "gpu_config": gpu_config, | ||
| "partition": partition, | ||
| "results": [], | ||
| } | ||
| try: | ||
| os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True) | ||
| with open(output_file, "w", encoding="utf-8") as f: | ||
| json.dump(metrics, f, indent=2) | ||
| print(f"Created empty metrics file: {output_file}") | ||
| return True | ||
| except OSError as e: | ||
| print(f"Error writing empty metrics file: {e}") | ||
| return False | ||
|
|
||
| # Parse all result files | ||
| all_results = [] | ||
| for filepath in result_files: | ||
| print(f" Reading: {filepath}") | ||
| results = parse_result_file(filepath) | ||
| all_results.extend(results) | ||
|
|
||
| print(f"Total benchmark results: {len(all_results)}") | ||
|
|
||
| # Group by model/variant | ||
| grouped = group_results_by_model(all_results, gpu_config, partition) | ||
|
|
||
| # Create metrics structure | ||
| metrics = { | ||
| "run_id": run_id, | ||
| "timestamp": datetime.now(timezone.utc).isoformat(), | ||
| "gpu_config": gpu_config, | ||
| "partition": partition, | ||
| "results": grouped, | ||
| } | ||
|
|
||
| # Ensure output directory exists and write output | ||
| try: | ||
| os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True) | ||
| with open(output_file, "w", encoding="utf-8") as f: | ||
| json.dump(metrics, f, indent=2) | ||
| print(f"Saved metrics to: {output_file}") | ||
| return True | ||
| except OSError as e: | ||
| print(f"Error writing metrics file: {e}") | ||
| return False |
There was a problem hiding this comment.
This function can be refactored to improve clarity and reduce code duplication, similar to merge_metrics.py. Specifically:
- The timestamp
datetime.now()is fetched in two separate branches. It's better to fetch it once at the beginning for consistency. - The logic for creating the output directory and writing the JSON file is duplicated. This can be consolidated into a single block at the end.
Here's a suggested refactoring:
def save_metrics(
gpu_config: str,
partition: int,
run_id: str,
output_file: str,
search_dirs: list[str],
) -> bool:
"""Collect metrics and save to output file."""
timestamp = datetime.now(timezone.utc).isoformat()
# Find all result files
result_files = find_result_files(search_dirs)
print(f"Found {len(result_files)} result file(s)")
grouped = []
if not result_files:
print("No benchmark result files found")
else:
# Parse all result files
all_results = []
for filepath in sorted(result_files):
print(f" Reading: {filepath}")
results = parse_result_file(filepath)
all_results.extend(results)
print(f"Total benchmark results: {len(all_results)}")
# Group by model/variant
grouped = group_results_by_model(all_results, gpu_config, partition)
# Create metrics structure
metrics = {
"run_id": run_id,
"timestamp": timestamp,
"gpu_config": gpu_config,
"partition": partition,
"results": grouped,
}
# Ensure output directory exists and write output
try:
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(metrics, f, indent=2)
if not result_files:
print(f"Created empty metrics file: {output_file}")
else:
print(f"Saved metrics to: {output_file}")
return True
except OSError as e:
print(f"Error writing metrics file: {e}")
return False52501bc to
419d1bb
Compare
Motivation
Modifications
Accuracy Tests
Benchmarking and Profiling
Checklist
Review Process
/tag-run-ci-label,/rerun-failed-ci,/tag-and-rerun-ci