# -*- coding: utf-8 -*-
|
import os
|
import logging
|
from typing import Dict, Optional, Tuple
|
|
from flask import Flask, request, jsonify
|
from transformers import BertTokenizer, BertForSequenceClassification, AutoTokenizer, AutoModelForTokenClassification
|
import torch
|
from werkzeug.exceptions import BadRequest
|
from ner_config import NERConfig, RepaymentNERConfig, IncomeNERConfig, FlightNERConfig, TrainNERConfig
|
import re
|
|
# 配置日志
|
logging.basicConfig(
|
level=logging.INFO,
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
handlers=[
|
logging.FileHandler('app.log'),
|
logging.StreamHandler()
|
]
|
)
|
logger = logging.getLogger(__name__)
|
|
class ModelManager:
|
def __init__(self):
|
self.classifier_path = "./models/classifier"
|
self.ner_path = "./models/ner_model/best_model"
|
self.repayment_path = "./models/repayment_model/best_model"
|
self.income_path = "./models/income_model/best_model"
|
# self.flight_path = "./models/flight_model/best_model"
|
# self.train_path = "./models/train_model/best_model" # 添加火车票模型路径
|
|
# 检查模型文件
|
self._check_model_files()
|
|
# 加载模型
|
self.classifier_tokenizer, self.classifier_model = self._load_classifier()
|
self.ner_tokenizer, self.ner_model = self._load_ner()
|
self.repayment_tokenizer, self.repayment_model = self._load_repayment()
|
self.income_tokenizer, self.income_model = self._load_income()
|
# self.flight_tokenizer, self.flight_model = self._load_flight()
|
# self.train_tokenizer, self.train_model = self._load_train() # 加载火车票模型
|
|
# 将模型设置为评估模式
|
self.classifier_model.eval()
|
self.ner_model.eval()
|
self.repayment_model.eval()
|
self.income_model.eval()
|
# self.flight_model.eval()
|
# self.train_model.eval() # 设置火车票模型为评估模式
|
|
def _check_model_files(self):
|
"""检查模型文件是否存在"""
|
if not os.path.exists(self.classifier_path):
|
raise RuntimeError("分类模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.ner_path):
|
raise RuntimeError("NER模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.repayment_path):
|
raise RuntimeError("还款模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.income_path):
|
raise RuntimeError("收入模型文件不存在,请先运行训练脚本")
|
# if not os.path.exists(self.flight_path):
|
# raise RuntimeError("航班模型文件不存在,请先运行训练脚本")
|
# if not os.path.exists(self.train_path):
|
# raise RuntimeError("火车票模型文件不存在,请先运行训练脚本")
|
|
def _load_classifier(self) -> Tuple[BertTokenizer, BertForSequenceClassification]:
|
"""加载分类模型"""
|
try:
|
tokenizer = BertTokenizer.from_pretrained(self.classifier_path)
|
model = BertForSequenceClassification.from_pretrained(self.classifier_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载分类模型失败: {str(e)}")
|
raise
|
|
def _load_ner(self) -> Tuple[AutoTokenizer, AutoModelForTokenClassification]:
|
"""加载NER模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.ner_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.ner_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载NER模型失败: {str(e)}")
|
raise
|
|
def _load_repayment(self):
|
"""加载还款模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.repayment_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.repayment_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载还款模型失败: {str(e)}")
|
raise
|
|
def _load_income(self):
|
"""加载收入模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.income_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.income_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载收入模型失败: {str(e)}")
|
raise
|
|
def _load_flight(self):
|
"""加载航班模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.flight_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.flight_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载航班模型失败: {str(e)}")
|
raise
|
|
def _load_train(self):
|
"""加载火车票模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.train_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.train_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载火车票模型失败: {str(e)}")
|
raise
|
|
def classify_sms(self, text: str) -> str:
|
"""对短信进行分类"""
|
try:
|
inputs = self.classifier_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=64
|
)
|
with torch.no_grad():
|
outputs = self.classifier_model(**inputs)
|
pred_id = outputs.logits.argmax().item()
|
return self.classifier_model.config.id2label[pred_id]
|
except Exception as e:
|
logger.error(f"短信分类失败: {str(e)}")
|
raise
|
|
def extract_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取文本中的实体"""
|
try:
|
# 初始化结果字典
|
result = {
|
"post": None, # 快递公司
|
"company": None, # 寄件公司
|
"address": None, # 地址
|
"pickup_code": None, # 取件码
|
"time": None # 添加时间字段
|
}
|
|
# 第一阶段:直接从文本中提取取件码
|
pickup_code = self.extract_pickup_code_from_text(text)
|
if pickup_code:
|
result["pickup_code"] = pickup_code
|
|
# 第二阶段:使用NER模型提取其他实体
|
inputs = self.ner_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=64
|
)
|
with torch.no_grad():
|
outputs = self.ner_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.ner_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.ner_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
# 解析实体
|
current_entity = None
|
entity_start = None
|
for i, (token, tag) in enumerate(zip(tokens, tags)):
|
# 跳过取件码实体,因为我们已经单独处理了
|
if tag.startswith("B-") and tag[2:] != "PICKUP_CODE":
|
# 保存之前的实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
# 开始新实体
|
current_entity = {"type": tag[2:], "text": token}
|
entity_start = i
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"] and current_entity["type"] != "PICKUP_CODE":
|
current_entity["text"] += token
|
elif tag == "O" or tag.startswith("B-"):
|
# 结束当前实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 如果第一阶段没有提取到取件码,使用NER模型的结果
|
if not result["pickup_code"]:
|
# 重新解析一次,只提取取件码
|
current_entity = None
|
for i, (token, tag) in enumerate(zip(tokens, tags)):
|
if tag.startswith("B-") and tag[2:] == "PICKUP_CODE":
|
current_entity = {"type": tag[2:], "text": token}
|
elif tag.startswith("I-") and current_entity and tag[2:] == "PICKUP_CODE":
|
current_entity["text"] += token
|
elif (tag == "O" or tag.startswith("B-")) and current_entity:
|
result["pickup_code"] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity and current_entity["type"] == "PICKUP_CODE":
|
result["pickup_code"] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 后处理:清理和验证取件码
|
if result["pickup_code"]:
|
# 清理取件码
|
code = result["pickup_code"]
|
# 移除取件码后的额外文字
|
for word in ["的", "来", "到", "取", "件", "码"]:
|
if word in code:
|
code = code[:code.index(word)]
|
|
# 只保留字母、数字和连字符
|
code = ''.join(c for c in code if c.isalnum() or c == "-")
|
|
# 确保格式正确
|
parts = code.split("-")
|
valid_parts = []
|
for part in parts:
|
if part and any(c.isalnum() for c in part):
|
valid_parts.append(part)
|
|
if valid_parts:
|
result["pickup_code"] = "-".join(valid_parts)
|
else:
|
result["pickup_code"] = None
|
|
# 清理公司名称
|
if result["company"]:
|
company = result["company"]
|
invalid_words = ["包裹", "快递", "到"]
|
for word in invalid_words:
|
if company.endswith(word):
|
company = company[:-len(word)]
|
result["company"] = company.strip()
|
|
# 清理地址
|
if result["address"]:
|
address = result["address"]
|
invalid_suffixes = [",请尽快取件", ",询", "请尽快取件"]
|
for suffix in invalid_suffixes:
|
if address.endswith(suffix):
|
address = address[:-len(suffix)]
|
result["address"] = address.strip()
|
|
return result
|
except Exception as e:
|
logger.error(f"实体提取失败: {str(e)}")
|
raise
|
|
def extract_pickup_code_from_text(self, text: str) -> Optional[str]:
|
"""直接从文本提取取件码"""
|
# 先尝试直接匹配完整的取件码格式
|
pickup_patterns = [
|
# 带有上下文的取件码模式(包括字母数字组合)
|
r'凭\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})\s*(来|到)?', # 匹配"凭xx-xx-xxxx"格式
|
r'码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"码xx-xx-xxxx"格式
|
r'提货码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"提货码xx-xx-xxxx"格式
|
r'取件码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"取件码xx-xx-xxxx"格式
|
|
# 分组提取模式,对有上下文的取件码
|
r'凭\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})\s*(来|到)?', # 匹配"凭xx-xx-xxxx"
|
r'码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"码xx-xx-xxxx"
|
r'提货码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"提货码xx-xx-xxxx"
|
r'取件码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"取件码xx-xx-xxxx"
|
|
# 独立的取件码模式
|
r'([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配独立的xx-xx-xxxx格式,支持字母
|
r'(\d{1,2}-\d{1,2}-\d{4,6})', # 匹配独立的xx-xx-xxxx格式,纯数字
|
]
|
|
# 首先尝试整体匹配
|
for pattern in pickup_patterns[:4]: # 前4个是整体匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
return matches[0] if isinstance(matches[0], str) else matches[0][0]
|
|
# 然后尝试分组匹配
|
for pattern in pickup_patterns[4:8]: # 中间4个是分组匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
# 分组匹配,第一部分可以是字母数字组合
|
match = matches[0]
|
if len(match) >= 3:
|
# 确保第2、3部分是数字
|
if match[1].isdigit() and match[2].isdigit():
|
return f"{match[0]}-{match[1]}-{match[2]}"
|
|
# 最后尝试独立模式
|
for pattern in pickup_patterns[8:10]: # 最后2个是独立匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
return matches[0]
|
|
# 特殊处理包含"取件码"的文本
|
code_indicators = ["取件码", "提货码"]
|
for indicator in code_indicators:
|
if indicator in text:
|
idx = text.find(indicator)
|
# 检查紧跟着的文本
|
search_text = text[idx:idx+35] # 扩大搜索范围
|
# 尝试匹配"取件码A8-1-15"这样的格式
|
special_match = re.search(r'[码]\s*([A-Za-z0-9]+[-\s]+\d+[-\s]+\d+)', search_text)
|
if special_match:
|
# 规范化格式
|
code = special_match.group(1)
|
# 将空格替换为连字符,确保格式一致
|
code = re.sub(r'\s+', '-', code)
|
# 确保连字符格式正确
|
code = re.sub(r'-+', '-', code)
|
return code
|
|
# 特殊处理"凭"后面的数字序列
|
if "凭" in text:
|
pickup_index = text.find("凭")
|
if pickup_index != -1:
|
# 在"凭"之后的25个字符内寻找取件码
|
search_text = text[pickup_index:pickup_index+35]
|
|
# 尝试提取形如"凭A8-1-15"或"凭22-4-1111"的格式
|
alpha_num_match = re.search(r'凭\s*([A-Za-z0-9]+)[-\s]+(\d+)[-\s]+(\d+)', search_text)
|
if alpha_num_match:
|
return f"{alpha_num_match.group(1)}-{alpha_num_match.group(2)}-{alpha_num_match.group(3)}"
|
|
# 提取所有字母数字序列
|
parts = re.findall(r'[A-Za-z0-9]+', search_text)
|
if len(parts) >= 3:
|
# 组合前三个部分
|
return f"{parts[0]}-{parts[1]}-{parts[2]}"
|
|
# 查找形如"A8-1-15"的取件码
|
alpha_num_codes = re.findall(r'([A-Za-z]\d+)-(\d+)-(\d+)', text)
|
if alpha_num_codes:
|
match = alpha_num_codes[0]
|
return f"{match[0]}-{match[1]}-{match[2]}"
|
|
return None
|
|
def extract_repayment_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取还款相关实体"""
|
try:
|
result = {
|
"bank": None, # 还款主体
|
"type": None, # 还款类型
|
"amount": None, # 还款金额
|
"date": None, # 还款日期
|
"number": None, # 账号尾号
|
"min_amount": None # 最低还款金额
|
}
|
|
inputs = self.repayment_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=RepaymentNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.repayment_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.repayment_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.repayment_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
def clean_amount(amount_text: str, context: str) -> Optional[str]:
|
"""清理和标准化金额"""
|
if not amount_text:
|
return None
|
|
# 尝试直接在上下文中使用正则表达式查找更完整的金额
|
# 如果在同一句话里有类似"应还金额5,800元"这样的模式
|
amount_match = re.search(r'(?:应还|还款)?金额([\d,]+\.?\d*)(?:元|块钱|块|万元|万)?', context)
|
if amount_match:
|
return amount_match.group(1) # 直接返回匹配到的金额,保留原始格式
|
|
# 尝试查找最低还款金额
|
min_amount_match = re.search(r'最低还款([\d,]+\.?\d*)(?:元|块钱|块|万元|万)?', context)
|
if min_amount_match and "MIN_CODE" in current_entity["type"]:
|
return min_amount_match.group(1) # 直接返回匹配到的最低还款金额,保留原始格式
|
|
# 在上下文中查找完整金额
|
amount_index = context.find(amount_text)
|
if amount_index != -1:
|
# 扩大搜索范围,查找完整金额
|
search_start = max(0, amount_index - 10)
|
search_end = min(len(context), amount_index + len(amount_text) + 10)
|
search_text = context[search_start:search_end]
|
|
# 使用正则表达式查找金额
|
amount_pattern = re.compile(r'([\d,]+\.?\d*)(?:元|块钱|块|万元|万)?')
|
matches = list(amount_pattern.finditer(search_text))
|
|
if matches:
|
# 选择最接近的匹配结果
|
best_match = None
|
min_distance = float('inf')
|
|
for match in matches:
|
distance = abs(match.start() - (amount_index - search_start))
|
if distance < min_distance:
|
min_distance = distance
|
best_match = match.group(1) # 只取数字部分,保留逗号
|
|
if best_match:
|
return best_match
|
|
# 如果上述方法都没找到,则保留原始提取结果但验证其有效性
|
# 移除货币符号和无效词
|
for symbol in RepaymentNERConfig.AMOUNT_CONFIG['currency_symbols']:
|
amount_text = amount_text.replace(symbol, '')
|
for word in RepaymentNERConfig.AMOUNT_CONFIG['invalid_words']:
|
amount_text = amount_text.replace(word, '')
|
|
# 验证金额有效性
|
clean_amount = amount_text.replace(',', '')
|
try:
|
value = float(clean_amount)
|
if value > 0:
|
# 返回原始格式
|
return amount_text
|
except ValueError:
|
pass
|
|
return None
|
|
# 实体提取
|
current_entity = None
|
entities = {
|
"BANK": [],
|
"TYPE": [],
|
"PICKUP_CODE": [],
|
"DATE": [],
|
"NUMBER": [],
|
"MIN_CODE": []
|
}
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = {"type": tag[2:], "text": token.replace("##", "")}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token.replace("##", "")
|
else:
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = None
|
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
|
# 处理银行名称
|
if entities["BANK"]:
|
bank_parts = []
|
seen = set()
|
for bank in entities["BANK"]:
|
bank = bank.strip()
|
if bank and bank not in seen:
|
bank_parts.append(bank)
|
seen.add(bank)
|
bank = "".join(bank_parts)
|
if len(bank) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["BANK"]:
|
result["bank"] = bank
|
|
# 处理还款类型
|
if entities["TYPE"]:
|
type_parts = []
|
seen = set()
|
for type_ in entities["TYPE"]:
|
type_ = type_.strip()
|
if type_ and type_ not in seen:
|
type_parts.append(type_)
|
seen.add(type_)
|
type_ = "".join(type_parts)
|
if len(type_) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["TYPE"]:
|
result["type"] = type_
|
|
# 处理账号尾号
|
if entities["NUMBER"]:
|
number = "".join(c for c in "".join(entities["NUMBER"]) if c.isdigit())
|
if number and len(number) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["NUMBER"]:
|
result["number"] = number
|
|
# 处理日期
|
if entities["DATE"]:
|
date = "".join(entities["DATE"])
|
date = ''.join(c for c in date if c.isdigit() or c in ['年', '月', '日', '-'])
|
if date:
|
result["date"] = date
|
|
# 处理金额
|
# 先尝试使用正则表达式直接匹配金额
|
amount_match = re.search(r'(?:应还|还款)?金额([\d,]+\.?\d*)(?:元|块钱|块|万元|万)?', text)
|
if amount_match:
|
amount = amount_match.group(1) # 保留原始格式(带逗号)
|
# 验证金额有效性
|
try:
|
value = float(amount.replace(',', ''))
|
if value > 0:
|
result["amount"] = amount
|
except ValueError:
|
pass
|
|
# 如果正则没有匹配到,使用NER结果
|
if not result["amount"]:
|
amount_candidates = []
|
# 从识别的实体中获取
|
for amount in entities["PICKUP_CODE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
try:
|
value = float(cleaned_amount.replace(',', ''))
|
amount_candidates.append((cleaned_amount, value))
|
except ValueError:
|
continue
|
|
# 如果还是没有找到,尝试从文本中提取
|
if not amount_candidates:
|
# 使用更宽松的正则表达式匹配金额
|
amount_pattern = re.compile(r'([\d,]+\.?\d*)(?:元|块钱|块|万元|万)')
|
matches = list(amount_pattern.finditer(text))
|
|
for match in matches:
|
amount_text = match.group(1) # 获取数字部分,保留逗号
|
try:
|
value = float(amount_text.replace(',', ''))
|
amount_candidates.append((amount_text, value))
|
except ValueError:
|
continue
|
|
# 选择最大的有效金额
|
if amount_candidates:
|
result["amount"] = max(amount_candidates, key=lambda x: x[1])[0]
|
|
# 处理最低还款金额
|
# 先尝试使用正则表达式直接匹配最低还款金额
|
min_amount_match = re.search(r'最低还款([\d,]+\.?\d*)(?:元|块钱|块|万元|万)?', text)
|
if min_amount_match:
|
min_amount = min_amount_match.group(1) # 保留原始格式(带逗号)
|
# 验证金额有效性
|
try:
|
value = float(min_amount.replace(',', ''))
|
if value > 0:
|
result["min_amount"] = min_amount
|
except ValueError:
|
pass
|
|
# 如果正则没有匹配到,使用NER结果
|
if not result["min_amount"] and entities["MIN_CODE"]:
|
for amount in entities["MIN_CODE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
result["min_amount"] = cleaned_amount
|
break
|
|
return result
|
|
except Exception as e:
|
logger.error(f"还款实体提取失败: {str(e)}")
|
raise
|
|
def extract_income_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取收入相关实体"""
|
try:
|
result = {
|
"bank": None, # 银行名称
|
"number": None, # 账号尾号
|
"datetime": None, # 交易时间
|
"amount": None, # 收入金额
|
"balance": None # 余额
|
}
|
|
inputs = self.income_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=IncomeNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.income_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.income_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.income_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
def clean_amount(amount_text: str, context: str) -> Optional[str]:
|
"""清理和标准化金额"""
|
if not amount_text:
|
return None
|
|
# 在上下文中查找完整金额
|
amount_index = context.find(amount_text)
|
if amount_index != -1:
|
# 扩大搜索范围,查找完整金额
|
search_start = max(0, amount_index - 10)
|
search_end = min(len(context), amount_index + len(amount_text) + 10)
|
search_text = context[search_start:search_end]
|
|
# 使用更精确的正则表达式查找金额模式,支持带逗号的金额
|
amount_pattern = re.compile(r'(\d{1,3}(?:,\d{3})*(?:\.\d{1,2})?|\d+(?:\.\d{1,2})?)')
|
matches = list(amount_pattern.finditer(search_text))
|
|
# 找到最接近且最长的完整金额
|
best_match = None
|
min_distance = float('inf')
|
max_length = 0
|
target_pos = amount_index - search_start
|
|
for match in matches:
|
match_pos = match.start()
|
distance = abs(match_pos - target_pos)
|
match_text = match.group(1)
|
|
if len(match_text) > max_length or (len(match_text) == max_length and distance < min_distance):
|
try:
|
value = float(match_text)
|
if value > 0:
|
best_match = match_text
|
min_distance = distance
|
max_length = len(match_text)
|
except ValueError:
|
continue
|
|
if best_match:
|
amount_text = best_match
|
|
# 移除货币符号和无效词
|
for symbol in IncomeNERConfig.AMOUNT_CONFIG['currency_symbols']:
|
amount_text = amount_text.replace(symbol, '')
|
for word in IncomeNERConfig.AMOUNT_CONFIG['invalid_words']:
|
amount_text = amount_text.replace(word, '')
|
|
# 处理金额中的逗号
|
amount_text = amount_text.replace(',', '')
|
|
try:
|
value = float(amount_text)
|
if '.' in amount_text:
|
decimal_places = len(amount_text.split('.')[1])
|
return f"{value:.{decimal_places}f}"
|
return str(int(value))
|
except ValueError:
|
pass
|
return None
|
|
# 实体提取
|
current_entity = None
|
entities = {
|
"BANK": [],
|
"NUMBER": [],
|
"DATATIME": [],
|
"PICKUP_CODE": [],
|
"BALANCE": []
|
}
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = {"type": tag[2:], "text": token.replace("##", "")}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token.replace("##", "")
|
else:
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = None
|
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
|
# 处理银行名称
|
if entities["BANK"]:
|
bank_parts = []
|
seen = set()
|
for bank in entities["BANK"]:
|
bank = bank.strip()
|
if bank and bank not in seen:
|
bank_parts.append(bank)
|
seen.add(bank)
|
bank = "".join(bank_parts)
|
if len(bank) <= IncomeNERConfig.MAX_ENTITY_LENGTH["BANK"]:
|
result["bank"] = bank
|
|
# 处理账号尾号
|
if entities["NUMBER"]:
|
number = "".join(c for c in "".join(entities["NUMBER"]) if c.isdigit())
|
if number and len(number) <= IncomeNERConfig.MAX_ENTITY_LENGTH["NUMBER"]:
|
result["number"] = number
|
|
# 处理交易时间
|
if entities["DATATIME"]:
|
datetime = "".join(entities["DATATIME"])
|
datetime = ''.join(c for c in datetime if c.isdigit() or c in ['年', '月', '日', '时', '分', ':', '-'])
|
if datetime:
|
result["datetime"] = datetime
|
|
# 处理收入金额
|
amount_candidates = []
|
# 首先从识别的实体中获取
|
for amount in entities["PICKUP_CODE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
try:
|
value = float(cleaned_amount)
|
amount_candidates.append((cleaned_amount, value))
|
except ValueError:
|
continue
|
|
# 如果没有找到有效金额,直接从文本中尝试提取
|
if not amount_candidates:
|
# 直接在整个文本中寻找金额模式
|
amount_pattern = re.compile(r'(\d{1,3}(?:,\d{3})*(?:\.\d{1,2})?|\d+(?:\.\d{1,2})?)')
|
matches = list(amount_pattern.finditer(text))
|
|
for match in matches:
|
amount_text = match.group(1)
|
try:
|
value = float(amount_text.replace(',', ''))
|
amount_candidates.append((amount_text, value))
|
except ValueError:
|
continue
|
|
# 选择最合适的有效金额
|
if amount_candidates:
|
result["amount"] = max(amount_candidates, key=lambda x: x[1])[0]
|
|
# 处理余额
|
if entities["BALANCE"]:
|
for amount in entities["BALANCE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
result["balance"] = cleaned_amount
|
break
|
|
return result
|
|
except Exception as e:
|
logger.error(f"收入实体提取失败: {str(e)}")
|
raise
|
|
def extract_flight_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取航班相关实体"""
|
try:
|
# 初始化结果字典
|
result = {
|
"flight": None, # 航班号
|
"company": None, # 航空公司
|
"start": None, # 出发地
|
"end": None, # 目的地
|
"date": None, # 日期
|
"time": None, # 时间
|
"departure_time": None, # 起飞时间
|
"arrival_time": None, # 到达时间
|
"ticket_num": None, # 机票号码
|
"seat": None # 座位等信息
|
}
|
|
# 使用NER模型提取实体
|
inputs = self.flight_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=FlightNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.flight_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.flight_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.flight_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
# 解析实体
|
current_entity = None
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = {"type": tag[2:], "text": token}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token
|
else:
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 处理航班号格式
|
if result["flight"]:
|
flight_no = result["flight"].upper()
|
# 清理航班号,只保留字母和数字
|
flight_no = ''.join(c for c in flight_no if c.isalnum())
|
# 验证航班号格式
|
valid_pattern = re.compile(FlightNERConfig.FLIGHT_CONFIG['pattern'])
|
if valid_pattern.match(flight_no):
|
result["flight"] = flight_no
|
else:
|
# 尝试修复常见错误
|
if len(flight_no) >= FlightNERConfig.FLIGHT_CONFIG['min_length'] and flight_no[:2].isalpha() and flight_no[2:].isdigit():
|
result["flight"] = flight_no
|
else:
|
result["flight"] = None
|
|
# 清理日期格式
|
if result["date"]:
|
date_str = result["date"]
|
# 保留数字和常见日期分隔符
|
date_str = ''.join(c for c in date_str if c.isdigit() or c in ['年', '月', '日', '-', '/', '.'])
|
result["date"] = date_str
|
|
# 清理时间格式
|
for time_field in ["time", "departure_time", "arrival_time"]:
|
if result[time_field]:
|
time_str = result[time_field]
|
# 保留数字和常见时间分隔符
|
time_str = ''.join(c for c in time_str if c.isdigit() or c in [':', '时', '分', '点'])
|
result[time_field] = time_str
|
|
# 处理机票号码
|
if result["ticket_num"]:
|
ticket_num = result["ticket_num"]
|
# 清理机票号码,只保留字母和数字
|
ticket_num = ''.join(c for c in ticket_num if c.isalnum())
|
result["ticket_num"] = ticket_num
|
|
# 处理座位信息
|
if result["seat"]:
|
seat_str = result["seat"]
|
# 移除可能的额外空格和特殊字符
|
seat_str = seat_str.replace(" ", "").strip()
|
result["seat"] = seat_str
|
|
return result
|
except Exception as e:
|
logger.error(f"航班实体提取失败: {str(e)}")
|
raise
|
|
def extract_train_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取火车票相关实体"""
|
try:
|
# 初始化结果字典
|
result = {
|
"company": None, # 12306
|
"trips": None, # 车次
|
"start": None, # 出发站
|
"end": None, # 到达站
|
"date": None, # 日期
|
"time": None, # 时间
|
"seat": None, # 座位等信息
|
"name": None # 用户姓名
|
}
|
|
# 使用NER模型提取实体
|
inputs = self.train_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=TrainNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.train_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.train_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.train_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
# 解析实体
|
current_entity = None
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = {"type": tag[2:], "text": token}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token
|
else:
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity:
|
entity_type = current_entity["type"].lower()
|
result[entity_type] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 处理公司名称,通常为12306
|
if result["company"]:
|
company = result["company"].strip()
|
# 如果文本中检测不到公司名称,但包含12306,则默认为12306
|
result["company"] = company
|
elif "12306" in text:
|
result["company"] = "12306"
|
|
# 处理车次格式
|
if result["trips"]:
|
trips_no = result["trips"].upper()
|
# 清理车次号,只保留字母和数字
|
trips_no = ''.join(c for c in trips_no if c.isalnum() or c in ['/', '-'])
|
|
# 验证车次格式
|
valid_patterns = [re.compile(pattern) for pattern in TrainNERConfig.TRIPS_CONFIG['patterns']]
|
if any(pattern.match(trips_no) for pattern in valid_patterns):
|
result["trips"] = trips_no
|
else:
|
# 尝试修复常见错误
|
if len(trips_no) >= TrainNERConfig.TRIPS_CONFIG['min_length'] and any(trips_no.startswith(t) for t in TrainNERConfig.TRIPS_CONFIG['train_types']):
|
result["trips"] = trips_no
|
elif trips_no.isdigit() and 1 <= len(trips_no) <= TrainNERConfig.TRIPS_CONFIG['max_length']:
|
result["trips"] = trips_no
|
else:
|
result["trips"] = None
|
|
# 清理日期格式
|
if result["date"]:
|
date_str = result["date"]
|
# 保留数字和常见日期分隔符
|
date_str = ''.join(c for c in date_str if c.isdigit() or c in ['年', '月', '日', '-', '/', '.'])
|
result["date"] = date_str
|
|
# 清理时间格式
|
if result["time"]:
|
time_str = result["time"]
|
# 保留数字和常见时间分隔符
|
time_str = ''.join(c for c in time_str if c.isdigit() or c in [':', '时', '分', '点'])
|
result["time"] = time_str
|
|
# 处理座位信息
|
if result["seat"]:
|
seat_str = result["seat"]
|
# 移除可能的额外空格和特殊字符
|
seat_str = seat_str.replace(" ", "").strip()
|
result["seat"] = seat_str
|
|
# 处理乘客姓名
|
if result["name"]:
|
name = result["name"].strip()
|
# 移除可能的标点符号
|
name = ''.join(c for c in name if c.isalnum() or c in ['*', '·'])
|
result["name"] = name
|
|
return result
|
except Exception as e:
|
logger.error(f"火车票实体提取失败: {str(e)}")
|
raise
|
|
# 创建Flask应用
|
app = Flask(__name__)
|
model_manager = ModelManager()
|
|
@app.route("/health", methods=["GET"])
|
def health_check():
|
"""健康检查接口"""
|
return jsonify({"status": "healthy"})
|
|
@app.route("/process-sms", methods=["POST"])
|
def process_sms():
|
"""处理短信的接口"""
|
try:
|
# 验证请求数据
|
if not request.is_json:
|
raise BadRequest("请求必须是JSON格式")
|
|
data = request.get_json()
|
if "content" not in data:
|
raise BadRequest("请求中必须包含'content'字段")
|
|
text = data["content"]
|
if not isinstance(text, str) or not text.strip():
|
raise BadRequest("短信内容不能为空")
|
|
# 处理短信
|
category = model_manager.classify_sms(text)
|
if category == "快递":
|
details = model_manager.extract_entities(text)
|
elif category == "还款":
|
details = model_manager.extract_repayment_entities(text)
|
elif category == "收入":
|
details = model_manager.extract_income_entities(text)
|
elif category == "航班":
|
details = model_manager.extract_flight_entities(text)
|
elif category == "火车票": # 添加火车票类别处理
|
details = model_manager.extract_train_entities(text)
|
else:
|
details = {}
|
|
# 记录处理结果
|
logger.info(f"Successfully processed SMS: {text[:30]}...")
|
|
return jsonify({
|
"status": "success",
|
"data": {
|
"category": category,
|
"details": details
|
}
|
})
|
|
except BadRequest as e:
|
logger.warning(f"Invalid request: {str(e)}")
|
return jsonify({
|
"status": "error",
|
"message": str(e)
|
}), 400
|
|
except Exception as e:
|
logger.error(f"Error processing SMS: {str(e)}")
|
return jsonify({
|
"status": "error",
|
"message": "服务器内部错误"
|
}), 500
|
|
if __name__ == "__main__":
|
app.run(host="0.0.0.0", port=5000)
|