You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
9.5 KiB
Python

8 months ago
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rapidfuzz.distance import Levenshtein
from difflib import SequenceMatcher
import numpy as np
import string
from .bleu import compute_blue_score, compute_edit_distance
class RecMetric(object):
def __init__(
self, main_indicator="acc", is_filter=False, ignore_space=True, **kwargs
):
self.main_indicator = main_indicator
self.is_filter = is_filter
self.ignore_space = ignore_space
self.eps = 1e-5
self.reset()
def _normalize_text(self, text):
text = "".join(
filter(lambda x: x in (string.digits + string.ascii_letters), text)
)
return text.lower()
def __call__(self, pred_label, *args, **kwargs):
preds, labels = pred_label
correct_num = 0
all_num = 0
norm_edit_dis = 0.0
for (pred, pred_conf), (target, _) in zip(preds, labels):
if self.ignore_space:
pred = pred.replace(" ", "")
target = target.replace(" ", "")
if self.is_filter:
pred = self._normalize_text(pred)
target = self._normalize_text(target)
norm_edit_dis += Levenshtein.normalized_distance(pred, target)
if pred == target:
correct_num += 1
all_num += 1
self.correct_num += correct_num
self.all_num += all_num
self.norm_edit_dis += norm_edit_dis
return {
"acc": correct_num / (all_num + self.eps),
"norm_edit_dis": 1 - norm_edit_dis / (all_num + self.eps),
}
def get_metric(self):
"""
return metrics {
'acc': 0,
'norm_edit_dis': 0,
}
"""
acc = 1.0 * self.correct_num / (self.all_num + self.eps)
norm_edit_dis = 1 - self.norm_edit_dis / (self.all_num + self.eps)
self.reset()
return {"acc": acc, "norm_edit_dis": norm_edit_dis}
def reset(self):
self.correct_num = 0
self.all_num = 0
self.norm_edit_dis = 0
class CNTMetric(object):
def __init__(self, main_indicator="acc", **kwargs):
self.main_indicator = main_indicator
self.eps = 1e-5
self.reset()
def __call__(self, pred_label, *args, **kwargs):
preds, labels = pred_label
correct_num = 0
all_num = 0
for pred, target in zip(preds, labels):
if pred == target:
correct_num += 1
all_num += 1
self.correct_num += correct_num
self.all_num += all_num
return {
"acc": correct_num / (all_num + self.eps),
}
def get_metric(self):
"""
return metrics {
'acc': 0,
}
"""
acc = 1.0 * self.correct_num / (self.all_num + self.eps)
self.reset()
return {"acc": acc}
def reset(self):
self.correct_num = 0
self.all_num = 0
class CANMetric(object):
def __init__(self, main_indicator="exp_rate", **kwargs):
self.main_indicator = main_indicator
self.word_right = []
self.exp_right = []
self.word_total_length = 0
self.exp_total_num = 0
self.word_rate = 0
self.exp_rate = 0
self.reset()
self.epoch_reset()
def __call__(self, preds, batch, **kwargs):
for k, v in kwargs.items():
epoch_reset = v
if epoch_reset:
self.epoch_reset()
word_probs = preds
word_label, word_label_mask = batch
line_right = 0
if word_probs is not None:
word_pred = word_probs.argmax(2)
word_pred = word_pred.cpu().detach().numpy()
word_scores = [
SequenceMatcher(
None, s1[: int(np.sum(s3))], s2[: int(np.sum(s3))], autojunk=False
).ratio()
* (len(s1[: int(np.sum(s3))]) + len(s2[: int(np.sum(s3))]))
/ len(s1[: int(np.sum(s3))])
/ 2
for s1, s2, s3 in zip(word_label, word_pred, word_label_mask)
]
batch_size = len(word_scores)
for i in range(batch_size):
if word_scores[i] == 1:
line_right += 1
self.word_rate = np.mean(word_scores) # float
self.exp_rate = line_right / batch_size # float
exp_length, word_length = word_label.shape[:2]
self.word_right.append(self.word_rate * word_length)
self.exp_right.append(self.exp_rate * exp_length)
self.word_total_length = self.word_total_length + word_length
self.exp_total_num = self.exp_total_num + exp_length
def get_metric(self):
"""
return {
'word_rate': 0,
"exp_rate": 0,
}
"""
cur_word_rate = sum(self.word_right) / self.word_total_length
cur_exp_rate = sum(self.exp_right) / self.exp_total_num
self.reset()
return {"word_rate": cur_word_rate, "exp_rate": cur_exp_rate}
def reset(self):
self.word_rate = 0
self.exp_rate = 0
def epoch_reset(self):
self.word_right = []
self.exp_right = []
self.word_total_length = 0
self.exp_total_num = 0
class LaTeXOCRMetric(object):
def __init__(self, main_indicator="exp_rate", cal_blue_score=False, **kwargs):
self.main_indicator = main_indicator
self.cal_blue_score = cal_blue_score
self.edit_right = []
self.exp_right = []
self.blue_right = []
self.e1_right = []
self.e2_right = []
self.e3_right = []
self.editdistance_total_length = 0
self.exp_total_num = 0
self.edit_dist = 0
self.exp_rate = 0
if self.cal_blue_score:
self.blue_score = 0
self.e1 = 0
self.e2 = 0
self.e3 = 0
self.reset()
self.epoch_reset()
def __call__(self, preds, batch, **kwargs):
for k, v in kwargs.items():
epoch_reset = v
if epoch_reset:
self.epoch_reset()
word_pred = preds
word_label = batch
line_right, e1, e2, e3 = 0, 0, 0, 0
lev_dist = []
for labels, prediction in zip(word_label, word_pred):
if prediction == labels:
line_right += 1
distance = compute_edit_distance(prediction, labels)
lev_dist.append(Levenshtein.normalized_distance(prediction, labels))
if distance <= 1:
e1 += 1
if distance <= 2:
e2 += 1
if distance <= 3:
e3 += 1
batch_size = len(lev_dist)
self.edit_dist = sum(lev_dist) # float
self.exp_rate = line_right # float
if self.cal_blue_score:
self.blue_score = compute_blue_score(word_pred, word_label)
self.e1 = e1
self.e2 = e2
self.e3 = e3
exp_length = len(word_label)
self.edit_right.append(self.edit_dist)
self.exp_right.append(self.exp_rate)
if self.cal_blue_score:
self.blue_right.append(self.blue_score * batch_size)
self.e1_right.append(self.e1)
self.e2_right.append(self.e2)
self.e3_right.append(self.e3)
self.editdistance_total_length = self.editdistance_total_length + exp_length
self.exp_total_num = self.exp_total_num + exp_length
def get_metric(self):
"""
return {
'edit distance': 0,
"blue_score": 0,
"exp_rate": 0,
}
"""
cur_edit_distance = sum(self.edit_right) / self.exp_total_num
cur_exp_rate = sum(self.exp_right) / self.exp_total_num
if self.cal_blue_score:
cur_blue_score = sum(self.blue_right) / self.editdistance_total_length
cur_exp_1 = sum(self.e1_right) / self.exp_total_num
cur_exp_2 = sum(self.e2_right) / self.exp_total_num
cur_exp_3 = sum(self.e3_right) / self.exp_total_num
self.reset()
if self.cal_blue_score:
return {
"blue_score": cur_blue_score,
"edit distance": cur_edit_distance,
"exp_rate": cur_exp_rate,
"exp_rate<=1 ": cur_exp_1,
"exp_rate<=2 ": cur_exp_2,
"exp_rate<=3 ": cur_exp_3,
}
else:
return {
"edit distance": cur_edit_distance,
"exp_rate": cur_exp_rate,
"exp_rate<=1 ": cur_exp_1,
"exp_rate<=2 ": cur_exp_2,
"exp_rate<=3 ": cur_exp_3,
}
def reset(self):
self.edit_dist = 0
self.exp_rate = 0
if self.cal_blue_score:
self.blue_score = 0
self.e1 = 0
self.e2 = 0
self.e3 = 0
def epoch_reset(self):
self.edit_right = []
self.exp_right = []
if self.cal_blue_score:
self.blue_right = []
self.e1_right = []
self.e2_right = []
self.e3_right = []
self.editdistance_total_length = 0
self.exp_total_num = 0