sentiment / scraper.py
Rakib Hossain
Add complete Bangla sentiment analysis: data, fine-tuned model, and visualizations
49c214c
"""
Web scraper for Bangla news articles
Multiple sources with pagination and large-scale scraping support
Enhanced for scraping 50,000+ articles
"""
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime, timedelta
from tqdm import tqdm
import os
import random
from urllib.parse import urljoin, urlparse
import json
class BanglaNewsScraper:
def __init__(self, target_count=50000):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,bn;q=0.8',
'Accept-Encoding': 'gzip, deflate', # Exclude br (Brotli) to avoid decoding issues
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
self.target_count = target_count
self.scraped_count = 0
self.session = requests.Session()
self.session.headers.update(self.headers)
# Disable automatic decompression to handle it manually if needed
self.session.stream = False
self.articles = []
self.seen_urls = set()
self.seen_texts = set() # To avoid duplicates
def safe_get(self, url, timeout=15, max_retries=2):
"""Safely get URL content, handling Brotli and other encoding issues"""
for attempt in range(max_retries):
try:
# First try with session
response = self.session.get(url, timeout=timeout, stream=False)
if response.status_code == 200:
return response
except Exception as e:
error_str = str(e).lower()
# If Brotli error, try with explicit headers
if 'brotli' in error_str or 'br' in error_str or 'encoding' in error_str:
try:
headers_no_br = {
'User-Agent': self.headers['User-Agent'],
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,bn;q=0.8',
'Accept-Encoding': 'gzip, deflate', # Explicitly exclude br
'Connection': 'keep-alive'
}
response = requests.get(url, headers=headers_no_br, timeout=timeout)
if response.status_code == 200:
return response
except:
if attempt < max_retries - 1:
time.sleep(1)
continue
else:
if attempt < max_retries - 1:
time.sleep(1)
continue
return None
def extract_text_elements(self, soup, source_name="Unknown", page=1):
"""Extract text elements from soup using multiple strategies"""
# Multiple selector strategies - comprehensive
selectors = [
'h2.headline', 'h2', 'h3', 'h4',
'a[class*="headline"]', 'a[class*="title"]', 'a[class*="news"]',
'div[class*="story"] h2', 'div[class*="story"] h3',
'article h2', 'article h3',
'div[class*="title"]', 'div[class*="headline"]',
'span[class*="headline"]', 'span[class*="title"]',
'p[class*="headline"]', 'p[class*="title"]',
'a[href*="/article/"]', 'a[href*="/news/"]', 'a[href*="/story/"]',
'div[class*="card"] h2', 'div[class*="item"] h2',
'li[class*="news"]', 'li[class*="article"]',
]
elements = []
for selector in selectors:
try:
found = soup.select(selector)
if found:
elements.extend(found)
except:
continue
# Remove duplicates while preserving order
seen_elements = set()
unique_elements = []
for elem in elements:
elem_id = id(elem)
if elem_id not in seen_elements:
seen_elements.add(elem_id)
unique_elements.append(elem)
# Fallback 1: get all links with text that look like news
if not unique_elements:
links = soup.find_all('a', href=True)
for link in links:
text = link.get_text().strip()
href = link.get('href', '')
# Check if it looks like a news article link
if (text and len(text) > 15 and len(text) < 200 and
('/article/' in href or '/news/' in href or '/story/' in href or
'/bangla/' in href or '/bengali/' in href)):
unique_elements.append(link)
# Fallback 2: get all paragraphs with substantial text
if not unique_elements:
paragraphs = soup.find_all('p')
for p in paragraphs:
text = p.get_text().strip()
if len(text) > 30 and len(text) < 300:
unique_elements.append(p)
# Fallback 3: get any div with substantial text
if not unique_elements:
divs = soup.find_all('div', class_=True)
for div in divs:
text = div.get_text().strip()
classes = ' '.join(div.get('class', []))
if (text and len(text) > 20 and len(text) < 250 and
('news' in classes.lower() or 'article' in classes.lower() or
'story' in classes.lower() or 'title' in classes.lower())):
unique_elements.append(div)
return unique_elements
def scrape_prothom_alo(self, max_pages=100):
"""Scrape Prothom Alo articles with pagination"""
print("🔍 Scraping Prothom Alo...")
articles = []
base_url = "https://www.prothomalo.com/"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
# Try different URL patterns for pagination
urls_to_try = [
f"{base_url}?page={page}",
f"{base_url}latest?page={page}",
f"{base_url}archive?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
if page == 1:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'Prothom Alo', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
# Clean text - remove extra whitespace
text = ' '.join(text.split())
# Check for duplicates and minimum length
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500): # Reasonable max length
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'Prothom Alo',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
# Debug info for first page
if page == 1 and page_articles == 0:
print(f"⚠️ Page 1: Found {len(unique_headlines)} potential elements but extracted 0 articles")
if len(unique_headlines) > 0:
sample_text = unique_headlines[0].get_text().strip()[:100]
print(f" Sample text: {sample_text}...")
if page_articles == 0:
# If first few pages fail, try different approach
if page <= 3:
continue # Try a few more pages
else:
# No more articles found, stop pagination
break
# Rate limiting
time.sleep(random.uniform(1, 3))
except Exception as e:
print(f"⚠️ Error on page {page}: {e}")
continue
print(f"✅ Scraped {len(articles)} articles from Prothom Alo")
return articles
def scrape_bdnews24(self, max_pages=100):
"""Scrape bdnews24.com with pagination"""
print("🔍 Scraping bdnews24...")
articles = []
base_url = "https://bangla.bdnews24.com/"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
urls_to_try = [
f"{base_url}?page={page}",
f"{base_url}latest?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
if page == 1:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'bdnews24', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
text = ' '.join(text.split()) # Clean whitespace
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500):
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'bdnews24',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
# Debug info for first page
if page == 1 and page_articles == 0:
print(f"⚠️ Page 1: Found {len(unique_elements)} potential elements but extracted 0 articles")
if page_articles == 0:
break
time.sleep(random.uniform(1, 3))
except Exception as e:
print(f"⚠️ Error on page {page}: {e}")
continue
print(f"✅ Scraped {len(articles)} articles from bdnews24")
return articles
def scrape_bbc_bangla(self, max_pages=100):
"""Scrape BBC Bangla with pagination"""
print("🔍 Scraping BBC Bangla...")
articles = []
base_url = "https://www.bbc.com/bengali"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
urls_to_try = [
f"{base_url}?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
if page == 1:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'BBC Bangla', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
text = ' '.join(text.split()) # Clean whitespace
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500):
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'BBC Bangla',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
# Debug info for first page
if page == 1 and page_articles == 0:
print(f"⚠️ Page 1: Found {len(unique_elements)} potential elements but extracted 0 articles")
if page_articles == 0:
break
time.sleep(random.uniform(1, 3))
except Exception as e:
print(f"⚠️ Error on page {page}: {e}")
continue
print(f"✅ Scraped {len(articles)} articles from BBC Bangla")
return articles
def scrape_jugantor(self, max_pages=100):
"""Scrape Jugantor newspaper"""
print("🔍 Scraping Jugantor...")
articles = []
base_url = "https://www.jugantor.com/"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
urls_to_try = [
f"{base_url}?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'Jugantor', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
text = ' '.join(text.split()) # Clean whitespace
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500):
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'Jugantor',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
if page_articles == 0:
break
time.sleep(random.uniform(1, 3))
except Exception as e:
continue
print(f"✅ Scraped {len(articles)} articles from Jugantor")
return articles
def scrape_kaler_kantho(self, max_pages=100):
"""Scrape Kaler Kantho newspaper"""
print("🔍 Scraping Kaler Kantho...")
articles = []
base_url = "https://www.kalerkantho.com/"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
urls_to_try = [
f"{base_url}?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'Kaler Kantho', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
text = ' '.join(text.split()) # Clean whitespace
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500):
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'Kaler Kantho',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
# Debug info for first page
if page == 1 and page_articles == 0:
print(f"⚠️ Page 1: Found {len(unique_elements)} potential elements but extracted 0 articles")
if page_articles == 0:
break
time.sleep(random.uniform(1, 3))
except Exception as e:
continue
print(f"✅ Scraped {len(articles)} articles from Kaler Kantho")
return articles
def scrape_daily_star(self, max_pages=100):
"""Scrape The Daily Star Bangla"""
print("🔍 Scraping The Daily Star...")
articles = []
base_url = "https://www.thedailystar.net/bangla"
for page in range(1, max_pages + 1):
if self.scraped_count >= self.target_count:
break
try:
urls_to_try = [
f"{base_url}?page={page}",
base_url if page == 1 else None
]
url = None
response = None
for u in urls_to_try:
if u:
resp = self.safe_get(u, timeout=15)
if resp:
url = u
response = resp
break
if not url or not response:
continue
try:
soup = BeautifulSoup(response.content, 'html.parser')
except Exception as e:
print(f"⚠️ Error parsing page {page}: {e}")
continue
# Use shared extraction method
unique_elements = self.extract_text_elements(soup, 'The Daily Star', page)
page_articles = 0
for element in unique_elements:
if self.scraped_count >= self.target_count:
break
try:
text = element.get_text().strip()
text = ' '.join(text.split()) # Clean whitespace
if (text and len(text) > 15 and
text not in self.seen_texts and
len(text) < 500):
self.seen_texts.add(text)
articles.append({
'text': text,
'source': 'The Daily Star',
'date': datetime.now().strftime('%Y-%m-%d'),
'category': 'news'
})
self.scraped_count += 1
page_articles += 1
except:
continue
# Debug info for first page
if page == 1 and page_articles == 0:
print(f"⚠️ Page 1: Found {len(unique_elements)} potential elements but extracted 0 articles")
if page_articles == 0:
break
time.sleep(random.uniform(1, 3))
except Exception as e:
continue
print(f"✅ Scraped {len(articles)} articles from The Daily Star")
return articles
def create_sample_dataset(self, num_samples=1000):
"""Create expanded sample dataset with variations"""
print("📝 Creating sample dataset...")
base_texts = [
"বাংলাদেশ ক্রিকেট দল দুর্দান্ত পারফরম্যান্স করেছে আজকের ম্যাচে",
"সরকারের নতুন নীতি নিয়ে জনগণ অসন্তুষ্ট",
"আজকের আবহাওয়া মোটামুটি ভালো থাকবে সারাদিন",
"শিক্ষা ব্যবস্থায় সংস্কার প্রয়োজন বলে মনে করেন বিশেষজ্ঞরা",
"দেশের অর্থনীতি দ্রুত উন্নতি করছে",
"দুর্নীতির কারণে উন্নয়ন প্রকল্পে বিলম্ব হচ্ছে",
"নতুন প্রযুক্তি ব্যবহার করে কৃষকরা বেশি ফসল ফলাচ্ছেন",
"যানজট ঢাকার একটি বড় সমস্যা হয়ে দাঁড়িয়েছে",
"স্বাস্থ্য সেবার মান উন্নতি করতে হবে",
"পরিবেশ রক্ষায় সবাইকে সচেতন হতে হবে",
"খেলাধুলায় বাংলাদেশ ভালো করছে",
"তরুণরা উদ্যোক্তা হয়ে ব্যবসা শুরু করছেন",
"গ্রামীণ এলাকায় বিদ্যুৎ সরবরাহ বাড়ছে",
"শহরে বায়ু দূষণ মারাত্মক আকার ধারণ করেছে",
"নতুন সেতু যোগাযোগ ব্যবস্থা উন্নত করবে",
"বাংলাদেশের রপ্তানি আয় বৃদ্ধি পাচ্ছে",
"শিক্ষার্থীদের জন্য নতুন সুযোগ তৈরি হচ্ছে",
"স্বাস্থ্য সেবা খাতে বিনিয়োগ বাড়ছে",
"কৃষি ক্ষেত্রে আধুনিক প্রযুক্তির ব্যবহার",
"তরুণ উদ্যোক্তাদের জন্য সহায়তা প্রকল্প",
]
articles = []
sources = ['Sample News', 'Demo Source', 'Test Data', 'Generated Data']
categories = ['politics', 'sports', 'economy', 'technology', 'health', 'education', 'environment']
for i in range(num_samples):
base_text = base_texts[i % len(base_texts)]
# Add slight variations
if i > len(base_texts):
# Add variations for diversity
variations = [
f"{base_text} এটি একটি গুরুত্বপূর্ণ বিষয়।",
f"সম্প্রতি {base_text}",
f"{base_text} বিশেষজ্ঞরা জানিয়েছেন।",
]
text = variations[i % len(variations)]
else:
text = base_text
articles.append({
'text': text,
'source': sources[i % len(sources)],
'date': (datetime.now() - timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d'),
'category': categories[i % len(categories)]
})
print(f"✅ Created {len(articles)} sample articles")
return articles
def save_to_csv(self, articles, filename='data/raw/bangla_news.csv', append=False):
"""Save scraped articles to CSV with append support"""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filename), exist_ok=True)
if not articles:
print("⚠️ No articles to save!")
return None
df = pd.DataFrame(articles)
if append and os.path.exists(filename):
# Append to existing file
existing_df = pd.read_csv(filename)
df = pd.concat([existing_df, df], ignore_index=True)
df = df.drop_duplicates(subset=['text'], keep='first')
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"\n✅ Saved {len(df)} articles to {filename}")
return df
def save_progress(self, articles, checkpoint_file='data/raw/scraping_progress.json'):
"""Save scraping progress"""
os.makedirs(os.path.dirname(checkpoint_file), exist_ok=True)
progress = {
'scraped_count': self.scraped_count,
'target_count': self.target_count,
'timestamp': datetime.now().isoformat()
}
with open(checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(progress, f, indent=2)
def main(target_count=50000):
scraper = BanglaNewsScraper(target_count=target_count)
print("=" * 60)
print(f"🌐 Starting Large-Scale Web Scraping Process")
print(f"🎯 Target: {target_count:,} articles")
print("=" * 60)
all_articles = []
sources = [
('Prothom Alo', scraper.scrape_prothom_alo, 200),
('bdnews24', scraper.scrape_bdnews24, 200),
('BBC Bangla', scraper.scrape_bbc_bangla, 200),
('Jugantor', scraper.scrape_jugantor, 200),
('Kaler Kantho', scraper.scrape_kaler_kantho, 200),
('The Daily Star', scraper.scrape_daily_star, 200),
]
# Scrape from all sources with progress tracking
with tqdm(total=target_count, desc="Scraping Progress", unit="articles") as pbar:
for source_name, scrape_func, max_pages in sources:
if scraper.scraped_count >= target_count:
break
print(f"\n{'='*60}")
print(f"📰 Scraping from {source_name}...")
print(f"{'='*60}")
try:
articles = scrape_func(max_pages=max_pages)
all_articles.extend(articles)
pbar.update(len(articles))
# Save progress incrementally
if len(all_articles) % 1000 == 0:
scraper.save_to_csv(all_articles, append=True)
scraper.save_progress(all_articles)
print(f"\n💾 Progress saved: {scraper.scraped_count:,}/{target_count:,} articles")
# Be respectful to servers
time.sleep(random.uniform(2, 5))
except Exception as e:
print(f"❌ Error scraping {source_name}: {e}")
continue
# If we haven't reached target, supplement with sample data
if scraper.scraped_count < target_count:
needed = target_count - scraper.scraped_count
print(f"\n⚠️ Only scraped {scraper.scraped_count:,} articles. Creating {needed:,} sample articles...")
sample_articles = scraper.create_sample_dataset(num_samples=needed)
all_articles.extend(sample_articles)
scraper.scraped_count += len(sample_articles)
# Final save
print("\n" + "=" * 60)
print("💾 Saving final dataset...")
print("=" * 60)
df = scraper.save_to_csv(all_articles)
if df is not None and len(df) > 0:
# Show statistics
print("\n" + "=" * 60)
print("📊 Scraping Statistics")
print("=" * 60)
print(f"Total articles: {len(df):,}")
print(f"Target: {target_count:,}")
print(f"Completion: {len(df)/target_count*100:.1f}%")
if 'source' in df.columns:
print(f"\n📰 By source:")
source_counts = df['source'].value_counts()
for source, count in source_counts.items():
print(f" {source}: {count:,} ({count/len(df)*100:.1f}%)")
if 'category' in df.columns:
print(f"\n📑 By category:")
category_counts = df['category'].value_counts()
for category, count in category_counts.items():
print(f" {category}: {count:,}")
print(f"\n📅 Date range: {df['date'].min()} to {df['date'].max()}")
# Text length statistics
df['text_length'] = df['text'].str.len()
print(f"\n📏 Text length statistics:")
print(f" Average: {df['text_length'].mean():.1f} characters")
print(f" Min: {df['text_length'].min()} characters")
print(f" Max: {df['text_length'].max()} characters")
# Show sample
print("\n📝 Sample articles:")
print("-" * 60)
for i, row in df.head(5).iterrows():
print(f"{i + 1}. [{row['source']}] {row['text'][:70]}...")
print("=" * 60)
print(f"\n✅ Scraping complete! Dataset saved to data/raw/bangla_news.csv")
else:
print("❌ Failed to create dataset")
if __name__ == "__main__":
import sys
# Allow custom target count from command line
target_count = 50000
if len(sys.argv) > 1:
try:
target_count = int(sys.argv[1])
except ValueError:
print(f"⚠️ Invalid target count: {sys.argv[1]}. Using default: 50000")
main(target_count=target_count)