517 lines
18 KiB
Python
517 lines
18 KiB
Python
# -*- coding:utf-8 -*-
|
||
|
||
"""
|
||
PDF报关单数据提取与Excel转换工具
|
||
功能:从PDF报关草单提取数据,填充到标准Excel报关单模板
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Any, Optional
|
||
import pdfplumber
|
||
import openpyxl
|
||
from openpyxl import load_workbook
|
||
from openpyxl.utils import get_column_letter
|
||
import logging
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PDFCustomsExtractor:
|
||
"""PDF报关单数据提取器"""
|
||
|
||
def __init__(self, pdf_path: str):
|
||
self.pdf_path = pdf_path
|
||
self.text_content = ""
|
||
self.tables = []
|
||
self.extracted_data = {
|
||
'header': {},
|
||
'contract': {},
|
||
'packing_list': {},
|
||
'invoice': {},
|
||
'items': []
|
||
}
|
||
|
||
def extract(self) -> Dict[str, Any]:
|
||
"""提取PDF中的所有数据"""
|
||
try:
|
||
with pdfplumber.open(self.pdf_path) as pdf:
|
||
# 提取所有文本
|
||
for page in pdf.pages:
|
||
page_text = page.extract_text() or ""
|
||
self.text_content += page_text + "\n"
|
||
|
||
# 提取表格
|
||
page_tables = page.extract_tables()
|
||
if page_tables:
|
||
self.tables.extend(page_tables)
|
||
|
||
logger.info(f"成功提取PDF文本,共{len(pdf.pages)}页,{len(self.tables)}个表格")
|
||
|
||
# 解析各个部分
|
||
self._parse_header()
|
||
self._parse_contract()
|
||
self._parse_packing_list()
|
||
self._parse_invoice()
|
||
self._parse_items()
|
||
|
||
return self.extracted_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"PDF提取失败: {e}")
|
||
raise
|
||
|
||
def _parse_header(self):
|
||
"""解析报关单头信息"""
|
||
text = self.text_content
|
||
|
||
# 提取预录入编号和海关代码
|
||
header_patterns = {
|
||
'pre_entry_no': r'预录入编号:\s*(\d+)',
|
||
'customs_code': r'10位海关代码[::]\s*(\w+)',
|
||
'declaration_no': r'海关编号[::]\s*(\w+)',
|
||
}
|
||
|
||
for key, pattern in header_patterns.items():
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
self.extracted_data['header'][key] = match.group(1).strip()
|
||
|
||
# 提取发货人信息
|
||
shipper_match = re.search(r'境内发货人\s*(\d+)\s*(\S+)', text)
|
||
if shipper_match:
|
||
self.extracted_data['header']['shipper_code'] = shipper_match.group(1)
|
||
self.extracted_data['header']['shipper_name'] = shipper_match.group(2)
|
||
|
||
def _parse_contract(self):
|
||
"""解析合同信息"""
|
||
text = self.text_content
|
||
|
||
# 提取合同号
|
||
contract_no_match = re.search(r'合同号[::]\s*(\w+)', text)
|
||
if contract_no_match:
|
||
self.extracted_data['contract']['contract_no'] = contract_no_match.group(1)
|
||
|
||
# 提取买卖双方信息
|
||
seller_match = re.search(r'卖方\s*[::]\s*([^\n]+)', text)
|
||
if seller_match:
|
||
self.extracted_data['contract']['seller'] = seller_match.group(1).strip()
|
||
|
||
buyer_match = re.search(r'买方\s*[::]\s*([^\n]+)', text)
|
||
if buyer_match:
|
||
self.extracted_data['contract']['buyer'] = buyer_match.group(1).strip()
|
||
|
||
# 提取日期
|
||
date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', text)
|
||
if date_match:
|
||
year, month, day = date_match.groups()
|
||
self.extracted_data['contract']['date'] = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
|
||
|
||
def _parse_packing_list(self):
|
||
"""解析装箱单信息"""
|
||
text = self.text_content
|
||
|
||
# 提取毛重、净重、件数
|
||
patterns = {
|
||
'total_quantity': r'总数量[::]\s*(\d+)',
|
||
'total_packages': r'总件数[::]\s*(\d+)',
|
||
'gross_weight': r'总毛重[::]\s*(\d+(?:\.\d+)?)',
|
||
'net_weight': r'总净重[::]\s*(\d+(?:\.\d+)?)',
|
||
}
|
||
|
||
for key, pattern in patterns.items():
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
self.extracted_data['packing_list'][key] = float(match.group(1))
|
||
|
||
def _parse_invoice(self):
|
||
"""解析发票信息"""
|
||
text = self.text_content
|
||
|
||
# 提取发票号
|
||
invoice_no_match = re.search(r'发票编号[::]\s*(\w+)', text)
|
||
if invoice_no_match:
|
||
self.extracted_data['invoice']['invoice_no'] = invoice_no_match.group(1)
|
||
|
||
# 提取总金额
|
||
total_match = re.search(r'TOTAL:\s*(\d+(?:,\d+)?(?:\.\d+)?)', text)
|
||
if total_match:
|
||
total_str = total_match.group(1).replace(',', '')
|
||
self.extracted_data['invoice']['total_amount'] = float(total_str)
|
||
|
||
def _parse_items(self):
|
||
"""解析货物明细"""
|
||
items = []
|
||
|
||
# 从表格中提取商品信息
|
||
for table in self.tables:
|
||
if not table or len(table) < 2:
|
||
continue
|
||
|
||
# 检查表头是否包含商品信息
|
||
header = table[0] if table[0] else []
|
||
header_str = ' '.join([str(cell).lower() if cell else '' for cell in header])
|
||
|
||
if any(keyword in header_str for keyword in ['序号', '货物名称', '商品名称', '规格']):
|
||
# 遍历数据行
|
||
for row in table[1:]:
|
||
if not row or all(not cell for cell in row):
|
||
continue
|
||
|
||
item = {}
|
||
for i, cell in enumerate(row):
|
||
if i < len(header) and cell:
|
||
col_name = str(header[i]) if header[i] else f'col_{i}'
|
||
if '序号' in col_name or 'no' in col_name.lower():
|
||
item['no'] = str(cell)
|
||
elif '名称' in col_name or 'description' in col_name.lower():
|
||
item['name'] = str(cell)
|
||
elif '规格' in col_name or 'spec' in col_name.lower():
|
||
item['spec'] = str(cell)
|
||
elif '数量' in col_name or 'quantity' in col_name.lower():
|
||
item['quantity'] = self._extract_number(cell)
|
||
elif '单位' in col_name or 'unit' in col_name.lower():
|
||
item['unit'] = str(cell)
|
||
elif '单价' in col_name or 'price' in col_name.lower():
|
||
item['unit_price'] = self._extract_number(cell)
|
||
elif '金额' in col_name or 'amount' in col_name.lower():
|
||
item['amount'] = self._extract_number(cell)
|
||
|
||
if item.get('name'):
|
||
items.append(item)
|
||
|
||
# 如果表格提取失败,尝试从文本中提取
|
||
if not items:
|
||
items = self._extract_items_from_text()
|
||
|
||
self.extracted_data['items'] = items
|
||
logger.info(f"提取到 {len(items)} 条商品明细")
|
||
|
||
def _extract_items_from_text(self) -> List[Dict]:
|
||
"""从文本中提取商品信息"""
|
||
items = []
|
||
lines = self.text_content.split('\n')
|
||
|
||
# 简单模式匹配
|
||
item_pattern = r'(\d+)\s+([^\d]+?)(\d+(?:,\d+)?(?:\.\d+)?)\s*台\s*(\d+(?:\.\d+)?)\s*(\d+(?:,\d+)?(?:\.\d+)?)'
|
||
|
||
for line in lines:
|
||
match = re.search(item_pattern, line)
|
||
if match:
|
||
item = {
|
||
'no': match.group(1),
|
||
'name': match.group(2).strip(),
|
||
'quantity': float(match.group(3).replace(',', '')),
|
||
'unit': '台',
|
||
'unit_price': float(match.group(4)),
|
||
'amount': float(match.group(5).replace(',', ''))
|
||
}
|
||
items.append(item)
|
||
|
||
return items
|
||
|
||
@staticmethod
|
||
def _extract_number(value: Any) -> Optional[float]:
|
||
"""从单元格提取数字"""
|
||
if value is None:
|
||
return None
|
||
|
||
str_value = str(value).strip()
|
||
# 移除千位分隔符,保留小数点
|
||
str_value = re.sub(r'[^\d.]', '', str_value)
|
||
|
||
try:
|
||
return float(str_value) if str_value else None
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
class ExcelCustomsGenerator:
|
||
"""Excel报关单生成器"""
|
||
|
||
def __init__(self, template_path: str, output_path: str):
|
||
self.template_path = template_path
|
||
self.output_path = output_path
|
||
self.wb = None
|
||
|
||
def load_template(self):
|
||
"""加载Excel模板"""
|
||
try:
|
||
self.wb = load_workbook(self.template_path)
|
||
logger.info(f"成功加载模板: {self.template_path}")
|
||
except Exception as e:
|
||
logger.error(f"加载模板失败: {e}")
|
||
raise
|
||
|
||
def generate(self, data: Dict[str, Any]):
|
||
"""生成Excel报关单"""
|
||
if not self.wb:
|
||
self.load_template()
|
||
|
||
# 更新各个sheet
|
||
self._update_declaration_sheet(data) # 报关单新
|
||
self._update_contract_sheet(data) # 合同
|
||
self._update_packing_sheet(data) # 装箱单
|
||
self._update_invoice_sheet(data) # 发票
|
||
self._update_confirmation_sheet(data) # 确认书
|
||
self._update_authorization_sheet(data) # 委托书
|
||
|
||
# 保存文件
|
||
self.wb.save(self.output_path)
|
||
logger.info(f"Excel文件已生成: {self.output_path}")
|
||
|
||
def _update_declaration_sheet(self, data: Dict):
|
||
"""更新报关单sheet"""
|
||
if '报关单新' not in self.wb.sheetnames:
|
||
logger.warning("未找到'报关单新' sheet")
|
||
return
|
||
|
||
sheet = self.wb['报关单新']
|
||
items = data.get('items', [])
|
||
|
||
# 更新表头信息
|
||
header = data.get('header', {})
|
||
if header.get('customs_code'):
|
||
sheet['G3'] = header['customs_code'] # 10位海关代码
|
||
|
||
# 更新发货人信息
|
||
if header.get('shipper_name'):
|
||
sheet['A5'] = header['shipper_name']
|
||
|
||
# 更新商品明细
|
||
start_row = 24 # 商品开始行
|
||
for i, item in enumerate(items[:30]): # 最多30行
|
||
row = start_row + i * 2
|
||
sheet[f'A{row}'] = i + 1 # 项号
|
||
sheet[f'B{row}'] = '9019101000' # HS编码,需要从数据中获取
|
||
sheet[f'C{row}'] = item.get('name', '')
|
||
sheet[f'F{row}'] = item.get('quantity', '')
|
||
sheet[f'H{row}'] = item.get('unit_price', '')
|
||
sheet[f'I{row}'] = item.get('amount', '')
|
||
|
||
def _update_contract_sheet(self, data: Dict):
|
||
"""更新合同sheet"""
|
||
if '合同' not in self.wb.sheetnames:
|
||
logger.warning("未找到'合同' sheet")
|
||
return
|
||
|
||
sheet = self.wb['合同']
|
||
contract = data.get('contract', {})
|
||
|
||
if contract.get('seller'):
|
||
sheet['B5'] = contract['seller']
|
||
|
||
if contract.get('buyer'):
|
||
sheet['B10'] = contract['buyer']
|
||
|
||
if contract.get('contract_no'):
|
||
sheet['F5'] = contract['contract_no']
|
||
|
||
if contract.get('date'):
|
||
sheet['F6'] = contract['date']
|
||
|
||
# 更新商品明细
|
||
items = data.get('items', [])
|
||
start_row = 15
|
||
for i, item in enumerate(items[:31]): # 最多31行
|
||
row = start_row + i
|
||
sheet[f'A{row}'] = i + 1
|
||
sheet[f'B{row}'] = item.get('name', '')
|
||
sheet[f'C{row}'] = item.get('quantity', '')
|
||
sheet[f'D{row}'] = item.get('unit', '台')
|
||
sheet[f'E{row}'] = item.get('unit_price', '')
|
||
sheet[f'F{row}'] = item.get('amount', '')
|
||
|
||
def _update_packing_sheet(self, data: Dict):
|
||
"""更新装箱单sheet"""
|
||
if '装箱单' not in self.wb.sheetnames:
|
||
logger.warning("未找到'装箱单' sheet")
|
||
return
|
||
|
||
sheet = self.wb['装箱单']
|
||
items = data.get('items', [])
|
||
|
||
# 更新客户信息
|
||
contract = data.get('contract', {})
|
||
if contract.get('buyer'):
|
||
sheet['B10'] = contract['buyer']
|
||
|
||
# 更新发票号
|
||
invoice = data.get('invoice', {})
|
||
if invoice.get('invoice_no'):
|
||
sheet['G8'] = invoice['invoice_no']
|
||
sheet['G10'] = invoice['invoice_no']
|
||
|
||
# 更新商品明细
|
||
start_row = 13
|
||
for i, item in enumerate(items[:31]): # 最多31行
|
||
row = start_row + i
|
||
sheet[f'B{row}'] = i + 1
|
||
sheet[f'C{row}'] = item.get('name', '')
|
||
sheet[f'D{row}'] = item.get('quantity', '')
|
||
# 件数、毛重、净重需要从数据中获取
|
||
|
||
# 更新合计
|
||
packing = data.get('packing_list', {})
|
||
if packing.get('total_quantity'):
|
||
sheet['D44'] = packing['total_quantity']
|
||
|
||
if packing.get('gross_weight'):
|
||
sheet['F44'] = packing['gross_weight']
|
||
|
||
if packing.get('net_weight'):
|
||
sheet['G44'] = packing['net_weight']
|
||
|
||
def _update_invoice_sheet(self, data: Dict):
|
||
"""更新发票sheet"""
|
||
if '发票' not in self.wb.sheetnames:
|
||
logger.warning("未找到'发票' sheet")
|
||
return
|
||
|
||
sheet = self.wb['发票']
|
||
items = data.get('items', [])
|
||
contract = data.get('contract', {})
|
||
|
||
# 更新客户信息
|
||
if contract.get('buyer'):
|
||
sheet['B8'] = contract['buyer']
|
||
|
||
# 更新发票号
|
||
invoice = data.get('invoice', {})
|
||
if invoice.get('invoice_no'):
|
||
sheet['E8'] = invoice['invoice_no']
|
||
|
||
# 更新日期
|
||
if contract.get('date'):
|
||
sheet['E9'] = contract['date']
|
||
|
||
# 更新商品明细
|
||
start_row = 13
|
||
for i, item in enumerate(items[:31]): # 最多31行
|
||
row = start_row + i
|
||
sheet[f'A{row}'] = i + 1
|
||
sheet[f'B{row}'] = item.get('name', '')
|
||
sheet[f'C{row}'] = item.get('quantity', '')
|
||
sheet[f'D{row}'] = item.get('unit_price', '')
|
||
sheet[f'E{row}'] = item.get('amount', '')
|
||
|
||
# 更新合计
|
||
if items:
|
||
total_quantity = sum(item.get('quantity', 0) for item in items)
|
||
total_amount = sum(item.get('amount', 0) for item in items)
|
||
|
||
sheet['C44'] = total_quantity
|
||
sheet['E44'] = total_amount
|
||
|
||
def _update_confirmation_sheet(self, data: Dict):
|
||
"""更新确认书sheet"""
|
||
if '确认书' not in self.wb.sheetnames:
|
||
logger.warning("未找到'确认书' sheet")
|
||
return
|
||
|
||
sheet = self.wb['确认书']
|
||
items = data.get('items', [])
|
||
contract = data.get('contract', {})
|
||
|
||
# 更新收货人
|
||
if contract.get('buyer'):
|
||
sheet['C5'] = contract['buyer']
|
||
|
||
# 更新合同号
|
||
if contract.get('contract_no'):
|
||
sheet['K3'] = contract['contract_no']
|
||
|
||
# 更新截关日期(当前日期+7天)
|
||
today = datetime.now()
|
||
cutoff_date = today + timedelta(days=7)
|
||
sheet['K4'] = cutoff_date.strftime('%Y-%m-%d')
|
||
|
||
# 更新商品明细
|
||
start_row = 7
|
||
for i, item in enumerate(items[:31]): # 最多31行
|
||
row = start_row + i
|
||
sheet[f'A{row}'] = i + 1
|
||
sheet[f'C{row}'] = item.get('name', '')
|
||
sheet[f'F{row}'] = item.get('quantity', '')
|
||
sheet[f'K{row}'] = item.get('unit_price', '')
|
||
sheet[f'L{row}'] = item.get('amount', '')
|
||
|
||
# 更新合计
|
||
if items:
|
||
total_quantity = sum(item.get('quantity', 0) for item in items)
|
||
total_amount = sum(item.get('amount', 0) for item in items)
|
||
|
||
sheet['F38'] = total_quantity
|
||
sheet['L38'] = total_amount
|
||
|
||
# 更新运抵国
|
||
sheet['D40'] = '加拿大'
|
||
|
||
def _update_authorization_sheet(self, data: Dict):
|
||
"""更新委托书sheet"""
|
||
if '委托书' not in self.wb.sheetnames:
|
||
logger.warning("未找到'委托书' sheet")
|
||
return
|
||
|
||
sheet = self.wb['委托书']
|
||
header = data.get('header', {})
|
||
items = data.get('items', [])
|
||
|
||
# 更新委托方
|
||
if header.get('shipper_name'):
|
||
sheet['B17'] = header['shipper_name']
|
||
|
||
# 更新主要货物名称
|
||
if items and len(items) > 0:
|
||
sheet['B18'] = items[0].get('name', '')
|
||
|
||
# 更新HS编码
|
||
sheet['B19'] = '9019101000'
|
||
|
||
# 更新有效期(当前日期+30天)
|
||
today = datetime.now()
|
||
expiry_date = today + timedelta(days=30)
|
||
sheet['A5'] = f"本委托书有效期自签字之日起至{expiry_date.strftime('%Y年%m月%d日')}止"
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 文件路径配置
|
||
pdf_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关草单BGKC2026C013.pdf" # 输入PDF文件
|
||
template_path = "D:\Program Files (x86)\download\科技创新-腾飞(1)(1).xlsx" # 模板文件
|
||
output_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关单生成_20260318.xlsx" # 输出文件
|
||
|
||
try:
|
||
# 1. 提取PDF数据
|
||
logger.info("开始提取PDF数据...")
|
||
extractor = PDFCustomsExtractor(pdf_path)
|
||
extracted_data = extractor.extract()
|
||
|
||
# 打印提取结果概览
|
||
logger.info(f"提取完成:")
|
||
logger.info(f"- 合同号: {extracted_data['contract'].get('contract_no', 'N/A')}")
|
||
logger.info(f"- 买方: {extracted_data['contract'].get('buyer', 'N/A')}")
|
||
logger.info(f"- 商品数量: {len(extracted_data.get('items', []))}")
|
||
|
||
# 2. 生成Excel
|
||
logger.info("开始生成Excel文件...")
|
||
generator = ExcelCustomsGenerator(template_path, output_path)
|
||
generator.generate(extracted_data)
|
||
|
||
logger.info(f"处理完成!输出文件: {output_path}")
|
||
|
||
except FileNotFoundError as e:
|
||
logger.error(f"文件不存在: {e}")
|
||
except Exception as e:
|
||
logger.error(f"处理失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|