| |
| |
| device = "cpu" |
| assert device.startswith("cpu") or device.startswith("cuda") |
|
|
| import sys |
| from predict import * |
|
|
| from transformers import ( |
| T5ForConditionalGeneration, |
| MT5ForConditionalGeneration, |
| ByT5Tokenizer, |
| PreTrainedTokenizer, |
| T5TokenizerFast as T5Tokenizer, |
| MT5TokenizerFast as MT5Tokenizer, |
| AutoModelForSeq2SeqLM, |
| AutoTokenizer, |
| BertTokenizer, |
| GPT2LMHeadModel, |
| ) |
|
|
| import pandas as pd |
| import numpy as np |
| import re |
| from rapidfuzz import fuzz |
| from tqdm import tqdm |
| import numpy as np |
| import os |
|
|
| import jieba |
| def repeat_to_one_f(x): |
| req = None |
| for token in jieba.lcut(x): |
| |
|
|
| if len(set(token)) == 1: |
| token = token[0] |
| if req is None: |
| req = token |
| else: |
|
|
| if (token in req and token not in [',', ',', '、', ' ']) or (req and token in [',', ',', '、', ' '] and req[-1] in [',', ',', '、', ' ']): |
| continue |
| else: |
| while req.endswith(token[0]): |
| token = token[1:] |
| req = req + token |
| if req is None: |
| return "" |
| return req.strip() |
|
|
| def shorten_exists(l, sim_threshold = 80, slice_size = 5): |
| req = [] |
| for ele in l: |
| if not req: |
| req.append(ele) |
| else: |
| if max(map(lambda x: fuzz.ratio(x[:slice_size], ele[:slice_size]), req)) < sim_threshold: |
| req.append(ele) |
| return req |
|
|
| model_path = "svjack/summary-dialogue" |
| tokenizer0 = T5Tokenizer.from_pretrained(model_path) |
| model0 = T5ForConditionalGeneration.from_pretrained(model_path) |
|
|
| if device.startswith("cuda"): |
| model = Obj(model0, tokenizer0, device = "cuda:0") |
| else: |
| model = Obj(model0, tokenizer0, device = "cpu") |
|
|
| def loop_add(l, names = ["杰克", "安娜"]): |
| req = [] |
| for i in range(len(l)): |
| ii = int(i % len(names)) |
| req.append( |
| "{}:{}".format(names[ii], l[i]) |
| ) |
| return req |
|
|
| |
| |
| def guess_name_candidates(context, cnt_threshold = 1): |
| from copy import deepcopy |
| assert type(context) == type("") |
| import re |
| l = re.findall(r"[\u4e00-\u9fa5a-zA-Z]+:", context) |
| l = list(filter(lambda x: x.strip(), l)) |
| ori_l = deepcopy(l) |
| if not l: |
| return [] |
| s = pd.Series(l).value_counts() |
| l = pd.Series(s[s > cnt_threshold].index.values.tolist()).map(lambda x: x[:-1]).values.tolist() |
| for ele in ori_l: |
| if len(ele[:-1]) not in l and (len(ele[:-1]) <= 3 or ( |
| sum(map(len ,re.findall(r"[a-zA-Z]+:", ele))) == len(ele) |
| )): |
| l.append(ele[:-1]) |
| l = list(set(l)) |
| return l |
|
|
| def simple_pred(summary, candidates = ["杰克", "安娜"], |
| shorten_it = False, do_sample = True): |
| pred_text = model.predict( |
| "摘要:{} 候选集:{}".format(summary, " ".join(candidates)), |
| do_sample = do_sample |
| )[0] |
| candidates_ = guess_name_candidates(pred_text) |
| l = re.split("{}".format("|".join(map(lambda x: "{}:".format(x), candidates_))) ,pred_text) |
| l = list(filter(lambda x: x.strip(), l)) |
| if shorten_it: |
| l = shorten_exists(l) |
| l = list(map(repeat_to_one_f, l)) |
| l = loop_add(l, candidates) |
| return l |
|
|
| def percentile_sort(df, perc_num = 101): |
| score_tuple_s = df["score_tuple"] |
| score_array = np.asarray(score_tuple_s.values.tolist()) |
| perc_list = np.linspace(0, 100, perc_num).tolist() |
| low_to_high_perc_array = np.stack(list(map(lambda p: np.percentile(score_array, p, axis = 0), perc_list))) |
|
|
| def get_rank(array_): |
| lookup_list = pd.DataFrame(array_ - low_to_high_perc_array[::-1]).apply(lambda s: min(s) >= 0, axis = 1).tolist() |
| if True not in lookup_list: |
| return len(lookup_list) |
| return lookup_list.index(True) |
|
|
| rank_list = [] |
| for i in range(score_array.shape[0]): |
| rank_list.append(get_rank(score_array[i, :])) |
|
|
| rank_s = pd.Series(rank_list) |
| return df.iloc[np.argsort(rank_s.values)] |
|
|
| def repeat_score(l, slice_size = 200 ,sim_threshold = 70): |
| from copy import deepcopy |
| assert type(l) == type([]) |
| l = deepcopy(l) |
| l = sorted(l) |
| cnt_num = 0 |
| set0 = set([]) |
| for ele in l: |
| if ":" in ele: |
| ele = "".join(ele.split(":")[1:]) |
| if set0 and max(map(lambda x: fuzz.ratio(x[:slice_size], ele[:slice_size]), set0)) > sim_threshold: |
| |
| cnt_num += 1 |
| set0.add(ele) |
| return cnt_num |
|
|
| |
| |
| model_path = "svjack/prompt-extend-chinese-gpt" |
| tokenizer1 = BertTokenizer.from_pretrained(model_path) |
| model1 = GPT2LMHeadModel.from_pretrained(model_path) |
|
|
| if device.startswith("cuda"): |
| zh_pe_model = Obj(model1, tokenizer1, device = "cuda:0") |
| else: |
| zh_pe_model = Obj(model1, tokenizer1, device = "cpu") |
|
|
| def one_ele_trans(x): |
| x = x.strip() |
| x = x[1:] if x.startswith("'") else x |
| x = x[:-1] if x.endswith("'") else x |
| x = x[1:] if x.startswith('"') else x |
| x = x[:-1] if x.endswith('"') else x |
| return x |
|
|
| def stdf_prompt_expander(x): |
| assert type(x) == type("") |
| return zh_pe_model.predict( |
| one_ele_trans(x.strip()).strip(), |
| max_length = 128 |
| )[0].replace(" ", "").strip() |
|
|
| def sample_pred(context, times = 5, stdf_prompt_expander = lambda _: _): |
| df_req = [] |
| for i in tqdm(range(times)): |
| ele = stdf_prompt_expander(context) |
| |
| l = simple_pred(ele, do_sample = True) |
| df_req.append( |
| [ele, l] |
| ) |
| df = pd.DataFrame(df_req) |
| df.columns = ["context", "dialogue"] |
| df["fuzz"] = df["dialogue"].map( |
| lambda x: fuzz.ratio(context, " ".join(x)) |
| ) |
| df["max_fuzz"] = df["dialogue"].map( |
| lambda x: max(map(lambda y: fuzz.ratio(y, context), x)) |
| ) |
| df["length"] = df["dialogue"].map(len) |
| df["rpt_score"] = df["dialogue"].map(repeat_score) |
| df["score_tuple"] = df.apply( |
| lambda x: (x["fuzz"], -1 * x["max_fuzz"], x["length"], -1 * x["rpt_score"]), axis = 1 |
| ) |
| df = percentile_sort(df) |
| return df |
|
|
| def sample_pred_wrapper(context, i2c_obj, times = 5, extend_by_diffusion = False): |
| assert type(context) == type("") |
| if any(map(lambda x: context.endswith(x), [".jpg", ".png", ".jpeg"])): |
| img_path = context |
| i2c_df = i2c_obj.predict_to_df([img_path]) |
| assert i2c_df.size > 0 |
| context = i2c_df["caption"].iloc[0] |
| else: |
| pass |
| assert type(context) == type("") |
| if extend_by_diffusion: |
| req_df = sample_pred(context, times = times, stdf_prompt_expander = stdf_prompt_expander) |
| else: |
| req_df = sample_pred(context, times = times, stdf_prompt_expander = lambda _:_) |
| return req_df |
|
|
| from ofa import * |
| ofa_obj = OFA() |
|
|
| if __name__ == "__main__": |
| ''' |
| from image2caption import * |
| i2c_tiny_zh_obj = Image2Caption("svjack/vit-gpt-diffusion-zh", |
| overwrite_encoder_checkpoint_path = "google/vit-base-patch16-224", |
| overwrite_token_model_path = "IDEA-CCNL/Wenzhong-GPT2-110M", |
| device = device |
| ) |
| ''' |
| from ofa import * |
| ofa_obj = OFA() |
|
|
| img_path = "../pic/bug.jpg" |
| img_path = "../pic/baobao.jpeg" |
| img_path = "../pic/cat0.jpg" |
| img_path = "../pic/cat.jpg" |
| os.path.exists(img_path) |
|
|
| df = sample_pred_wrapper(img_path, i2c_obj = ofa_obj) |
| df["dialogue"].values.tolist() |
|
|
| img_url = "https://datasets-server.huggingface.co/assets/metashift/--/metashift/train/2/image/image.jpg" |
| img_url = "https://datasets-server.huggingface.co/assets/metashift/--/metashift/train/6/image/image.jpg" |
|
|
| |
| df = sample_pred_wrapper(img_url, i2c_obj = ofa_obj) |
| df["dialogue"].values.tolist() |
|
|
| ds_en_zh_df = pd.read_csv("../ds_en_zh_df.csv") |
|
|
| idx = 3 |
| ds_en_zh_df.iloc[:, -1].iloc[idx] |
|
|
| df = sample_pred(ds_en_zh_df.iloc[:, -1].iloc[idx]) |
| df["dialogue"].values.tolist() |
|
|