team_16trial / app.py
dcrey7's picture
Update app.py
0f05b68 verified
raw
history blame
5.44 kB
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit, join_room
from flask_cors import CORS
import os
import requests
import json
import uuid
from datetime import datetime
from dotenv import load_dotenv
import logging
from werkzeug.utils import secure_filename
import random
# Initialize Flask with CORS
app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# Configure Socket.IO for Hugging Face Spaces
socketio = SocketIO(app,
cors_allowed_origins="*",
async_mode='eventlet',
logger=True,
engineio_logger=True
)
# Load environment variables
load_dotenv()
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY')
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GameState:
"""Manages all active game sessions with WebSocket room support"""
def __init__(self):
self.games = {}
self.cleanup_interval = 3600
def create_game(self):
game_id = str(uuid.uuid4())
self.games[game_id] = {
'players': [],
'current_phase': 'setup',
'recordings': {},
'impostor': None,
'votes': {},
'question': None,
'impostor_answer': None,
'modified_recording': None,
'round_number': 1,
'start_time': datetime.now().isoformat(),
'completed_rounds': [],
'score': {'impostor_wins': 0, 'player_wins': 0},
'socket_room': f'game_{game_id}'
}
return game_id
def cleanup_inactive_games(self):
current_time = datetime.now()
for game_id, game in list(self.games.items()):
if (current_time - datetime.fromisoformat(game['start_time'])).total_seconds() > 7200:
del self.games[game_id]
game_state = GameState()
# WebSocket Handlers
@socketio.on('connect')
def handle_connect():
logger.info('Client connected: %s', request.sid)
@socketio.on('disconnect')
def handle_disconnect():
logger.info('Client disconnected: %s', request.sid)
@socketio.on('create_game')
def handle_create_game():
try:
game_id = game_state.create_game()
emit('game_created', {
'status': 'success',
'gameId': game_id,
'message': 'Game created successfully'
})
logger.info('Created game: %s', game_id)
except Exception as e:
logger.error('Game creation failed: %s', str(e))
emit('game_created', {
'status': 'error',
'error': 'Game creation failed',
'details': str(e)
})
@socketio.on('join_game')
def handle_join_game(data):
try:
game_id = data.get('game_id')
player_name = data.get('player_name')
if not game_id or not player_name:
raise ValueError('Missing game ID or player name')
if game_id not in game_state.games:
raise KeyError('Game not found')
game = game_state.games[game_id]
if len(game['players']) >= 5:
raise ValueError('Game is full')
player_id = len(game['players']) + 1
new_player = {
'id': player_id,
'name': player_name,
'socket_id': request.sid
}
game['players'].append(new_player)
join_room(game['socket_room'])
emit('player_joined', {
'status': 'success',
'player': new_player
}, room=game['socket_room'])
logger.info('Player %s joined game %s', player_name, game_id)
except Exception as e:
logger.error('Join game error: %s', str(e))
emit('join_failed', {
'status': 'error',
'error': str(e)
})
# REST API Endpoints
@app.route('/')
def home():
return render_template('index.html')
@app.route('/api/start_game', methods=['POST'])
def start_game():
try:
data = request.get_json()
game_id = data.get('game_id')
if not game_id or game_id not in game_state.games:
return jsonify({'status': 'error', 'error': 'Invalid game ID'}), 400
game = game_state.games[game_id]
game['current_phase'] = 'recording'
socketio.emit('round_start', {
'phase': 'recording',
'duration': 30
}, room=game['socket_room'])
return jsonify({'status': 'success', 'message': 'Game started'})
except Exception as e:
logger.error('Start game error: %s', str(e))
return jsonify({'status': 'error', 'error': str(e)}), 500
# Voice Processing Functions
async def generate_question():
# Implementation remains same as before
pass
async def generate_impostor_answer(question):
# Implementation remains same as before
pass
async def clone_voice(audio_file):
# Implementation remains same as before
pass
async def generate_cloned_speech(voice_id, text):
# Implementation remains same as before
pass
if __name__ == '__main__':
os.makedirs('temp', exist_ok=True)
socketio.run(app,
host='0.0.0.0',
port=7860,
debug=True,
allow_unsafe_werkzeug=True,
use_reloader=False
)