import os import time import tempfile import requests import json import logging import threading import queue from flask import Flask, render_template, request, jsonify, Response, send_file, send_from_directory from flask_socketio import SocketIO, emit import whisper import torch import ollama # Initialize logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp() app.config['TTS_SERVER'] = os.environ.get('TTS_SERVER_URL', 'http://localhost:5050/v1/audio/speech') app.config['TTS_API_KEY'] = os.environ.get('TTS_API_KEY', '56461d8b44607f2cfcb8030dee313a8e') # Initialize Flask-SocketIO socketio = SocketIO(app, cors_allowed_origins="*") @app.route('/') def root_files(filename): # Check if requested file is one of the common icon filenames common_icons = [ 'favicon.ico', 'apple-touch-icon.png', 'apple-touch-icon-precomposed.png', 'apple-touch-icon-120x120.png', 'apple-touch-icon-120x120-precomposed.png' ] if filename in common_icons: # Map to appropriate icon in static/icons icon_mapping = { 'favicon.ico': 'favicon.ico', 'apple-touch-icon.png': 'apple-icon-180x180.png', 'apple-touch-icon-precomposed.png': 'apple-icon-180x180.png', 'apple-touch-icon-120x120.png': 'apple-icon-120x120.png', 'apple-touch-icon-120x120-precomposed.png': 'apple-icon-120x120.png' } return send_from_directory('static/icons', icon_mapping.get(filename, 'apple-icon-180x180.png')) # If not an icon, return 404 return "File not found", 404 @app.route('/favicon.ico') def favicon(): return send_from_directory('static/icons', 'favicon.ico') @app.route('/apple-touch-icon.png') def apple_touch_icon(): return send_from_directory('static/icons', 'apple-icon-180x180.png') @app.route('/apple-touch-icon-precomposed.png') def apple_touch_icon_precomposed(): return send_from_directory('static/icons', 'apple-icon-180x180.png') @app.route('/apple-touch-icon-120x120.png') def apple_touch_icon_120(): return send_from_directory('static/icons', 'apple-icon-120x120.png') @app.route('/apple-touch-icon-120x120-precomposed.png') def apple_touch_icon_120_precomposed(): return send_from_directory('static/icons', 'apple-icon-120x120.png') # Add this route to your Flask app @app.route('/service-worker.js') def service_worker(): return app.send_static_file('service-worker.js') # Make sure static files are served properly app.static_folder = 'static' @app.route('/static/icons/') def serve_icon(filename): return send_from_directory('static/icons', filename) @app.route('/api/push-public-key', methods=['GET']) def push_public_key(): # For now, return a placeholder. In production, you'd use a real VAPID key return jsonify({'publicKey': 'BDHyDgdhVgJWaKOBQZVPTMvK0ZMFD6c7eXvUMBP16NoRQ9PM-eX-3_hJYy3il8TpN9YVJnQKUQhLCBxBSP5Rxj0'}) @app.route('/api/push-subscribe', methods=['POST']) def push_subscribe(): # This would store subscription info in a database # For now, just acknowledge receipt return jsonify({'success': True}) # Add a route to check TTS server status @app.route('/check_tts_server', methods=['GET']) def check_tts_server(): try: # Get current TTS server configuration tts_server_url = app.config['TTS_SERVER'] tts_api_key = app.config['TTS_API_KEY'] # Try a simple request to the TTS server with a minimal payload headers = { "Content-Type": "application/json", "Authorization": f"Bearer {tts_api_key}" } # For status check, we'll just check if the server responds to a HEAD request # or a minimal POST with a very short text to minimize bandwidth usage try: response = requests.head( tts_server_url.split('/v1/audio/speech')[0] + '/v1/models', headers=headers, timeout=5 ) status_code = response.status_code except: # If HEAD request fails, try minimal POST response = requests.post( tts_server_url, headers=headers, json={ "input": "Test", "voice": "echo", "response_format": "mp3", "speed": 1.0 }, timeout=5 ) status_code = response.status_code if status_code in [200, 401, 403]: # Even auth errors mean server is running logger.info(f"TTS server is reachable at {tts_server_url}") return jsonify({ 'status': 'online' if status_code == 200 else 'auth_error', 'message': 'TTS server is online' if status_code == 200 else 'Authentication error. Check API key.', 'url': tts_server_url, 'code': status_code }) else: logger.warning(f"TTS server returned status code {status_code}") return jsonify({ 'status': 'error', 'message': f'TTS server returned status code {status_code}', 'url': tts_server_url, 'code': status_code }) except requests.exceptions.RequestException as e: logger.error(f"Cannot connect to TTS server: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Cannot connect to TTS server: {str(e)}', 'url': app.config['TTS_SERVER'] }) @app.route('/update_tts_config', methods=['POST']) def update_tts_config(): try: data = request.json tts_server_url = data.get('server_url') tts_api_key = data.get('api_key') if tts_server_url: app.config['TTS_SERVER'] = tts_server_url logger.info(f"Updated TTS server URL to {tts_server_url}") if tts_api_key: app.config['TTS_API_KEY'] = tts_api_key logger.info("Updated TTS API key") return jsonify({ 'success': True, 'message': 'TTS configuration updated', 'url': app.config['TTS_SERVER'] }) except Exception as e: logger.error(f"Failed to update TTS config: {str(e)}") return jsonify({ 'success': False, 'error': f'Failed to update TTS config: {str(e)}' }), 500 # Load Whisper model logger.info("Loading Whisper model...") whisper_model = whisper.load_model("base") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") whisper_model = whisper_model.to(device) logger.info("Whisper model loaded successfully") # Supported languages SUPPORTED_LANGUAGES = { "ar": "Arabic", "hy": "Armenian", "az": "Azerbaijani", "en": "English", "fr": "French", "ka": "Georgian", "kk": "Kazakh", "zh": "Mandarin", "fa": "Farsi", "pt": "Portuguese", "ru": "Russian", "es": "Spanish", "tr": "Turkish", "uz": "Uzbek" } # Map language names to language codes LANGUAGE_TO_CODE = {v: k for k, v in SUPPORTED_LANGUAGES.items()} # Map language names to OpenAI TTS voice options LANGUAGE_TO_VOICE = { "Arabic": "ar-EG-ShakirNeural", # Using OpenAI general voices "Armenian": "echo", # as OpenAI doesn't have specific voices "Azerbaijani": "az-AZ-BanuNeural", # for all these languages "English": "en-GB-RyanNeural", # We'll use the available voices "French": "fr-FR-DeniseNeural", # and rely on the translation being "Georgian": "ka-GE-GiorgiNeural", # in the correct language text "Kazakh": "kk-KZ-DauletNeural", "Mandarin": "zh-CN-YunjianNeural", "Farsi": "fa-IR-FaridNeural", "Portuguese": "pt-BR-ThalitaNeural", "Russian": "ru-RU-SvetlanaNeural", "Spanish": "es-CR-MariaNeural", "Turkish": "tr-TR-EmelNeural", "Uzbek": "uz-UZ-SardorNeural" } @app.route('/') def index(): return render_template('index.html', languages=sorted(SUPPORTED_LANGUAGES.values())) @app.route('/transcribe', methods=['POST']) def transcribe(): if 'audio' not in request.files: return jsonify({'error': 'No audio file provided'}), 400 audio_file = request.files['audio'] source_lang = request.form.get('source_lang', '') # Save the audio file temporarily temp_path = os.path.join(app.config['UPLOAD_FOLDER'], 'input_audio.wav') audio_file.save(temp_path) try: # Use Whisper for transcription result = whisper_model.transcribe( temp_path, language=LANGUAGE_TO_CODE.get(source_lang, None) ) transcribed_text = result["text"] return jsonify({ 'success': True, 'text': transcribed_text }) except Exception as e: logger.error(f"Transcription error: {str(e)}") return jsonify({'error': f'Transcription failed: {str(e)}'}), 500 finally: # Clean up the temporary file if os.path.exists(temp_path): os.remove(temp_path) @app.route('/translate', methods=['POST']) def translate(): try: data = request.json text = data.get('text', '') source_lang = data.get('source_lang', '') target_lang = data.get('target_lang', '') if not text or not source_lang or not target_lang: return jsonify({'error': 'Missing required parameters'}), 400 # Create a prompt for Gemma 3 translation prompt = f""" Translate the following text from {source_lang} to {target_lang}: "{text}" Provide only the translation without any additional text. """ # Use Ollama to interact with Gemma 3 response = ollama.chat( model="gemma3:27b", messages=[ { "role": "user", "content": prompt } ] ) translated_text = response['message']['content'].strip() return jsonify({ 'success': True, 'translation': translated_text }) except Exception as e: logger.error(f"Translation error: {str(e)}") return jsonify({'error': f'Translation failed: {str(e)}'}), 500 @app.route('/speak', methods=['POST']) def speak(): try: data = request.json text = data.get('text', '') language = data.get('language', '') if not text or not language: return jsonify({'error': 'Missing required parameters'}), 400 voice = LANGUAGE_TO_VOICE.get(language, 'echo') # Default to echo if language not found # Get TTS server URL and API key from config tts_server_url = app.config['TTS_SERVER'] tts_api_key = app.config['TTS_API_KEY'] try: # Request TTS from the OpenAI Edge TTS server logger.info(f"Sending TTS request to {tts_server_url}") headers = { "Content-Type": "application/json", "Authorization": f"Bearer {tts_api_key}" } # Log request details for debugging logger.info(f"Text for TTS: {text}") logger.info(f"Selected voice: {voice}") # Proper OpenAI TTS payload payload = { "input": text, "voice": voice, "response_format": "mp3", "speed": 1.0 } logger.debug(f"Full TTS request payload: {payload}") # Dump the payload to ensure proper JSON formatting payload_json = json.dumps(payload) logger.debug(f"Serialized payload: {payload_json}") tts_response = requests.post( tts_server_url, headers=headers, json=payload, # Use json parameter to ensure proper serialization timeout=15 # Longer timeout for audio generation ) logger.info(f"TTS response status: {tts_response.status_code}") if tts_response.status_code != 200: error_msg = f'TTS request failed with status {tts_response.status_code}' logger.error(error_msg) # Try to get error details from response if possible try: error_details = tts_response.json() logger.error(f"Error details: {error_details}") error_msg = f"{error_msg}: {error_details.get('error', {}).get('message', 'Unknown error')}" except Exception as e: logger.error(f"Could not parse error response: {str(e)}") # Log the raw response content logger.error(f"Raw response: {tts_response.text[:200]}") return jsonify({'error': error_msg}), 500 # The response contains the audio data directly temp_audio_path = os.path.join(app.config['UPLOAD_FOLDER'], f'output_{int(time.time())}.mp3') with open(temp_audio_path, 'wb') as f: f.write(tts_response.content) return jsonify({ 'success': True, 'audio_url': f'/get_audio/{os.path.basename(temp_audio_path)}' }) except requests.exceptions.RequestException as e: error_msg = f'Failed to connect to TTS server: {str(e)}' logger.error(error_msg) return jsonify({'error': error_msg}), 500 except Exception as e: logger.error(f"TTS error: {str(e)}") return jsonify({'error': f'TTS failed: {str(e)}'}), 500 @app.route('/get_audio/') def get_audio(filename): try: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) return send_file(file_path, mimetype='audio/mpeg') except Exception as e: logger.error(f"Audio retrieval error: {str(e)}") return jsonify({'error': f'Audio retrieval failed: {str(e)}'}), 500 # Real-time interpreter functions def setup_realtime_interpreter(): global whisper_model logger.info("Setting up realtime interpreter...") # Create processing queues audio_queue = queue.Queue() transcription_queue = queue.Queue() translation_queue = queue.Queue() # Audio processing thread function def process_audio_chunks(): while True: try: audio_data, source_lang, session_id = audio_queue.get(timeout=1) if audio_data is None: # Poison pill to stop thread break # Log received audio chunk size logger.info(f"Processing audio chunk: {len(audio_data)} bytes for session {session_id}") # Save audio chunk to a temporary file temp_path = os.path.join(app.config['UPLOAD_FOLDER'], f'chunk_{session_id}_{int(time.time())}.wav') with open(temp_path, 'wb') as f: f.write(audio_data) logger.info(f"Saved audio chunk to {temp_path}, starting transcription") # Use whisper to transcribe try: result = whisper_model.transcribe( temp_path, language=LANGUAGE_TO_CODE.get(source_lang, None) ) # Log the result length to see if we got anything text_result = result["text"].strip() logger.info(f"Transcription result: {len(text_result)} characters") # If we got a meaningful result, send it for translation if text_result: logger.info(f"Transcription: '{text_result}'") transcription_queue.put((text_result, source_lang, session_id)) # Emit transcription result to client socketio.emit('transcription_result', { 'text': text_result, 'session_id': session_id }) else: logger.info("No transcription text detected in the audio") except Exception as e: logger.error(f"Error transcribing audio chunk: {str(e)}") # Clean up temp file if os.path.exists(temp_path): os.remove(temp_path) audio_queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"Error in audio processing thread: {str(e)}") # Translation processing thread function def process_transcriptions(): while True: try: text, source_lang, session_id = transcription_queue.get(timeout=1) if text is None: # Poison pill to stop thread break # Get the target language from the active sessions target_lang = active_sessions.get(session_id, {}).get('target_lang', 'English') # Create a prompt for Gemma 3 translation prompt = f""" Translate the following text from {source_lang} to {target_lang}: "{text}" Provide only the translation without any additional text. """ # Use Ollama to interact with Gemma 3 try: response = ollama.chat( model="gemma3:27b", messages=[ { "role": "user", "content": prompt } ] ) translated_text = response['message']['content'].strip() translation_queue.put((translated_text, target_lang, session_id)) # Emit translation result to client socketio.emit('translation_result', { 'text': translated_text, 'session_id': session_id }) except Exception as e: logger.error(f"Error translating text: {str(e)}") transcription_queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"Error in translation thread: {str(e)}") # TTS processing thread function def process_translations(): while True: try: text, language, session_id = translation_queue.get(timeout=1) if text is None: # Poison pill to stop thread break voice = LANGUAGE_TO_VOICE.get(language, 'echo') # Get TTS server URL and API key from config tts_server_url = app.config['TTS_SERVER'] tts_api_key = app.config['TTS_API_KEY'] # Request TTS from the OpenAI Edge TTS server try: headers = { "Content-Type": "application/json", "Authorization": f"Bearer {tts_api_key}" } # Proper OpenAI TTS payload payload = { "input": text, "voice": voice, "response_format": "mp3", "speed": 1.0 } tts_response = requests.post( tts_server_url, headers=headers, json=payload, timeout=15 ) if tts_response.status_code == 200: # Save audio temporarily temp_audio_path = os.path.join(app.config['UPLOAD_FOLDER'], f'output_{session_id}_{int(time.time())}.mp3') with open(temp_audio_path, 'wb') as f: f.write(tts_response.content) # Emit audio URL to client audio_url = f'/get_audio/{os.path.basename(temp_audio_path)}' socketio.emit('audio_ready', { 'audio_url': audio_url, 'session_id': session_id }) except Exception as e: logger.error(f"Error generating TTS: {str(e)}") translation_queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"Error in TTS thread: {str(e)}") # Start worker threads threading.Thread(target=process_audio_chunks, daemon=True).start() threading.Thread(target=process_transcriptions, daemon=True).start() threading.Thread(target=process_translations, daemon=True).start() logger.info("Realtime interpreter threads started") return audio_queue # Dictionary to store active interpretation sessions active_sessions = {} # Socket.IO event handlers @socketio.on('connect') def handle_connect(): logger.info(f"Client connected: {request.sid}") @socketio.on('disconnect') def handle_disconnect(): logger.info(f"Client disconnected: {request.sid}") # Clean up any session data if request.sid in active_sessions: del active_sessions[request.sid] @socketio.on('start_interpreter_session') def handle_start_session(data): source_lang = data.get('source_lang', 'English') target_lang = data.get('target_lang', 'Spanish') # Store session info active_sessions[request.sid] = { 'source_lang': source_lang, 'target_lang': target_lang, 'start_time': time.time() } logger.info(f"Started interpreter session {request.sid}: {source_lang} -> {target_lang}") emit('session_started', {'session_id': request.sid}) @socketio.on('end_interpreter_session') def handle_end_session(): if request.sid in active_sessions: logger.info(f"Ended interpreter session {request.sid}") del active_sessions[request.sid] emit('session_ended') @socketio.on('audio_chunk') def handle_audio_chunk(data): if request.sid not in active_sessions: emit('error', {'message': 'No active session'}) return # Get audio data from the client audio_data = data.get('audio') if not audio_data: emit('error', {'message': 'No audio data received'}) return # Get session details source_lang = active_sessions[request.sid]['source_lang'] # Add audio chunk to processing queue audio_processing_queue.put((audio_data, source_lang, request.sid)) emit('chunk_received') # Initialize the audio processing queue audio_processing_queue = None if __name__ == '__main__': # Setup real-time interpreter audio_processing_queue = setup_realtime_interpreter() # Run with SocketIO instead of regular Flask socketio.run(app, host='0.0.0.0', port=5005, debug=True)