Abxl / app.py
Translsis's picture
Update app.py
b0881e7 verified
import gradio as gr
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import requests
import os
from datetime import datetime
import subprocess
import threading
import queue
# Lấy thông tin từ biến môi trường
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
CHAT_ID_1 = os.environ.get("CHAT_ID_1", "")
CHAT_ID_2 = os.environ.get("CHAT_ID_2", "")
# Queue để truyền log và video
log_queue = queue.Queue()
video_queue = queue.Queue()
recording_status = {"is_recording": False, "should_stop": False}
def log(message):
"""Thêm log vào queue"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_message = f"[{timestamp}] {message}"
log_queue.put(log_message)
print(log_message)
def send_video_telegram(video_path, chat_ids):
"""Gửi video đến Telegram"""
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendVideo"
results = []
for chat_id in chat_ids:
if not chat_id:
continue
try:
file_size = os.path.getsize(video_path) / (1024*1024)
log(f"📤 Đang gửi video ({file_size:.2f} MB) đến {chat_id}...")
with open(video_path, 'rb') as video:
files = {'video': video}
data = {
'chat_id': chat_id,
'caption': f'🎥 Video {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url, files=files, data=data, timeout=300)
if response.status_code == 200:
log(f"✅ Đã gửi video đến {chat_id}")
results.append(True)
else:
log(f"❌ Lỗi gửi đến {chat_id}: {response.text}")
results.append(False)
except Exception as e:
log(f"❌ Lỗi: {e}")
results.append(False)
return all(results)
def capture_stream_with_ffmpeg(stream_url, duration, output_file):
"""Ghi stream bằng FFmpeg"""
log(f"🎬 Bắt đầu ghi stream ({duration/60} phút)")
log(f"🔗 URL: {stream_url}")
# Xác định loại stream và cấu hình FFmpeg phù hợp
command = ['ffmpeg', '-y', '-v', 'error', '-stats']
# Headers và options cho stream
if '.m3u8' in stream_url or 'hls' in stream_url.lower():
# HLS stream
log("📡 Loại: HLS stream")
command.extend([
'-headers', 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'-reconnect', '1',
'-reconnect_streamed', '1',
'-reconnect_delay_max', '5'
])
command.extend([
'-i', stream_url,
'-t', str(duration),
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-crf', '23',
'-c:a', 'aac',
'-b:a', '128k',
'-movflags', '+faststart',
'-f', 'mp4',
output_file
])
log(f"🛠️ FFmpeg command ready")
try:
# Tạo file log riêng cho FFmpeg
ffmpeg_log = f"ffmpeg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
with open(ffmpeg_log, 'w') as log_file:
process = subprocess.Popen(
command,
stdout=log_file,
stderr=subprocess.STDOUT,
universal_newlines=True
)
# Monitor tiến trình
start_time = time.time()
last_log_time = time.time()
while process.poll() is None:
if recording_status["should_stop"]:
process.terminate()
log("⏹️ Đã dừng ghi")
return False
# Log tiến trình mỗi 10 giây
current_time = time.time()
if current_time - last_log_time >= 10:
elapsed = current_time - start_time
progress = min((elapsed / duration) * 100, 100)
# Kiểm tra file size hiện tại
if os.path.exists(output_file):
current_size = os.path.getsize(output_file) / (1024*1024)
log(f"⏳ {progress:.1f}% | {elapsed/60:.1f}/{duration/60:.1f} phút | {current_size:.1f} MB")
else:
log(f"⏳ {progress:.1f}% | {elapsed/60:.1f}/{duration/60:.1f} phút")
last_log_time = current_time
time.sleep(1)
process.wait()
# Đọc log FFmpeg nếu có lỗi
if process.returncode != 0:
log(f"❌ FFmpeg error (code: {process.returncode})")
try:
with open(ffmpeg_log, 'r') as f:
error_lines = f.readlines()
# Log 5 dòng cuối
for line in error_lines[-5:]:
log(f"FFmpeg: {line.strip()}")
except:
pass
return False
# Kiểm tra file output
if os.path.exists(output_file):
file_size = os.path.getsize(output_file)
if file_size > 10240: # Ít nhất 10KB
log(f"✅ Hoàn thành: {file_size/(1024*1024):.2f} MB")
# Xóa log file nếu thành công
try:
os.remove(ffmpeg_log)
except:
pass
return True
else:
log(f"❌ File quá nhỏ: {file_size} bytes")
return False
else:
log(f"❌ Không tìm thấy file output")
return False
except Exception as e:
log(f"❌ Lỗi: {e}")
import traceback
log(f"📋 {traceback.format_exc()[:500]}")
return False
def get_stream_url(driver):
"""Lấy URL stream từ trang"""
try:
log("🔍 Đang tìm stream URL...")
# Đợi video load
time.sleep(10)
# Phương pháp 1: Lấy trực tiếp từ video element
log("🔍 Kiểm tra video element...")
try:
wait = WebDriverWait(driver, 15)
video_element = wait.until(
EC.presence_of_element_located((By.TAG_NAME, "video"))
)
# Lấy tất cả thuộc tính của video
video_info = driver.execute_script("""
var video = arguments[0];
return {
src: video.src || '',
currentSrc: video.currentSrc || '',
networkState: video.networkState,
readyState: video.readyState,
duration: video.duration,
paused: video.paused
};
""", video_element)
log(f"📊 Video info: readyState={video_info['readyState']}, paused={video_info['paused']}")
# Kiểm tra src
for src_key in ['currentSrc', 'src']:
url = video_info.get(src_key, '')
if url and not url.startswith('blob:') and url.startswith('http'):
log(f"✅ Stream từ video.{src_key}")
log(f"📺 {url}")
return url
# Nếu là blob URL, cần tìm URL gốc
if video_info.get('currentSrc', '').startswith('blob:'):
log("⚠️ Video dùng blob URL - cần tìm stream gốc")
except Exception as e:
log(f"⚠️ Video element: {str(e)[:100]}")
# Phương pháp 2: Tìm HLS/DASH manifest trong network
log("🔍 Tìm manifest trong network...")
try:
logs = driver.get_log('performance')
# Danh sách pattern tìm kiếm
video_patterns = [
'.m3u8', # HLS
'.mpd', # DASH
'/hls/',
'/dash/',
'manifest',
'playlist',
'master.m3u8',
'index.m3u8',
'/live/',
'/stream/'
]
# Extensions không hợp lệ
invalid_exts = ['.js', '.css', '.json', '.html', '.woff', '.ttf', '.svg',
'.png', '.jpg', '.jpeg', '.gif', '.ico', '.xml', '.txt', '.webp']
stream_urls = {
'hls': [],
'dash': [],
'live': []
}
for entry in logs:
import json
try:
log_entry = json.loads(entry['message'])['message']
if log_entry['method'] == 'Network.requestWillBeSent':
url = log_entry['params']['request']['url']
# Skip invalid
if not url.startswith('http'):
continue
if any(url.endswith(ext) for ext in invalid_exts):
continue
if 'google' in url or 'facebook' in url or 'analytics' in url:
continue
# Tìm HLS
if '.m3u8' in url:
stream_urls['hls'].append(url)
# Tìm DASH
elif '.mpd' in url:
stream_urls['dash'].append(url)
# Tìm live stream URLs
elif any(pattern in url.lower() for pattern in ['/hls/', '/dash/', '/live/', '/stream/', 'manifest', 'playlist']):
if not any(url.endswith(ext) for ext in invalid_exts):
stream_urls['live'].append(url)
except:
continue
# Log kết quả
if stream_urls['hls']:
log(f"🎯 Tìm thấy {len(stream_urls['hls'])} HLS stream(s)")
url = stream_urls['hls'][0]
log(f"✅ Chọn HLS stream")
log(f"📺 {url}")
return url
if stream_urls['dash']:
log(f"🎯 Tìm thấy {len(stream_urls['dash'])} DASH stream(s)")
url = stream_urls['dash'][0]
log(f"✅ Chọn DASH stream")
log(f"📺 {url}")
return url
if stream_urls['live']:
log(f"🎯 Tìm thấy {len(stream_urls['live'])} live stream(s)")
for url in stream_urls['live']:
log(f" - {url[:100]}...")
# Chọn URL có vẻ đúng nhất
for url in stream_urls['live']:
if any(x in url.lower() for x in ['m3u8', 'mpd', 'manifest', 'playlist', 'master']):
log(f"✅ Chọn live stream")
log(f"📺 {url}")
return url
except Exception as e:
log(f"⚠️ Network analysis: {str(e)[:100]}")
# Phương pháp 3: Screenshot + screen recording
log("💡 Không tìm thấy stream URL trực tiếp")
log("🎥 Sẽ thử ghi màn hình video player...")
return "SCREEN_RECORD"
except Exception as e:
log(f"❌ Lỗi: {e}")
return None
def start_recording(url, duration_minutes, auto_send):
"""Bắt đầu quay video"""
recording_status["is_recording"] = True
recording_status["should_stop"] = False
log("🚀 Khởi động Chrome headless...")
chrome_options = Options()
chrome_options.add_argument('--headless=new')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
driver = None
try:
driver = webdriver.Chrome(options=chrome_options)
log(f"🌐 Truy cập: {url}")
driver.get(url)
log("⏳ Đợi trang load...")
time.sleep(10)
# Click player
try:
wait = WebDriverWait(driver, 15)
player = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".player-controls-layers__layer--toggle"))
)
driver.execute_script("arguments[0].click();", player)
log("✅ Player đã sẵn sàng")
time.sleep(3)
except:
pass
# Lấy stream URL
stream_url = get_stream_url(driver)
if not stream_url:
log("❌ Không lấy được stream URL")
return None
# Ghi video
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
video_filename = f"recording_{timestamp}.mp4"
duration_seconds = duration_minutes * 60
success = capture_stream_with_ffmpeg(stream_url, duration_seconds, video_filename)
if success and os.path.exists(video_filename):
file_size = os.path.getsize(video_filename) / (1024*1024)
log(f"✅ Video hoàn thành: {file_size:.2f} MB")
# Thêm video vào queue để hiển thị
video_queue.put(video_filename)
# Gửi tự động nếu được chọn
if auto_send:
chat_ids = [CHAT_ID_1, CHAT_ID_2]
chat_ids = [cid for cid in chat_ids if cid]
if chat_ids:
send_video_telegram(video_filename, chat_ids)
else:
log("⚠️ Không có Chat ID trong biến môi trường")
return video_filename
else:
log("❌ Ghi video thất bại")
return None
except Exception as e:
log(f"❌ Lỗi: {e}")
return None
finally:
if driver:
driver.quit()
recording_status["is_recording"] = False
log("🏁 Hoàn tất")
def stop_recording():
"""Dừng quá trình ghi"""
recording_status["should_stop"] = True
log("⏹️ Đang dừng...")
return "Đã gửi lệnh dừng"
def record_video_thread(url, duration, auto_send):
"""Thread để ghi video"""
start_recording(url, duration, auto_send)
def gradio_start_recording(url, duration, auto_send):
"""Bắt đầu ghi từ Gradio"""
if recording_status["is_recording"]:
return "⚠️ Đang ghi video, vui lòng đợi hoặc dừng trước"
if not TELEGRAM_BOT_TOKEN:
return "❌ Chưa cấu hình TELEGRAM_BOT_TOKEN trong biến môi trường"
if not url:
return "❌ Vui lòng nhập URL"
# Chạy trong thread riêng
thread = threading.Thread(target=record_video_thread, args=(url, duration, auto_send))
thread.daemon = True
thread.start()
return "✅ Đã bắt đầu ghi video"
def get_logs():
"""Lấy logs mới nhất"""
logs = []
while not log_queue.empty():
try:
logs.append(log_queue.get_nowait())
except:
break
return "\n".join(logs) if logs else ""
def get_latest_video():
"""Lấy video mới nhất"""
try:
return video_queue.get_nowait()
except:
return None
def send_video_manual(video_path):
"""Gửi video thủ công"""
if not video_path or not os.path.exists(video_path):
return "❌ Không có video để gửi"
chat_ids = [CHAT_ID_1, CHAT_ID_2]
chat_ids = [cid for cid in chat_ids if cid]
if not chat_ids:
return "❌ Không có Chat ID trong biến môi trường"
log("📤 Gửi video thủ công...")
success = send_video_telegram(video_path, chat_ids)
return "✅ Đã gửi video" if success else "❌ Gửi thất bại"
# Tạo Gradio Interface
with gr.Blocks(title="Video Recorder & Telegram Sender") as app:
gr.Markdown("""
# 🎥 Video Recorder & Telegram Sender
### Ghi video stream và gửi qua Telegram
""")
with gr.Tabs():
# Tab 1: Ghi video
with gr.Tab("📹 Ghi Video"):
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="🌐 URL Stream",
placeholder="https://xhamsterlive.com/rose-u",
value="https://xhamsterlive.com/rose-u"
)
duration_input = gr.Slider(
minimum=1,
maximum=120,
value=30,
step=1,
label="⏱️ Thời lượng (phút)"
)
auto_send_checkbox = gr.Checkbox(
label="📤 Tự động gửi qua Telegram sau khi ghi xong",
value=True
)
with gr.Row():
start_btn = gr.Button("▶️ Bắt đầu ghi", variant="primary", size="lg")
stop_btn = gr.Button("⏹️ Dừng", variant="stop", size="lg")
status_output = gr.Textbox(label="📊 Trạng thái", interactive=False)
# Log output với auto-refresh
log_output = gr.Textbox(
label="📝 Nhật ký hoạt động",
lines=15,
max_lines=20,
interactive=False
)
start_btn.click(
fn=gradio_start_recording,
inputs=[url_input, duration_input, auto_send_checkbox],
outputs=status_output
)
stop_btn.click(
fn=stop_recording,
outputs=status_output
)
# Tab 2: Xem & Gửi video
with gr.Tab("🎬 Xem Video"):
video_output = gr.Video(label="📹 Video đã ghi")
with gr.Row():
refresh_btn = gr.Button("🔄 Làm mới", size="lg")
send_btn = gr.Button("📤 Gửi qua Telegram", variant="primary", size="lg")
send_status = gr.Textbox(label="📊 Trạng thái gửi", interactive=False)
current_video = gr.State(value=None)
def refresh_video():
latest = get_latest_video()
if latest:
return latest, latest
return None, None
refresh_btn.click(
fn=refresh_video,
outputs=[video_output, current_video]
)
send_btn.click(
fn=send_video_manual,
inputs=current_video,
outputs=send_status
)
# Tab 3: Cài đặt
with gr.Tab("⚙️ Cấu hình"):
gr.Markdown("""
### 🔐 Biến môi trường (Secrets)
Để sử dụng, cần cấu hình các biến môi trường sau trong Hugging Face Space:
1. **TELEGRAM_BOT_TOKEN**: Token của bot Telegram
2. **CHAT_ID_1**: ID người nhận thứ nhất
3. **CHAT_ID_2**: ID người nhận thứ hai
---
### 📦 Packages cần thiết (requirements.txt):
```
selenium==4.15.2
requests==2.31.0
gradio==4.44.0
```
### 🔧 Packages hệ thống (packages.txt):
```
chromium
chromium-driver
ffmpeg
```
### 🚀 Khởi động:
Thêm vào file `README.md` hoặc `app.py`:
```python
import os
os.system("export PATH=$PATH:/usr/lib/chromium-browser/")
```
""")
# Hiển thị cấu hình hiện tại (ẩn token)
with gr.Row():
bot_status = gr.Textbox(
label="Bot Token",
value="✅ Đã cấu hình" if TELEGRAM_BOT_TOKEN else "❌ Chưa cấu hình",
interactive=False
)
chat1_status = gr.Textbox(
label="Chat ID 1",
value="✅ Đã cấu hình" if CHAT_ID_1 else "❌ Chưa cấu hình",
interactive=False
)
chat2_status = gr.Textbox(
label="Chat ID 2",
value="✅ Đã cấu hình" if CHAT_ID_2 else "❌ Chưa cấu hình",
interactive=False
)
if __name__ == "__main__":
# Tạo timer để cập nhật logs tự động
def update_logs_periodically():
while True:
time.sleep(2)
get_logs()
# Chạy timer trong background
log_thread = threading.Thread(target=update_logs_periodically, daemon=True)
log_thread.start()
app.queue()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)