1. 新增文件: - __sync_prod_db.py - 数据库同步脚本 - uploads/ 目录下的PDF文档 2. 修改文件: - app/api/service/documentSourceService.py - 文档源服务 - requirements.txt - 依赖配置
515 lines
20 KiB
Python
515 lines
20 KiB
Python
# encoding: UTF-8
|
||
import os
|
||
import re
|
||
|
||
from ..model.documentSourceModel import DocumentSource
|
||
from ..model.caseModel import TestCase, Module
|
||
from ..dao.documentSourceDao import DocumentSourceDao
|
||
from ..dao.caseDao import CaseDao
|
||
from ..dao.skillDao import SkillDao
|
||
from .aiService import AIService
|
||
|
||
|
||
class DocumentSourceService:
|
||
|
||
DOCUMENT_TYPE_PDF = 1
|
||
DOCUMENT_TYPE_FEISHU = 2
|
||
|
||
DOCUMENT_STATUS_PENDING = 0
|
||
DOCUMENT_STATUS_PARSED = 1
|
||
DOCUMENT_STATUS_GENERATED = 2
|
||
|
||
@staticmethod
|
||
def create(session, data):
|
||
product_id = data.get('productId') or data.get('product_id')
|
||
project_id = data.get('projectId') or data.get('project_id')
|
||
document_type = data.get('type', 1)
|
||
source = data.get('source')
|
||
content = data.get('content', '')
|
||
created_by = data.get('createdBy') or data.get('created_by')
|
||
|
||
if not product_id or not project_id or not source:
|
||
return 0, 'productId、projectId、source 为必传参数'
|
||
|
||
max_version = DocumentSourceDao.get_max_version(session, product_id, project_id, source)
|
||
|
||
document_source = DocumentSource(
|
||
product_id=product_id,
|
||
project_id=project_id,
|
||
type=document_type,
|
||
source=source,
|
||
content=content,
|
||
version=max_version + 1,
|
||
status=DocumentSourceService.DOCUMENT_STATUS_PENDING,
|
||
created_by=created_by,
|
||
is_delete=0
|
||
)
|
||
|
||
if document_type == DocumentSourceService.DOCUMENT_TYPE_FEISHU:
|
||
content = DocumentSourceService._fetch_feishu_content(source)
|
||
if content:
|
||
document_source.content = content
|
||
document_source.status = DocumentSourceService.DOCUMENT_STATUS_PARSED
|
||
|
||
doc_id = DocumentSourceDao.create(session, document_source)
|
||
return doc_id, ''
|
||
|
||
@staticmethod
|
||
def _fetch_feishu_content(url):
|
||
try:
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
}
|
||
response = requests.get(url, headers=headers, timeout=30)
|
||
if response.status_code == 200:
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
return soup.get_text(strip=True)[:10000]
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def parse_pdf_content(pdf_path):
|
||
try:
|
||
from PyPDF2 import PdfReader
|
||
|
||
reader = PdfReader(pdf_path)
|
||
content = ''
|
||
for page in reader.pages:
|
||
text = page.extract_text()
|
||
if text:
|
||
content += text
|
||
return content
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_by_id(session, document_id):
|
||
return DocumentSourceDao.get_by_id(session, document_id)
|
||
|
||
@staticmethod
|
||
def list(session, req_data):
|
||
filters = [DocumentSource.is_delete == 0]
|
||
|
||
product_id = req_data.get('productId') or req_data.get('product_id')
|
||
if product_id:
|
||
filters.append(DocumentSource.product_id == product_id)
|
||
|
||
project_id = req_data.get('projectId') or req_data.get('project_id')
|
||
if project_id:
|
||
filters.append(DocumentSource.project_id == project_id)
|
||
|
||
doc_type = req_data.get('type')
|
||
if doc_type is not None:
|
||
filters.append(DocumentSource.type == doc_type)
|
||
|
||
status = req_data.get('status')
|
||
if status is not None:
|
||
filters.append(DocumentSource.status == status)
|
||
|
||
keyword = req_data.get('keyword')
|
||
if keyword:
|
||
filters.append(DocumentSource.source.like(f'%{keyword}%'))
|
||
|
||
page_no = int(req_data.get('pageNo', req_data.get('page', 1)))
|
||
page_size = int(req_data.get('pageSize', req_data.get('size', 20)))
|
||
|
||
items, total = DocumentSourceDao.list_by_filters(
|
||
session, filters, page_no, page_size, DocumentSource.created_time.desc()
|
||
)
|
||
|
||
return items, total
|
||
|
||
@staticmethod
|
||
def update(session, document_id, data):
|
||
update_info = {}
|
||
|
||
fields = ['type', 'source', 'content', 'ai_model']
|
||
for field in fields:
|
||
if field in data:
|
||
update_info[field] = data[field]
|
||
|
||
if update_info:
|
||
return DocumentSourceDao.update_by_id(session, document_id, update_info)
|
||
return 1
|
||
|
||
@staticmethod
|
||
def delete(session, document_id):
|
||
import os
|
||
from flask import current_app
|
||
|
||
# 先查询文档信息
|
||
document = DocumentSourceDao.get_by_id(session, document_id)
|
||
if not document:
|
||
return 0, '文档不存在'
|
||
|
||
# 如果是PDF文件类型,删除对应的文件
|
||
if document.type == DocumentSourceService.DOCUMENT_TYPE_PDF and document.source:
|
||
# source字段存储的是相对路径,如:uploads/zhyy/v2.0/xxx.pdf
|
||
file_path = os.path.join(os.getcwd(), document.source)
|
||
|
||
try:
|
||
if os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
current_app.logger.info(f'已删除文件: {file_path}')
|
||
except Exception as e:
|
||
current_app.logger.error(f'删除文件失败: {file_path}, 错误: {str(e)}')
|
||
|
||
# 软删除数据库记录
|
||
result = DocumentSourceDao.delete_by_id(session, document_id)
|
||
return result, ''
|
||
|
||
@staticmethod
|
||
def refresh_content(session, document_id):
|
||
document = DocumentSourceDao.get_by_id(session, document_id)
|
||
if not document:
|
||
return False, '文档不存在'
|
||
|
||
if document.type == DocumentSourceService.DOCUMENT_TYPE_FEISHU:
|
||
content = DocumentSourceService._fetch_feishu_content(document.source)
|
||
if content:
|
||
DocumentSourceDao.update_by_id(session, document_id, {
|
||
'content': content,
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_PARSED
|
||
})
|
||
return True, ''
|
||
return False, '获取飞书内容失败'
|
||
|
||
return False, '仅支持刷新飞书链接内容'
|
||
|
||
@staticmethod
|
||
def generate_cases(session, document_id, template=None):
|
||
document = DocumentSourceDao.get_by_id(session, document_id)
|
||
if not document:
|
||
return [], '文档不存在'
|
||
|
||
# 如果是PDF类型且内容为空,先解析PDF
|
||
if document.type == DocumentSourceService.DOCUMENT_TYPE_PDF and not document.content:
|
||
# 解析PDF内容
|
||
pdf_path = os.path.join(os.getcwd(), document.source)
|
||
if not os.path.exists(pdf_path):
|
||
return [], 'PDF文件不存在'
|
||
|
||
# 使用AI服务解析PDF并生成用例
|
||
cases, msg = AIService.parse_pdf_and_generate_cases(pdf_path, template)
|
||
if msg:
|
||
return [], msg
|
||
|
||
# 更新文档内容和状态
|
||
DocumentSourceDao.update_by_id(session, document_id, {
|
||
'content': DocumentSourceService._extract_content_from_pdf(pdf_path),
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_GENERATED
|
||
})
|
||
|
||
return cases, ''
|
||
|
||
if not document.content:
|
||
return [], '文档内容为空'
|
||
|
||
# 使用AI服务生成测试用例
|
||
cases, msg = AIService.generate_test_cases(document.content, template)
|
||
if msg:
|
||
return [], msg
|
||
|
||
# 更新文档状态为已生成用例
|
||
DocumentSourceDao.update_by_id(session, document_id, {
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_GENERATED
|
||
})
|
||
|
||
return cases, ''
|
||
|
||
@staticmethod
|
||
def _extract_content_from_pdf(pdf_path):
|
||
"""提取PDF内容"""
|
||
try:
|
||
from flask import current_app
|
||
from PyPDF2 import PdfReader
|
||
file_size = os.path.getsize(pdf_path) if os.path.exists(pdf_path) else 0
|
||
current_app.logger.info(f'开始提取PDF内容: path={pdf_path}, size={file_size}')
|
||
reader = PdfReader(pdf_path)
|
||
content = ''
|
||
for page in reader.pages:
|
||
page_content = page.extract_text()
|
||
if page_content:
|
||
content += page_content + '\n'
|
||
current_app.logger.info(f'PDF内容提取完成: path={pdf_path}, pages={len(reader.pages)}, content_length={len(content)}')
|
||
return content
|
||
except Exception as e:
|
||
from flask import current_app
|
||
current_app.logger.exception(f'PDF内容提取失败: path={pdf_path}, error={str(e)}')
|
||
return ''
|
||
|
||
@staticmethod
|
||
def generate_cases_batch(session, document_ids, template=None):
|
||
"""
|
||
批量生成测试用例,支持多个文档
|
||
|
||
:param session: 数据库会话
|
||
:param document_ids: 文档ID列表
|
||
:param template: 用例模板配置
|
||
:return: 所有测试用例列表,失败文档列表
|
||
"""
|
||
all_cases = []
|
||
failed_docs = []
|
||
combined_content = []
|
||
template = template or {}
|
||
|
||
for doc_id in document_ids:
|
||
document = DocumentSourceDao.get_by_id(session, doc_id)
|
||
if not document:
|
||
failed_docs.append({'documentId': doc_id, 'error': '文档不存在'})
|
||
continue
|
||
|
||
content = document.content
|
||
|
||
# 如果是PDF类型且内容为空,先解析PDF
|
||
if document.type == DocumentSourceService.DOCUMENT_TYPE_PDF and not content:
|
||
pdf_path = os.path.join(os.getcwd(), document.source)
|
||
if not os.path.exists(pdf_path):
|
||
failed_docs.append({'documentId': doc_id, 'error': 'PDF文件不存在'})
|
||
continue
|
||
|
||
# 提取PDF内容
|
||
content = DocumentSourceService._extract_content_from_pdf(pdf_path)
|
||
if not content:
|
||
file_size = os.path.getsize(pdf_path) if os.path.exists(pdf_path) else 0
|
||
failed_docs.append({'documentId': doc_id, 'error': f'PDF内容为空,文件大小:{file_size} bytes。请检查服务器是否安装PyPDF2、文件是否为扫描件或加密PDF'})
|
||
continue
|
||
|
||
# 更新文档内容
|
||
DocumentSourceDao.update_by_id(session, doc_id, {
|
||
'content': content,
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_PARSED
|
||
})
|
||
|
||
if not content:
|
||
failed_docs.append({'documentId': doc_id, 'error': '文档内容为空'})
|
||
continue
|
||
|
||
# 添加文档标识
|
||
combined_content.append(f"【文档ID: {doc_id}】\n{content}\n")
|
||
|
||
if not combined_content:
|
||
return [], failed_docs
|
||
|
||
# 合并所有文档内容
|
||
merged_content = "\n---\n".join(combined_content)
|
||
|
||
context_template, context_err = DocumentSourceService._attach_generation_context(session, template)
|
||
if context_err:
|
||
return [], [{'documentId': 'all', 'error': context_err}]
|
||
|
||
# 使用AI服务生成测试用例(基于合并后的内容)
|
||
cases, msg = AIService.generate_test_cases(merged_content, context_template)
|
||
if msg:
|
||
return [], [{'documentId': 'all', 'error': msg}]
|
||
|
||
# 更新所有文档状态为已生成用例
|
||
for doc_id in document_ids:
|
||
if doc_id not in [f['documentId'] for f in failed_docs]:
|
||
DocumentSourceDao.update_by_id(session, doc_id, {
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_GENERATED
|
||
})
|
||
|
||
return cases, failed_docs
|
||
|
||
@staticmethod
|
||
def _attach_generation_context(session, template):
|
||
template = dict(template or {})
|
||
skill_ids = template.get('skill_ids') or []
|
||
rule_ids = template.get('rule_ids') or []
|
||
if not skill_ids and not rule_ids:
|
||
return template, ''
|
||
project_id = template.get('project_id')
|
||
if not project_id:
|
||
return template, 'projectId 为必传参数'
|
||
try:
|
||
skill_ids = [int(item) for item in skill_ids]
|
||
rule_ids = [int(item) for item in rule_ids]
|
||
except (TypeError, ValueError):
|
||
return template, 'skillIds、ruleIds 必须是数字数组'
|
||
|
||
skills = SkillDao.list_skills_by_ids(session, project_id, skill_ids)
|
||
rules = SkillDao.list_business_rules_by_ids(session, project_id, rule_ids)
|
||
if len(skills) != len(set(skill_ids)):
|
||
return template, '存在未查询到的 Skill 或 Skill 不属于当前项目'
|
||
if len(rules) != len(set(rule_ids)):
|
||
return template, '存在未查询到的业务规则或业务规则不属于当前项目'
|
||
|
||
skill_contexts, err_msg = DocumentSourceService._load_asset_contexts(skills, 'skill_file_path', 'Skill')
|
||
if err_msg:
|
||
return template, err_msg
|
||
rule_contexts, err_msg = DocumentSourceService._load_asset_contexts(rules, 'rule_file_path', '业务规则')
|
||
if err_msg:
|
||
return template, err_msg
|
||
template['skill_contexts'] = skill_contexts
|
||
template['rule_contexts'] = rule_contexts
|
||
return template, ''
|
||
|
||
@staticmethod
|
||
def _load_asset_contexts(items, path_field, source_label):
|
||
contexts = []
|
||
workspace_root = os.getcwd()
|
||
for item in items:
|
||
file_path = getattr(item, path_field, None)
|
||
if not file_path:
|
||
return [], f'{source_label}「{getattr(item, "name", "")}」未配置文件路径'
|
||
if not os.path.isabs(file_path):
|
||
file_path = os.path.join(workspace_root, file_path)
|
||
normalized_path = os.path.abspath(file_path)
|
||
if not os.path.exists(normalized_path):
|
||
return [], f'{source_label}「{getattr(item, "name", "")}」文件不存在'
|
||
try:
|
||
with open(normalized_path, 'r', encoding='utf-8') as file_obj:
|
||
content = file_obj.read()
|
||
except Exception as e:
|
||
return [], f'{source_label}「{getattr(item, "name", "")}」文件读取失败:{str(e)}'
|
||
contexts.append({
|
||
'id': item.id,
|
||
'name': item.name,
|
||
'path': normalized_path,
|
||
'content': content
|
||
})
|
||
return contexts, ''
|
||
|
||
@staticmethod
|
||
def match_modules(session, project_id, cases):
|
||
for case in cases:
|
||
module_name = case.get('module_name')
|
||
case['module_id'] = DocumentSourceService._find_module_by_path(session, project_id, module_name) if module_name else None
|
||
return cases
|
||
|
||
@staticmethod
|
||
def import_cases(session, document_id, cases, user_id, auto_create_module=False):
|
||
document = DocumentSourceDao.get_by_id(session, document_id)
|
||
if not document:
|
||
return 0, '文档不存在'
|
||
|
||
success_count = 0
|
||
for case_data in cases:
|
||
if not case_data.get('selected', True):
|
||
continue
|
||
|
||
module_id = case_data.get('module_id')
|
||
module_name = case_data.get('module_name', '未分类')
|
||
|
||
if not module_id:
|
||
if auto_create_module:
|
||
module_id = DocumentSourceService._get_or_create_module_path(session, document.project_id, module_name)
|
||
else:
|
||
module_id = DocumentSourceService._find_module_by_path(session, document.project_id, module_name)
|
||
if not module_id:
|
||
continue
|
||
|
||
case_info = {
|
||
'project_id': document.project_id,
|
||
'module_id': module_id,
|
||
'case_key': CaseDao.next_case_key(session, document.project_id, module_id, document.product_id),
|
||
'title': case_data.get('title', ''),
|
||
'preconditions': case_data.get('precondition', ''),
|
||
'steps': case_data.get('steps', ''),
|
||
'expected_results': case_data.get('expected_result', ''),
|
||
'priority': case_data.get('priority', 2),
|
||
'case_type': case_data.get('case_type', 1),
|
||
'tags': case_data.get('tags', []),
|
||
'is_ai_generated': 1,
|
||
'status': 0,
|
||
'is_delete': 0,
|
||
'created_by': user_id
|
||
}
|
||
|
||
case_id, err_msg = CaseDao.create(session, TestCase, case_info)
|
||
if err_msg:
|
||
return success_count, err_msg
|
||
success_count += 1
|
||
|
||
DocumentSourceDao.update_by_id(session, document_id, {
|
||
'status': DocumentSourceService.DOCUMENT_STATUS_GENERATED
|
||
})
|
||
|
||
return success_count, ''
|
||
|
||
@staticmethod
|
||
def batch_create_modules(session, project_id, module_names):
|
||
created_modules = []
|
||
for name in module_names:
|
||
module = DocumentSourceService._get_or_create_module_path(session, project_id, name, return_model=True)
|
||
if module:
|
||
created_modules.append(module)
|
||
session.flush()
|
||
return created_modules
|
||
|
||
@staticmethod
|
||
def _find_module_by_path(session, project_id, module_name):
|
||
parts = DocumentSourceService._parse_module_path(module_name)
|
||
parent_id = 0
|
||
module_id = None
|
||
for name in parts:
|
||
module = session.query(Module).filter(
|
||
Module.project_id == project_id,
|
||
Module.parent_id == parent_id,
|
||
Module.name == name,
|
||
Module.is_delete == 0
|
||
).first()
|
||
if not module:
|
||
return None
|
||
module_id = module.id
|
||
parent_id = module.id
|
||
return module_id
|
||
|
||
@staticmethod
|
||
def _get_or_create_module_path(session, project_id, module_name, return_model=False):
|
||
parts = DocumentSourceService._parse_module_path(module_name)
|
||
parent_id = 0
|
||
current_module = None
|
||
for name in parts:
|
||
current_module = session.query(Module).filter(
|
||
Module.project_id == project_id,
|
||
Module.parent_id == parent_id,
|
||
Module.name == name,
|
||
Module.is_delete == 0
|
||
).first()
|
||
if not current_module:
|
||
current_module = Module(
|
||
project_id=project_id,
|
||
parent_id=parent_id,
|
||
name=name,
|
||
sort_order=DocumentSourceService._next_module_sort_order(session, project_id, parent_id),
|
||
path=DocumentSourceService._build_module_path(session, parent_id, name),
|
||
is_delete=0,
|
||
status=0
|
||
)
|
||
session.add(current_module)
|
||
session.flush()
|
||
parent_id = current_module.id
|
||
return current_module if return_model else current_module.id
|
||
|
||
@staticmethod
|
||
def _parse_module_path(module_name):
|
||
module_name = str(module_name or '').strip() or '未分类'
|
||
parts = [part.strip() for part in re.split(r'[/\\>>||]', module_name) if part.strip()]
|
||
return (parts or ['未分类'])[:3]
|
||
|
||
@staticmethod
|
||
def _next_module_sort_order(session, project_id, parent_id):
|
||
last_module = session.query(Module).filter(
|
||
Module.project_id == project_id,
|
||
Module.parent_id == parent_id,
|
||
Module.is_delete == 0
|
||
).order_by(Module.sort_order.desc()).first()
|
||
return (last_module.sort_order if last_module and last_module.sort_order is not None else 0) + 1
|
||
|
||
@staticmethod
|
||
def _build_module_path(session, parent_id, name):
|
||
if not parent_id:
|
||
return name
|
||
parent = session.query(Module).filter(Module.id == parent_id, Module.is_delete == 0).first()
|
||
if parent and parent.path:
|
||
return f'{parent.path}/{name}'
|
||
if parent:
|
||
return f'{parent.name}/{name}'
|
||
return name
|