char_4grams_av / app.py
hen8001's picture
test part updated
7f93fc7
import gradio as gr
import json
import os
import time
import random
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import f1_score, precision_score, recall_score
from scipy.spatial.distance import cosine
import pickle
from pathlib import Path
from itertools import combinations
# Import the necessary functions from your existing code
from pan22_verif_evaluator import evaluate_all
# Implement the main logic functions (simplified versions)
def cosine_sim(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def rescale(value, orig_min, orig_max, new_min, new_max):
orig_span = orig_max - orig_min
new_span = new_max - new_min
try:
scaled_value = float(value - orig_min) / float(orig_span)
except ZeroDivisionError:
orig_span += 1e-6
scaled_value = float(value - orig_min) / float(orig_span)
return new_min + (scaled_value * new_span)
def correct_scores(scores, p1, p2):
for sc in scores:
if sc <= p1:
yield rescale(sc, 0, p1, 0, 0.49)
elif p1 < sc < p2:
yield 0.5
else:
yield rescale(sc, p2, 1, 0.51, 1)
# Main training function
def train_model(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout):
gold = {}
for line in open(truths_file):
d = json.loads(line.strip())
gold[d['id']] = int(d['same'])
# truncation for development purposes
cutoff = 0
if cutoff:
gold = dict(random.sample(gold.items(), cutoff))
# print(len(gold))
texts = []
for line in open(pairs_file,encoding='utf8'):
d = json.loads(line.strip())
if d['id'] in gold:
texts.extend(d['pair'])
# Process the data and train the model
vectorizer = TfidfVectorizer(max_features=vocab_size, analyzer='char',
ngram_range=(ngram_size, ngram_size))
vectorizer.fit(texts)
if num_iterations:
total_feats = len(vectorizer.get_feature_names_out())
keep_feats = int(total_feats * dropout)
rnd_feature_idxs = []
for _ in range(num_iterations):
rnd_feature_idxs.append(np.random.choice(total_feats,
keep_feats,
replace=False))
rnd_feature_idxs = np.array(rnd_feature_idxs)
similarities, labels = [], []
for line in open(pairs_file,encoding='utf8'):
d = json.loads(line.strip())
if d['id'] in gold:
x1, x2 = vectorizer.transform(d['pair']).toarray()
if num_iterations:
similarities_ = []
for i in range(num_iterations):
similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]],
x2[rnd_feature_idxs[i, :]]))
similarities.append(np.mean(similarities_))
else:
similarities.append(cosine_sim(x1, x2))
labels.append(gold[d['id']])
similarities = np.array(similarities, dtype=np.float64)
labels = np.array(labels, dtype=np.float64)
print('-> grid search p1/p2:')
step_size = 0.01
thresholds = np.arange(0.01, 0.99, step_size)
combs = [(p1, p2) for (p1, p2) in combinations(thresholds, 2) if p1 < p2]
params = {}
for p1, p2 in combs:
corrected_scores = np.array(list(correct_scores(similarities, p1=p1, p2=p2)))
score = evaluate_all(pred_y=corrected_scores, true_y=labels)
params[(p1, p2)] = score['overall']
opt_p1, opt_p2 = max(params, key=params.get)
print('optimal p1/p2:', opt_p1, opt_p2)
corrected_scores = np.array(list(correct_scores(similarities, p1=opt_p1, p2=opt_p2)))
evaluation_result = evaluate_all(pred_y=corrected_scores, true_y=labels)
print('optimal score:', evaluation_result)
print('-> determining optimal threshold')
scores = []
for th in np.linspace(0.25, 0.75, 1000):
adjusted = (corrected_scores >= th) * 1
scores.append((th,
f1_score(labels, adjusted),
precision_score(labels, adjusted),
recall_score(labels, adjusted)))
thresholds, f1s, precisions, recalls = zip(*scores)
max_idx = np.array(f1s).argmax()
max_f1 = f1s[max_idx]
max_th = thresholds[max_idx]
print(f'Dev results -> F1={max_f1} at th={max_th}')
# Save the model
model = {
'vectorizer': vectorizer,
'opt_p1': opt_p1,
'opt_p2': opt_p2,
'rnd_feature_idxs': rnd_feature_idxs if num_iterations else None,
'evaluation_result': evaluation_result
}
pickle_path = os.path.join(os.getcwd(), 'model.pkl')
with open(pickle_path, 'wb') as f:
pickle.dump(model, f)
return "Training complete. Model files saved.", opt_p1, opt_p2, evaluation_result, pickle_path
# Gradio interface
def gradio_interface(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout):
if pairs_file is None or truths_file is None:
return "Please upload both JSON files.", None, gr.Group(visible=False), None, None
try:
start_time = time.time()
training_message, opt_p1, opt_p2, evaluation_result, pickle_path = train_model(
pairs_file.name, truths_file.name, vocab_size, ngram_size, num_iterations, dropout
)
end_time = time.time()
execution_time = end_time - start_time # Calculate execution time
# Create a DataFrame for display
data = {
'Metric': ['p1', 'p2', 'AUC', 'c@1', 'f_05_u', 'F1', 'Brier', 'Overall', 'Execution Time'],
'Value': [
opt_p1,
opt_p2,
evaluation_result['auc'],
evaluation_result['c@1'],
evaluation_result['f_05_u'],
evaluation_result['F1'],
evaluation_result['brier'],
evaluation_result['overall'],
round(execution_time, 2)
]
}
df = pd.DataFrame(data)
return training_message, df, gr.Group(visible=True), pickle_path, pickle_path
except Exception as e:
return f"An error occurred: {str(e)}", None, gr.Group(visible=False), None, None
with gr.Blocks() as iface:
gr.Markdown("# Character 4-grams Model")
model_path = gr.State(None)
with gr.Tab("Train"):
gr.Markdown("Upload pairs.json and truths.json files, adjust parameters, then click 'Train' to train and evaluate the model.")
with gr.Row():
pairs_file = gr.File(label="Upload pairs.json")
truths_file = gr.File(label="Upload truths.json")
with gr.Row():
vocab_size = gr.Slider(minimum=1000, maximum=50000, step=100, value=3000, label="Vocabulary Size")
ngram_size = gr.Slider(minimum=2, maximum=6, step=1, value=4, label="N-gram Size")
num_iterations = gr.Slider(minimum=0, maximum=100, step=1, value=0, label="Number of Iterations")
dropout = gr.Slider(minimum=0.1, maximum=0.9, step=0.1, value=0.5, label="Dropout")
submit_btn = gr.Button("Train")
status_box = gr.Textbox(label="Status")
with gr.Group(visible=False) as output_group:
gr.Markdown("## Evaluation Metrics")
output_table = gr.DataFrame()
download_button = gr.File(label="Download Model")
with gr.Tab('Test'):
gr.Markdown("Enter two texts to compare and click 'Predict' to estimate their similarity.")
text1 = gr.Textbox(label="Text 1")
text2 = gr.Textbox(label="Text 2")
predict_btn = gr.Button("Predict")
similarity_output = gr.Textbox(label="Similarity Result")
def test_model(text1, text2, model_path):
if model_path is None:
return "Please train the model first."
model = pickle.load(open(model_path, 'rb'))
vectorizer = model['vectorizer']
opt_p1 = model['opt_p1']
opt_p2 = model['opt_p2']
num_iterations = model['rnd_feature_idxs'] is not None
rnd_feature_idxs = model['rnd_feature_idxs']
x1, x2 = vectorizer.transform([text1, text2]).toarray()
if num_iterations:
similarities_ = []
for i in range(len(rnd_feature_idxs)):
similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]], x2[rnd_feature_idxs[i, :]]))
similarity = np.mean(similarities_)
else:
similarity = cosine_sim(x1, x2)
similarity = np.array(list(correct_scores([similarity], p1=opt_p1, p2=opt_p2)))[0]
return f"Similarity: {similarity:.4f}"
submit_btn.click(
gradio_interface,
inputs=[pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout],
outputs=[status_box, output_table, output_group, download_button, model_path]
)
predict_btn.click(
test_model,
inputs=[text1, text2, model_path],
outputs=[similarity_output]
)
if __name__ == "__main__":
iface.launch()