File size: 6,366 Bytes
e01c07b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""URL validation and health checking utilities."""

import logging
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional, Set
from urllib.parse import urlparse

import requests

logger = logging.getLogger(__name__)


class URLValidator:
    """Validates and health-checks URLs before processing."""

    def __init__(self, max_workers: int = 10, timeout: int = 10):
        self.max_workers = max_workers
        self.timeout = timeout
        self.session = requests.Session()

        # Blocked domains that consistently fail or are problematic
        self.blocked_domains = {
            'bodyartguru.com',
            'dcassetcdn.com',
            'warvox.com',
            'jenkins-tpp.blackboard.com',
            'wrdsclassroom.wharton.upenn.edu',
        }

        # User agents for health checks
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
        ]

    def validate_urls(self, urls: List[str]) -> List[str]:
        """Validate multiple URLs concurrently."""
        if not urls:
            return []

        # First, filter out obviously bad URLs
        pre_filtered = self._pre_filter_urls(urls)

        if not pre_filtered:
            return []

        # Health check the remaining URLs
        valid_urls = self._health_check_urls(pre_filtered)

        logger.info(f"URL validation: {len(urls)} -> {len(pre_filtered)} -> {len(valid_urls)}")
        return valid_urls

    def _pre_filter_urls(self, urls: List[str]) -> List[str]:
        """Pre-filter URLs based on basic criteria."""
        filtered = []

        for url in urls:
            if not self._is_valid_url_format(url):
                continue

            if self._is_blocked_domain(url):
                continue

            if not self._has_image_extension(url):
                continue

            if len(url) > 500:  # Skip very long URLs
                continue

            filtered.append(url)

        return filtered

    def _health_check_urls(self, urls: List[str]) -> List[str]:
        """Perform HEAD requests to check URL accessibility."""
        valid_urls = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit health check tasks
            future_to_url = {
                executor.submit(self._check_single_url, url): url
                for url in urls
            }

            # Collect results
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    is_valid = future.result(timeout=self.timeout + 5)
                    if is_valid:
                        valid_urls.append(url)
                except Exception as e:
                    logger.debug(f"Health check failed for {url}: {e}")

                # Small delay to be respectful
                time.sleep(0.1)

        return valid_urls

    def _check_single_url(self, url: str) -> bool:
        """Check if a single URL is accessible."""
        try:
            headers = {
                'User-Agent': random.choice(self.user_agents),
                'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.9',
                'Connection': 'keep-alive',
                'DNT': '1',
            }

            # Add platform-specific headers
            if 'pinterest' in url.lower():
                headers.update({
                    'Referer': 'https://www.pinterest.com/',
                    'Origin': 'https://www.pinterest.com',
                })
            elif 'instagram' in url.lower():
                headers.update({
                    'Referer': 'https://www.instagram.com/',
                })
            else:
                headers['Referer'] = 'https://www.google.com/'

            response = self.session.head(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )

            # Check status code
            if response.status_code not in [200, 301, 302]:
                return False

            # Check content type if available
            content_type = response.headers.get('content-type', '').lower()
            if content_type and not content_type.startswith('image/'):
                return False

            # Check content length if available
            content_length = response.headers.get('content-length')
            if content_length:
                size = int(content_length)
                if size < 1024 or size > 10 * 1024 * 1024:  # Too small or too large
                    return False

            return True

        except Exception as e:
            logger.debug(f"URL check failed for {url}: {e}")
            return False

    def _is_valid_url_format(self, url: str) -> bool:
        """Check if URL has valid format."""
        try:
            parsed = urlparse(url)
            return all([parsed.scheme, parsed.netloc])
        except Exception:
            return False

    def _is_blocked_domain(self, url: str) -> bool:
        """Check if URL is from a blocked domain."""
        try:
            parsed = urlparse(url)
            domain = parsed.netloc.lower()
            return any(blocked in domain for blocked in self.blocked_domains)
        except Exception:
            return True  # Block malformed URLs

    def _has_image_extension(self, url: str) -> bool:
        """Check if URL appears to point to an image."""
        image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
        url_lower = url.lower()
        return any(ext in url_lower for ext in image_extensions)

    def add_blocked_domain(self, domain: str) -> None:
        """Add a domain to the blocked list."""
        self.blocked_domains.add(domain.lower())

    def remove_blocked_domain(self, domain: str) -> None:
        """Remove a domain from the blocked list."""
        self.blocked_domains.discard(domain.lower())

    def get_blocked_domains(self) -> Set[str]:
        """Get the set of blocked domains."""
        return self.blocked_domains.copy()