import gradio as gr from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time import requests import os from datetime import datetime import subprocess import threading import queue # Lấy thông tin từ biến môi trường TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") CHAT_ID_1 = os.environ.get("CHAT_ID_1", "") CHAT_ID_2 = os.environ.get("CHAT_ID_2", "") # Queue để truyền log và video log_queue = queue.Queue() video_queue = queue.Queue() recording_status = {"is_recording": False, "should_stop": False} def log(message): """Thêm log vào queue""" timestamp = datetime.now().strftime("%H:%M:%S") log_message = f"[{timestamp}] {message}" log_queue.put(log_message) print(log_message) def send_video_telegram(video_path, chat_ids): """Gửi video đến Telegram""" url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendVideo" results = [] for chat_id in chat_ids: if not chat_id: continue try: file_size = os.path.getsize(video_path) / (1024*1024) log(f"📤 Đang gửi video ({file_size:.2f} MB) đến {chat_id}...") with open(video_path, 'rb') as video: files = {'video': video} data = { 'chat_id': chat_id, 'caption': f'🎥 Video {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' } response = requests.post(url, files=files, data=data, timeout=300) if response.status_code == 200: log(f"✅ Đã gửi video đến {chat_id}") results.append(True) else: log(f"❌ Lỗi gửi đến {chat_id}: {response.text}") results.append(False) except Exception as e: log(f"❌ Lỗi: {e}") results.append(False) return all(results) def capture_stream_with_ffmpeg(stream_url, duration, output_file): """Ghi stream bằng FFmpeg""" log(f"🎬 Bắt đầu ghi stream ({duration/60} phút)") log(f"🔗 URL: {stream_url}") # Xác định loại stream và cấu hình FFmpeg phù hợp command = ['ffmpeg', '-y', '-v', 'error', '-stats'] # Headers và options cho stream if '.m3u8' in stream_url or 'hls' in stream_url.lower(): # HLS stream log("📡 Loại: HLS stream") command.extend([ '-headers', 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', '-reconnect', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', '5' ]) command.extend([ '-i', stream_url, '-t', str(duration), '-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '23', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', '-f', 'mp4', output_file ]) log(f"🛠️ FFmpeg command ready") try: # Tạo file log riêng cho FFmpeg ffmpeg_log = f"ffmpeg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" with open(ffmpeg_log, 'w') as log_file: process = subprocess.Popen( command, stdout=log_file, stderr=subprocess.STDOUT, universal_newlines=True ) # Monitor tiến trình start_time = time.time() last_log_time = time.time() while process.poll() is None: if recording_status["should_stop"]: process.terminate() log("⏹️ Đã dừng ghi") return False # Log tiến trình mỗi 10 giây current_time = time.time() if current_time - last_log_time >= 10: elapsed = current_time - start_time progress = min((elapsed / duration) * 100, 100) # Kiểm tra file size hiện tại if os.path.exists(output_file): current_size = os.path.getsize(output_file) / (1024*1024) log(f"⏳ {progress:.1f}% | {elapsed/60:.1f}/{duration/60:.1f} phút | {current_size:.1f} MB") else: log(f"⏳ {progress:.1f}% | {elapsed/60:.1f}/{duration/60:.1f} phút") last_log_time = current_time time.sleep(1) process.wait() # Đọc log FFmpeg nếu có lỗi if process.returncode != 0: log(f"❌ FFmpeg error (code: {process.returncode})") try: with open(ffmpeg_log, 'r') as f: error_lines = f.readlines() # Log 5 dòng cuối for line in error_lines[-5:]: log(f"FFmpeg: {line.strip()}") except: pass return False # Kiểm tra file output if os.path.exists(output_file): file_size = os.path.getsize(output_file) if file_size > 10240: # Ít nhất 10KB log(f"✅ Hoàn thành: {file_size/(1024*1024):.2f} MB") # Xóa log file nếu thành công try: os.remove(ffmpeg_log) except: pass return True else: log(f"❌ File quá nhỏ: {file_size} bytes") return False else: log(f"❌ Không tìm thấy file output") return False except Exception as e: log(f"❌ Lỗi: {e}") import traceback log(f"📋 {traceback.format_exc()[:500]}") return False def get_stream_url(driver): """Lấy URL stream từ trang""" try: log("🔍 Đang tìm stream URL...") # Đợi video load time.sleep(10) # Phương pháp 1: Lấy trực tiếp từ video element log("🔍 Kiểm tra video element...") try: wait = WebDriverWait(driver, 15) video_element = wait.until( EC.presence_of_element_located((By.TAG_NAME, "video")) ) # Lấy tất cả thuộc tính của video video_info = driver.execute_script(""" var video = arguments[0]; return { src: video.src || '', currentSrc: video.currentSrc || '', networkState: video.networkState, readyState: video.readyState, duration: video.duration, paused: video.paused }; """, video_element) log(f"📊 Video info: readyState={video_info['readyState']}, paused={video_info['paused']}") # Kiểm tra src for src_key in ['currentSrc', 'src']: url = video_info.get(src_key, '') if url and not url.startswith('blob:') and url.startswith('http'): log(f"✅ Stream từ video.{src_key}") log(f"📺 {url}") return url # Nếu là blob URL, cần tìm URL gốc if video_info.get('currentSrc', '').startswith('blob:'): log("⚠️ Video dùng blob URL - cần tìm stream gốc") except Exception as e: log(f"⚠️ Video element: {str(e)[:100]}") # Phương pháp 2: Tìm HLS/DASH manifest trong network log("🔍 Tìm manifest trong network...") try: logs = driver.get_log('performance') # Danh sách pattern tìm kiếm video_patterns = [ '.m3u8', # HLS '.mpd', # DASH '/hls/', '/dash/', 'manifest', 'playlist', 'master.m3u8', 'index.m3u8', '/live/', '/stream/' ] # Extensions không hợp lệ invalid_exts = ['.js', '.css', '.json', '.html', '.woff', '.ttf', '.svg', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.xml', '.txt', '.webp'] stream_urls = { 'hls': [], 'dash': [], 'live': [] } for entry in logs: import json try: log_entry = json.loads(entry['message'])['message'] if log_entry['method'] == 'Network.requestWillBeSent': url = log_entry['params']['request']['url'] # Skip invalid if not url.startswith('http'): continue if any(url.endswith(ext) for ext in invalid_exts): continue if 'google' in url or 'facebook' in url or 'analytics' in url: continue # Tìm HLS if '.m3u8' in url: stream_urls['hls'].append(url) # Tìm DASH elif '.mpd' in url: stream_urls['dash'].append(url) # Tìm live stream URLs elif any(pattern in url.lower() for pattern in ['/hls/', '/dash/', '/live/', '/stream/', 'manifest', 'playlist']): if not any(url.endswith(ext) for ext in invalid_exts): stream_urls['live'].append(url) except: continue # Log kết quả if stream_urls['hls']: log(f"🎯 Tìm thấy {len(stream_urls['hls'])} HLS stream(s)") url = stream_urls['hls'][0] log(f"✅ Chọn HLS stream") log(f"📺 {url}") return url if stream_urls['dash']: log(f"🎯 Tìm thấy {len(stream_urls['dash'])} DASH stream(s)") url = stream_urls['dash'][0] log(f"✅ Chọn DASH stream") log(f"📺 {url}") return url if stream_urls['live']: log(f"🎯 Tìm thấy {len(stream_urls['live'])} live stream(s)") for url in stream_urls['live']: log(f" - {url[:100]}...") # Chọn URL có vẻ đúng nhất for url in stream_urls['live']: if any(x in url.lower() for x in ['m3u8', 'mpd', 'manifest', 'playlist', 'master']): log(f"✅ Chọn live stream") log(f"📺 {url}") return url except Exception as e: log(f"⚠️ Network analysis: {str(e)[:100]}") # Phương pháp 3: Screenshot + screen recording log("💡 Không tìm thấy stream URL trực tiếp") log("🎥 Sẽ thử ghi màn hình video player...") return "SCREEN_RECORD" except Exception as e: log(f"❌ Lỗi: {e}") return None def start_recording(url, duration_minutes, auto_send): """Bắt đầu quay video""" recording_status["is_recording"] = True recording_status["should_stop"] = False log("🚀 Khởi động Chrome headless...") chrome_options = Options() chrome_options.add_argument('--headless=new') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'}) driver = None try: driver = webdriver.Chrome(options=chrome_options) log(f"🌐 Truy cập: {url}") driver.get(url) log("⏳ Đợi trang load...") time.sleep(10) # Click player try: wait = WebDriverWait(driver, 15) player = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, ".player-controls-layers__layer--toggle")) ) driver.execute_script("arguments[0].click();", player) log("✅ Player đã sẵn sàng") time.sleep(3) except: pass # Lấy stream URL stream_url = get_stream_url(driver) if not stream_url: log("❌ Không lấy được stream URL") return None # Ghi video timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") video_filename = f"recording_{timestamp}.mp4" duration_seconds = duration_minutes * 60 success = capture_stream_with_ffmpeg(stream_url, duration_seconds, video_filename) if success and os.path.exists(video_filename): file_size = os.path.getsize(video_filename) / (1024*1024) log(f"✅ Video hoàn thành: {file_size:.2f} MB") # Thêm video vào queue để hiển thị video_queue.put(video_filename) # Gửi tự động nếu được chọn if auto_send: chat_ids = [CHAT_ID_1, CHAT_ID_2] chat_ids = [cid for cid in chat_ids if cid] if chat_ids: send_video_telegram(video_filename, chat_ids) else: log("⚠️ Không có Chat ID trong biến môi trường") return video_filename else: log("❌ Ghi video thất bại") return None except Exception as e: log(f"❌ Lỗi: {e}") return None finally: if driver: driver.quit() recording_status["is_recording"] = False log("🏁 Hoàn tất") def stop_recording(): """Dừng quá trình ghi""" recording_status["should_stop"] = True log("⏹️ Đang dừng...") return "Đã gửi lệnh dừng" def record_video_thread(url, duration, auto_send): """Thread để ghi video""" start_recording(url, duration, auto_send) def gradio_start_recording(url, duration, auto_send): """Bắt đầu ghi từ Gradio""" if recording_status["is_recording"]: return "⚠️ Đang ghi video, vui lòng đợi hoặc dừng trước" if not TELEGRAM_BOT_TOKEN: return "❌ Chưa cấu hình TELEGRAM_BOT_TOKEN trong biến môi trường" if not url: return "❌ Vui lòng nhập URL" # Chạy trong thread riêng thread = threading.Thread(target=record_video_thread, args=(url, duration, auto_send)) thread.daemon = True thread.start() return "✅ Đã bắt đầu ghi video" def get_logs(): """Lấy logs mới nhất""" logs = [] while not log_queue.empty(): try: logs.append(log_queue.get_nowait()) except: break return "\n".join(logs) if logs else "" def get_latest_video(): """Lấy video mới nhất""" try: return video_queue.get_nowait() except: return None def send_video_manual(video_path): """Gửi video thủ công""" if not video_path or not os.path.exists(video_path): return "❌ Không có video để gửi" chat_ids = [CHAT_ID_1, CHAT_ID_2] chat_ids = [cid for cid in chat_ids if cid] if not chat_ids: return "❌ Không có Chat ID trong biến môi trường" log("📤 Gửi video thủ công...") success = send_video_telegram(video_path, chat_ids) return "✅ Đã gửi video" if success else "❌ Gửi thất bại" # Tạo Gradio Interface with gr.Blocks(title="Video Recorder & Telegram Sender") as app: gr.Markdown(""" # 🎥 Video Recorder & Telegram Sender ### Ghi video stream và gửi qua Telegram """) with gr.Tabs(): # Tab 1: Ghi video with gr.Tab("📹 Ghi Video"): with gr.Row(): with gr.Column(): url_input = gr.Textbox( label="🌐 URL Stream", placeholder="https://xhamsterlive.com/rose-u", value="https://xhamsterlive.com/rose-u" ) duration_input = gr.Slider( minimum=1, maximum=120, value=30, step=1, label="⏱️ Thời lượng (phút)" ) auto_send_checkbox = gr.Checkbox( label="📤 Tự động gửi qua Telegram sau khi ghi xong", value=True ) with gr.Row(): start_btn = gr.Button("▶️ Bắt đầu ghi", variant="primary", size="lg") stop_btn = gr.Button("⏹️ Dừng", variant="stop", size="lg") status_output = gr.Textbox(label="📊 Trạng thái", interactive=False) # Log output với auto-refresh log_output = gr.Textbox( label="📝 Nhật ký hoạt động", lines=15, max_lines=20, interactive=False ) start_btn.click( fn=gradio_start_recording, inputs=[url_input, duration_input, auto_send_checkbox], outputs=status_output ) stop_btn.click( fn=stop_recording, outputs=status_output ) # Tab 2: Xem & Gửi video with gr.Tab("🎬 Xem Video"): video_output = gr.Video(label="📹 Video đã ghi") with gr.Row(): refresh_btn = gr.Button("🔄 Làm mới", size="lg") send_btn = gr.Button("📤 Gửi qua Telegram", variant="primary", size="lg") send_status = gr.Textbox(label="📊 Trạng thái gửi", interactive=False) current_video = gr.State(value=None) def refresh_video(): latest = get_latest_video() if latest: return latest, latest return None, None refresh_btn.click( fn=refresh_video, outputs=[video_output, current_video] ) send_btn.click( fn=send_video_manual, inputs=current_video, outputs=send_status ) # Tab 3: Cài đặt with gr.Tab("⚙️ Cấu hình"): gr.Markdown(""" ### 🔐 Biến môi trường (Secrets) Để sử dụng, cần cấu hình các biến môi trường sau trong Hugging Face Space: 1. **TELEGRAM_BOT_TOKEN**: Token của bot Telegram 2. **CHAT_ID_1**: ID người nhận thứ nhất 3. **CHAT_ID_2**: ID người nhận thứ hai --- ### 📦 Packages cần thiết (requirements.txt): ``` selenium==4.15.2 requests==2.31.0 gradio==4.44.0 ``` ### 🔧 Packages hệ thống (packages.txt): ``` chromium chromium-driver ffmpeg ``` ### 🚀 Khởi động: Thêm vào file `README.md` hoặc `app.py`: ```python import os os.system("export PATH=$PATH:/usr/lib/chromium-browser/") ``` """) # Hiển thị cấu hình hiện tại (ẩn token) with gr.Row(): bot_status = gr.Textbox( label="Bot Token", value="✅ Đã cấu hình" if TELEGRAM_BOT_TOKEN else "❌ Chưa cấu hình", interactive=False ) chat1_status = gr.Textbox( label="Chat ID 1", value="✅ Đã cấu hình" if CHAT_ID_1 else "❌ Chưa cấu hình", interactive=False ) chat2_status = gr.Textbox( label="Chat ID 2", value="✅ Đã cấu hình" if CHAT_ID_2 else "❌ Chưa cấu hình", interactive=False ) if __name__ == "__main__": # Tạo timer để cập nhật logs tự động def update_logs_periodically(): while True: time.sleep(2) get_logs() # Chạy timer trong background log_thread = threading.Thread(target=update_logs_periodically, daemon=True) log_thread.start() app.queue() app.launch( server_name="0.0.0.0", server_port=7860, share=False )