| | import os
|
| | import time
|
| | import uuid
|
| | import json
|
| | import requests
|
| | import subprocess
|
| | import asyncio
|
| | import threading
|
| | import hashlib
|
| | import re
|
| | from datetime import datetime, timedelta
|
| | from typing import Optional, Dict, List, Tuple
|
| | from dataclasses import dataclass, asdict
|
| | from concurrent.futures import ThreadPoolExecutor
|
| | import sqlite3
|
| | from contextlib import contextmanager
|
| | from dotenv import load_dotenv
|
| | from azure.storage.blob import BlobServiceClient
|
| | import tempfile
|
| | import shutil
|
| |
|
| |
|
| | load_dotenv()
|
| |
|
| | def _require_env_var(varname):
|
| | value = os.environ.get(varname)
|
| | if not value or value.strip() == "" or "your" in value.lower():
|
| | raise ValueError(f"Environment variable {varname} is missing or invalid. Check your .env file.")
|
| | return value
|
| |
|
| |
|
| | AZURE_SPEECH_KEY = _require_env_var("AZURE_SPEECH_KEY")
|
| | AZURE_SPEECH_KEY_ENDPOINT = _require_env_var("AZURE_SPEECH_KEY_ENDPOINT").rstrip('/')
|
| | AZURE_REGION = _require_env_var("AZURE_REGION")
|
| | AZURE_BLOB_CONNECTION = _require_env_var("AZURE_BLOB_CONNECTION")
|
| | AZURE_CONTAINER = _require_env_var("AZURE_CONTAINER")
|
| | AZURE_BLOB_SAS_TOKEN = _require_env_var("AZURE_BLOB_SAS_TOKEN")
|
| | ALLOWED_LANGS = json.loads(os.environ.get("ALLOWED_LANGS", "{}"))
|
| | API_VERSION = os.environ.get("API_VERSION", "v3.2")
|
| |
|
| |
|
| | AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT", "")
|
| | AZURE_OPENAI_KEY = os.environ.get("AZURE_OPENAI_KEY", "")
|
| | AZURE_OPENAI_DEPLOYMENT = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")
|
| |
|
| |
|
| | TRANSCRIPTS_CONTAINER = AZURE_CONTAINER
|
| | AI_SUMMARIES_CONTAINER = os.environ.get("AI_SUMMARIES_CONTAINER", f"{AZURE_CONTAINER}-summaries")
|
| | CHAT_RESPONSES_CONTAINER = os.environ.get("CHAT_RESPONSES_CONTAINER", f"{AZURE_CONTAINER}-chats")
|
| |
|
| |
|
| | UPLOAD_DIR = "uploads"
|
| | DB_DIR = "database"
|
| | os.makedirs(UPLOAD_DIR, exist_ok=True)
|
| | os.makedirs(DB_DIR, exist_ok=True)
|
| |
|
| | AUDIO_FORMATS = [
|
| | "wav", "mp3", "ogg", "opus", "flac", "wma", "aac", "alaw", "mulaw", "amr", "webm", "speex"
|
| | ]
|
| |
|
| | @dataclass
|
| | class User:
|
| | user_id: str
|
| | email: str
|
| | username: str
|
| | password_hash: str
|
| | created_at: str
|
| | last_login: Optional[str] = None
|
| | is_active: bool = True
|
| | gdpr_consent: bool = False
|
| | data_retention_agreed: bool = False
|
| | marketing_consent: bool = False
|
| |
|
| | @dataclass
|
| | class TranscriptionJob:
|
| | job_id: str
|
| | user_id: str
|
| | original_filename: str
|
| | audio_url: str
|
| | language: str
|
| | status: str
|
| | created_at: str
|
| | completed_at: Optional[str] = None
|
| | transcript_text: Optional[str] = None
|
| | transcript_url: Optional[str] = None
|
| | error_message: Optional[str] = None
|
| | azure_trans_id: Optional[str] = None
|
| | settings: Optional[Dict] = None
|
| |
|
| | @dataclass
|
| | class SummaryJob:
|
| | job_id: str
|
| | user_id: str
|
| | original_files: List[str]
|
| | summary_type: str
|
| | user_prompt: str
|
| | status: str
|
| | created_at: str
|
| | completed_at: Optional[str] = None
|
| | summary_text: Optional[str] = None
|
| | processed_files: Optional[Dict] = None
|
| | extracted_images: Optional[List[str]] = None
|
| | transcript_text: Optional[str] = None
|
| | error_message: Optional[str] = None
|
| | settings: Optional[Dict] = None
|
| | chat_response_url: Optional[str] = None
|
| |
|
| | class AuthManager:
|
| | """Handle user authentication and PDPA compliance"""
|
| |
|
| | @staticmethod
|
| | def hash_password(password: str) -> str:
|
| | """Hash password using SHA-256 with salt"""
|
| | salt = "azure_ai_conference_service_salt_2024"
|
| | return hashlib.sha256((password + salt).encode()).hexdigest()
|
| |
|
| | @staticmethod
|
| | def verify_password(password: str, password_hash: str) -> bool:
|
| | """Verify password against hash"""
|
| | return AuthManager.hash_password(password) == password_hash
|
| |
|
| | @staticmethod
|
| | def validate_email(email: str) -> bool:
|
| | """Validate email format"""
|
| | pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
| | return re.match(pattern, email) is not None
|
| |
|
| | @staticmethod
|
| | def validate_username(username: str) -> bool:
|
| | """Validate username format"""
|
| |
|
| | pattern = r'^[a-zA-Z0-9_]{3,30}$'
|
| | return re.match(pattern, username) is not None
|
| |
|
| | @staticmethod
|
| | def validate_password(password: str) -> Tuple[bool, str]:
|
| | """Validate password strength"""
|
| | if len(password) < 8:
|
| | return False, "Password must be at least 8 characters long"
|
| | if not re.search(r'[A-Z]', password):
|
| | return False, "Password must contain at least one uppercase letter"
|
| | if not re.search(r'[a-z]', password):
|
| | return False, "Password must contain at least one lowercase letter"
|
| | if not re.search(r'\d', password):
|
| | return False, "Password must contain at least one number"
|
| | return True, "Password is valid"
|
| |
|
| | class DatabaseManager:
|
| | def __init__(self, db_path: str = None):
|
| | self.db_path = db_path or os.path.join(DB_DIR, "ai_conference_service.db")
|
| | self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
| | self.db_blob_name = "shared/database/ai_conference_service.db"
|
| | self._lock = threading.Lock()
|
| | self._last_backup_time = 0
|
| | self._backup_interval = 30
|
| |
|
| |
|
| | self.init_database()
|
| |
|
| | def _download_db_from_blob(self):
|
| | """Download database from Azure Blob Storage if it exists"""
|
| | try:
|
| | blob_client = self.blob_service.get_blob_client(container=TRANSCRIPTS_CONTAINER, blob=self.db_blob_name)
|
| |
|
| |
|
| | if blob_client.exists():
|
| | print("📥 Downloading existing shared database from Azure Blob Storage...")
|
| |
|
| |
|
| | with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
| | temp_path = temp_file.name
|
| |
|
| |
|
| | with open(temp_path, "wb") as download_file:
|
| | download_file.write(blob_client.download_blob().readall())
|
| |
|
| |
|
| | os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
| | shutil.move(temp_path, self.db_path)
|
| |
|
| | print("✅ Shared database downloaded successfully")
|
| | return True
|
| | else:
|
| | print("🔍 No existing shared database found in blob storage, will create new one")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | print(f"⚠️ Warning: Could not download shared database from blob storage: {e}")
|
| | print("🔍 Will create new local database")
|
| | return False
|
| |
|
| | def _upload_db_to_blob(self):
|
| | """Upload database to Azure Blob Storage with rate limiting"""
|
| | try:
|
| | current_time = time.time()
|
| | if current_time - self._last_backup_time < self._backup_interval:
|
| | return
|
| |
|
| | if not os.path.exists(self.db_path):
|
| | return
|
| |
|
| | blob_client = self.blob_service.get_blob_client(container=TRANSCRIPTS_CONTAINER, blob=self.db_blob_name)
|
| |
|
| | with open(self.db_path, "rb") as data:
|
| | blob_client.upload_blob(data, overwrite=True)
|
| |
|
| | self._last_backup_time = current_time
|
| |
|
| | except Exception as e:
|
| | print(f"⚠️ Warning: Could not upload shared database to blob storage: {e}")
|
| |
|
| | def _store_chat_response(self, job_id: str, response_content: str, user_id: str) -> str:
|
| | """Store AI chat response in dedicated blob container"""
|
| | try:
|
| |
|
| | chat_blob_name = f"users/{user_id}/chats/summary_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
| |
|
| |
|
| | temp_path = os.path.join(tempfile.gettempdir(), f"chat_response_{job_id}.txt")
|
| | with open(temp_path, "w", encoding="utf-8") as f:
|
| | f.write(response_content)
|
| |
|
| |
|
| | chat_blob_client = self.blob_service.get_blob_client(
|
| | container=CHAT_RESPONSES_CONTAINER,
|
| | blob=chat_blob_name
|
| | )
|
| |
|
| | with open(temp_path, "rb") as data:
|
| | chat_blob_client.upload_blob(data, overwrite=True)
|
| |
|
| |
|
| | os.remove(temp_path)
|
| |
|
| |
|
| | sas = AZURE_BLOB_SAS_TOKEN.lstrip("?")
|
| | chat_url = f"{chat_blob_client.url}?{sas}"
|
| |
|
| | print(f"💬 Chat response stored for user {user_id[:8]}...")
|
| | return chat_url
|
| |
|
| | except Exception as e:
|
| | print(f"⚠️ Error storing chat response: {e}")
|
| | return ""
|
| |
|
| | @contextmanager
|
| | def get_connection(self):
|
| | with self._lock:
|
| | conn = sqlite3.connect(self.db_path, timeout=30.0)
|
| | conn.row_factory = sqlite3.Row
|
| | try:
|
| | yield conn
|
| | finally:
|
| | conn.close()
|
| |
|
| | threading.Thread(target=self._upload_db_to_blob, daemon=True).start()
|
| |
|
| | def init_database(self):
|
| |
|
| | self._download_db_from_blob()
|
| |
|
| |
|
| | with self.get_connection() as conn:
|
| |
|
| | conn.execute("""
|
| | CREATE TABLE IF NOT EXISTS users (
|
| | user_id TEXT PRIMARY KEY,
|
| | email TEXT UNIQUE NOT NULL,
|
| | username TEXT UNIQUE NOT NULL,
|
| | password_hash TEXT NOT NULL,
|
| | created_at TEXT NOT NULL,
|
| | last_login TEXT,
|
| | is_active BOOLEAN DEFAULT 1,
|
| | gdpr_consent BOOLEAN DEFAULT 0,
|
| | data_retention_agreed BOOLEAN DEFAULT 0,
|
| | marketing_consent BOOLEAN DEFAULT 0
|
| | )
|
| | """)
|
| |
|
| |
|
| | conn.execute("""
|
| | CREATE TABLE IF NOT EXISTS transcriptions (
|
| | job_id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | original_filename TEXT NOT NULL,
|
| | audio_url TEXT,
|
| | language TEXT NOT NULL,
|
| | status TEXT NOT NULL,
|
| | created_at TEXT NOT NULL,
|
| | completed_at TEXT,
|
| | transcript_text TEXT,
|
| | transcript_url TEXT,
|
| | error_message TEXT,
|
| | azure_trans_id TEXT,
|
| | settings TEXT,
|
| | file_size INTEGER DEFAULT 0,
|
| | processing_duration REAL DEFAULT 0.0,
|
| | FOREIGN KEY (user_id) REFERENCES users (user_id)
|
| | )
|
| | """)
|
| |
|
| |
|
| | conn.execute("""
|
| | CREATE TABLE IF NOT EXISTS ai_summaries (
|
| | job_id TEXT PRIMARY KEY,
|
| | user_id TEXT NOT NULL,
|
| | original_files TEXT NOT NULL,
|
| | summary_type TEXT NOT NULL,
|
| | user_prompt TEXT NOT NULL,
|
| | status TEXT NOT NULL,
|
| | created_at TEXT NOT NULL,
|
| | completed_at TEXT,
|
| | summary_text TEXT,
|
| | processed_files TEXT,
|
| | extracted_images TEXT,
|
| | transcript_text TEXT,
|
| | error_message TEXT,
|
| | settings TEXT,
|
| | chat_response_url TEXT,
|
| | input_token_count INTEGER DEFAULT 0,
|
| | output_token_count INTEGER DEFAULT 0,
|
| | processing_duration REAL DEFAULT 0.0,
|
| | FOREIGN KEY (user_id) REFERENCES users (user_id)
|
| | )
|
| | """)
|
| |
|
| |
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_id ON transcriptions(user_id)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_status ON transcriptions(status)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_created_at ON transcriptions(created_at DESC)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_created ON transcriptions(user_id, created_at DESC)")
|
| |
|
| |
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_user_id ON ai_summaries(user_id)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_status ON ai_summaries(status)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_created_at ON ai_summaries(created_at DESC)")
|
| | conn.execute("CREATE INDEX IF NOT EXISTS idx_ai_summaries_user_created ON ai_summaries(user_id, created_at DESC)")
|
| |
|
| | conn.commit()
|
| | print("✅ Enhanced database schema initialized (transcriptions + AI summaries)")
|
| |
|
| |
|
| | def create_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
| | data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
| | """Create new user account"""
|
| | try:
|
| |
|
| | if not AuthManager.validate_email(email):
|
| | return False, "Invalid email format", None
|
| |
|
| | if not AuthManager.validate_username(username):
|
| | return False, "Username must be 3-30 characters, alphanumeric and underscore only", None
|
| |
|
| | is_valid, message = AuthManager.validate_password(password)
|
| | if not is_valid:
|
| | return False, message, None
|
| |
|
| | if not gdpr_consent:
|
| | return False, "GDPR consent is required to create an account", None
|
| |
|
| | if not data_retention_agreed:
|
| | return False, "Data retention agreement is required", None
|
| |
|
| | user_id = str(uuid.uuid4())
|
| | password_hash = AuthManager.hash_password(password)
|
| |
|
| | with self.get_connection() as conn:
|
| |
|
| | existing = conn.execute(
|
| | "SELECT email, username FROM users WHERE email = ? OR username = ?",
|
| | (email, username)
|
| | ).fetchone()
|
| |
|
| | if existing:
|
| | if existing['email'] == email:
|
| | return False, "Email already registered", None
|
| | else:
|
| | return False, "Username already taken", None
|
| |
|
| |
|
| | user = User(
|
| | user_id=user_id,
|
| | email=email,
|
| | username=username,
|
| | password_hash=password_hash,
|
| | created_at=datetime.now().isoformat(),
|
| | gdpr_consent=gdpr_consent,
|
| | data_retention_agreed=data_retention_agreed,
|
| | marketing_consent=marketing_consent
|
| | )
|
| |
|
| | conn.execute("""
|
| | INSERT INTO users
|
| | (user_id, email, username, password_hash, created_at, is_active,
|
| | gdpr_consent, data_retention_agreed, marketing_consent)
|
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| | """, (
|
| | user.user_id, user.email, user.username, user.password_hash,
|
| | user.created_at, user.is_active, user.gdpr_consent,
|
| | user.data_retention_agreed, user.marketing_consent
|
| | ))
|
| | conn.commit()
|
| |
|
| | print(f"👤 New user registered: {username} ({email})")
|
| | return True, "Account created successfully", user_id
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Error creating user: {str(e)}")
|
| | return False, f"Registration failed: {str(e)}", None
|
| |
|
| | def authenticate_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
| | """Authenticate user by email or username"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| |
|
| | user_row = conn.execute("""
|
| | SELECT * FROM users
|
| | WHERE (email = ? OR username = ?) AND is_active = 1
|
| | """, (login, login)).fetchone()
|
| |
|
| | if not user_row:
|
| | return False, "Invalid credentials", None
|
| |
|
| |
|
| | if not AuthManager.verify_password(password, user_row['password_hash']):
|
| | return False, "Invalid credentials", None
|
| |
|
| |
|
| | conn.execute(
|
| | "UPDATE users SET last_login = ? WHERE user_id = ?",
|
| | (datetime.now().isoformat(), user_row['user_id'])
|
| | )
|
| | conn.commit()
|
| |
|
| |
|
| | user = User(
|
| | user_id=user_row['user_id'],
|
| | email=user_row['email'],
|
| | username=user_row['username'],
|
| | password_hash=user_row['password_hash'],
|
| | created_at=user_row['created_at'],
|
| | last_login=datetime.now().isoformat(),
|
| | is_active=bool(user_row['is_active']),
|
| | gdpr_consent=bool(user_row['gdpr_consent']),
|
| | data_retention_agreed=bool(user_row['data_retention_agreed']),
|
| | marketing_consent=bool(user_row['marketing_consent'])
|
| | )
|
| |
|
| | print(f"🔐 User logged in: {user.username} ({user.email})")
|
| | return True, "Login successful", user
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Authentication error: {str(e)}")
|
| | return False, f"Login failed: {str(e)}", None
|
| |
|
| | def get_user_by_id(self, user_id: str) -> Optional[User]:
|
| | """Get user by ID"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| | user_row = conn.execute(
|
| | "SELECT * FROM users WHERE user_id = ? AND is_active = 1",
|
| | (user_id,)
|
| | ).fetchone()
|
| |
|
| | if user_row:
|
| | return User(
|
| | user_id=user_row['user_id'],
|
| | email=user_row['email'],
|
| | username=user_row['username'],
|
| | password_hash=user_row['password_hash'],
|
| | created_at=user_row['created_at'],
|
| | last_login=user_row['last_login'],
|
| | is_active=bool(user_row['is_active']),
|
| | gdpr_consent=bool(user_row['gdpr_consent']),
|
| | data_retention_agreed=bool(user_row['data_retention_agreed']),
|
| | marketing_consent=bool(user_row['marketing_consent'])
|
| | )
|
| | except Exception as e:
|
| | print(f"❌ Error getting user: {str(e)}")
|
| | return None
|
| |
|
| | def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
| | """Update user marketing consent"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| | conn.execute(
|
| | "UPDATE users SET marketing_consent = ? WHERE user_id = ?",
|
| | (marketing_consent, user_id)
|
| | )
|
| | conn.commit()
|
| | return True
|
| | except Exception as e:
|
| | print(f"❌ Error updating consent: {str(e)}")
|
| | return False
|
| |
|
| | def delete_user_account(self, user_id: str) -> bool:
|
| | """Delete user account and all associated data (GDPR compliance)"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| |
|
| | conn.execute("DELETE FROM transcriptions WHERE user_id = ?", (user_id,))
|
| |
|
| | conn.execute("DELETE FROM ai_summaries WHERE user_id = ?", (user_id,))
|
| |
|
| | conn.execute(
|
| | "UPDATE users SET is_active = 0, email = ?, username = ? WHERE user_id = ?",
|
| | (f"deleted_{user_id}@deleted.com", f"deleted_{user_id}", user_id)
|
| | )
|
| | conn.commit()
|
| | print(f"🗑️ Complete user account deleted: {user_id}")
|
| | return True
|
| | except Exception as e:
|
| | print(f"❌ Error deleting user account: {str(e)}")
|
| | return False
|
| |
|
| | def delete_user_summary_data(self, user_id: str) -> bool:
|
| | """Delete user's AI summary data specifically"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| | conn.execute("DELETE FROM ai_summaries WHERE user_id = ?", (user_id,))
|
| | conn.commit()
|
| | print(f"🗑️ User AI summary data deleted: {user_id}")
|
| | return True
|
| | except Exception as e:
|
| | print(f"❌ Error deleting user AI summary data: {str(e)}")
|
| | return False
|
| |
|
| |
|
| | def save_job(self, job: TranscriptionJob):
|
| | with self.get_connection() as conn:
|
| | conn.execute("""
|
| | INSERT OR REPLACE INTO transcriptions
|
| | (job_id, user_id, original_filename, audio_url, language, status,
|
| | created_at, completed_at, transcript_text, transcript_url, error_message,
|
| | azure_trans_id, settings, file_size, processing_duration)
|
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| | """, (
|
| | job.job_id, job.user_id, job.original_filename, job.audio_url,
|
| | job.language, job.status, job.created_at, job.completed_at,
|
| | job.transcript_text, job.transcript_url, job.error_message,
|
| | job.azure_trans_id, json.dumps(job.settings) if job.settings else None,
|
| | 0, 0.0
|
| | ))
|
| | conn.commit()
|
| |
|
| |
|
| | def save_summary_job(self, job: SummaryJob):
|
| | """Save or update AI summary job"""
|
| | with self.get_connection() as conn:
|
| | conn.execute("""
|
| | INSERT OR REPLACE INTO ai_summaries
|
| | (job_id, user_id, original_files, summary_type, user_prompt, status,
|
| | created_at, completed_at, summary_text, processed_files, extracted_images,
|
| | transcript_text, error_message, settings, chat_response_url,
|
| | input_token_count, output_token_count, processing_duration)
|
| | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| | """, (
|
| | job.job_id, job.user_id, json.dumps(job.original_files), job.summary_type,
|
| | job.user_prompt, job.status, job.created_at, job.completed_at,
|
| | job.summary_text, json.dumps(job.processed_files) if job.processed_files else None,
|
| | json.dumps(job.extracted_images) if job.extracted_images else None,
|
| | job.transcript_text, job.error_message,
|
| | json.dumps(job.settings) if job.settings else None,
|
| | job.chat_response_url, 0, 0, 0.0
|
| | ))
|
| | conn.commit()
|
| |
|
| | def get_summary_job(self, job_id: str) -> Optional[SummaryJob]:
|
| | """Get AI summary job by ID"""
|
| | with self.get_connection() as conn:
|
| | row = conn.execute(
|
| | "SELECT * FROM ai_summaries WHERE job_id = ?", (job_id,)
|
| | ).fetchone()
|
| | if row:
|
| | return self._row_to_summary_job(row)
|
| | return None
|
| |
|
| | def get_user_summary_jobs(self, user_id: str, limit: int = 50) -> List[SummaryJob]:
|
| | """Get AI summary jobs for a specific user"""
|
| | with self.get_connection() as conn:
|
| | rows = conn.execute("""
|
| | SELECT * FROM ai_summaries
|
| | WHERE user_id = ?
|
| | ORDER BY created_at DESC
|
| | LIMIT ?
|
| | """, (user_id, limit)).fetchall()
|
| | return [self._row_to_summary_job(row) for row in rows]
|
| |
|
| | def _row_to_summary_job(self, row) -> SummaryJob:
|
| | """Convert database row to SummaryJob object"""
|
| | return SummaryJob(
|
| | job_id=row['job_id'],
|
| | user_id=row['user_id'],
|
| | original_files=json.loads(row['original_files']) if row['original_files'] else [],
|
| | summary_type=row['summary_type'],
|
| | user_prompt=row['user_prompt'],
|
| | status=row['status'],
|
| | created_at=row['created_at'],
|
| | completed_at=row['completed_at'],
|
| | summary_text=row['summary_text'],
|
| | processed_files=json.loads(row['processed_files']) if row['processed_files'] else None,
|
| | extracted_images=json.loads(row['extracted_images']) if row['extracted_images'] else None,
|
| | transcript_text=row['transcript_text'],
|
| | error_message=row['error_message'],
|
| | settings=json.loads(row['settings']) if row['settings'] else None,
|
| | chat_response_url=row['chat_response_url']
|
| | )
|
| |
|
| | def get_job(self, job_id: str) -> Optional[TranscriptionJob]:
|
| | with self.get_connection() as conn:
|
| | row = conn.execute(
|
| | "SELECT * FROM transcriptions WHERE job_id = ?", (job_id,)
|
| | ).fetchone()
|
| | if row:
|
| | return self._row_to_job(row)
|
| | return None
|
| |
|
| | def get_user_jobs(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
| | """Get all transcription jobs for a specific user - PDPA compliant"""
|
| | with self.get_connection() as conn:
|
| | rows = conn.execute("""
|
| | SELECT * FROM transcriptions
|
| | WHERE user_id = ?
|
| | ORDER BY created_at DESC
|
| | LIMIT ?
|
| | """, (user_id, limit)).fetchall()
|
| | return [self._row_to_job(row) for row in rows]
|
| |
|
| | def get_all_jobs(self, limit: int = 100) -> List[TranscriptionJob]:
|
| | """Get all transcription jobs across all users (for admin/global view)"""
|
| | with self.get_connection() as conn:
|
| | rows = conn.execute("""
|
| | SELECT * FROM transcriptions
|
| | ORDER BY created_at DESC
|
| | LIMIT ?
|
| | """, (limit,)).fetchall()
|
| | return [self._row_to_job(row) for row in rows]
|
| |
|
| | def get_pending_jobs(self) -> List[TranscriptionJob]:
|
| | """Get pending transcription jobs across all users for background processing"""
|
| | with self.get_connection() as conn:
|
| | rows = conn.execute(
|
| | "SELECT * FROM transcriptions WHERE status IN ('pending', 'processing')"
|
| | ).fetchall()
|
| | return [self._row_to_job(row) for row in rows]
|
| |
|
| | def get_pending_summary_jobs(self) -> List[SummaryJob]:
|
| | """Get pending AI summary jobs for background processing"""
|
| | with self.get_connection() as conn:
|
| | rows = conn.execute(
|
| | "SELECT * FROM ai_summaries WHERE status IN ('pending', 'processing')"
|
| | ).fetchall()
|
| | return [self._row_to_summary_job(row) for row in rows]
|
| |
|
| | def get_user_stats(self, user_id: str) -> Dict:
|
| | """Get comprehensive statistics for a specific user (transcriptions)"""
|
| | with self.get_connection() as conn:
|
| | stats = {}
|
| |
|
| |
|
| | result = conn.execute("""
|
| | SELECT COUNT(*) FROM transcriptions WHERE user_id = ?
|
| | """, (user_id,)).fetchone()
|
| | stats['total_jobs'] = result[0] if result else 0
|
| |
|
| |
|
| | result = conn.execute("""
|
| | SELECT status, COUNT(*) FROM transcriptions
|
| | WHERE user_id = ?
|
| | GROUP BY status
|
| | """, (user_id,)).fetchall()
|
| | stats['by_status'] = {row[0]: row[1] for row in result}
|
| |
|
| |
|
| | week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
| | result = conn.execute("""
|
| | SELECT COUNT(*) FROM transcriptions
|
| | WHERE user_id = ? AND created_at >= ?
|
| | """, (user_id, week_ago)).fetchone()
|
| | stats['recent_jobs'] = result[0] if result else 0
|
| |
|
| | return stats
|
| |
|
| | def get_user_summary_stats(self, user_id: str) -> Dict:
|
| | """Get comprehensive statistics for a specific user (AI summaries)"""
|
| | with self.get_connection() as conn:
|
| | stats = {}
|
| |
|
| |
|
| | result = conn.execute("""
|
| | SELECT COUNT(*) FROM ai_summaries WHERE user_id = ?
|
| | """, (user_id,)).fetchone()
|
| | stats['total_jobs'] = result[0] if result else 0
|
| |
|
| |
|
| | result = conn.execute("""
|
| | SELECT status, COUNT(*) FROM ai_summaries
|
| | WHERE user_id = ?
|
| | GROUP BY status
|
| | """, (user_id,)).fetchall()
|
| | stats['by_status'] = {row[0]: row[1] for row in result}
|
| |
|
| |
|
| | week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
| | result = conn.execute("""
|
| | SELECT COUNT(*) FROM ai_summaries
|
| | WHERE user_id = ? AND created_at >= ?
|
| | """, (user_id, week_ago)).fetchone()
|
| | stats['recent_jobs'] = result[0] if result else 0
|
| |
|
| | return stats
|
| |
|
| | def export_user_data(self, user_id: str) -> Dict:
|
| | """Export comprehensive user data including AI summaries"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| |
|
| | user_row = conn.execute(
|
| | "SELECT * FROM users WHERE user_id = ?", (user_id,)
|
| | ).fetchone()
|
| |
|
| |
|
| | transcription_rows = conn.execute(
|
| | "SELECT * FROM transcriptions WHERE user_id = ?", (user_id,)
|
| | ).fetchall()
|
| |
|
| |
|
| | summary_rows = conn.execute(
|
| | "SELECT * FROM ai_summaries WHERE user_id = ?", (user_id,)
|
| | ).fetchall()
|
| |
|
| | export_data = {
|
| | "export_date": datetime.now().isoformat(),
|
| | "export_type": "comprehensive_ai_conference_service",
|
| | "user_info": dict(user_row) if user_row else {},
|
| | "transcriptions": [dict(row) for row in transcription_rows],
|
| | "ai_summaries": [dict(row) for row in summary_rows],
|
| | "transcription_statistics": self.get_user_stats(user_id),
|
| | "ai_summary_statistics": self.get_user_summary_stats(user_id),
|
| | "service_features": [
|
| | "speech_transcription",
|
| | "ai_summarization",
|
| | "computer_vision",
|
| | "multi_modal_analysis"
|
| | ]
|
| | }
|
| |
|
| | return export_data
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Error exporting comprehensive user data: {str(e)}")
|
| | return {}
|
| |
|
| | def _row_to_job(self, row) -> TranscriptionJob:
|
| | settings = json.loads(row['settings']) if row['settings'] else None
|
| | return TranscriptionJob(
|
| | job_id=row['job_id'],
|
| | user_id=row['user_id'],
|
| | original_filename=row['original_filename'],
|
| | audio_url=row['audio_url'],
|
| | language=row['language'],
|
| | status=row['status'],
|
| | created_at=row['created_at'],
|
| | completed_at=row['completed_at'],
|
| | transcript_text=row['transcript_text'],
|
| | transcript_url=row['transcript_url'],
|
| | error_message=row['error_message'],
|
| | azure_trans_id=row['azure_trans_id'],
|
| | settings=settings
|
| | )
|
| |
|
| | class TranscriptionManager:
|
| | def __init__(self):
|
| | self.db = DatabaseManager()
|
| | self.executor = ThreadPoolExecutor(max_workers=5)
|
| | self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
| | self._job_status_cache = {}
|
| |
|
| |
|
| | self.running = True
|
| | self.worker_thread = threading.Thread(target=self._background_worker, daemon=True)
|
| | self.worker_thread.start()
|
| |
|
| | print("✅ Enhanced Transcription Manager initialized with AI integration")
|
| |
|
| | def _log_status_change(self, job_id: str, old_status: str, new_status: str, filename: str = "", user_id: str = ""):
|
| | """Only log when status actually changes"""
|
| | cache_key = f"{job_id}_{old_status}_{new_status}"
|
| | if cache_key not in self._job_status_cache:
|
| | self._job_status_cache[cache_key] = True
|
| | user_display = f"[{user_id[:8]}...]" if user_id else ""
|
| | if filename:
|
| | print(f"🔄 {user_display} Job {job_id[:8]}... ({filename}): {old_status} → {new_status}")
|
| | else:
|
| | print(f"🔄 {user_display} Job {job_id[:8]}...: {old_status} → {new_status}")
|
| |
|
| | def _background_worker(self):
|
| | """Enhanced background worker to process both transcriptions and AI summaries"""
|
| | iteration_count = 0
|
| | while self.running:
|
| | try:
|
| |
|
| | pending_transcription_jobs = self.db.get_pending_jobs()
|
| | pending_summary_jobs = self.db.get_pending_summary_jobs()
|
| |
|
| |
|
| | if (pending_transcription_jobs or pending_summary_jobs) and iteration_count % 6 == 0:
|
| | active_transcripts = len([j for j in pending_transcription_jobs if j.status == 'processing'])
|
| | queued_transcripts = len([j for j in pending_transcription_jobs if j.status == 'pending'])
|
| | active_summaries = len([j for j in pending_summary_jobs if j.status == 'processing'])
|
| | queued_summaries = len([j for j in pending_summary_jobs if j.status == 'pending'])
|
| |
|
| | if any([active_transcripts, queued_transcripts, active_summaries, queued_summaries]):
|
| | print(f"📊 Background worker: 🎙️ {active_transcripts} transcribing, {queued_transcripts} queued | 🤖 {active_summaries} summarizing, {queued_summaries} queued")
|
| |
|
| |
|
| | for job in pending_transcription_jobs:
|
| | if job.status == 'pending':
|
| | self.executor.submit(self._process_transcription_job, job.job_id)
|
| | elif job.status == 'processing' and job.azure_trans_id:
|
| | self.executor.submit(self._check_transcription_status, job.job_id)
|
| |
|
| |
|
| |
|
| | time.sleep(10)
|
| | iteration_count += 1
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Background worker error: {e}")
|
| | time.sleep(30)
|
| |
|
| | def submit_transcription(
|
| | self,
|
| | file_bytes: bytes,
|
| | original_filename: str,
|
| | user_id: str,
|
| | language: str,
|
| | settings: Dict
|
| | ) -> str:
|
| | """Submit a new transcription job for authenticated user"""
|
| | job_id = str(uuid.uuid4())
|
| |
|
| | print(f"🚀 [{user_id[:8]}...] New transcription: {original_filename} ({len(file_bytes):,} bytes)")
|
| |
|
| |
|
| | job = TranscriptionJob(
|
| | job_id=job_id,
|
| | user_id=user_id,
|
| | original_filename=original_filename,
|
| | audio_url="",
|
| | language=language,
|
| | status="pending",
|
| | created_at=datetime.now().isoformat(),
|
| | settings=settings
|
| | )
|
| |
|
| |
|
| | self.db.save_job(job)
|
| |
|
| |
|
| | self.executor.submit(self._prepare_audio_file, job_id, file_bytes, original_filename, settings)
|
| |
|
| | return job_id
|
| |
|
| | def _prepare_audio_file(self, job_id: str, file_bytes: bytes, original_filename: str, settings: Dict):
|
| | """Prepare audio file and upload to blob storage with user-specific paths"""
|
| | try:
|
| | job = self.db.get_job(job_id)
|
| | if not job:
|
| | return
|
| |
|
| | user_id = job.user_id
|
| |
|
| |
|
| | src_ext = original_filename.split('.')[-1].lower() if '.' in original_filename else "bin"
|
| | upload_path = os.path.join(UPLOAD_DIR, f"{job_id}_original.{src_ext}")
|
| |
|
| | with open(upload_path, "wb") as f:
|
| | f.write(file_bytes)
|
| |
|
| |
|
| | audio_format = settings.get('audio_format', 'wav')
|
| |
|
| |
|
| | if src_ext == audio_format and audio_format == 'wav':
|
| |
|
| | try:
|
| | probe_cmd = [
|
| | 'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
| | '-show_streams', upload_path
|
| | ]
|
| | result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=30)
|
| |
|
| | if result.returncode == 0:
|
| | import json
|
| | probe_data = json.loads(result.stdout)
|
| | audio_stream = probe_data.get('streams', [{}])[0]
|
| |
|
| | sample_rate = int(audio_stream.get('sample_rate', 0))
|
| | channels = int(audio_stream.get('channels', 0))
|
| |
|
| |
|
| | if sample_rate == 16000 and channels == 1:
|
| | out_path = upload_path
|
| | else:
|
| | print(f"🔄 [{user_id[:8]}...] Converting {original_filename} to 16kHz mono")
|
| | out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| | self._convert_to_audio(upload_path, out_path, audio_format)
|
| | else:
|
| | out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| | self._convert_to_audio(upload_path, out_path, audio_format)
|
| |
|
| | except Exception as e:
|
| | print(f"⚠️ [{user_id[:8]}...] Audio probing failed for {original_filename}: {e}")
|
| | out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| | self._convert_to_audio(upload_path, out_path, audio_format)
|
| | else:
|
| |
|
| | print(f"🔄 [{user_id[:8]}...] Converting {original_filename}: {src_ext} → {audio_format}")
|
| | out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| |
|
| | try:
|
| | self._convert_to_audio(upload_path, out_path, audio_format)
|
| | except Exception as e:
|
| | print(f"❌ [{user_id[:8]}...] Audio conversion failed for {original_filename}: {str(e)}")
|
| | job.status = "failed"
|
| | job.error_message = f"Audio conversion failed: {str(e)}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| |
|
| |
|
| | try:
|
| | os.remove(upload_path)
|
| | except:
|
| | pass
|
| | return
|
| |
|
| |
|
| | try:
|
| |
|
| | final_audio_name = f"users/{user_id}/audio/{job_id}.{audio_format}"
|
| | audio_url = self._upload_blob(out_path, final_audio_name, TRANSCRIPTS_CONTAINER)
|
| |
|
| |
|
| | if out_path != upload_path:
|
| | orig_blob_name = f"users/{user_id}/originals/{job_id}_{original_filename}"
|
| | self._upload_blob(upload_path, orig_blob_name, TRANSCRIPTS_CONTAINER)
|
| | else:
|
| |
|
| | orig_blob_name = f"users/{user_id}/originals/{job_id}_{original_filename}"
|
| | self._upload_blob(upload_path, orig_blob_name, TRANSCRIPTS_CONTAINER)
|
| |
|
| | print(f"☁️ [{user_id[:8]}...] {original_filename} uploaded to user-specific blob storage")
|
| |
|
| |
|
| | job.audio_url = audio_url
|
| | job.status = "pending"
|
| | self.db.save_job(job)
|
| |
|
| | except Exception as e:
|
| | print(f"❌ [{user_id[:8]}...] Blob upload failed for {original_filename}: {str(e)}")
|
| | job.status = "failed"
|
| | job.error_message = f"Blob storage upload failed: {str(e)}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| |
|
| |
|
| | try:
|
| | if os.path.exists(upload_path):
|
| | os.remove(upload_path)
|
| | if out_path != upload_path and os.path.exists(out_path):
|
| | os.remove(out_path)
|
| | except Exception as e:
|
| | print(f"⚠️ [{user_id[:8]}...] Warning: Could not clean up local files for {original_filename}: {e}")
|
| |
|
| | except Exception as e:
|
| | print(f"❌ File preparation error for {original_filename}: {e}")
|
| | job = self.db.get_job(job_id)
|
| | if job:
|
| | job.status = "failed"
|
| | job.error_message = f"File preparation failed: {str(e)}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| |
|
| | def _process_transcription_job(self, job_id: str):
|
| | """Process a transcription job"""
|
| | try:
|
| | job = self.db.get_job(job_id)
|
| | if not job or job.status != 'pending' or not job.audio_url:
|
| | return
|
| |
|
| | user_id = job.user_id
|
| |
|
| | old_status = job.status
|
| |
|
| | job.status = "processing"
|
| | self.db.save_job(job)
|
| |
|
| | self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| |
|
| |
|
| | settings = job.settings or {}
|
| | azure_trans_id = self._create_transcription(
|
| | job.audio_url,
|
| | job.language,
|
| | settings.get('diarization_enabled', False),
|
| | settings.get('speakers', 2),
|
| | settings.get('profanity', 'masked'),
|
| | settings.get('punctuation', 'automatic'),
|
| | settings.get('timestamps', True),
|
| | settings.get('lexical', False),
|
| | settings.get('language_id_enabled', False),
|
| | settings.get('candidate_locales', None)
|
| | )
|
| |
|
| |
|
| | job.azure_trans_id = azure_trans_id
|
| | self.db.save_job(job)
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Transcription submission failed for job {job_id[:8]}...: {str(e)}")
|
| | job = self.db.get_job(job_id)
|
| | if job:
|
| | old_status = job.status
|
| | job.status = "failed"
|
| | job.error_message = f"Transcription submission failed: {str(e)}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| | self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| |
|
| | def _check_transcription_status(self, job_id: str):
|
| | """Check status of Azure transcription"""
|
| | try:
|
| | job = self.db.get_job(job_id)
|
| | if not job or job.status != 'processing' or not job.azure_trans_id:
|
| | return
|
| |
|
| |
|
| | url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{job.azure_trans_id}"
|
| | headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
| |
|
| | r = requests.get(url, headers=headers)
|
| | data = r.json()
|
| |
|
| | if data.get("status") == "Succeeded":
|
| |
|
| | content_url = self._get_transcription_result_url(job.azure_trans_id)
|
| | if content_url:
|
| | transcript = self._fetch_transcript(content_url)
|
| |
|
| |
|
| | transcript_blob_name = f"users/{job.user_id}/transcripts/{job_id}.txt"
|
| | transcript_path = os.path.join(UPLOAD_DIR, f"{job_id}_transcript.txt")
|
| |
|
| | with open(transcript_path, "w", encoding="utf-8") as f:
|
| | f.write(transcript)
|
| |
|
| | transcript_url = self._upload_blob(transcript_path, transcript_blob_name, TRANSCRIPTS_CONTAINER)
|
| |
|
| |
|
| | old_status = job.status
|
| | job.status = "completed"
|
| | job.transcript_text = transcript
|
| | job.transcript_url = transcript_url
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| |
|
| | self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| | print(f"✅ [{job.user_id[:8]}...] Transcription completed: {job.original_filename}")
|
| |
|
| |
|
| | try:
|
| | os.remove(transcript_path)
|
| | except:
|
| | pass
|
| |
|
| | elif data.get("status") in ("Failed", "FailedWithPartialResults"):
|
| | error_message = ""
|
| | if "properties" in data and "error" in data["properties"]:
|
| | error_message = data["properties"]["error"].get("message", "")
|
| | elif "error" in data:
|
| | error_message = data["error"].get("message", "")
|
| |
|
| | old_status = job.status
|
| | job.status = "failed"
|
| | job.error_message = f"Azure transcription failed: {error_message}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| |
|
| | self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| | print(f"❌ [{job.user_id[:8]}...] Transcription failed: {job.original_filename} - {error_message}")
|
| |
|
| | except Exception as e:
|
| | print(f"❌ Status check failed for job {job_id[:8]}...: {str(e)}")
|
| | job = self.db.get_job(job_id)
|
| | if job:
|
| | old_status = job.status
|
| | job.status = "failed"
|
| | job.error_message = f"Status check failed: {str(e)}"
|
| | job.completed_at = datetime.now().isoformat()
|
| | self.db.save_job(job)
|
| | self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| |
|
| | def get_job_status(self, job_id: str) -> Optional[TranscriptionJob]:
|
| | """Get current transcription job status"""
|
| | return self.db.get_job(job_id)
|
| |
|
| | def get_user_history(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
| | """Get user's transcription history - PDPA compliant"""
|
| | return self.db.get_user_jobs(user_id, limit)
|
| |
|
| | def get_all_history(self, limit: int = 100) -> List[TranscriptionJob]:
|
| | """Get all transcription history across all users (admin view)"""
|
| | return self.db.get_all_jobs(limit)
|
| |
|
| | def get_user_stats(self, user_id: str) -> Dict:
|
| | """Get user transcription statistics"""
|
| | return self.db.get_user_stats(user_id)
|
| |
|
| | def get_user_summary_stats(self, user_id: str) -> Dict:
|
| | """Get user AI summary statistics"""
|
| | return self.db.get_user_summary_stats(user_id)
|
| |
|
| | def download_transcript(self, job_id: str, user_id: str) -> Optional[str]:
|
| | """Download transcript content - with user verification for PDPA compliance"""
|
| | job = self.db.get_job(job_id)
|
| | if job and job.user_id == user_id and job.transcript_text:
|
| | return job.transcript_text
|
| | return None
|
| |
|
| |
|
| | def save_summary_job(self, job: SummaryJob):
|
| | """Save AI summary job to database"""
|
| | self.db.save_summary_job(job)
|
| |
|
| | def get_summary_job(self, job_id: str) -> Optional[SummaryJob]:
|
| | """Get AI summary job by ID"""
|
| | return self.db.get_summary_job(job_id)
|
| |
|
| | def get_user_summary_history(self, user_id: str, limit: int = 50) -> List[SummaryJob]:
|
| | """Get user's AI summary history"""
|
| | return self.db.get_user_summary_jobs(user_id, limit)
|
| |
|
| |
|
| | def register_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
| | data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
| | """Register new user"""
|
| | return self.db.create_user(email, username, password, gdpr_consent, data_retention_agreed, marketing_consent)
|
| |
|
| | def login_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
| | """Login user"""
|
| | return self.db.authenticate_user(login, password)
|
| |
|
| | def get_user(self, user_id: str) -> Optional[User]:
|
| | """Get user by ID"""
|
| | return self.db.get_user_by_id(user_id)
|
| |
|
| | def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
| | """Update user marketing consent"""
|
| | return self.db.update_user_consent(user_id, marketing_consent)
|
| |
|
| | def export_user_data(self, user_id: str) -> Dict:
|
| | """Export comprehensive user data including AI summaries"""
|
| | return self.db.export_user_data(user_id)
|
| |
|
| | def delete_user_account(self, user_id: str) -> bool:
|
| | """Delete user account and all data"""
|
| | return self.db.delete_user_account(user_id)
|
| |
|
| | def delete_user_summary_data(self, user_id: str) -> bool:
|
| | """Delete user's AI summary data specifically"""
|
| | return self.db.delete_user_summary_data(user_id)
|
| |
|
| |
|
| | def _convert_to_audio(self, input_path, output_path, audio_format="wav"):
|
| | """Convert audio/video file to specified audio format"""
|
| |
|
| | os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
| |
|
| | if audio_format in {"wav", "alaw", "mulaw"}:
|
| | cmd = [
|
| | "ffmpeg", "-y", "-i", input_path,
|
| | "-ar", "16000", "-ac", "1",
|
| | output_path
|
| | ]
|
| | else:
|
| | cmd = [
|
| | "ffmpeg", "-y", "-i", input_path,
|
| | output_path
|
| | ]
|
| |
|
| | try:
|
| | result = subprocess.run(
|
| | cmd,
|
| | stdout=subprocess.PIPE,
|
| | stderr=subprocess.PIPE,
|
| | timeout=300,
|
| | text=True
|
| | )
|
| |
|
| | if result.returncode != 0:
|
| | error_output = result.stderr
|
| | raise Exception(f"FFmpeg conversion failed: {error_output}")
|
| |
|
| |
|
| | if not os.path.exists(output_path):
|
| | raise Exception(f"Output file was not created: {output_path}")
|
| |
|
| | file_size = os.path.getsize(output_path)
|
| | if file_size == 0:
|
| | raise Exception(f"Output file is empty: {output_path}")
|
| |
|
| | except subprocess.TimeoutExpired:
|
| | raise Exception(f"FFmpeg conversion timed out after 5 minutes")
|
| | except Exception as e:
|
| | if "FFmpeg conversion failed" in str(e):
|
| | raise
|
| | else:
|
| | raise Exception(f"FFmpeg error: {str(e)}")
|
| |
|
| | def _upload_blob(self, local_file, blob_name, container_name=None):
|
| | """Upload file to specified blob container"""
|
| | if container_name is None:
|
| | container_name = TRANSCRIPTS_CONTAINER
|
| |
|
| | blob_client = self.blob_service.get_blob_client(container=container_name, blob=blob_name)
|
| | with open(local_file, "rb") as data:
|
| | blob_client.upload_blob(data, overwrite=True)
|
| | sas = AZURE_BLOB_SAS_TOKEN.lstrip("?")
|
| | return f"{blob_client.url}?{sas}"
|
| |
|
| | def _create_transcription(self, audio_url, language, diarization_enabled, speakers,
|
| | profanity, punctuation, timestamps, lexical,
|
| | language_id_enabled=False, candidate_locales=None):
|
| | """Create Azure Speech transcription job"""
|
| | url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions"
|
| | headers = {
|
| | "Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY,
|
| | "Content-Type": "application/json"
|
| | }
|
| |
|
| | properties = {
|
| | "profanityFilterMode": profanity,
|
| | "punctuationMode": punctuation,
|
| | "wordLevelTimestampsEnabled": timestamps,
|
| | "displayFormWordLevelTimestampsEnabled": timestamps,
|
| | "lexical": lexical
|
| | }
|
| | if diarization_enabled:
|
| | properties["diarizationEnabled"] = True
|
| | properties["diarization"] = {
|
| | "speakers": {
|
| | "minCount": 1,
|
| | "maxCount": int(speakers)
|
| | }
|
| | }
|
| | if language_id_enabled and candidate_locales:
|
| | properties["languageIdentification"] = {
|
| | "mode": "continuous",
|
| | "candidateLocales": candidate_locales
|
| | }
|
| |
|
| | properties = {k: v for k, v in properties.items() if v is not None}
|
| | body = {
|
| | "displayName": f"AI_Conference_Transcription_{uuid.uuid4()}",
|
| | "description": "Enhanced batch speech-to-text with AI integration support",
|
| | "locale": language,
|
| | "contentUrls": [audio_url],
|
| | "properties": properties,
|
| | "customProperties": {}
|
| | }
|
| | r = requests.post(url, headers=headers, json=body)
|
| | r.raise_for_status()
|
| | trans_id = r.headers["Location"].split("/")[-1].split("?")[0]
|
| | return trans_id
|
| |
|
| | def _get_transcription_result_url(self, trans_id):
|
| | """Get transcription result URL from Azure"""
|
| | url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{trans_id}"
|
| | headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
| |
|
| | r = requests.get(url, headers=headers)
|
| | data = r.json()
|
| |
|
| | if data.get("status") == "Succeeded":
|
| | files_url = None
|
| | if "links" in data and "files" in data["links"]:
|
| | files_url = data["links"]["files"]
|
| | if files_url:
|
| | r2 = requests.get(files_url, headers=headers)
|
| | file_list = r2.json().get("values", [])
|
| | for f in file_list:
|
| | if f.get("kind", "").lower() == "transcription":
|
| | return f["links"]["contentUrl"]
|
| | return None
|
| |
|
| | def _fetch_transcript(self, content_url):
|
| | """Enhanced transcript fetching with improved timestamp handling"""
|
| | r = requests.get(content_url)
|
| | try:
|
| | j = r.json()
|
| | out = []
|
| |
|
| | def get_text(phrase):
|
| | if 'nBest' in phrase and phrase['nBest']:
|
| | return phrase['nBest'][0].get('display', '') or phrase.get('display', '')
|
| | return phrase.get('display', '')
|
| |
|
| | def safe_offset(val):
|
| | try:
|
| | return int(val)
|
| | except (ValueError, TypeError):
|
| | return None
|
| |
|
| | def format_time(seconds):
|
| | """Format seconds into HH:MM:SS format"""
|
| | try:
|
| | td = timedelta(seconds=int(seconds))
|
| | hours, remainder = divmod(td.total_seconds(), 3600)
|
| | minutes, seconds = divmod(remainder, 60)
|
| | return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
|
| | except:
|
| | return "00:00:00"
|
| |
|
| |
|
| | if 'recognizedPhrases' in j:
|
| | for phrase in j['recognizedPhrases']:
|
| | speaker_id = phrase.get('speaker', 0)
|
| | text = get_text(phrase)
|
| |
|
| | if not text.strip():
|
| | continue
|
| |
|
| |
|
| | timestamp_seconds = None
|
| |
|
| |
|
| | if 'offset' in phrase and phrase['offset'] is not None:
|
| | offset_100ns = safe_offset(phrase['offset'])
|
| | if offset_100ns is not None:
|
| | timestamp_seconds = offset_100ns / 10_000_000
|
| |
|
| |
|
| | if timestamp_seconds is None and 'words' in phrase and phrase['words']:
|
| | first_word = phrase['words'][0]
|
| | if 'offset' in first_word and first_word['offset'] is not None:
|
| | offset_100ns = safe_offset(first_word['offset'])
|
| | if offset_100ns is not None:
|
| | timestamp_seconds = offset_100ns / 10_000_000
|
| |
|
| |
|
| | if timestamp_seconds is None and 'offsetInTicks' in phrase:
|
| | offset_ticks = safe_offset(phrase['offsetInTicks'])
|
| | if offset_ticks is not None:
|
| | timestamp_seconds = offset_ticks / 10_000_000
|
| |
|
| |
|
| | if timestamp_seconds is not None:
|
| | time_str = format_time(timestamp_seconds)
|
| | if 'speaker' in phrase:
|
| |
|
| | out.append(f"[{time_str}] Speaker {speaker_id}: {text}")
|
| | else:
|
| |
|
| | out.append(f"[{time_str}] {text}")
|
| | else:
|
| |
|
| | if 'speaker' in phrase:
|
| | out.append(f"Speaker {speaker_id}: {text}")
|
| | else:
|
| | out.append(text)
|
| |
|
| | if out:
|
| | return '\n\n'.join(out)
|
| |
|
| |
|
| | if 'combinedRecognizedPhrases' in j:
|
| | combined_results = []
|
| | for combined_phrase in j['combinedRecognizedPhrases']:
|
| | text = combined_phrase.get('display', '')
|
| | if text.strip():
|
| | combined_results.append(text)
|
| |
|
| | if combined_results:
|
| | return '\n\n'.join(combined_results)
|
| |
|
| |
|
| | return json.dumps(j, ensure_ascii=False, indent=2)
|
| |
|
| | except Exception as e:
|
| | return f"Unable to parse transcription result: {str(e)}\n\nRaw response: {r.text[:1000]}..."
|
| |
|
| |
|
| | transcription_manager = TranscriptionManager()
|
| |
|
| |
|
| | def allowed_file(filename):
|
| | """Check if file extension is supported"""
|
| | if not filename or filename in ["upload.unknown", ""]:
|
| | return True
|
| |
|
| | if '.' not in filename:
|
| | return True
|
| |
|
| | ext = filename.rsplit('.', 1)[1].lower()
|
| | supported_extensions = set(AUDIO_FORMATS) | {
|
| | 'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v',
|
| | 'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob',
|
| |
|
| | 'pdf', 'docx', 'doc', 'pptx', 'ppt', 'xlsx', 'xls', 'csv', 'txt', 'json',
|
| | 'jpg', 'jpeg', 'png', 'bmp', 'gif', 'tiff', 'webp'
|
| | }
|
| |
|
| | return ext in supported_extensions
|
| |
|
| | def generate_user_session():
|
| | """Generate a unique user session ID - kept for compatibility"""
|
| | return str(uuid.uuid4()) |