Update test framework: fix run_tests.py to support all test files, add auto-import-check for test files

This commit is contained in:
qiaoxinjiu
2026-05-09 15:11:30 +08:00
parent eb053a347f
commit eaba8328da
21739 changed files with 2236758 additions and 719 deletions

View File

@@ -0,0 +1,516 @@
# -*- coding:utf-8 -*-
"""
PDF报关单数据提取与Excel转换工具
功能从PDF报关草单提取数据填充到标准Excel报关单模板
"""
import os
import re
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import pdfplumber
import openpyxl
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PDFCustomsExtractor:
"""PDF报关单数据提取器"""
def __init__(self, pdf_path: str):
self.pdf_path = pdf_path
self.text_content = ""
self.tables = []
self.extracted_data = {
'header': {},
'contract': {},
'packing_list': {},
'invoice': {},
'items': []
}
def extract(self) -> Dict[str, Any]:
"""提取PDF中的所有数据"""
try:
with pdfplumber.open(self.pdf_path) as pdf:
# 提取所有文本
for page in pdf.pages:
page_text = page.extract_text() or ""
self.text_content += page_text + "\n"
# 提取表格
page_tables = page.extract_tables()
if page_tables:
self.tables.extend(page_tables)
logger.info(f"成功提取PDF文本{len(pdf.pages)}页,{len(self.tables)}个表格")
# 解析各个部分
self._parse_header()
self._parse_contract()
self._parse_packing_list()
self._parse_invoice()
self._parse_items()
return self.extracted_data
except Exception as e:
logger.error(f"PDF提取失败: {e}")
raise
def _parse_header(self):
"""解析报关单头信息"""
text = self.text_content
# 提取预录入编号和海关代码
header_patterns = {
'pre_entry_no': r'预录入编号:\s*(\d+)',
'customs_code': r'10位海关代码[:]\s*(\w+)',
'declaration_no': r'海关编号[:]\s*(\w+)',
}
for key, pattern in header_patterns.items():
match = re.search(pattern, text)
if match:
self.extracted_data['header'][key] = match.group(1).strip()
# 提取发货人信息
shipper_match = re.search(r'境内发货人\s*(\d+)\s*(\S+)', text)
if shipper_match:
self.extracted_data['header']['shipper_code'] = shipper_match.group(1)
self.extracted_data['header']['shipper_name'] = shipper_match.group(2)
def _parse_contract(self):
"""解析合同信息"""
text = self.text_content
# 提取合同号
contract_no_match = re.search(r'合同号[:]\s*(\w+)', text)
if contract_no_match:
self.extracted_data['contract']['contract_no'] = contract_no_match.group(1)
# 提取买卖双方信息
seller_match = re.search(r'卖方\s*[:]\s*([^\n]+)', text)
if seller_match:
self.extracted_data['contract']['seller'] = seller_match.group(1).strip()
buyer_match = re.search(r'买方\s*[:]\s*([^\n]+)', text)
if buyer_match:
self.extracted_data['contract']['buyer'] = buyer_match.group(1).strip()
# 提取日期
date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', text)
if date_match:
year, month, day = date_match.groups()
self.extracted_data['contract']['date'] = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
def _parse_packing_list(self):
"""解析装箱单信息"""
text = self.text_content
# 提取毛重、净重、件数
patterns = {
'total_quantity': r'总数量[:]\s*(\d+)',
'total_packages': r'总件数[:]\s*(\d+)',
'gross_weight': r'总毛重[:]\s*(\d+(?:\.\d+)?)',
'net_weight': r'总净重[:]\s*(\d+(?:\.\d+)?)',
}
for key, pattern in patterns.items():
match = re.search(pattern, text)
if match:
self.extracted_data['packing_list'][key] = float(match.group(1))
def _parse_invoice(self):
"""解析发票信息"""
text = self.text_content
# 提取发票号
invoice_no_match = re.search(r'发票编号[:]\s*(\w+)', text)
if invoice_no_match:
self.extracted_data['invoice']['invoice_no'] = invoice_no_match.group(1)
# 提取总金额
total_match = re.search(r'TOTAL:\s*(\d+(?:,\d+)?(?:\.\d+)?)', text)
if total_match:
total_str = total_match.group(1).replace(',', '')
self.extracted_data['invoice']['total_amount'] = float(total_str)
def _parse_items(self):
"""解析货物明细"""
items = []
# 从表格中提取商品信息
for table in self.tables:
if not table or len(table) < 2:
continue
# 检查表头是否包含商品信息
header = table[0] if table[0] else []
header_str = ' '.join([str(cell).lower() if cell else '' for cell in header])
if any(keyword in header_str for keyword in ['序号', '货物名称', '商品名称', '规格']):
# 遍历数据行
for row in table[1:]:
if not row or all(not cell for cell in row):
continue
item = {}
for i, cell in enumerate(row):
if i < len(header) and cell:
col_name = str(header[i]) if header[i] else f'col_{i}'
if '序号' in col_name or 'no' in col_name.lower():
item['no'] = str(cell)
elif '名称' in col_name or 'description' in col_name.lower():
item['name'] = str(cell)
elif '规格' in col_name or 'spec' in col_name.lower():
item['spec'] = str(cell)
elif '数量' in col_name or 'quantity' in col_name.lower():
item['quantity'] = self._extract_number(cell)
elif '单位' in col_name or 'unit' in col_name.lower():
item['unit'] = str(cell)
elif '单价' in col_name or 'price' in col_name.lower():
item['unit_price'] = self._extract_number(cell)
elif '金额' in col_name or 'amount' in col_name.lower():
item['amount'] = self._extract_number(cell)
if item.get('name'):
items.append(item)
# 如果表格提取失败,尝试从文本中提取
if not items:
items = self._extract_items_from_text()
self.extracted_data['items'] = items
logger.info(f"提取到 {len(items)} 条商品明细")
def _extract_items_from_text(self) -> List[Dict]:
"""从文本中提取商品信息"""
items = []
lines = self.text_content.split('\n')
# 简单模式匹配
item_pattern = r'(\d+)\s+([^\d]+?)(\d+(?:,\d+)?(?:\.\d+)?)\s*台\s*(\d+(?:\.\d+)?)\s*(\d+(?:,\d+)?(?:\.\d+)?)'
for line in lines:
match = re.search(item_pattern, line)
if match:
item = {
'no': match.group(1),
'name': match.group(2).strip(),
'quantity': float(match.group(3).replace(',', '')),
'unit': '',
'unit_price': float(match.group(4)),
'amount': float(match.group(5).replace(',', ''))
}
items.append(item)
return items
@staticmethod
def _extract_number(value: Any) -> Optional[float]:
"""从单元格提取数字"""
if value is None:
return None
str_value = str(value).strip()
# 移除千位分隔符,保留小数点
str_value = re.sub(r'[^\d.]', '', str_value)
try:
return float(str_value) if str_value else None
except ValueError:
return None
class ExcelCustomsGenerator:
"""Excel报关单生成器"""
def __init__(self, template_path: str, output_path: str):
self.template_path = template_path
self.output_path = output_path
self.wb = None
def load_template(self):
"""加载Excel模板"""
try:
self.wb = load_workbook(self.template_path)
logger.info(f"成功加载模板: {self.template_path}")
except Exception as e:
logger.error(f"加载模板失败: {e}")
raise
def generate(self, data: Dict[str, Any]):
"""生成Excel报关单"""
if not self.wb:
self.load_template()
# 更新各个sheet
self._update_declaration_sheet(data) # 报关单新
self._update_contract_sheet(data) # 合同
self._update_packing_sheet(data) # 装箱单
self._update_invoice_sheet(data) # 发票
self._update_confirmation_sheet(data) # 确认书
self._update_authorization_sheet(data) # 委托书
# 保存文件
self.wb.save(self.output_path)
logger.info(f"Excel文件已生成: {self.output_path}")
def _update_declaration_sheet(self, data: Dict):
"""更新报关单sheet"""
if '报关单新' not in self.wb.sheetnames:
logger.warning("未找到'报关单新' sheet")
return
sheet = self.wb['报关单新']
items = data.get('items', [])
# 更新表头信息
header = data.get('header', {})
if header.get('customs_code'):
sheet['G3'] = header['customs_code'] # 10位海关代码
# 更新发货人信息
if header.get('shipper_name'):
sheet['A5'] = header['shipper_name']
# 更新商品明细
start_row = 24 # 商品开始行
for i, item in enumerate(items[:30]): # 最多30行
row = start_row + i * 2
sheet[f'A{row}'] = i + 1 # 项号
sheet[f'B{row}'] = '9019101000' # HS编码需要从数据中获取
sheet[f'C{row}'] = item.get('name', '')
sheet[f'F{row}'] = item.get('quantity', '')
sheet[f'H{row}'] = item.get('unit_price', '')
sheet[f'I{row}'] = item.get('amount', '')
def _update_contract_sheet(self, data: Dict):
"""更新合同sheet"""
if '合同' not in self.wb.sheetnames:
logger.warning("未找到'合同' sheet")
return
sheet = self.wb['合同']
contract = data.get('contract', {})
if contract.get('seller'):
sheet['B5'] = contract['seller']
if contract.get('buyer'):
sheet['B10'] = contract['buyer']
if contract.get('contract_no'):
sheet['F5'] = contract['contract_no']
if contract.get('date'):
sheet['F6'] = contract['date']
# 更新商品明细
items = data.get('items', [])
start_row = 15
for i, item in enumerate(items[:31]): # 最多31行
row = start_row + i
sheet[f'A{row}'] = i + 1
sheet[f'B{row}'] = item.get('name', '')
sheet[f'C{row}'] = item.get('quantity', '')
sheet[f'D{row}'] = item.get('unit', '')
sheet[f'E{row}'] = item.get('unit_price', '')
sheet[f'F{row}'] = item.get('amount', '')
def _update_packing_sheet(self, data: Dict):
"""更新装箱单sheet"""
if '装箱单' not in self.wb.sheetnames:
logger.warning("未找到'装箱单' sheet")
return
sheet = self.wb['装箱单']
items = data.get('items', [])
# 更新客户信息
contract = data.get('contract', {})
if contract.get('buyer'):
sheet['B10'] = contract['buyer']
# 更新发票号
invoice = data.get('invoice', {})
if invoice.get('invoice_no'):
sheet['G8'] = invoice['invoice_no']
sheet['G10'] = invoice['invoice_no']
# 更新商品明细
start_row = 13
for i, item in enumerate(items[:31]): # 最多31行
row = start_row + i
sheet[f'B{row}'] = i + 1
sheet[f'C{row}'] = item.get('name', '')
sheet[f'D{row}'] = item.get('quantity', '')
# 件数、毛重、净重需要从数据中获取
# 更新合计
packing = data.get('packing_list', {})
if packing.get('total_quantity'):
sheet['D44'] = packing['total_quantity']
if packing.get('gross_weight'):
sheet['F44'] = packing['gross_weight']
if packing.get('net_weight'):
sheet['G44'] = packing['net_weight']
def _update_invoice_sheet(self, data: Dict):
"""更新发票sheet"""
if '发票' not in self.wb.sheetnames:
logger.warning("未找到'发票' sheet")
return
sheet = self.wb['发票']
items = data.get('items', [])
contract = data.get('contract', {})
# 更新客户信息
if contract.get('buyer'):
sheet['B8'] = contract['buyer']
# 更新发票号
invoice = data.get('invoice', {})
if invoice.get('invoice_no'):
sheet['E8'] = invoice['invoice_no']
# 更新日期
if contract.get('date'):
sheet['E9'] = contract['date']
# 更新商品明细
start_row = 13
for i, item in enumerate(items[:31]): # 最多31行
row = start_row + i
sheet[f'A{row}'] = i + 1
sheet[f'B{row}'] = item.get('name', '')
sheet[f'C{row}'] = item.get('quantity', '')
sheet[f'D{row}'] = item.get('unit_price', '')
sheet[f'E{row}'] = item.get('amount', '')
# 更新合计
if items:
total_quantity = sum(item.get('quantity', 0) for item in items)
total_amount = sum(item.get('amount', 0) for item in items)
sheet['C44'] = total_quantity
sheet['E44'] = total_amount
def _update_confirmation_sheet(self, data: Dict):
"""更新确认书sheet"""
if '确认书' not in self.wb.sheetnames:
logger.warning("未找到'确认书' sheet")
return
sheet = self.wb['确认书']
items = data.get('items', [])
contract = data.get('contract', {})
# 更新收货人
if contract.get('buyer'):
sheet['C5'] = contract['buyer']
# 更新合同号
if contract.get('contract_no'):
sheet['K3'] = contract['contract_no']
# 更新截关日期(当前日期+7天
today = datetime.now()
cutoff_date = today + timedelta(days=7)
sheet['K4'] = cutoff_date.strftime('%Y-%m-%d')
# 更新商品明细
start_row = 7
for i, item in enumerate(items[:31]): # 最多31行
row = start_row + i
sheet[f'A{row}'] = i + 1
sheet[f'C{row}'] = item.get('name', '')
sheet[f'F{row}'] = item.get('quantity', '')
sheet[f'K{row}'] = item.get('unit_price', '')
sheet[f'L{row}'] = item.get('amount', '')
# 更新合计
if items:
total_quantity = sum(item.get('quantity', 0) for item in items)
total_amount = sum(item.get('amount', 0) for item in items)
sheet['F38'] = total_quantity
sheet['L38'] = total_amount
# 更新运抵国
sheet['D40'] = '加拿大'
def _update_authorization_sheet(self, data: Dict):
"""更新委托书sheet"""
if '委托书' not in self.wb.sheetnames:
logger.warning("未找到'委托书' sheet")
return
sheet = self.wb['委托书']
header = data.get('header', {})
items = data.get('items', [])
# 更新委托方
if header.get('shipper_name'):
sheet['B17'] = header['shipper_name']
# 更新主要货物名称
if items and len(items) > 0:
sheet['B18'] = items[0].get('name', '')
# 更新HS编码
sheet['B19'] = '9019101000'
# 更新有效期(当前日期+30天
today = datetime.now()
expiry_date = today + timedelta(days=30)
sheet['A5'] = f"本委托书有效期自签字之日起至{expiry_date.strftime('%Y年%m月%d')}"
def main():
"""主函数"""
# 文件路径配置
pdf_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关草单BGKC2026C013.pdf" # 输入PDF文件
template_path = "D:\Program Files (x86)\download\科技创新-腾飞(1)(1).xlsx" # 模板文件
output_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关单生成_20260318.xlsx" # 输出文件
try:
# 1. 提取PDF数据
logger.info("开始提取PDF数据...")
extractor = PDFCustomsExtractor(pdf_path)
extracted_data = extractor.extract()
# 打印提取结果概览
logger.info(f"提取完成:")
logger.info(f"- 合同号: {extracted_data['contract'].get('contract_no', 'N/A')}")
logger.info(f"- 买方: {extracted_data['contract'].get('buyer', 'N/A')}")
logger.info(f"- 商品数量: {len(extracted_data.get('items', []))}")
# 2. 生成Excel
logger.info("开始生成Excel文件...")
generator = ExcelCustomsGenerator(template_path, output_path)
generator.generate(extracted_data)
logger.info(f"处理完成!输出文件: {output_path}")
except FileNotFoundError as e:
logger.error(f"文件不存在: {e}")
except Exception as e:
logger.error(f"处理失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()