import json from pathlib import Path import sqlite3 import pickle from functools import lru_cache import threading import pandas as pd import ast import yaml class TracePreprocessor: def __init__(self, db_path='preprocessed_traces.db'): self.db_path = db_path self.local = threading.local() def get_conn(self): if not hasattr(self.local, 'conn'): self.local.conn = sqlite3.connect(self.db_path) return self.local.conn def create_tables(self): with self.get_conn() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS preprocessed_traces ( benchmark_name TEXT, agent_name TEXT, raw_logging_results BLOB, PRIMARY KEY (benchmark_name, agent_name) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS failure_reports ( benchmark_name TEXT, agent_name TEXT, failure_report BLOB, PRIMARY KEY (benchmark_name, agent_name) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS parsed_results ( benchmark_name TEXT, agent_name TEXT, date TEXT, successful_tasks TEXT, failed_tasks TEXT, total_cost REAL, accuracy REAL, precision REAL, recall REAL, f1_score REAL, auc REAL, overall_score REAL, vectorization_score REAL, fathomnet_score REAL, feedback_score REAL, house_price_score REAL, spaceship_titanic_score REAL, amp_parkinsons_disease_progression_prediction_score REAL, cifar10_score REAL, imdb_score REAL, PRIMARY KEY (benchmark_name, agent_name) ) ''') def preprocess_traces(self, processed_dir="evals_live"): self.create_tables() processed_dir = Path(processed_dir) for file in processed_dir.glob('*.json'): with open(file, 'r') as f: data = json.load(f) agent_name = data['config']['agent_name'] benchmark_name = data['config']['benchmark_name'] try: raw_logging_results = pickle.dumps(data['raw_logging_results']) with self.get_conn() as conn: conn.execute(''' INSERT OR REPLACE INTO preprocessed_traces (benchmark_name, agent_name, raw_logging_results) VALUES (?, ?, ?) ''', (benchmark_name, agent_name, raw_logging_results)) except Exception as e: print(f"Error preprocessing raw_logging_results in {file}: {e}") try: failure_report = pickle.dumps(data['failure_report']) with self.get_conn() as conn: conn.execute(''' INSERT OR REPLACE INTO failure_reports (benchmark_name, agent_name, failure_report) VALUES (?, ?, ?) ''', (benchmark_name, agent_name, failure_report)) except Exception as e: print(f"Error preprocessing failure_report in {file}: {e}") try: config = data['config'] results = data['results'] with self.get_conn() as conn: conn.execute(''' INSERT OR REPLACE INTO parsed_results (benchmark_name, agent_name, date, successful_tasks, failed_tasks, total_cost, accuracy, precision, recall, f1_score, auc, overall_score, vectorization_score, fathomnet_score, feedback_score, house_price_score, spaceship_titanic_score, amp_parkinsons_disease_progression_prediction_score, cifar10_score, imdb_score) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( benchmark_name, agent_name, config['date'], str(results.get('successful_tasks')), str(results.get('failed_tasks')), results.get('total_cost'), results.get('accuracy'), results.get('precision'), results.get('recall'), results.get('f1_score'), results.get('auc'), results.get('overall_score'), results.get('vectorization_score'), results.get('fathomnet_score'), results.get('feedback_score'), results.get('house-price_score'), results.get('spaceship-titanic_score'), results.get('amp-parkinsons-disease-progression-prediction_score'), results.get('cifar10_score'), results.get('imdb_score') )) except Exception as e: print(f"Error preprocessing parsed results in {file}: {e}") @lru_cache(maxsize=100) def get_analyzed_traces(self, agent_name, benchmark_name): with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(''' SELECT raw_logging_results FROM preprocessed_traces WHERE benchmark_name = ? AND agent_name = ? ''', (benchmark_name, agent_name)) result = cursor.fetchone() if result: return pickle.loads(result[0]) return None @lru_cache(maxsize=100) def get_failure_report(self, agent_name, benchmark_name): with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(''' SELECT failure_report FROM failure_reports WHERE benchmark_name = ? AND agent_name = ? ''', (benchmark_name, agent_name)) result = cursor.fetchone() if result: return pickle.loads(result[0]) return None def get_parsed_results(self, benchmark_name): with self.get_conn() as conn: query = ''' SELECT * FROM parsed_results WHERE benchmark_name = ? ORDER BY accuracy DESC ''' df = pd.read_sql_query(query, conn, params=(benchmark_name,)) # Load verified agents verified_agents = self.load_verified_agents() # Add 'Verified' column df['Verified'] = df.apply(lambda row: '✓' if (benchmark_name, row['agent_name']) in verified_agents else '', axis=1) # Round float columns to 3 decimal places float_columns = ['total_cost', 'accuracy', 'precision', 'recall', 'f1_score', 'auc', 'overall_score', 'vectorization_score', 'fathomnet_score', 'feedback_score', 'house-price_score', 'spaceship-titanic_score', 'amp-parkinsons-disease-progression-prediction_score', 'cifar10_score', 'imdb_score'] for column in float_columns: if column in df.columns: df[column] = df[column].round(3) df = df.drop(columns=['successful_tasks', 'failed_tasks'], axis=1) # Rename columns df = df.rename(columns={ 'agent_name': 'Agent Name', 'date': 'Date', 'total_cost': 'Total Cost', 'accuracy': 'Accuracy', 'precision': 'Precision', 'recall': 'Recall', 'f1_score': 'F1 Score', 'auc': 'AUC', 'overall_score': 'Overall Score', 'vectorization_score': 'Vectorization Score', 'fathomnet_score': 'Fathomnet Score', 'feedback_score': 'Feedback Score', 'house_price_score': 'House Price Score', 'spaceship_titanic_score': 'Spaceship Titanic Score', 'amp_parkinsons_disease_progression_prediction_score': 'AMP Parkinsons Disease Progression Prediction Score', 'cifar10_score': 'CIFAR10 Score', 'imdb_score': 'IMDB Score' }) return df def get_task_success_data(self, benchmark_name): with self.get_conn() as conn: query = ''' SELECT agent_name, successful_tasks, failed_tasks FROM parsed_results WHERE benchmark_name = ? ''' df = pd.read_sql_query(query, conn, params=(benchmark_name,)) # Get all unique task IDs task_ids = set() for tasks in df['successful_tasks']: if ast.literal_eval(tasks) is not None: task_ids.update(ast.literal_eval(tasks)) for tasks in df['failed_tasks']: if ast.literal_eval(tasks) is not None: task_ids.update(ast.literal_eval(tasks)) # Create a DataFrame with agent_name, task_ids, and success columns data_list = [] for _, row in df.iterrows(): agent_name = row['agent_name'] for task_id in task_ids: success = 1 if task_id in row['successful_tasks'] else 0 data_list.append({ 'agent_name': agent_name, 'task_id': task_id, 'success': success }) df = pd.DataFrame(data_list) df = df.rename(columns={ 'agent_name': 'Agent Name', 'task_id': 'Task ID', 'success': 'Success' }) return df def load_verified_agents(self, file_path='verified_agents.yaml'): with open(file_path, 'r') as f: verified_data = yaml.safe_load(f) verified_agents = set() for benchmark, agents in verified_data.items(): for agent in agents: verified_agents.add((benchmark, agent['agent_name'])) return verified_agents if __name__ == '__main__': preprocessor = TracePreprocessor() preprocessor.preprocess_traces()