Rakib Hossain
Add complete Bangla sentiment analysis: data, fine-tuned model, and visualizations
49c214c
| """ | |
| Fine-tune multilingual sentiment model on Bangla dataset | |
| Optimized for Bangla news classification (positive / neutral / negative) | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datasets import Dataset | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorWithPadding, | |
| pipeline | |
| ) | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # ------------------------------------------------------------------- | |
| # β¬ Helper: Clean & map sentiment labels (base model has 5 classes) | |
| # ------------------------------------------------------------------- | |
| def map_to_three_class(label: str): | |
| label = label.lower() | |
| if label in ["very negative", "negative"]: | |
| return "negative" | |
| if label == "neutral": | |
| return "neutral" | |
| if label in ["positive", "very positive"]: | |
| return "positive" | |
| return "neutral" # fallback | |
| # ------------------------------------------------------------------- | |
| # β¬ Fine-tuning class | |
| # ------------------------------------------------------------------- | |
| class BanglaSentimentFineTuner: | |
| def __init__(self, base_model="tabularisai/multilingual-sentiment-analysis"): | |
| self.base_model = base_model | |
| self.tokenizer = AutoTokenizer.from_pretrained(base_model) | |
| # Ensure tokenizer has a pad_token | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.model = None | |
| # --------------------------------------------------------------- | |
| def prepare_dataset(self, csv_file='data/raw/bangla_news_labeled.csv'): | |
| print("π Loading dataset...") | |
| try: | |
| df = pd.read_csv(csv_file) | |
| except FileNotFoundError: | |
| raise FileNotFoundError(f"Dataset file not found: {csv_file}") | |
| except Exception as e: | |
| raise Exception(f"Error loading dataset: {str(e)}") | |
| # Check required columns | |
| if "text" not in df.columns or "sentiment" not in df.columns: | |
| raise ValueError("Dataset must contain 'text' and 'sentiment' columns") | |
| # Label mapping β 3 classes | |
| label_map = {"negative": 0, "neutral": 1, "positive": 2} | |
| df["label"] = df["sentiment"].map(label_map) | |
| # Remove rows with NaN labels (unmapped sentiment values) | |
| initial_count = len(df) | |
| df = df.dropna(subset=["label"]) | |
| if len(df) < initial_count: | |
| print(f"β οΈ Removed {initial_count - len(df)} rows with unmapped sentiment values") | |
| # Check for empty dataset | |
| if len(df) == 0: | |
| raise ValueError("Dataset is empty after processing") | |
| # Validate all 3 classes are present | |
| unique_labels = df["label"].unique() | |
| if len(unique_labels) < 3: | |
| missing = set([0, 1, 2]) - set(unique_labels) | |
| print(f"β οΈ Warning: Missing label classes: {missing}") | |
| # Remove rows with missing text | |
| df = df.dropna(subset=["text"]) | |
| df = df[df["text"].astype(str).str.strip() != ""] | |
| if len(df) == 0: | |
| raise ValueError("Dataset is empty after removing invalid rows") | |
| # Use stratify only if all classes are present | |
| try: | |
| train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df["label"]) | |
| except ValueError: | |
| # If stratification fails (e.g., not enough samples per class), use regular split | |
| print("β οΈ Using regular split (stratification not possible)") | |
| train_df, test_df = train_test_split(df, test_size=0.2, random_state=42) | |
| print(f"β Training samples: {len(train_df)}") | |
| print(f"β Test samples: {len(test_df)}") | |
| print(f"π Label distribution in training set:") | |
| print(train_df["label"].value_counts().sort_index()) | |
| train_dataset = Dataset.from_pandas(train_df[["text", "label"]]) | |
| test_dataset = Dataset.from_pandas(test_df[["text", "label"]]) | |
| return train_dataset, test_dataset | |
| # --------------------------------------------------------------- | |
| def tokenize_function(self, examples): | |
| return self.tokenizer( | |
| examples["text"], | |
| padding="max_length", | |
| truncation=True, | |
| max_length=128 | |
| ) | |
| # --------------------------------------------------------------- | |
| def compute_metrics(self, eval_pred): | |
| predictions, labels = eval_pred | |
| predictions = np.argmax(predictions, axis=1) | |
| accuracy = accuracy_score(labels, predictions) | |
| precision, recall, f1, _ = precision_recall_fscore_support( | |
| labels, predictions, average="weighted" | |
| ) | |
| return { | |
| "accuracy": accuracy, | |
| "precision": precision, | |
| "recall": recall, | |
| "f1": f1 | |
| } | |
| # --------------------------------------------------------------- | |
| def fine_tune(self, train_dataset, test_dataset, | |
| output_dir="models/bangla-sentiment-finetuned"): | |
| print("\nπ Starting Fine-tuning Process...") | |
| print("=" * 60) | |
| # Validate datasets | |
| if len(train_dataset) == 0: | |
| raise ValueError("Training dataset is empty") | |
| if len(test_dataset) == 0: | |
| raise ValueError("Test dataset is empty") | |
| # Load & fix mismatch head problem | |
| print("π₯ Loading base model...") | |
| try: | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| self.base_model, | |
| num_labels=3, | |
| ignore_mismatched_sizes=True # IMPORTANT FIX | |
| ) | |
| # Set pad_token_id if needed | |
| if self.model.config.pad_token_id is None: | |
| self.model.config.pad_token_id = self.tokenizer.pad_token_id | |
| except Exception as e: | |
| raise Exception(f"Error loading model: {str(e)}") | |
| # Tokenize | |
| train_dataset = train_dataset.map(self.tokenize_function, batched=True) | |
| test_dataset = test_dataset.map(self.tokenize_function, batched=True) | |
| # Remove unused columns | |
| train_dataset = train_dataset.remove_columns(["text"]) | |
| test_dataset = test_dataset.remove_columns(["text"]) | |
| # Training config | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| num_train_epochs=4, | |
| per_device_train_batch_size=16, | |
| per_device_eval_batch_size=16, | |
| warmup_steps=100, | |
| weight_decay=0.01, | |
| logging_dir="./logs", | |
| logging_steps=50, | |
| eval_strategy="epoch", # Changed from evaluation_strategy for older transformers versions | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="f1" | |
| ) | |
| data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer) | |
| trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=train_dataset, | |
| eval_dataset=test_dataset, | |
| data_collator=data_collator, | |
| tokenizer=self.tokenizer, | |
| compute_metrics=self.compute_metrics | |
| ) | |
| print("\nπ― Training started...") | |
| trainer.train() | |
| print("\nπ Evaluating model...") | |
| results = trainer.evaluate() | |
| print("\n" + "=" * 60) | |
| print("β Fine-tuning Complete!") | |
| print("=" * 60) | |
| print(f"Accuracy: {results['eval_accuracy']:.4f}") | |
| print(f"Precision: {results['eval_precision']:.4f}") | |
| print(f"Recall: {results['eval_recall']:.4f}") | |
| print(f"F1 Score: {results['eval_f1']:.4f}") | |
| print("=" * 60) | |
| # Save | |
| self.model.save_pretrained(output_dir) | |
| self.tokenizer.save_pretrained(output_dir) | |
| print(f"\nπΎ Model saved to {output_dir}") | |
| return results | |
| # ------------------------------------------------------------------- | |
| # β¬ Auto-labeled Dataset Creator (improved) | |
| # ------------------------------------------------------------------- | |
| def create_labeled_dataset(): | |
| print("π Creating labeled dataset...") | |
| try: | |
| df = pd.read_csv("data/raw/bangla_news.csv") | |
| except FileNotFoundError: | |
| raise FileNotFoundError("Source dataset not found: data/raw/bangla_news.csv") | |
| except Exception as e: | |
| raise Exception(f"Error loading source dataset: {str(e)}") | |
| if "text" not in df.columns: | |
| raise ValueError("Source dataset must contain 'text' column") | |
| print("π€ Loading sentiment analysis model...") | |
| sentiment_analyzer = pipeline( | |
| "sentiment-analysis", | |
| model="tabularisai/multilingual-sentiment-analysis" | |
| ) | |
| print("β Model loaded!") | |
| labeled_rows = [] | |
| skipped = 0 | |
| for idx, row in df.iterrows(): | |
| try: | |
| text = str(row["text"]).strip() | |
| if not text: | |
| skipped += 1 | |
| continue | |
| pred = sentiment_analyzer(text[:512])[0] | |
| three_class = map_to_three_class(pred["label"]) | |
| labeled_rows.append({ | |
| "text": text, | |
| "sentiment": three_class, | |
| "source": row.get("source", "Unknown") | |
| }) | |
| except Exception as e: | |
| skipped += 1 | |
| continue | |
| if len(labeled_rows) == 0: | |
| raise ValueError("No valid labeled samples created. Check your source dataset.") | |
| df_labeled = pd.DataFrame(labeled_rows) | |
| df_labeled.to_csv("data/raw/bangla_news_labeled.csv", index=False) | |
| print(f"β Created labeled dataset with {len(df_labeled)} samples") | |
| if skipped > 0: | |
| print(f"β οΈ Skipped {skipped} invalid or problematic rows") | |
| return df_labeled | |
| # ------------------------------------------------------------------- | |
| # β¬ Main | |
| # ------------------------------------------------------------------- | |
| def main(): | |
| try: | |
| print("=" * 60) | |
| print("STEP 1: Creating Labeled Dataset") | |
| print("=" * 60) | |
| create_labeled_dataset() | |
| print("\n" + "=" * 60) | |
| print("STEP 2: Fine-tuning Model") | |
| print("=" * 60) | |
| finetuner = BanglaSentimentFineTuner() | |
| train_dataset, test_dataset = finetuner.prepare_dataset() | |
| finetuner.fine_tune(train_dataset, test_dataset) | |
| print("\nβ Process Complete! Fine-tuned sentiment model is ready.") | |
| except FileNotFoundError as e: | |
| print(f"\nβ Error: {str(e)}") | |
| print("Please ensure the required data files exist.") | |
| return 1 | |
| except ValueError as e: | |
| print(f"\nβ Error: {str(e)}") | |
| return 1 | |
| except Exception as e: | |
| print(f"\nβ Unexpected error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| main() | |