Spaces:
Sleeping
Sleeping
File size: 6,366 Bytes
e01c07b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""URL validation and health checking utilities."""
import logging
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional, Set
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
class URLValidator:
"""Validates and health-checks URLs before processing."""
def __init__(self, max_workers: int = 10, timeout: int = 10):
self.max_workers = max_workers
self.timeout = timeout
self.session = requests.Session()
# Blocked domains that consistently fail or are problematic
self.blocked_domains = {
'bodyartguru.com',
'dcassetcdn.com',
'warvox.com',
'jenkins-tpp.blackboard.com',
'wrdsclassroom.wharton.upenn.edu',
}
# User agents for health checks
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
]
def validate_urls(self, urls: List[str]) -> List[str]:
"""Validate multiple URLs concurrently."""
if not urls:
return []
# First, filter out obviously bad URLs
pre_filtered = self._pre_filter_urls(urls)
if not pre_filtered:
return []
# Health check the remaining URLs
valid_urls = self._health_check_urls(pre_filtered)
logger.info(f"URL validation: {len(urls)} -> {len(pre_filtered)} -> {len(valid_urls)}")
return valid_urls
def _pre_filter_urls(self, urls: List[str]) -> List[str]:
"""Pre-filter URLs based on basic criteria."""
filtered = []
for url in urls:
if not self._is_valid_url_format(url):
continue
if self._is_blocked_domain(url):
continue
if not self._has_image_extension(url):
continue
if len(url) > 500: # Skip very long URLs
continue
filtered.append(url)
return filtered
def _health_check_urls(self, urls: List[str]) -> List[str]:
"""Perform HEAD requests to check URL accessibility."""
valid_urls = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit health check tasks
future_to_url = {
executor.submit(self._check_single_url, url): url
for url in urls
}
# Collect results
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
is_valid = future.result(timeout=self.timeout + 5)
if is_valid:
valid_urls.append(url)
except Exception as e:
logger.debug(f"Health check failed for {url}: {e}")
# Small delay to be respectful
time.sleep(0.1)
return valid_urls
def _check_single_url(self, url: str) -> bool:
"""Check if a single URL is accessible."""
try:
headers = {
'User-Agent': random.choice(self.user_agents),
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Connection': 'keep-alive',
'DNT': '1',
}
# Add platform-specific headers
if 'pinterest' in url.lower():
headers.update({
'Referer': 'https://www.pinterest.com/',
'Origin': 'https://www.pinterest.com',
})
elif 'instagram' in url.lower():
headers.update({
'Referer': 'https://www.instagram.com/',
})
else:
headers['Referer'] = 'https://www.google.com/'
response = self.session.head(
url,
headers=headers,
timeout=self.timeout,
allow_redirects=True
)
# Check status code
if response.status_code not in [200, 301, 302]:
return False
# Check content type if available
content_type = response.headers.get('content-type', '').lower()
if content_type and not content_type.startswith('image/'):
return False
# Check content length if available
content_length = response.headers.get('content-length')
if content_length:
size = int(content_length)
if size < 1024 or size > 10 * 1024 * 1024: # Too small or too large
return False
return True
except Exception as e:
logger.debug(f"URL check failed for {url}: {e}")
return False
def _is_valid_url_format(self, url: str) -> bool:
"""Check if URL has valid format."""
try:
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception:
return False
def _is_blocked_domain(self, url: str) -> bool:
"""Check if URL is from a blocked domain."""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
return any(blocked in domain for blocked in self.blocked_domains)
except Exception:
return True # Block malformed URLs
def _has_image_extension(self, url: str) -> bool:
"""Check if URL appears to point to an image."""
image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
url_lower = url.lower()
return any(ext in url_lower for ext in image_extensions)
def add_blocked_domain(self, domain: str) -> None:
"""Add a domain to the blocked list."""
self.blocked_domains.add(domain.lower())
def remove_blocked_domain(self, domain: str) -> None:
"""Remove a domain from the blocked list."""
self.blocked_domains.discard(domain.lower())
def get_blocked_domains(self) -> Set[str]:
"""Get the set of blocked domains."""
return self.blocked_domains.copy() |