# -*- coding:utf-8 -*- """ PDF报关单数据提取与Excel转换工具 功能:从PDF报关草单提取数据,填充到标准Excel报关单模板 """ import os import re import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import pdfplumber import openpyxl from openpyxl import load_workbook from openpyxl.utils import get_column_letter import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class PDFCustomsExtractor: """PDF报关单数据提取器""" def __init__(self, pdf_path: str): self.pdf_path = pdf_path self.text_content = "" self.tables = [] self.extracted_data = { 'header': {}, 'contract': {}, 'packing_list': {}, 'invoice': {}, 'items': [] } def extract(self) -> Dict[str, Any]: """提取PDF中的所有数据""" try: with pdfplumber.open(self.pdf_path) as pdf: # 提取所有文本 for page in pdf.pages: page_text = page.extract_text() or "" self.text_content += page_text + "\n" # 提取表格 page_tables = page.extract_tables() if page_tables: self.tables.extend(page_tables) logger.info(f"成功提取PDF文本,共{len(pdf.pages)}页,{len(self.tables)}个表格") # 解析各个部分 self._parse_header() self._parse_contract() self._parse_packing_list() self._parse_invoice() self._parse_items() return self.extracted_data except Exception as e: logger.error(f"PDF提取失败: {e}") raise def _parse_header(self): """解析报关单头信息""" text = self.text_content # 提取预录入编号和海关代码 header_patterns = { 'pre_entry_no': r'预录入编号:\s*(\d+)', 'customs_code': r'10位海关代码[::]\s*(\w+)', 'declaration_no': r'海关编号[::]\s*(\w+)', } for key, pattern in header_patterns.items(): match = re.search(pattern, text) if match: self.extracted_data['header'][key] = match.group(1).strip() # 提取发货人信息 shipper_match = re.search(r'境内发货人\s*(\d+)\s*(\S+)', text) if shipper_match: self.extracted_data['header']['shipper_code'] = shipper_match.group(1) self.extracted_data['header']['shipper_name'] = shipper_match.group(2) def _parse_contract(self): """解析合同信息""" text = self.text_content # 提取合同号 contract_no_match = re.search(r'合同号[::]\s*(\w+)', text) if contract_no_match: self.extracted_data['contract']['contract_no'] = contract_no_match.group(1) # 提取买卖双方信息 seller_match = re.search(r'卖方\s*[::]\s*([^\n]+)', text) if seller_match: self.extracted_data['contract']['seller'] = seller_match.group(1).strip() buyer_match = re.search(r'买方\s*[::]\s*([^\n]+)', text) if buyer_match: self.extracted_data['contract']['buyer'] = buyer_match.group(1).strip() # 提取日期 date_match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', text) if date_match: year, month, day = date_match.groups() self.extracted_data['contract']['date'] = f"{year}-{month.zfill(2)}-{day.zfill(2)}" def _parse_packing_list(self): """解析装箱单信息""" text = self.text_content # 提取毛重、净重、件数 patterns = { 'total_quantity': r'总数量[::]\s*(\d+)', 'total_packages': r'总件数[::]\s*(\d+)', 'gross_weight': r'总毛重[::]\s*(\d+(?:\.\d+)?)', 'net_weight': r'总净重[::]\s*(\d+(?:\.\d+)?)', } for key, pattern in patterns.items(): match = re.search(pattern, text) if match: self.extracted_data['packing_list'][key] = float(match.group(1)) def _parse_invoice(self): """解析发票信息""" text = self.text_content # 提取发票号 invoice_no_match = re.search(r'发票编号[::]\s*(\w+)', text) if invoice_no_match: self.extracted_data['invoice']['invoice_no'] = invoice_no_match.group(1) # 提取总金额 total_match = re.search(r'TOTAL:\s*(\d+(?:,\d+)?(?:\.\d+)?)', text) if total_match: total_str = total_match.group(1).replace(',', '') self.extracted_data['invoice']['total_amount'] = float(total_str) def _parse_items(self): """解析货物明细""" items = [] # 从表格中提取商品信息 for table in self.tables: if not table or len(table) < 2: continue # 检查表头是否包含商品信息 header = table[0] if table[0] else [] header_str = ' '.join([str(cell).lower() if cell else '' for cell in header]) if any(keyword in header_str for keyword in ['序号', '货物名称', '商品名称', '规格']): # 遍历数据行 for row in table[1:]: if not row or all(not cell for cell in row): continue item = {} for i, cell in enumerate(row): if i < len(header) and cell: col_name = str(header[i]) if header[i] else f'col_{i}' if '序号' in col_name or 'no' in col_name.lower(): item['no'] = str(cell) elif '名称' in col_name or 'description' in col_name.lower(): item['name'] = str(cell) elif '规格' in col_name or 'spec' in col_name.lower(): item['spec'] = str(cell) elif '数量' in col_name or 'quantity' in col_name.lower(): item['quantity'] = self._extract_number(cell) elif '单位' in col_name or 'unit' in col_name.lower(): item['unit'] = str(cell) elif '单价' in col_name or 'price' in col_name.lower(): item['unit_price'] = self._extract_number(cell) elif '金额' in col_name or 'amount' in col_name.lower(): item['amount'] = self._extract_number(cell) if item.get('name'): items.append(item) # 如果表格提取失败,尝试从文本中提取 if not items: items = self._extract_items_from_text() self.extracted_data['items'] = items logger.info(f"提取到 {len(items)} 条商品明细") def _extract_items_from_text(self) -> List[Dict]: """从文本中提取商品信息""" items = [] lines = self.text_content.split('\n') # 简单模式匹配 item_pattern = r'(\d+)\s+([^\d]+?)(\d+(?:,\d+)?(?:\.\d+)?)\s*台\s*(\d+(?:\.\d+)?)\s*(\d+(?:,\d+)?(?:\.\d+)?)' for line in lines: match = re.search(item_pattern, line) if match: item = { 'no': match.group(1), 'name': match.group(2).strip(), 'quantity': float(match.group(3).replace(',', '')), 'unit': '台', 'unit_price': float(match.group(4)), 'amount': float(match.group(5).replace(',', '')) } items.append(item) return items @staticmethod def _extract_number(value: Any) -> Optional[float]: """从单元格提取数字""" if value is None: return None str_value = str(value).strip() # 移除千位分隔符,保留小数点 str_value = re.sub(r'[^\d.]', '', str_value) try: return float(str_value) if str_value else None except ValueError: return None class ExcelCustomsGenerator: """Excel报关单生成器""" def __init__(self, template_path: str, output_path: str): self.template_path = template_path self.output_path = output_path self.wb = None def load_template(self): """加载Excel模板""" try: self.wb = load_workbook(self.template_path) logger.info(f"成功加载模板: {self.template_path}") except Exception as e: logger.error(f"加载模板失败: {e}") raise def generate(self, data: Dict[str, Any]): """生成Excel报关单""" if not self.wb: self.load_template() # 更新各个sheet self._update_declaration_sheet(data) # 报关单新 self._update_contract_sheet(data) # 合同 self._update_packing_sheet(data) # 装箱单 self._update_invoice_sheet(data) # 发票 self._update_confirmation_sheet(data) # 确认书 self._update_authorization_sheet(data) # 委托书 # 保存文件 self.wb.save(self.output_path) logger.info(f"Excel文件已生成: {self.output_path}") def _update_declaration_sheet(self, data: Dict): """更新报关单sheet""" if '报关单新' not in self.wb.sheetnames: logger.warning("未找到'报关单新' sheet") return sheet = self.wb['报关单新'] items = data.get('items', []) # 更新表头信息 header = data.get('header', {}) if header.get('customs_code'): sheet['G3'] = header['customs_code'] # 10位海关代码 # 更新发货人信息 if header.get('shipper_name'): sheet['A5'] = header['shipper_name'] # 更新商品明细 start_row = 24 # 商品开始行 for i, item in enumerate(items[:30]): # 最多30行 row = start_row + i * 2 sheet[f'A{row}'] = i + 1 # 项号 sheet[f'B{row}'] = '9019101000' # HS编码,需要从数据中获取 sheet[f'C{row}'] = item.get('name', '') sheet[f'F{row}'] = item.get('quantity', '') sheet[f'H{row}'] = item.get('unit_price', '') sheet[f'I{row}'] = item.get('amount', '') def _update_contract_sheet(self, data: Dict): """更新合同sheet""" if '合同' not in self.wb.sheetnames: logger.warning("未找到'合同' sheet") return sheet = self.wb['合同'] contract = data.get('contract', {}) if contract.get('seller'): sheet['B5'] = contract['seller'] if contract.get('buyer'): sheet['B10'] = contract['buyer'] if contract.get('contract_no'): sheet['F5'] = contract['contract_no'] if contract.get('date'): sheet['F6'] = contract['date'] # 更新商品明细 items = data.get('items', []) start_row = 15 for i, item in enumerate(items[:31]): # 最多31行 row = start_row + i sheet[f'A{row}'] = i + 1 sheet[f'B{row}'] = item.get('name', '') sheet[f'C{row}'] = item.get('quantity', '') sheet[f'D{row}'] = item.get('unit', '台') sheet[f'E{row}'] = item.get('unit_price', '') sheet[f'F{row}'] = item.get('amount', '') def _update_packing_sheet(self, data: Dict): """更新装箱单sheet""" if '装箱单' not in self.wb.sheetnames: logger.warning("未找到'装箱单' sheet") return sheet = self.wb['装箱单'] items = data.get('items', []) # 更新客户信息 contract = data.get('contract', {}) if contract.get('buyer'): sheet['B10'] = contract['buyer'] # 更新发票号 invoice = data.get('invoice', {}) if invoice.get('invoice_no'): sheet['G8'] = invoice['invoice_no'] sheet['G10'] = invoice['invoice_no'] # 更新商品明细 start_row = 13 for i, item in enumerate(items[:31]): # 最多31行 row = start_row + i sheet[f'B{row}'] = i + 1 sheet[f'C{row}'] = item.get('name', '') sheet[f'D{row}'] = item.get('quantity', '') # 件数、毛重、净重需要从数据中获取 # 更新合计 packing = data.get('packing_list', {}) if packing.get('total_quantity'): sheet['D44'] = packing['total_quantity'] if packing.get('gross_weight'): sheet['F44'] = packing['gross_weight'] if packing.get('net_weight'): sheet['G44'] = packing['net_weight'] def _update_invoice_sheet(self, data: Dict): """更新发票sheet""" if '发票' not in self.wb.sheetnames: logger.warning("未找到'发票' sheet") return sheet = self.wb['发票'] items = data.get('items', []) contract = data.get('contract', {}) # 更新客户信息 if contract.get('buyer'): sheet['B8'] = contract['buyer'] # 更新发票号 invoice = data.get('invoice', {}) if invoice.get('invoice_no'): sheet['E8'] = invoice['invoice_no'] # 更新日期 if contract.get('date'): sheet['E9'] = contract['date'] # 更新商品明细 start_row = 13 for i, item in enumerate(items[:31]): # 最多31行 row = start_row + i sheet[f'A{row}'] = i + 1 sheet[f'B{row}'] = item.get('name', '') sheet[f'C{row}'] = item.get('quantity', '') sheet[f'D{row}'] = item.get('unit_price', '') sheet[f'E{row}'] = item.get('amount', '') # 更新合计 if items: total_quantity = sum(item.get('quantity', 0) for item in items) total_amount = sum(item.get('amount', 0) for item in items) sheet['C44'] = total_quantity sheet['E44'] = total_amount def _update_confirmation_sheet(self, data: Dict): """更新确认书sheet""" if '确认书' not in self.wb.sheetnames: logger.warning("未找到'确认书' sheet") return sheet = self.wb['确认书'] items = data.get('items', []) contract = data.get('contract', {}) # 更新收货人 if contract.get('buyer'): sheet['C5'] = contract['buyer'] # 更新合同号 if contract.get('contract_no'): sheet['K3'] = contract['contract_no'] # 更新截关日期(当前日期+7天) today = datetime.now() cutoff_date = today + timedelta(days=7) sheet['K4'] = cutoff_date.strftime('%Y-%m-%d') # 更新商品明细 start_row = 7 for i, item in enumerate(items[:31]): # 最多31行 row = start_row + i sheet[f'A{row}'] = i + 1 sheet[f'C{row}'] = item.get('name', '') sheet[f'F{row}'] = item.get('quantity', '') sheet[f'K{row}'] = item.get('unit_price', '') sheet[f'L{row}'] = item.get('amount', '') # 更新合计 if items: total_quantity = sum(item.get('quantity', 0) for item in items) total_amount = sum(item.get('amount', 0) for item in items) sheet['F38'] = total_quantity sheet['L38'] = total_amount # 更新运抵国 sheet['D40'] = '加拿大' def _update_authorization_sheet(self, data: Dict): """更新委托书sheet""" if '委托书' not in self.wb.sheetnames: logger.warning("未找到'委托书' sheet") return sheet = self.wb['委托书'] header = data.get('header', {}) items = data.get('items', []) # 更新委托方 if header.get('shipper_name'): sheet['B17'] = header['shipper_name'] # 更新主要货物名称 if items and len(items) > 0: sheet['B18'] = items[0].get('name', '') # 更新HS编码 sheet['B19'] = '9019101000' # 更新有效期(当前日期+30天) today = datetime.now() expiry_date = today + timedelta(days=30) sheet['A5'] = f"本委托书有效期自签字之日起至{expiry_date.strftime('%Y年%m月%d日')}止" def main(): """主函数""" # 文件路径配置 pdf_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关草单BGKC2026C013.pdf" # 输入PDF文件 template_path = "D:\Program Files (x86)\download\科技创新-腾飞(1)(1).xlsx" # 模板文件 output_path = "D:\工作内容\沃达\智慧运营平台\上传文件\报关单生成_20260318.xlsx" # 输出文件 try: # 1. 提取PDF数据 logger.info("开始提取PDF数据...") extractor = PDFCustomsExtractor(pdf_path) extracted_data = extractor.extract() # 打印提取结果概览 logger.info(f"提取完成:") logger.info(f"- 合同号: {extracted_data['contract'].get('contract_no', 'N/A')}") logger.info(f"- 买方: {extracted_data['contract'].get('buyer', 'N/A')}") logger.info(f"- 商品数量: {len(extracted_data.get('items', []))}") # 2. 生成Excel logger.info("开始生成Excel文件...") generator = ExcelCustomsGenerator(template_path, output_path) generator.generate(extracted_data) logger.info(f"处理完成!输出文件: {output_path}") except FileNotFoundError as e: logger.error(f"文件不存在: {e}") except Exception as e: logger.error(f"处理失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()