Spaces:
Running
Running
import gradio as gr | |
from gradio_leaderboard import Leaderboard, SelectColumns, ColumnFilter | |
import config | |
from envs import RESULTS_REPO_ID, REPO_ID, API, HF_TOKEN | |
from pathlib import Path | |
import pandas as pd | |
import os | |
import json | |
from utils import parse_json_files, create_scatter_plot, create_flow_chart | |
from huggingface_hub import snapshot_download | |
from apscheduler.schedulers.background import BackgroundScheduler | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=HF_TOKEN) | |
# New function to download results | |
def download_latest_results(): | |
print("Downloading latest results...") | |
snapshot_download(RESULTS_REPO_ID, | |
local_dir=abs_path / "evals", | |
repo_type='dataset', | |
tqdm_class=None, | |
etag_timeout=30, | |
max_workers=4, | |
) | |
print("Download complete.") | |
abs_path = Path(__file__).parent | |
# load task_analyses.json from evals/usaco_traces folder | |
with open(os.path.join(abs_path, "evals", "usaco_traces", "task_analyses.json"), "r") as f: | |
analyzed_traces = json.load(f) | |
def update_task_analysis(task_id): | |
if task_id not in analyzed_traces: | |
return "No analysis available for this task.", None, [], "" | |
analysis = analyzed_traces[task_id] | |
summary = analysis['summary'] | |
if isinstance(summary, str): | |
try: | |
summary = json.loads(summary) | |
except json.JSONDecodeError: | |
return "Error: Unable to parse summary data.", None, [], "" | |
elif not isinstance(summary, dict): | |
return "Error: Summary data is in an unexpected format.", None, [], "" | |
overview = f"# Task Overview\n\n{summary.get('overview', 'No overview available.')}\n\n" | |
overview += f"## Successes\n{summary.get('successes', 'No successes listed.')}\n\n" | |
overview += f"## Challenges\n{summary.get('challenges', 'No challenges listed.')}\n\n" | |
steps = [(f"Step {i+1}", i) for i in range(len(analysis['steps']))] | |
flow_chart = create_flow_chart(analysis['steps']) | |
return overview, flow_chart, gr.Dropdown(choices=steps, label="Agent Steps"), "" | |
def update_step_details(task_id, step_index): | |
if task_id not in analyzed_traces: | |
return "No analysis available for this task." | |
if step_index is None: | |
return "Please select a step to view details." | |
steps = analyzed_traces[task_id]['steps'] | |
if isinstance(step_index, tuple): | |
step_index = step_index[1] | |
elif isinstance(step_index, str): | |
step_index = int(step_index.split()[-1]) - 1 | |
if step_index < 0 or step_index >= len(steps): | |
return f"Invalid step index: {step_index}" | |
step = steps[step_index] | |
analysis = step['analysis'] | |
if isinstance(analysis, str): | |
try: | |
analysis = json.loads(analysis) | |
except json.JSONDecodeError: | |
return "Error: Unable to parse step analysis data." | |
elif not isinstance(analysis, dict): | |
return "Error: Step analysis data is in an unexpected format." | |
details = f"# Step {step_index + 1} Details\n\n" | |
details += f"## Description\n{analysis.get('description', 'No description available.')}\n\n" | |
details += f"## Assessment\n{analysis.get('assessment', 'No assessment available.')}\n\n" | |
return details | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# 🥇 Agent Leaderboard | |
""") | |
with gr.Tabs(): | |
with gr.Tab("SWE-Bench"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), "results_total_cost", "results_accuracy", "Cost (in USD)", "Accuracy", ["agent_name"])) | |
with gr.Column(scale=1): | |
Leaderboard( | |
value=parse_json_files(os.path.join(abs_path, "evals"), 'swebench_lite'), | |
select_columns=SelectColumns( | |
default_selection=config.SWEBENCH_ON_LOAD_COLUMNS, | |
cant_deselect=["agent_name"], | |
label="Select Columns to Display:", | |
), | |
search_columns=config.SWEBENCH_SEARCH_COLUMNS, | |
column_widths={"agent_name": 40, | |
"results_accuracy": 20, | |
"results_total_cost": 20}, | |
) | |
with gr.Tab("USACO"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
scatter_plot = gr.Plot(create_scatter_plot(parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), "results_total_cost", "results_accuracy", "Cost", "Accuracy", ["agent_name"])) | |
with gr.Column(scale=1): | |
Leaderboard( | |
value=parse_json_files(os.path.join(abs_path, "evals"), 'usaco'), | |
select_columns=SelectColumns( | |
default_selection=config.USACO_ON_LOAD_COLUMNS, | |
cant_deselect=["agent_name"], | |
label="Select Columns to Display:", | |
), | |
search_columns=config.USACO_SEARCH_COLUMNS, | |
column_widths={"agent_name": 40, | |
"results_accuracy": 20, | |
"results_total_cost": 20}, | |
) | |
gr.Markdown("## USACO Task Trace Explorer") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
task_dropdown = gr.Dropdown(choices=list(analyzed_traces.keys()), label="Select USACO Task") | |
task_overview = gr.Markdown() | |
with gr.Column(scale=1): | |
steps_dropdown = gr.Dropdown(label="Agent Steps") | |
step_details = gr.Markdown() | |
with gr.Row(): | |
flow_chart = gr.Plot(label="Task Flow") | |
task_dropdown.change(update_task_analysis, | |
inputs=[task_dropdown], | |
outputs=[task_overview, flow_chart, steps_dropdown, step_details]) | |
steps_dropdown.change(update_step_details, | |
inputs=[task_dropdown, steps_dropdown], | |
outputs=[step_details]) | |
with gr.Tab("About"): | |
gr.Markdown((Path(__file__).parent / "about.md").read_text()) | |
if __name__ == "__main__": | |
# Download the results from the Hugging Face Hub | |
download_latest_results() | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", hours=1) # restarted every 1h | |
scheduler.add_job(download_latest_results, "interval", hours=1) # download latest results every 1h | |
scheduler.start() | |
demo.launch() |