char_4grams_av / pan22-verif-baseline-cngdist.py
hen8001's picture
First commit of char-4grams
79372c8
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# Naive, Distance-Based Baseline
## Introduction
This baseline offers a naive, yet fast solution to the
PAN2022 track on authorship verification. All documents
are represented using a bag-of-character-ngrams model,
that is TFIDF weighted. The cosine similarity between
each document pair in the calibration data set is
calculated. Finally, the resulting similarities are
optimized, and projected through a simple rescaling
operation, so that they can function as pseudo-probabi-
lities, indiciating the likelihood that a document-pair
is a same-author pair. Via a grid search, the optimal
verification threshold is determined, taking into account
that some difficult problems can be left unanswered.
Through setting `num_iterations` to an integer > 0,
a bootstrapped variant of this procedure can be used.
In this case, the similarity calculation is applied in
an iterative procedure to a randomly sampled subset of
the available features. The average similarity is then
used downstream. This imputation procedure is inspired
by the imposters approach.
## Dependencies
- Python 3.6+ (we recommend the Anaconda Python distribution)
- scikit-learn, numpy, scipy
- pan22_verif_evaluator.py
Example usage from the command line to train the model:
>>> python pan22-verif-baseline-cngdist.py \
--train \
--model_dir="models/baseline" \
-p="datasets/pan22-authorship-verification-training" \
-t="datasets/pan22-authorship-verification-training" \
-num_iterations=0
Example usage from the command line to test the model:
>>> python pan22-verif-baseline-cngdist.py \
--model_dir="models/baseline" \
-i="datasets/pan22-authorship-verification-test" \
-num_iterations=0 \
-output="out"
"""
import argparse
import json
import os
import random
from pathlib import Path
from itertools import combinations
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import f1_score, precision_score, recall_score
from scipy.spatial.distance import cosine
import pickle
from pan22_verif_evaluator import evaluate_all
def cosine_sim(a, b):
# print(a, b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def rescale(value, orig_min, orig_max, new_min, new_max):
"""
Rescales a `value` in the old range defined by
`orig_min` and `orig_max`, to the new range
`new_min` and `new_max`. Assumes that
`orig_min` <= value <= `orig_max`.
Parameters
----------
value: float, default=None
The value to be rescaled.
orig_min: float, default=None
The minimum of the original range.
orig_max: float, default=None
The minimum of the original range.
new_min: float, default=None
The minimum of the new range.
new_max: float, default=None
The minimum of the new range.
Returns
----------
new_value: float
The rescaled value.
"""
orig_span = orig_max - orig_min
new_span = new_max - new_min
try:
scaled_value = float(value - orig_min) / float(orig_span)
except ZeroDivisionError:
orig_span += 1e-6
scaled_value = float(value - orig_min) / float(orig_span)
return new_min + (scaled_value * new_span)
def correct_scores(scores, p1, p2):
for sc in scores:
if sc <= p1:
yield rescale(sc, 0, p1, 0, 0.49)
elif p1 < sc < p2:
yield 0.5
else:
yield rescale(sc, p2, 1, 0.51, 1) # np.array(list
def train(input_pairs, input_truth, model_directory, vocab_size, ngram_size, num_iterations, dropout):
gold = {}
for line in open(input_truth):
d = json.loads(line.strip())
gold[d['id']] = int(d['same'])
# truncation for development purposes
cutoff = 0
if cutoff:
gold = dict(random.sample(gold.items(), cutoff))
print(len(gold))
texts = []
for line in open(input_pairs,encoding='utf8'):
d = json.loads(line.strip())
if d['id'] in gold:
texts.extend(d['pair'])
print('-> constructing vectorizer')
vectorizer = TfidfVectorizer(max_features=vocab_size, analyzer='char',
ngram_range=(ngram_size, ngram_size))
vectorizer.fit(texts)
if num_iterations:
total_feats = len(vectorizer.get_feature_names_out())
keep_feats = int(total_feats * dropout)
rnd_feature_idxs = []
for _ in range(num_iterations):
rnd_feature_idxs.append(np.random.choice(total_feats,
keep_feats,
replace=False))
rnd_feature_idxs = np.array(rnd_feature_idxs)
print('-> calculating pairwise similarities')
similarities, labels = [], []
for line in open(input_pairs,encoding='utf8'):
d = json.loads(line.strip())
if d['id'] in gold:
x1, x2 = vectorizer.transform(d['pair']).toarray()
if num_iterations:
similarities_ = []
for i in range(num_iterations):
similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]],
x2[rnd_feature_idxs[i, :]]))
similarities.append(np.mean(similarities_))
else:
similarities.append(cosine_sim(x1, x2))
labels.append(gold[d['id']])
similarities = np.array(similarities, dtype=np.float64)
labels = np.array(labels, dtype=np.float64)
print('-> grid search p1/p2:')
step_size = 0.01
thresholds = np.arange(0.01, 0.99, step_size)
combs = [(p1, p2) for (p1, p2) in combinations(thresholds, 2) if p1 < p2]
params = {}
for p1, p2 in combs:
corrected_scores = np.array(list(correct_scores(similarities, p1=p1, p2=p2)))
score = evaluate_all(pred_y=corrected_scores, true_y=labels)
params[(p1, p2)] = score['overall']
opt_p1, opt_p2 = max(params, key=params.get)
print('optimal p1/p2:', opt_p1, opt_p2)
corrected_scores = np.array(list(correct_scores(similarities, p1=opt_p1, p2=opt_p2)))
print('optimal score:', evaluate_all(pred_y=corrected_scores, true_y=labels))
print('-> determining optimal threshold')
scores = []
for th in np.linspace(0.25, 0.75, 1000):
adjusted = (corrected_scores >= th) * 1
scores.append((th,
f1_score(labels, adjusted),
precision_score(labels, adjusted),
recall_score(labels, adjusted)))
thresholds, f1s, precisions, recalls = zip(*scores)
max_idx = np.array(f1s).argmax()
max_f1 = f1s[max_idx]
max_th = thresholds[max_idx]
print(f'Dev results -> F1={max_f1} at th={max_th}')
pickle.dump(vectorizer, open(model_directory / 'vectorizer.pickle', 'wb'))
pickle.dump(opt_p1, open(model_directory / 'opt_p1.pickle', 'wb'))
pickle.dump(opt_p2, open(model_directory / 'opt_p2.pickle', 'wb'))
if num_iterations:
pickle.dump(rnd_feature_idxs, open(model_directory / 'rnd_feature_idxs.pickle', 'wb'))
def test(test_pairs, output_dir, model_directory, num_iterations):
vectorizer = pickle.load(open(model_directory / 'vectorizer.pickle', 'rb'))
opt_p1 = pickle.load(open(model_directory / 'opt_p1.pickle', 'rb'))
opt_p2 = pickle.load(open(model_directory / 'opt_p2.pickle', 'rb'))
print('p1 =',opt_p1,', p2 =',opt_p2)
if num_iterations:
rnd_feature_idxs = pickle.load(open(model_directory / 'rnd_feature_idxs.pickle', 'rb'))
print('-> calculating test similarities')
with open(output_dir / 'answers.jsonl', 'w') as outf:
count=0
for line in open(test_pairs,encoding='utf8'):
count=count+1
d = json.loads(line.strip())
problem_id = d['id']
x1, x2 = vectorizer.transform(d['pair']).toarray()
if num_iterations:
similarities_ = []
for i in range(num_iterations):
similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]],
x2[rnd_feature_idxs[i, :]]))
similarity = np.mean(similarities_)
else:
similarity = cosine_sim(x1, x2)
similarity = np.array(list(correct_scores([similarity], p1=opt_p1, p2=opt_p2)))[0]
r = {'id': problem_id, 'value': similarity}
outf.write(json.dumps(r) + '\n')
print(count,'cases')
def main():
parser = argparse.ArgumentParser(
description='PAN-22 Cross-domain Authorship Verification task: Distance-based baseline')
# data settings:
parser.add_argument('--train', action='store_true', help='If True, train a model from the given '
'input pair and input truth. If False, load a model'
'and test on the test dir')
parser.add_argument('-p', '--input_pairs', type=str, help='Path to the jsonl-file with the input pairs')
parser.add_argument('-t', '--input_truth', type=str, help='Path to the ground truth-file for the input pairs')
parser.add_argument('-i', '--test_dir', type=str, help='Path to the directory that contains the test pairs.jsonl')
parser.add_argument('-o', '--output', type=str, help='Path to the output folder for the predictions.\
(Will be overwritten if it exist already.)')
parser.add_argument('--model_dir', type=str, default='./baseline-distance-data', help='Path to the directory storing the model')
# algorithmic settings:
parser.add_argument('-seed', default=2020, type=int, help='Random seed')
parser.add_argument('-vocab_size', default=3000, type=int,
help='Maximum number of vocabulary items in feature space')
parser.add_argument('-ngram_size', default=4, type=int, help='Size of the ngrams')
parser.add_argument('-num_iterations', default=0, type=int, help='Number of iterations (`k`); zero by default')
parser.add_argument('-dropout', default=.5, type=float, help='Proportion of features to keep in each iteration')
args = parser.parse_args()
print(args)
np.random.seed(args.seed)
random.seed(args.seed)
model_directory = Path(args.model_dir)
if args.train:
if not args.input_pairs or not args.input_truth:
print("STOP. Missing required parameters: --input_pairs or --input_truth")
exit(1)
model_directory.mkdir(parents=True, exist_ok=True)
train(args.input_pairs + os.sep + 'pairs.jsonl', args.input_truth + os.sep + 'truth.jsonl', model_directory,
args.vocab_size, args.ngram_size, args.num_iterations, args.dropout)
else:
if not args.test_dir or not args.output:
print("STOP. Missing required parameters: --test_dir or --output")
exit(1)
if not model_directory.exists():
print("STOP. Model does not exist at " + model_directory)
exit(1)
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
test(str(Path(args.test_dir)) + os.sep + 'pairs.jsonl',
Path(output_dir), model_directory, args.num_iterations)
if __name__ == '__main__':
main()