Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| import time | |
| import random | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics import f1_score, precision_score, recall_score | |
| from scipy.spatial.distance import cosine | |
| import pickle | |
| from pathlib import Path | |
| from itertools import combinations | |
| # Import the necessary functions from your existing code | |
| from pan22_verif_evaluator import evaluate_all | |
| # Implement the main logic functions (simplified versions) | |
| def cosine_sim(a, b): | |
| return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) | |
| def rescale(value, orig_min, orig_max, new_min, new_max): | |
| orig_span = orig_max - orig_min | |
| new_span = new_max - new_min | |
| try: | |
| scaled_value = float(value - orig_min) / float(orig_span) | |
| except ZeroDivisionError: | |
| orig_span += 1e-6 | |
| scaled_value = float(value - orig_min) / float(orig_span) | |
| return new_min + (scaled_value * new_span) | |
| def correct_scores(scores, p1, p2): | |
| for sc in scores: | |
| if sc <= p1: | |
| yield rescale(sc, 0, p1, 0, 0.49) | |
| elif p1 < sc < p2: | |
| yield 0.5 | |
| else: | |
| yield rescale(sc, p2, 1, 0.51, 1) | |
| # Main training function | |
| def train_model(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout): | |
| gold = {} | |
| for line in open(truths_file): | |
| d = json.loads(line.strip()) | |
| gold[d['id']] = int(d['same']) | |
| # truncation for development purposes | |
| cutoff = 0 | |
| if cutoff: | |
| gold = dict(random.sample(gold.items(), cutoff)) | |
| # print(len(gold)) | |
| texts = [] | |
| for line in open(pairs_file,encoding='utf8'): | |
| d = json.loads(line.strip()) | |
| if d['id'] in gold: | |
| texts.extend(d['pair']) | |
| # Process the data and train the model | |
| vectorizer = TfidfVectorizer(max_features=vocab_size, analyzer='char', | |
| ngram_range=(ngram_size, ngram_size)) | |
| vectorizer.fit(texts) | |
| if num_iterations: | |
| total_feats = len(vectorizer.get_feature_names_out()) | |
| keep_feats = int(total_feats * dropout) | |
| rnd_feature_idxs = [] | |
| for _ in range(num_iterations): | |
| rnd_feature_idxs.append(np.random.choice(total_feats, | |
| keep_feats, | |
| replace=False)) | |
| rnd_feature_idxs = np.array(rnd_feature_idxs) | |
| similarities, labels = [], [] | |
| for line in open(pairs_file,encoding='utf8'): | |
| d = json.loads(line.strip()) | |
| if d['id'] in gold: | |
| x1, x2 = vectorizer.transform(d['pair']).toarray() | |
| if num_iterations: | |
| similarities_ = [] | |
| for i in range(num_iterations): | |
| similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]], | |
| x2[rnd_feature_idxs[i, :]])) | |
| similarities.append(np.mean(similarities_)) | |
| else: | |
| similarities.append(cosine_sim(x1, x2)) | |
| labels.append(gold[d['id']]) | |
| similarities = np.array(similarities, dtype=np.float64) | |
| labels = np.array(labels, dtype=np.float64) | |
| print('-> grid search p1/p2:') | |
| step_size = 0.01 | |
| thresholds = np.arange(0.01, 0.99, step_size) | |
| combs = [(p1, p2) for (p1, p2) in combinations(thresholds, 2) if p1 < p2] | |
| params = {} | |
| for p1, p2 in combs: | |
| corrected_scores = np.array(list(correct_scores(similarities, p1=p1, p2=p2))) | |
| score = evaluate_all(pred_y=corrected_scores, true_y=labels) | |
| params[(p1, p2)] = score['overall'] | |
| opt_p1, opt_p2 = max(params, key=params.get) | |
| print('optimal p1/p2:', opt_p1, opt_p2) | |
| corrected_scores = np.array(list(correct_scores(similarities, p1=opt_p1, p2=opt_p2))) | |
| evaluation_result = evaluate_all(pred_y=corrected_scores, true_y=labels) | |
| print('optimal score:', evaluation_result) | |
| print('-> determining optimal threshold') | |
| scores = [] | |
| for th in np.linspace(0.25, 0.75, 1000): | |
| adjusted = (corrected_scores >= th) * 1 | |
| scores.append((th, | |
| f1_score(labels, adjusted), | |
| precision_score(labels, adjusted), | |
| recall_score(labels, adjusted))) | |
| thresholds, f1s, precisions, recalls = zip(*scores) | |
| max_idx = np.array(f1s).argmax() | |
| max_f1 = f1s[max_idx] | |
| max_th = thresholds[max_idx] | |
| print(f'Dev results -> F1={max_f1} at th={max_th}') | |
| # Save the model | |
| model = { | |
| 'vectorizer': vectorizer, | |
| 'opt_p1': opt_p1, | |
| 'opt_p2': opt_p2, | |
| 'rnd_feature_idxs': rnd_feature_idxs if num_iterations else None, | |
| 'evaluation_result': evaluation_result | |
| } | |
| pickle_path = os.path.join(os.getcwd(), 'model.pkl') | |
| with open(pickle_path, 'wb') as f: | |
| pickle.dump(model, f) | |
| return "Training complete. Model files saved.", opt_p1, opt_p2, evaluation_result, pickle_path | |
| # Gradio interface | |
| def gradio_interface(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout): | |
| if pairs_file is None or truths_file is None: | |
| return "Please upload both JSON files.", None, gr.Group(visible=False), None, None | |
| try: | |
| start_time = time.time() | |
| training_message, opt_p1, opt_p2, evaluation_result, pickle_path = train_model( | |
| pairs_file.name, truths_file.name, vocab_size, ngram_size, num_iterations, dropout | |
| ) | |
| end_time = time.time() | |
| execution_time = end_time - start_time # Calculate execution time | |
| # Create a DataFrame for display | |
| data = { | |
| 'Metric': ['p1', 'p2', 'AUC', 'c@1', 'f_05_u', 'F1', 'Brier', 'Overall', 'Execution Time'], | |
| 'Value': [ | |
| opt_p1, | |
| opt_p2, | |
| evaluation_result['auc'], | |
| evaluation_result['c@1'], | |
| evaluation_result['f_05_u'], | |
| evaluation_result['F1'], | |
| evaluation_result['brier'], | |
| evaluation_result['overall'], | |
| round(execution_time, 2) | |
| ] | |
| } | |
| df = pd.DataFrame(data) | |
| return training_message, df, gr.Group(visible=True), pickle_path, pickle_path | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}", None, gr.Group(visible=False), None, None | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# Character 4-grams Model") | |
| model_path = gr.State(None) | |
| with gr.Tab("Train"): | |
| gr.Markdown("Upload pairs.json and truths.json files, adjust parameters, then click 'Train' to train and evaluate the model.") | |
| with gr.Row(): | |
| pairs_file = gr.File(label="Upload pairs.json") | |
| truths_file = gr.File(label="Upload truths.json") | |
| with gr.Row(): | |
| vocab_size = gr.Slider(minimum=1000, maximum=50000, step=100, value=3000, label="Vocabulary Size") | |
| ngram_size = gr.Slider(minimum=2, maximum=6, step=1, value=4, label="N-gram Size") | |
| num_iterations = gr.Slider(minimum=0, maximum=100, step=1, value=0, label="Number of Iterations") | |
| dropout = gr.Slider(minimum=0.1, maximum=0.9, step=0.1, value=0.5, label="Dropout") | |
| submit_btn = gr.Button("Train") | |
| status_box = gr.Textbox(label="Status") | |
| with gr.Group(visible=False) as output_group: | |
| gr.Markdown("## Evaluation Metrics") | |
| output_table = gr.DataFrame() | |
| download_button = gr.File(label="Download Model") | |
| with gr.Tab('Test'): | |
| gr.Markdown("Enter two texts to compare and click 'Predict' to estimate their similarity.") | |
| text1 = gr.Textbox(label="Text 1") | |
| text2 = gr.Textbox(label="Text 2") | |
| predict_btn = gr.Button("Predict") | |
| similarity_output = gr.Textbox(label="Similarity Result") | |
| def test_model(text1, text2, model_path): | |
| if model_path is None: | |
| return "Please train the model first." | |
| model = pickle.load(open(model_path, 'rb')) | |
| vectorizer = model['vectorizer'] | |
| opt_p1 = model['opt_p1'] | |
| opt_p2 = model['opt_p2'] | |
| num_iterations = model['rnd_feature_idxs'] is not None | |
| rnd_feature_idxs = model['rnd_feature_idxs'] | |
| x1, x2 = vectorizer.transform([text1, text2]).toarray() | |
| if num_iterations: | |
| similarities_ = [] | |
| for i in range(len(rnd_feature_idxs)): | |
| similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]], x2[rnd_feature_idxs[i, :]])) | |
| similarity = np.mean(similarities_) | |
| else: | |
| similarity = cosine_sim(x1, x2) | |
| similarity = np.array(list(correct_scores([similarity], p1=opt_p1, p2=opt_p2)))[0] | |
| return f"Similarity: {similarity:.4f}" | |
| submit_btn.click( | |
| gradio_interface, | |
| inputs=[pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout], | |
| outputs=[status_box, output_table, output_group, download_button, model_path] | |
| ) | |
| predict_btn.click( | |
| test_model, | |
| inputs=[text1, text2, model_path], | |
| outputs=[similarity_output] | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |