from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer from transformers.trainer_callback import EarlyStoppingCallback import torch from torch.utils.data import Dataset import numpy as np from sklearn.model_selection import train_test_split, KFold from seqeval.metrics import classification_report, f1_score, precision_score, recall_score import random import os from ner_config import NERConfig # 设置随机种子 def set_seed(seed): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) set_seed(NERConfig.SEED) # 更新标签列表,确保包含所有可能的标签 label_list = [ "O", "B-POST", "I-POST", "B-COMPANY", "I-COMPANY", "B-ADDRESS", "I-ADDRESS", "B-PICKUP_CODE", "I-PICKUP_CODE", # 添加取件码标签 "B-STATION", "I-STATION" # 添加站点标签 ] class NERDataset(Dataset): def __init__(self, texts, labels, tokenizer, label_list): self.texts = texts self.labels = labels self.tokenizer = tokenizer # 创建标签到ID的映射 self.label2id = {label: i for i, label in enumerate(label_list)} self.id2label = {i: label for i, label in enumerate(label_list)} # 打印标签映射信息,用于调试 print("标签映射:") for label, idx in self.label2id.items(): print(f"{label}: {idx}") # 对文本进行编码 self.encodings = self.tokenize_and_align_labels() def tokenize_and_align_labels(self): tokenized_inputs = self.tokenizer( [''.join(text) for text in self.texts], truncation=True, padding=True, max_length=NERConfig.MAX_LENGTH, return_offsets_mapping=True, return_tensors=None ) labels = [] for i, label in enumerate(self.labels): word_ids = tokenized_inputs.word_ids(i) previous_word_idx = None label_ids = [] current_entity = None for word_idx in word_ids: if word_idx is None: label_ids.append(-100) elif word_idx != previous_word_idx: # 新词开始 label_ids.append(self.label2id[label[word_idx]]) if label[word_idx].startswith("B-"): current_entity = label[word_idx][2:] elif label[word_idx] == "O": current_entity = None else: # 同一个词的后续token if current_entity: label_ids.append(self.label2id[f"I-{current_entity}"]) else: label_ids.append(self.label2id["O"]) previous_word_idx = word_idx labels.append(label_ids) tokenized_inputs["labels"] = labels return tokenized_inputs def __getitem__(self, idx): return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} def __len__(self): return len(self.texts) def load_data(file_path): texts, labels = [], [] current_words, current_labels = [], [] def clean_pickup_code_labels(words, labels): """清理取件码标注,确保不包含额外文字""" i = 0 while i < len(words): if labels[i].startswith("B-PICKUP_CODE"): # 找到取件码的结束位置 j = i + 1 while j < len(words) and ( labels[j].startswith("I-PICKUP_CODE") and (words[j].isalnum() or words[j] == "-") ): j += 1 # 检查并修正取件码序列 code_words = words[i:j] code_str = ''.join(code_words) # 如果发现无效字符或结束词,提前结束取件码 for k, word in enumerate(code_words): if not (word.isalnum() or word == "-") or word in ["的", "来", "到", "取", "件", "码"]: # 将后续标签改为O for m in range(i + k, j): labels[m] = "O" break i = j else: i += 1 return words, labels with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: try: word, label = line.split(maxsplit=1) current_words.append(word) current_labels.append(label) except Exception as e: print(f"错误:处理行时出错: '{line}'") continue elif current_words: # 遇到空行且当前有数据 # 清理取件码标注 current_words, current_labels = clean_pickup_code_labels(current_words, current_labels) texts.append(current_words) labels.append(current_labels) current_words, current_labels = [], [] if current_words: # 处理最后一个样本 current_words, current_labels = clean_pickup_code_labels(current_words, current_labels) texts.append(current_words) labels.append(current_labels) return texts, labels def compute_metrics(p): """计算评估指标""" predictions, labels = p predictions = np.argmax(predictions, axis=2) # 移除特殊token的预测和标签 true_predictions = [ [NERConfig.LABELS[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] true_labels = [ [NERConfig.LABELS[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels) ] # 计算总体评估指标 results = { "overall_f1": f1_score(true_labels, true_predictions), "overall_precision": precision_score(true_labels, true_predictions), "overall_recall": recall_score(true_labels, true_predictions) } # 计算每个实体类型的指标 for entity_type in ["POST", "COMPANY", "ADDRESS", "PICKUP_CODE"]: # 将标签转换为二进制形式(是否属于当前实体类型) binary_preds = [] binary_labels = [] for pred_seq, label_seq in zip(true_predictions, true_labels): pred_binary = [] label_binary = [] for pred, label in zip(pred_seq, label_seq): # 检查标签是否属于当前实体类型 pred_is_entity = pred.endswith(entity_type) label_is_entity = label.endswith(entity_type) pred_binary.append(1 if pred_is_entity else 0) label_binary.append(1 if label_is_entity else 0) binary_preds.append(pred_binary) binary_labels.append(label_binary) # 计算当前实体类型的F1分数 try: entity_f1 = f1_score( sum(binary_labels, []), # 展平列表 sum(binary_preds, []), # 展平列表 average='binary' # 使用二进制评估 ) results[f"{entity_type}_f1"] = entity_f1 except Exception as e: print(f"计算{entity_type}的F1分数时出错: {str(e)}") results[f"{entity_type}_f1"] = 0.0 return results def augment_data(texts, labels): """数据增强""" augmented_texts = [] augmented_labels = [] for text, label in zip(texts, labels): # 原始数据 augmented_texts.append(text) augmented_labels.append(label) # 删除一些无关字符 if NERConfig.USE_DATA_AUGMENTATION: new_text = [] new_label = [] for t, l in zip(text, label): if l == "O" and random.random() < NERConfig.AUGMENTATION_RATIO: continue new_text.append(t) new_label.append(l) augmented_texts.append(new_text) augmented_labels.append(new_label) return augmented_texts, augmented_labels def train_with_different_seeds(texts, labels, n_seeds=2): """使用不同的随机种子进行多次训练""" best_model = None best_f1 = 0 results = [] # 创建输出目录 os.makedirs(NERConfig.MODEL_PATH, exist_ok=True) os.makedirs(NERConfig.LOG_PATH, exist_ok=True) for seed in range(n_seeds): print(f"\n=== 使用随机种子 {seed} 开始训练 ===") # 设置随机种子 set_seed(seed) # 加载分词器和模型 tokenizer = AutoTokenizer.from_pretrained(NERConfig.MODEL_NAME) model = AutoModelForTokenClassification.from_pretrained( NERConfig.MODEL_NAME, num_labels=len(NERConfig.LABELS), id2label={i: label for i, label in enumerate(NERConfig.LABELS)}, label2id={label: i for i, label in enumerate(NERConfig.LABELS)} ) # 创建数据集 train_texts, val_texts, train_labels, val_labels = train_test_split( texts, labels, test_size=NERConfig.TEST_SIZE, random_state=seed ) # 数据增强 if NERConfig.USE_DATA_AUGMENTATION: train_texts, train_labels = augment_data(train_texts, train_labels) train_dataset = NERDataset(train_texts, train_labels, tokenizer, NERConfig.LABELS) val_dataset = NERDataset(val_texts, val_labels, tokenizer, NERConfig.LABELS) # 训练参数 training_args = TrainingArguments( output_dir=f"{NERConfig.MODEL_PATH}/seed_{seed}", num_train_epochs=NERConfig.EPOCHS, per_device_train_batch_size=NERConfig.BATCH_SIZE, per_device_eval_batch_size=NERConfig.BATCH_SIZE, learning_rate=NERConfig.LEARNING_RATE, warmup_ratio=NERConfig.WARMUP_RATIO, weight_decay=NERConfig.WEIGHT_DECAY, gradient_accumulation_steps=NERConfig.GRADIENT_ACCUMULATION_STEPS, logging_steps=NERConfig.LOGGING_STEPS, save_total_limit=2, no_cuda=True, evaluation_strategy="steps", eval_steps=NERConfig.EVAL_STEPS, save_strategy="steps", save_steps=NERConfig.EVAL_STEPS, load_best_model_at_end=True, metric_for_best_model="overall_f1", greater_is_better=True, logging_dir=f"{NERConfig.LOG_PATH}/seed_{seed}", logging_first_step=True, report_to=["tensorboard"], ) # 训练器 trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, compute_metrics=compute_metrics, # 使用外部定义的compute_metrics callbacks=[EarlyStoppingCallback(early_stopping_patience=NERConfig.EARLY_STOPPING_PATIENCE)] ) # 训练模型 trainer.train() # 评估结果 eval_results = trainer.evaluate() results.append({ "seed": seed, "f1": eval_results["eval_overall_f1"], # 使用overall_f1而不是eval_f1 "metrics": eval_results }) # 如果是最好的模型,保存它 if eval_results["eval_overall_f1"] > best_f1: best_f1 = eval_results["eval_overall_f1"] best_model = (model, tokenizer) # 打印所有结果 print("\n=== 训练结果汇总 ===") for result in results: print(f"\n种子 {result['seed']} 的评估结果:") print(f"总体 F1: {result['f1']:.4f}") for key, value in result['metrics'].items(): if key.endswith('_f1'): print(f"{key}: {value:.4f}") # 保存最佳模型 if best_model: model, tokenizer = best_model save_path = f"{NERConfig.MODEL_PATH}/best_model" print(f"\n保存最佳模型到: {save_path}") model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) return best_model, results def main(): # 加载数据 texts, labels = load_data(NERConfig.DATA_PATH) print(f"加载的数据集大小:{len(texts)}个样本") # 使用不同种子进行多次训练 best_model, results = train_with_different_seeds(texts, labels, n_seeds=NERConfig.N_SEEDS) if __name__ == "__main__": main()