from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
|
from transformers.trainer_callback import EarlyStoppingCallback
|
import torch
|
from torch.utils.data import Dataset
|
import numpy as np
|
from sklearn.model_selection import train_test_split, KFold
|
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score
|
import random
|
import os
|
from ner_config import NERConfig
|
|
# 设置随机种子
|
def set_seed(seed):
|
random.seed(seed)
|
np.random.seed(seed)
|
torch.manual_seed(seed)
|
if torch.cuda.is_available():
|
torch.cuda.manual_seed_all(seed)
|
|
set_seed(NERConfig.SEED)
|
|
# 更新标签列表,确保包含所有可能的标签
|
label_list = [
|
"O",
|
"B-POST", "I-POST",
|
"B-COMPANY", "I-COMPANY",
|
"B-ADDRESS", "I-ADDRESS",
|
"B-PICKUP_CODE", "I-PICKUP_CODE", # 添加取件码标签
|
"B-STATION", "I-STATION" # 添加站点标签
|
]
|
|
|
class NERDataset(Dataset):
|
def __init__(self, texts, labels, tokenizer, label_list):
|
self.texts = texts
|
self.labels = labels
|
self.tokenizer = tokenizer
|
# 创建标签到ID的映射
|
self.label2id = {label: i for i, label in enumerate(label_list)}
|
self.id2label = {i: label for i, label in enumerate(label_list)}
|
|
# 打印标签映射信息,用于调试
|
print("标签映射:")
|
for label, idx in self.label2id.items():
|
print(f"{label}: {idx}")
|
|
# 对文本进行编码
|
self.encodings = self.tokenize_and_align_labels()
|
|
def tokenize_and_align_labels(self):
|
tokenized_inputs = self.tokenizer(
|
[''.join(text) for text in self.texts],
|
truncation=True,
|
padding=True,
|
max_length=NERConfig.MAX_LENGTH,
|
return_offsets_mapping=True,
|
return_tensors=None
|
)
|
|
labels = []
|
for i, label in enumerate(self.labels):
|
word_ids = tokenized_inputs.word_ids(i)
|
previous_word_idx = None
|
label_ids = []
|
current_entity = None
|
|
for word_idx in word_ids:
|
if word_idx is None:
|
label_ids.append(-100)
|
elif word_idx != previous_word_idx:
|
# 新词开始
|
label_ids.append(self.label2id[label[word_idx]])
|
if label[word_idx].startswith("B-"):
|
current_entity = label[word_idx][2:]
|
elif label[word_idx] == "O":
|
current_entity = None
|
else:
|
# 同一个词的后续token
|
if current_entity:
|
label_ids.append(self.label2id[f"I-{current_entity}"])
|
else:
|
label_ids.append(self.label2id["O"])
|
|
previous_word_idx = word_idx
|
|
labels.append(label_ids)
|
|
tokenized_inputs["labels"] = labels
|
return tokenized_inputs
|
|
def __getitem__(self, idx):
|
return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
|
|
def __len__(self):
|
return len(self.texts)
|
|
def load_data(file_path):
|
texts, labels = [], []
|
current_words, current_labels = [], []
|
|
def clean_pickup_code_labels(words, labels):
|
"""清理取件码标注,确保不包含额外文字"""
|
i = 0
|
while i < len(words):
|
if labels[i].startswith("B-PICKUP_CODE"):
|
# 找到取件码的结束位置
|
j = i + 1
|
while j < len(words) and (
|
labels[j].startswith("I-PICKUP_CODE") and
|
(words[j].isalnum() or words[j] == "-")
|
):
|
j += 1
|
|
# 检查并修正取件码序列
|
code_words = words[i:j]
|
code_str = ''.join(code_words)
|
|
# 如果发现无效字符或结束词,提前结束取件码
|
for k, word in enumerate(code_words):
|
if not (word.isalnum() or word == "-") or word in ["的", "来", "到", "取", "件", "码"]:
|
# 将后续标签改为O
|
for m in range(i + k, j):
|
labels[m] = "O"
|
break
|
|
i = j
|
else:
|
i += 1
|
|
return words, labels
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
for line in f:
|
line = line.strip()
|
if line:
|
try:
|
word, label = line.split(maxsplit=1)
|
current_words.append(word)
|
current_labels.append(label)
|
except Exception as e:
|
print(f"错误:处理行时出错: '{line}'")
|
continue
|
elif current_words: # 遇到空行且当前有数据
|
# 清理取件码标注
|
current_words, current_labels = clean_pickup_code_labels(current_words, current_labels)
|
texts.append(current_words)
|
labels.append(current_labels)
|
current_words, current_labels = [], []
|
|
if current_words: # 处理最后一个样本
|
current_words, current_labels = clean_pickup_code_labels(current_words, current_labels)
|
texts.append(current_words)
|
labels.append(current_labels)
|
|
return texts, labels
|
|
def compute_metrics(p):
|
"""计算评估指标"""
|
predictions, labels = p
|
predictions = np.argmax(predictions, axis=2)
|
|
# 移除特殊token的预测和标签
|
true_predictions = [
|
[NERConfig.LABELS[p] for (p, l) in zip(prediction, label) if l != -100]
|
for prediction, label in zip(predictions, labels)
|
]
|
true_labels = [
|
[NERConfig.LABELS[l] for (p, l) in zip(prediction, label) if l != -100]
|
for prediction, label in zip(predictions, labels)
|
]
|
|
# 计算总体评估指标
|
results = {
|
"overall_f1": f1_score(true_labels, true_predictions),
|
"overall_precision": precision_score(true_labels, true_predictions),
|
"overall_recall": recall_score(true_labels, true_predictions)
|
}
|
|
# 计算每个实体类型的指标
|
for entity_type in ["POST", "COMPANY", "ADDRESS", "PICKUP_CODE"]:
|
# 将标签转换为二进制形式(是否属于当前实体类型)
|
binary_preds = []
|
binary_labels = []
|
|
for pred_seq, label_seq in zip(true_predictions, true_labels):
|
pred_binary = []
|
label_binary = []
|
|
for pred, label in zip(pred_seq, label_seq):
|
# 检查标签是否属于当前实体类型
|
pred_is_entity = pred.endswith(entity_type)
|
label_is_entity = label.endswith(entity_type)
|
|
pred_binary.append(1 if pred_is_entity else 0)
|
label_binary.append(1 if label_is_entity else 0)
|
|
binary_preds.append(pred_binary)
|
binary_labels.append(label_binary)
|
|
# 计算当前实体类型的F1分数
|
try:
|
entity_f1 = f1_score(
|
sum(binary_labels, []), # 展平列表
|
sum(binary_preds, []), # 展平列表
|
average='binary' # 使用二进制评估
|
)
|
results[f"{entity_type}_f1"] = entity_f1
|
except Exception as e:
|
print(f"计算{entity_type}的F1分数时出错: {str(e)}")
|
results[f"{entity_type}_f1"] = 0.0
|
|
return results
|
|
def augment_data(texts, labels):
|
"""数据增强"""
|
augmented_texts = []
|
augmented_labels = []
|
for text, label in zip(texts, labels):
|
# 原始数据
|
augmented_texts.append(text)
|
augmented_labels.append(label)
|
|
# 删除一些无关字符
|
if NERConfig.USE_DATA_AUGMENTATION:
|
new_text = []
|
new_label = []
|
for t, l in zip(text, label):
|
if l == "O" and random.random() < NERConfig.AUGMENTATION_RATIO:
|
continue
|
new_text.append(t)
|
new_label.append(l)
|
augmented_texts.append(new_text)
|
augmented_labels.append(new_label)
|
|
return augmented_texts, augmented_labels
|
|
def train_with_different_seeds(texts, labels, n_seeds=2):
|
"""使用不同的随机种子进行多次训练"""
|
best_model = None
|
best_f1 = 0
|
results = []
|
|
# 创建输出目录
|
os.makedirs(NERConfig.MODEL_PATH, exist_ok=True)
|
os.makedirs(NERConfig.LOG_PATH, exist_ok=True)
|
|
for seed in range(n_seeds):
|
print(f"\n=== 使用随机种子 {seed} 开始训练 ===")
|
|
# 设置随机种子
|
set_seed(seed)
|
|
# 加载分词器和模型
|
tokenizer = AutoTokenizer.from_pretrained(NERConfig.MODEL_NAME)
|
model = AutoModelForTokenClassification.from_pretrained(
|
NERConfig.MODEL_NAME,
|
num_labels=len(NERConfig.LABELS),
|
id2label={i: label for i, label in enumerate(NERConfig.LABELS)},
|
label2id={label: i for i, label in enumerate(NERConfig.LABELS)}
|
)
|
|
# 创建数据集
|
train_texts, val_texts, train_labels, val_labels = train_test_split(
|
texts, labels, test_size=NERConfig.TEST_SIZE, random_state=seed
|
)
|
|
# 数据增强
|
if NERConfig.USE_DATA_AUGMENTATION:
|
train_texts, train_labels = augment_data(train_texts, train_labels)
|
|
train_dataset = NERDataset(train_texts, train_labels, tokenizer, NERConfig.LABELS)
|
val_dataset = NERDataset(val_texts, val_labels, tokenizer, NERConfig.LABELS)
|
|
# 训练参数
|
training_args = TrainingArguments(
|
output_dir=f"{NERConfig.MODEL_PATH}/seed_{seed}",
|
num_train_epochs=NERConfig.EPOCHS,
|
per_device_train_batch_size=NERConfig.BATCH_SIZE,
|
per_device_eval_batch_size=NERConfig.BATCH_SIZE,
|
learning_rate=NERConfig.LEARNING_RATE,
|
warmup_ratio=NERConfig.WARMUP_RATIO,
|
weight_decay=NERConfig.WEIGHT_DECAY,
|
gradient_accumulation_steps=NERConfig.GRADIENT_ACCUMULATION_STEPS,
|
logging_steps=NERConfig.LOGGING_STEPS,
|
save_total_limit=2,
|
no_cuda=True,
|
evaluation_strategy="steps",
|
eval_steps=NERConfig.EVAL_STEPS,
|
save_strategy="steps",
|
save_steps=NERConfig.EVAL_STEPS,
|
load_best_model_at_end=True,
|
metric_for_best_model="overall_f1",
|
greater_is_better=True,
|
logging_dir=f"{NERConfig.LOG_PATH}/seed_{seed}",
|
logging_first_step=True,
|
report_to=["tensorboard"],
|
)
|
|
# 训练器
|
trainer = Trainer(
|
model=model,
|
args=training_args,
|
train_dataset=train_dataset,
|
eval_dataset=val_dataset,
|
compute_metrics=compute_metrics, # 使用外部定义的compute_metrics
|
callbacks=[EarlyStoppingCallback(early_stopping_patience=NERConfig.EARLY_STOPPING_PATIENCE)]
|
)
|
|
# 训练模型
|
trainer.train()
|
|
# 评估结果
|
eval_results = trainer.evaluate()
|
results.append({
|
"seed": seed,
|
"f1": eval_results["eval_overall_f1"], # 使用overall_f1而不是eval_f1
|
"metrics": eval_results
|
})
|
|
# 如果是最好的模型,保存它
|
if eval_results["eval_overall_f1"] > best_f1:
|
best_f1 = eval_results["eval_overall_f1"]
|
best_model = (model, tokenizer)
|
|
# 打印所有结果
|
print("\n=== 训练结果汇总 ===")
|
for result in results:
|
print(f"\n种子 {result['seed']} 的评估结果:")
|
print(f"总体 F1: {result['f1']:.4f}")
|
for key, value in result['metrics'].items():
|
if key.endswith('_f1'):
|
print(f"{key}: {value:.4f}")
|
|
# 保存最佳模型
|
if best_model:
|
model, tokenizer = best_model
|
save_path = f"{NERConfig.MODEL_PATH}/best_model"
|
print(f"\n保存最佳模型到: {save_path}")
|
model.save_pretrained(save_path)
|
tokenizer.save_pretrained(save_path)
|
|
return best_model, results
|
|
def main():
|
# 加载数据
|
texts, labels = load_data(NERConfig.DATA_PATH)
|
print(f"加载的数据集大小:{len(texts)}个样本")
|
|
# 使用不同种子进行多次训练
|
best_model, results = train_with_different_seeds(texts, labels, n_seeds=NERConfig.N_SEEDS)
|
|
if __name__ == "__main__":
|
main()
|