# -*- coding: utf-8 -*-
|
import os
|
import logging
|
from typing import Dict, Optional, Tuple
|
|
from flask import Flask, request, jsonify
|
from transformers import BertTokenizer, BertForSequenceClassification, AutoTokenizer, AutoModelForTokenClassification
|
import torch
|
from werkzeug.exceptions import BadRequest
|
from ner_config import NERConfig, RepaymentNERConfig, IncomeNERConfig
|
import re
|
|
# 配置日志
|
logging.basicConfig(
|
level=logging.INFO,
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
handlers=[
|
logging.FileHandler('app.log'),
|
logging.StreamHandler()
|
]
|
)
|
logger = logging.getLogger(__name__)
|
|
class ModelManager:
|
def __init__(self):
|
self.classifier_path = "./models/classifier"
|
self.ner_path = "./models/ner_model/best_model"
|
self.repayment_path = "./models/repayment_model/best_model"
|
self.income_path = "./models/income_model/best_model"
|
|
# 检查模型文件
|
self._check_model_files()
|
|
# 加载模型
|
self.classifier_tokenizer, self.classifier_model = self._load_classifier()
|
self.ner_tokenizer, self.ner_model = self._load_ner()
|
self.repayment_tokenizer, self.repayment_model = self._load_repayment()
|
self.income_tokenizer, self.income_model = self._load_income()
|
|
# 将模型设置为评估模式
|
self.classifier_model.eval()
|
self.ner_model.eval()
|
self.repayment_model.eval()
|
self.income_model.eval()
|
|
def _check_model_files(self):
|
"""检查模型文件是否存在"""
|
if not os.path.exists(self.classifier_path):
|
raise RuntimeError("分类模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.ner_path):
|
raise RuntimeError("NER模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.repayment_path):
|
raise RuntimeError("还款模型文件不存在,请先运行训练脚本")
|
if not os.path.exists(self.income_path):
|
raise RuntimeError("收入模型文件不存在,请先运行训练脚本")
|
|
def _load_classifier(self) -> Tuple[BertTokenizer, BertForSequenceClassification]:
|
"""加载分类模型"""
|
try:
|
tokenizer = BertTokenizer.from_pretrained(self.classifier_path)
|
model = BertForSequenceClassification.from_pretrained(self.classifier_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载分类模型失败: {str(e)}")
|
raise
|
|
def _load_ner(self) -> Tuple[AutoTokenizer, AutoModelForTokenClassification]:
|
"""加载NER模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.ner_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.ner_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载NER模型失败: {str(e)}")
|
raise
|
|
def _load_repayment(self):
|
"""加载还款模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.repayment_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.repayment_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载还款模型失败: {str(e)}")
|
raise
|
|
def _load_income(self):
|
"""加载收入模型"""
|
try:
|
tokenizer = AutoTokenizer.from_pretrained(self.income_path)
|
model = AutoModelForTokenClassification.from_pretrained(self.income_path)
|
return tokenizer, model
|
except Exception as e:
|
logger.error(f"加载收入模型失败: {str(e)}")
|
raise
|
|
def classify_sms(self, text: str) -> str:
|
"""对短信进行分类"""
|
try:
|
inputs = self.classifier_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=64
|
)
|
with torch.no_grad():
|
outputs = self.classifier_model(**inputs)
|
pred_id = outputs.logits.argmax().item()
|
return self.classifier_model.config.id2label[pred_id]
|
except Exception as e:
|
logger.error(f"短信分类失败: {str(e)}")
|
raise
|
|
def extract_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取文本中的实体"""
|
try:
|
# 初始化结果字典
|
result = {
|
"post": None, # 快递公司
|
"company": None, # 寄件公司
|
"address": None, # 地址
|
"pickup_code": None, # 取件码
|
"time": None # 时间
|
}
|
|
# 第一阶段:直接从文本中提取取件码
|
pickup_code = self.extract_pickup_code_from_text(text)
|
if pickup_code:
|
result["pickup_code"] = pickup_code
|
|
# 第二阶段:使用NER模型提取其他实体
|
inputs = self.ner_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=64
|
)
|
with torch.no_grad():
|
outputs = self.ner_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.ner_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.ner_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
# 解析实体
|
current_entity = None
|
entity_start = None
|
for i, (token, tag) in enumerate(zip(tokens, tags)):
|
# 跳过取件码实体,因为我们已经单独处理了
|
if tag.startswith("B-") and tag[2:] != "PICKUP_CODE":
|
# 保存之前的实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
# 开始新实体
|
current_entity = {"type": tag[2:], "text": token}
|
entity_start = i
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"] and current_entity["type"] != "PICKUP_CODE":
|
current_entity["text"] += token
|
elif tag == "O" or tag.startswith("B-"):
|
# 结束当前实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity and current_entity["type"] != "PICKUP_CODE":
|
key = current_entity["type"].lower()
|
result[key] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 如果第一阶段没有提取到取件码,使用NER模型的结果
|
if not result["pickup_code"]:
|
# 重新解析一次,只提取取件码
|
current_entity = None
|
for i, (token, tag) in enumerate(zip(tokens, tags)):
|
if tag.startswith("B-") and tag[2:] == "PICKUP_CODE":
|
current_entity = {"type": tag[2:], "text": token}
|
elif tag.startswith("I-") and current_entity and tag[2:] == "PICKUP_CODE":
|
current_entity["text"] += token
|
elif (tag == "O" or tag.startswith("B-")) and current_entity:
|
result["pickup_code"] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
current_entity = None
|
|
# 处理最后一个实体
|
if current_entity and current_entity["type"] == "PICKUP_CODE":
|
result["pickup_code"] = current_entity["text"].replace("[UNK]", "").replace("##", "").strip()
|
|
# 后处理:清理和验证取件码
|
if result["pickup_code"]:
|
# 清理取件码
|
code = result["pickup_code"]
|
# 移除取件码后的额外文字
|
for word in ["的", "来", "到", "取", "件", "码"]:
|
if word in code:
|
code = code[:code.index(word)]
|
|
# 只保留字母、数字和连字符
|
code = ''.join(c for c in code if c.isalnum() or c == "-")
|
|
# 确保格式正确
|
parts = code.split("-")
|
valid_parts = []
|
for part in parts:
|
if part and any(c.isalnum() for c in part):
|
valid_parts.append(part)
|
|
if valid_parts:
|
result["pickup_code"] = "-".join(valid_parts)
|
else:
|
result["pickup_code"] = None
|
|
# 清理公司名称
|
if result["company"]:
|
company = result["company"]
|
invalid_words = ["包裹", "快递", "到"]
|
for word in invalid_words:
|
if company.endswith(word):
|
company = company[:-len(word)]
|
result["company"] = company.strip()
|
|
# 清理地址
|
if result["address"]:
|
address = result["address"]
|
invalid_suffixes = [",请尽快取件", ",询", "请尽快取件"]
|
for suffix in invalid_suffixes:
|
if address.endswith(suffix):
|
address = address[:-len(suffix)]
|
result["address"] = address.strip()
|
|
return result
|
except Exception as e:
|
logger.error(f"实体提取失败: {str(e)}")
|
raise
|
|
def extract_pickup_code_from_text(self, text: str) -> Optional[str]:
|
"""直接从文本提取取件码"""
|
# 先尝试直接匹配完整的取件码格式
|
pickup_patterns = [
|
# 带有上下文的取件码模式(包括字母数字组合)
|
r'凭\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})\s*(来|到)?', # 匹配"凭xx-xx-xxxx"格式
|
r'码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"码xx-xx-xxxx"格式
|
r'提货码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"提货码xx-xx-xxxx"格式
|
r'取件码\s*([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配"取件码xx-xx-xxxx"格式
|
|
# 分组提取模式,对有上下文的取件码
|
r'凭\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})\s*(来|到)?', # 匹配"凭xx-xx-xxxx"
|
r'码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"码xx-xx-xxxx"
|
r'提货码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"提货码xx-xx-xxxx"
|
r'取件码\s*([A-Za-z0-9]{1,3})-(\d{1,2})-(\d{1,6})', # 匹配"取件码xx-xx-xxxx"
|
|
# 独立的取件码模式
|
r'([A-Za-z0-9]{1,3}-\d{1,2}-\d{1,6})', # 匹配独立的xx-xx-xxxx格式,支持字母
|
r'(\d{1,2}-\d{1,2}-\d{4,6})', # 匹配独立的xx-xx-xxxx格式,纯数字
|
]
|
|
# 首先尝试整体匹配
|
for pattern in pickup_patterns[:4]: # 前4个是整体匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
return matches[0] if isinstance(matches[0], str) else matches[0][0]
|
|
# 然后尝试分组匹配
|
for pattern in pickup_patterns[4:8]: # 中间4个是分组匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
# 分组匹配,第一部分可以是字母数字组合
|
match = matches[0]
|
if len(match) >= 3:
|
# 确保第2、3部分是数字
|
if match[1].isdigit() and match[2].isdigit():
|
return f"{match[0]}-{match[1]}-{match[2]}"
|
|
# 最后尝试独立模式
|
for pattern in pickup_patterns[8:10]: # 最后2个是独立匹配模式
|
matches = re.findall(pattern, text)
|
if matches:
|
return matches[0]
|
|
# 特殊处理包含"取件码"的文本
|
code_indicators = ["取件码", "提货码"]
|
for indicator in code_indicators:
|
if indicator in text:
|
idx = text.find(indicator)
|
# 检查紧跟着的文本
|
search_text = text[idx:idx+35] # 扩大搜索范围
|
# 尝试匹配"取件码A8-1-15"这样的格式
|
special_match = re.search(r'[码]\s*([A-Za-z0-9]+[-\s]+\d+[-\s]+\d+)', search_text)
|
if special_match:
|
# 规范化格式
|
code = special_match.group(1)
|
# 将空格替换为连字符,确保格式一致
|
code = re.sub(r'\s+', '-', code)
|
# 确保连字符格式正确
|
code = re.sub(r'-+', '-', code)
|
return code
|
|
# 特殊处理"凭"后面的数字序列
|
if "凭" in text:
|
pickup_index = text.find("凭")
|
if pickup_index != -1:
|
# 在"凭"之后的25个字符内寻找取件码
|
search_text = text[pickup_index:pickup_index+35]
|
|
# 尝试提取形如"凭A8-1-15"或"凭22-4-1111"的格式
|
alpha_num_match = re.search(r'凭\s*([A-Za-z0-9]+)[-\s]+(\d+)[-\s]+(\d+)', search_text)
|
if alpha_num_match:
|
return f"{alpha_num_match.group(1)}-{alpha_num_match.group(2)}-{alpha_num_match.group(3)}"
|
|
# 提取所有字母数字序列
|
parts = re.findall(r'[A-Za-z0-9]+', search_text)
|
if len(parts) >= 3:
|
# 组合前三个部分
|
return f"{parts[0]}-{parts[1]}-{parts[2]}"
|
|
# 查找形如"A8-1-15"的取件码
|
alpha_num_codes = re.findall(r'([A-Za-z]\d+)-(\d+)-(\d+)', text)
|
if alpha_num_codes:
|
match = alpha_num_codes[0]
|
return f"{match[0]}-{match[1]}-{match[2]}"
|
|
return None
|
|
def extract_repayment_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取还款相关实体"""
|
try:
|
result = {
|
"bank": None, # 还款主体
|
"type": None, # 还款类型
|
"amount": None, # 还款金额
|
"date": None, # 还款日期
|
"number": None, # 账号尾号
|
"min_amount": None # 最低还款金额
|
}
|
|
inputs = self.repayment_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=RepaymentNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.repayment_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.repayment_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.repayment_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
def clean_amount(amount_text: str, context: str) -> Optional[str]:
|
"""清理和标准化金额"""
|
if not amount_text:
|
return None
|
|
# 在上下文中查找完整金额
|
amount_index = context.find(amount_text)
|
if amount_index != -1:
|
# 扩大搜索范围,查找完整金额
|
search_start = max(0, amount_index - 10) # 增加向前搜索范围
|
search_end = min(len(context), amount_index + len(amount_text) + 10)
|
search_text = context[search_start:search_end]
|
|
# 使用更精确的正则表达式查找金额模式
|
amount_pattern = re.compile(r'(\d{1,10}(?:\.\d{1,2})?)')
|
matches = list(amount_pattern.finditer(search_text))
|
|
# 找到最接近且最长的完整金额
|
best_match = None
|
min_distance = float('inf')
|
max_length = 0
|
target_pos = amount_index - search_start
|
|
for match in matches:
|
match_pos = match.start()
|
distance = abs(match_pos - target_pos)
|
match_text = match.group(1)
|
|
# 优先选择更长的匹配,除非距离差异太大
|
if len(match_text) > max_length or (len(match_text) == max_length and distance < min_distance):
|
try:
|
# 验证金额是否合理
|
value = float(match_text)
|
if value > 0 and value <= 9999999.99: # 设置合理的金额范围
|
best_match = match_text
|
min_distance = distance
|
max_length = len(match_text)
|
except ValueError:
|
continue
|
|
if best_match:
|
amount_text = best_match
|
|
# 移除货币符号和无效词
|
for symbol in RepaymentNERConfig.AMOUNT_CONFIG['currency_symbols']:
|
amount_text = amount_text.replace(symbol, '')
|
for word in RepaymentNERConfig.AMOUNT_CONFIG['invalid_words']:
|
amount_text = amount_text.replace(word, '')
|
|
# 处理金额中的逗号
|
amount_text = amount_text.replace(',', '')
|
|
try:
|
# 转换为浮点数
|
value = float(amount_text)
|
|
# 验证整数位数
|
integer_part = str(int(value))
|
if len(integer_part) <= RepaymentNERConfig.AMOUNT_CONFIG['max_integer_digits']:
|
# 保持原始小数位数
|
if '.' in amount_text:
|
decimal_places = len(amount_text.split('.')[1])
|
return f"{value:.{decimal_places}f}"
|
return str(int(value))
|
except ValueError:
|
pass
|
return None
|
|
# 实体提取
|
current_entity = None
|
entities = {
|
"BANK": [],
|
"TYPE": [],
|
"PICKUP_CODE": [],
|
"DATE": [],
|
"NUMBER": [],
|
"MIN_CODE": []
|
}
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = {"type": tag[2:], "text": token.replace("##", "")}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token.replace("##", "")
|
else:
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = None
|
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
|
# 处理银行名称
|
if entities["BANK"]:
|
# 修改银行名称处理逻辑
|
bank_parts = []
|
seen = set() # 用于去重
|
for bank in entities["BANK"]:
|
bank = bank.strip()
|
if bank and bank not in seen: # 避免重复
|
bank_parts.append(bank)
|
seen.add(bank)
|
bank = "".join(bank_parts)
|
if len(bank) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["BANK"]:
|
result["bank"] = bank
|
|
# 处理还款类型
|
if entities["TYPE"]:
|
type_ = "".join(entities["TYPE"]).strip()
|
if len(type_) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["TYPE"]:
|
result["type"] = type_
|
|
# 处理账号尾号
|
if entities["NUMBER"]:
|
number = "".join(c for c in "".join(entities["NUMBER"]) if c.isdigit())
|
if number and len(number) <= RepaymentNERConfig.MAX_ENTITY_LENGTH["NUMBER"]:
|
result["number"] = number
|
|
# 处理日期
|
if entities["DATE"]:
|
date = "".join(entities["DATE"])
|
date = ''.join(c for c in date if c.isdigit() or c in ['年', '月', '日'])
|
if date:
|
result["date"] = date
|
|
# 处理金额
|
amount_candidates = []
|
for amount in entities["PICKUP_CODE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
try:
|
value = float(cleaned_amount)
|
amount_candidates.append((cleaned_amount, value))
|
except ValueError:
|
continue
|
|
# 选择最大的有效金额
|
if amount_candidates:
|
# 按金额大小排序,选择最大的
|
result["amount"] = max(amount_candidates, key=lambda x: x[1])[0]
|
|
# 处理最低还款金额
|
for amount in entities["MIN_CODE"]:
|
cleaned_amount = clean_amount(amount, text) # 传入原始文本作为上下文
|
if cleaned_amount:
|
result["min_amount"] = cleaned_amount
|
break
|
|
return result
|
|
except Exception as e:
|
logger.error(f"还款实体提取失败: {str(e)}")
|
raise
|
|
def extract_income_entities(self, text: str) -> Dict[str, Optional[str]]:
|
"""提取收入相关实体"""
|
try:
|
result = {
|
"bank": None, # 银行名称
|
"number": None, # 账号尾号
|
"datetime": None, # 交易时间
|
"amount": None, # 收入金额
|
"balance": None # 余额
|
}
|
|
inputs = self.income_tokenizer(
|
text,
|
return_tensors="pt",
|
truncation=True,
|
max_length=IncomeNERConfig.MAX_LENGTH
|
)
|
|
with torch.no_grad():
|
outputs = self.income_model(**inputs)
|
|
predictions = torch.argmax(outputs.logits, dim=2)
|
tokens = self.income_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
|
tags = [self.income_model.config.id2label[p] for p in predictions[0].numpy()]
|
|
def clean_amount(amount_text: str, context: str) -> Optional[str]:
|
"""清理和标准化金额"""
|
if not amount_text:
|
return None
|
|
# 在上下文中查找完整金额
|
amount_index = context.find(amount_text)
|
if amount_index != -1:
|
# 扩大搜索范围,查找完整金额
|
search_start = max(0, amount_index - 10)
|
search_end = min(len(context), amount_index + len(amount_text) + 10)
|
search_text = context[search_start:search_end]
|
|
# 使用正则表达式查找金额模式
|
import re
|
amount_pattern = re.compile(r'(\d{1,10}(?:\.\d{1,2})?)')
|
matches = list(amount_pattern.finditer(search_text))
|
|
# 找到最接近且最长的完整金额
|
best_match = None
|
min_distance = float('inf')
|
max_length = 0
|
target_pos = amount_index - search_start
|
|
for match in matches:
|
match_pos = match.start()
|
distance = abs(match_pos - target_pos)
|
match_text = match.group(1)
|
|
if len(match_text) > max_length or (len(match_text) == max_length and distance < min_distance):
|
try:
|
value = float(match_text)
|
if value > 0:
|
best_match = match_text
|
min_distance = distance
|
max_length = len(match_text)
|
except ValueError:
|
continue
|
|
if best_match:
|
amount_text = best_match
|
|
# 移除货币符号和无效词
|
for symbol in IncomeNERConfig.AMOUNT_CONFIG['currency_symbols']:
|
amount_text = amount_text.replace(symbol, '')
|
for word in IncomeNERConfig.AMOUNT_CONFIG['invalid_words']:
|
amount_text = amount_text.replace(word, '')
|
|
# 处理金额中的逗号
|
amount_text = amount_text.replace(',', '')
|
|
try:
|
value = float(amount_text)
|
if '.' in amount_text:
|
decimal_places = len(amount_text.split('.')[1])
|
return f"{value:.{decimal_places}f}"
|
return str(int(value))
|
except ValueError:
|
pass
|
return None
|
|
# 实体提取
|
current_entity = None
|
entities = {
|
"BANK": [],
|
"NUMBER": [],
|
"DATATIME": [],
|
"PICKUP_CODE": [],
|
"BALANCE": []
|
}
|
|
for token, tag in zip(tokens, tags):
|
if tag.startswith("B-"):
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = {"type": tag[2:], "text": token.replace("##", "")}
|
elif tag.startswith("I-") and current_entity and tag[2:] == current_entity["type"]:
|
current_entity["text"] += token.replace("##", "")
|
else:
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
current_entity = None
|
|
if current_entity:
|
entities[current_entity["type"]].append(current_entity["text"])
|
|
# 处理银行名称
|
if entities["BANK"]:
|
bank_parts = []
|
seen = set()
|
for bank in entities["BANK"]:
|
bank = bank.strip()
|
if bank and bank not in seen:
|
bank_parts.append(bank)
|
seen.add(bank)
|
bank = "".join(bank_parts)
|
if len(bank) <= IncomeNERConfig.MAX_ENTITY_LENGTH["BANK"]:
|
result["bank"] = bank
|
|
# 处理账号尾号
|
if entities["NUMBER"]:
|
number = "".join(c for c in "".join(entities["NUMBER"]) if c.isdigit())
|
if number and len(number) <= IncomeNERConfig.MAX_ENTITY_LENGTH["NUMBER"]:
|
result["number"] = number
|
|
# 处理交易时间
|
if entities["DATATIME"]:
|
datetime = "".join(entities["DATATIME"])
|
datetime = ''.join(c for c in datetime if c.isdigit() or c in ['年', '月', '日', '时', '分', ':', '-'])
|
if datetime:
|
result["datetime"] = datetime
|
|
# 处理收入金额
|
if entities["PICKUP_CODE"]:
|
for amount in entities["PICKUP_CODE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
result["amount"] = cleaned_amount
|
break
|
|
# 处理余额
|
if entities["BALANCE"]:
|
for amount in entities["BALANCE"]:
|
cleaned_amount = clean_amount(amount, text)
|
if cleaned_amount:
|
result["balance"] = cleaned_amount
|
break
|
|
return result
|
|
except Exception as e:
|
logger.error(f"收入实体提取失败: {str(e)}")
|
raise
|
|
# 创建Flask应用
|
app = Flask(__name__)
|
model_manager = ModelManager()
|
|
@app.route("/health", methods=["GET"])
|
def health_check():
|
"""健康检查接口"""
|
return jsonify({"status": "healthy"})
|
|
@app.route("/process-sms", methods=["POST"])
|
def process_sms():
|
"""处理短信的接口"""
|
try:
|
# 验证请求数据
|
if not request.is_json:
|
raise BadRequest("请求必须是JSON格式")
|
|
data = request.get_json()
|
if "content" not in data:
|
raise BadRequest("请求中必须包含'content'字段")
|
|
text = data["content"]
|
if not isinstance(text, str) or not text.strip():
|
raise BadRequest("短信内容不能为空")
|
|
# 处理短信
|
category = model_manager.classify_sms(text)
|
if category == "快递":
|
details = model_manager.extract_entities(text)
|
elif category == "还款":
|
details = model_manager.extract_repayment_entities(text)
|
elif category == "收入":
|
details = model_manager.extract_income_entities(text)
|
else:
|
details = {}
|
|
# 记录处理结果
|
logger.info(f"Successfully processed SMS: {text[:30]}...")
|
|
return jsonify({
|
"status": "success",
|
"data": {
|
"category": category,
|
"details": details
|
}
|
})
|
|
except BadRequest as e:
|
logger.warning(f"Invalid request: {str(e)}")
|
return jsonify({
|
"status": "error",
|
"message": str(e)
|
}), 400
|
|
except Exception as e:
|
logger.error(f"Error processing SMS: {str(e)}")
|
return jsonify({
|
"status": "error",
|
"message": "服务器内部错误"
|
}), 500
|
|
if __name__ == "__main__":
|
app.run(host="0.0.0.0", port=5000)
|