tattoo_search_engine / utils /url_validator.py
onurcopur's picture
change dockerfile
e01c07b
"""URL validation and health checking utilities."""
import logging
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional, Set
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
class URLValidator:
"""Validates and health-checks URLs before processing."""
def __init__(self, max_workers: int = 10, timeout: int = 10):
self.max_workers = max_workers
self.timeout = timeout
self.session = requests.Session()
# Blocked domains that consistently fail or are problematic
self.blocked_domains = {
'bodyartguru.com',
'dcassetcdn.com',
'warvox.com',
'jenkins-tpp.blackboard.com',
'wrdsclassroom.wharton.upenn.edu',
}
# User agents for health checks
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
]
def validate_urls(self, urls: List[str]) -> List[str]:
"""Validate multiple URLs concurrently."""
if not urls:
return []
# First, filter out obviously bad URLs
pre_filtered = self._pre_filter_urls(urls)
if not pre_filtered:
return []
# Health check the remaining URLs
valid_urls = self._health_check_urls(pre_filtered)
logger.info(f"URL validation: {len(urls)} -> {len(pre_filtered)} -> {len(valid_urls)}")
return valid_urls
def _pre_filter_urls(self, urls: List[str]) -> List[str]:
"""Pre-filter URLs based on basic criteria."""
filtered = []
for url in urls:
if not self._is_valid_url_format(url):
continue
if self._is_blocked_domain(url):
continue
if not self._has_image_extension(url):
continue
if len(url) > 500: # Skip very long URLs
continue
filtered.append(url)
return filtered
def _health_check_urls(self, urls: List[str]) -> List[str]:
"""Perform HEAD requests to check URL accessibility."""
valid_urls = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit health check tasks
future_to_url = {
executor.submit(self._check_single_url, url): url
for url in urls
}
# Collect results
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
is_valid = future.result(timeout=self.timeout + 5)
if is_valid:
valid_urls.append(url)
except Exception as e:
logger.debug(f"Health check failed for {url}: {e}")
# Small delay to be respectful
time.sleep(0.1)
return valid_urls
def _check_single_url(self, url: str) -> bool:
"""Check if a single URL is accessible."""
try:
headers = {
'User-Agent': random.choice(self.user_agents),
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Connection': 'keep-alive',
'DNT': '1',
}
# Add platform-specific headers
if 'pinterest' in url.lower():
headers.update({
'Referer': 'https://www.pinterest.com/',
'Origin': 'https://www.pinterest.com',
})
elif 'instagram' in url.lower():
headers.update({
'Referer': 'https://www.instagram.com/',
})
else:
headers['Referer'] = 'https://www.google.com/'
response = self.session.head(
url,
headers=headers,
timeout=self.timeout,
allow_redirects=True
)
# Check status code
if response.status_code not in [200, 301, 302]:
return False
# Check content type if available
content_type = response.headers.get('content-type', '').lower()
if content_type and not content_type.startswith('image/'):
return False
# Check content length if available
content_length = response.headers.get('content-length')
if content_length:
size = int(content_length)
if size < 1024 or size > 10 * 1024 * 1024: # Too small or too large
return False
return True
except Exception as e:
logger.debug(f"URL check failed for {url}: {e}")
return False
def _is_valid_url_format(self, url: str) -> bool:
"""Check if URL has valid format."""
try:
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception:
return False
def _is_blocked_domain(self, url: str) -> bool:
"""Check if URL is from a blocked domain."""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
return any(blocked in domain for blocked in self.blocked_domains)
except Exception:
return True # Block malformed URLs
def _has_image_extension(self, url: str) -> bool:
"""Check if URL appears to point to an image."""
image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
url_lower = url.lower()
return any(ext in url_lower for ext in image_extensions)
def add_blocked_domain(self, domain: str) -> None:
"""Add a domain to the blocked list."""
self.blocked_domains.add(domain.lower())
def remove_blocked_domain(self, domain: str) -> None:
"""Remove a domain from the blocked list."""
self.blocked_domains.discard(domain.lower())
def get_blocked_domains(self) -> Set[str]:
"""Get the set of blocked domains."""
return self.blocked_domains.copy()