# Database models and configuration for Talk2Me application import os from datetime import datetime from typing import Optional, Dict, Any from flask_sqlalchemy import SQLAlchemy from sqlalchemy import Index, text from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.ext.hybrid import hybrid_property import uuid db = SQLAlchemy() class Translation(db.Model): """Store translation history for analytics and caching""" __tablename__ = 'translations' id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) session_id = db.Column(db.String(255), nullable=False, index=True) user_id = db.Column(db.String(255), nullable=True, index=True) # Translation data source_text = db.Column(db.Text, nullable=False) source_language = db.Column(db.String(10), nullable=False) target_text = db.Column(db.Text, nullable=False) target_language = db.Column(db.String(10), nullable=False) # Metadata translation_time_ms = db.Column(db.Integer, nullable=True) model_used = db.Column(db.String(50), default='gemma3:27b') confidence_score = db.Column(db.Float, nullable=True) # Timestamps created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) accessed_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) access_count = db.Column(db.Integer, default=1) # Client info ip_address = db.Column(db.String(45), nullable=True) user_agent = db.Column(db.String(500), nullable=True) # Create indexes for better query performance __table_args__ = ( Index('idx_translations_languages', 'source_language', 'target_language'), Index('idx_translations_created_at', 'created_at'), Index('idx_translations_session_user', 'session_id', 'user_id'), ) def to_dict(self) -> Dict[str, Any]: """Convert translation to dictionary""" return { 'id': str(self.id), 'session_id': self.session_id, 'user_id': self.user_id, 'source_text': self.source_text, 'source_language': self.source_language, 'target_text': self.target_text, 'target_language': self.target_language, 'translation_time_ms': self.translation_time_ms, 'model_used': self.model_used, 'confidence_score': self.confidence_score, 'created_at': self.created_at.isoformat(), 'accessed_at': self.accessed_at.isoformat(), 'access_count': self.access_count } class Transcription(db.Model): """Store transcription history""" __tablename__ = 'transcriptions' id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) session_id = db.Column(db.String(255), nullable=False, index=True) user_id = db.Column(db.String(255), nullable=True, index=True) # Transcription data transcribed_text = db.Column(db.Text, nullable=False) detected_language = db.Column(db.String(10), nullable=True) audio_duration_seconds = db.Column(db.Float, nullable=True) # Metadata transcription_time_ms = db.Column(db.Integer, nullable=True) model_used = db.Column(db.String(50), default='whisper-base') confidence_score = db.Column(db.Float, nullable=True) # File info audio_file_size = db.Column(db.Integer, nullable=True) audio_format = db.Column(db.String(10), nullable=True) # Timestamps created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) # Client info ip_address = db.Column(db.String(45), nullable=True) user_agent = db.Column(db.String(500), nullable=True) __table_args__ = ( Index('idx_transcriptions_created_at', 'created_at'), Index('idx_transcriptions_session_user', 'session_id', 'user_id'), ) def to_dict(self) -> Dict[str, Any]: """Convert transcription to dictionary""" return { 'id': str(self.id), 'session_id': self.session_id, 'user_id': self.user_id, 'transcribed_text': self.transcribed_text, 'detected_language': self.detected_language, 'audio_duration_seconds': self.audio_duration_seconds, 'transcription_time_ms': self.transcription_time_ms, 'model_used': self.model_used, 'confidence_score': self.confidence_score, 'audio_file_size': self.audio_file_size, 'audio_format': self.audio_format, 'created_at': self.created_at.isoformat() } class UserPreferences(db.Model): """Store user preferences and settings""" __tablename__ = 'user_preferences' id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) user_id = db.Column(db.String(255), nullable=False, unique=True, index=True) session_id = db.Column(db.String(255), nullable=True) # Preferences preferred_source_language = db.Column(db.String(10), nullable=True) preferred_target_language = db.Column(db.String(10), nullable=True) preferred_voice = db.Column(db.String(50), nullable=True) speech_speed = db.Column(db.Float, default=1.0) # Settings stored as JSONB for flexibility settings = db.Column(JSONB, default={}) # Usage stats total_translations = db.Column(db.Integer, default=0) total_transcriptions = db.Column(db.Integer, default=0) total_tts_requests = db.Column(db.Integer, default=0) # Timestamps created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) last_active_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) def to_dict(self) -> Dict[str, Any]: """Convert preferences to dictionary""" return { 'id': str(self.id), 'user_id': self.user_id, 'preferred_source_language': self.preferred_source_language, 'preferred_target_language': self.preferred_target_language, 'preferred_voice': self.preferred_voice, 'speech_speed': self.speech_speed, 'settings': self.settings or {}, 'total_translations': self.total_translations, 'total_transcriptions': self.total_transcriptions, 'total_tts_requests': self.total_tts_requests, 'created_at': self.created_at.isoformat(), 'updated_at': self.updated_at.isoformat(), 'last_active_at': self.last_active_at.isoformat() } class UsageAnalytics(db.Model): """Store aggregated usage analytics""" __tablename__ = 'usage_analytics' id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) # Time period date = db.Column(db.Date, nullable=False, index=True) hour = db.Column(db.Integer, nullable=True) # 0-23, null for daily aggregates # Metrics total_requests = db.Column(db.Integer, default=0) unique_sessions = db.Column(db.Integer, default=0) unique_users = db.Column(db.Integer, default=0) # Service breakdown transcriptions = db.Column(db.Integer, default=0) translations = db.Column(db.Integer, default=0) tts_requests = db.Column(db.Integer, default=0) # Performance metrics avg_transcription_time_ms = db.Column(db.Float, nullable=True) avg_translation_time_ms = db.Column(db.Float, nullable=True) avg_tts_time_ms = db.Column(db.Float, nullable=True) # Language stats (stored as JSONB) language_pairs = db.Column(JSONB, default={}) # {"en-es": 100, "fr-en": 50} detected_languages = db.Column(JSONB, default={}) # {"en": 150, "es": 100} # Error stats error_count = db.Column(db.Integer, default=0) error_details = db.Column(JSONB, default={}) __table_args__ = ( Index('idx_analytics_date_hour', 'date', 'hour'), db.UniqueConstraint('date', 'hour', name='uq_analytics_date_hour'), ) class ApiKey(db.Model): """Store API keys for authenticated access""" __tablename__ = 'api_keys' id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) key_hash = db.Column(db.String(255), nullable=False, unique=True, index=True) name = db.Column(db.String(100), nullable=False) user_id = db.Column(db.String(255), nullable=True) # Permissions and limits is_active = db.Column(db.Boolean, default=True) rate_limit_per_minute = db.Column(db.Integer, default=60) rate_limit_per_hour = db.Column(db.Integer, default=1000) allowed_endpoints = db.Column(JSONB, default=[]) # Empty = all endpoints # Usage tracking total_requests = db.Column(db.Integer, default=0) last_used_at = db.Column(db.DateTime, nullable=True) # Timestamps created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) expires_at = db.Column(db.DateTime, nullable=True) @hybrid_property def is_expired(self): """Check if API key is expired""" if self.expires_at is None: return False return datetime.utcnow() > self.expires_at def init_db(app): """Initialize database with app""" db.init_app(app) with app.app_context(): # Create tables if they don't exist db.create_all() # Create any custom indexes or functions try: # Create a function for updating updated_at timestamp db.session.execute(text(""" CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ language 'plpgsql'; """)) # Drop existing trigger if it exists and recreate it db.session.execute(text(""" DROP TRIGGER IF EXISTS update_user_preferences_updated_at ON user_preferences; """)) # Create trigger for user_preferences db.session.execute(text(""" CREATE TRIGGER update_user_preferences_updated_at BEFORE UPDATE ON user_preferences FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); """)) db.session.commit() except Exception as e: # Log error but don't fail - database might not support triggers db.session.rollback() app.logger.debug(f"Database initialization note: {e}")