提交所有代码到 qiaoxinjiu 分支

This commit is contained in:
qiaoxinjiu
2026-05-11 14:29:16 +08:00
parent 01a4ac8ea1
commit 2fea5adb44
59 changed files with 4957 additions and 1603 deletions

Binary file not shown.

View File

@@ -0,0 +1,129 @@
# encoding: UTF-8
from sqlalchemy import func
from logger import logger
from ..model.automationModel import AutoExecution, AutoExecutionCase
from ..model.caseModel import TestCase
from ..model.planModel import PlanCase, TestPlan
class AutomationDao(object):
@staticmethod
def create_execution(session, add_info):
obj = AutoExecution(**add_info)
session.add(obj)
err = session.done(close=False)
if err:
logger.warning(f'AutoExecution新增失败{err}')
return 0, f'新增失败!{err}'
return obj, ''
@staticmethod
def batch_create_execution_cases(session, batch_info_list):
if not batch_info_list:
return [], ''
objs = [AutoExecutionCase(**info) for info in batch_info_list]
session.add_all(objs)
err = session.done(close=False)
if err:
logger.warning(f'AutoExecutionCase批量新增失败{err}')
return [], f'批量新增失败!{err}'
return objs, ''
@staticmethod
def update_execution_by_id(session, execution_id, update_info):
update_res = session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'AutoExecution更新失败id: {execution_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应执行记录!'
return int(execution_id), ''
@staticmethod
def get_execution_by_id(session, execution_id):
return session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).first()
@staticmethod
def list_execution_by_filters(session, filters, page=1, limit=20):
query = session.query(AutoExecution).filter(*filters)
total = query.count()
items = query.order_by(AutoExecution.created_time.desc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
return items, total
@staticmethod
def get_execution_case_by_id(session, execution_case_id):
return session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).first()
@staticmethod
def get_execution_case_by_unique(session, execution_id, case_id, plan_case_id=None):
filters = [AutoExecutionCase.execution_id == int(execution_id), AutoExecutionCase.case_id == int(case_id)]
if plan_case_id:
filters.append(AutoExecutionCase.plan_case_id == int(plan_case_id))
return session.query(AutoExecutionCase).filter(*filters).order_by(AutoExecutionCase.id.asc()).first()
@staticmethod
def update_execution_case_by_id(session, execution_case_id, update_info):
update_res = session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'AutoExecutionCase更新失败id: {execution_case_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应执行明细!'
return int(execution_case_id), ''
@staticmethod
def list_execution_case_by_filters(session, filters, page=1, limit=20):
query = session.query(AutoExecutionCase).filter(*filters)
total = query.count()
items = query.order_by(AutoExecutionCase.id.asc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
return items, total
@staticmethod
def count_execution_case_summary(session, execution_id):
rows = session.query(AutoExecutionCase.status, func.count(AutoExecutionCase.id)).filter(
AutoExecutionCase.execution_id == int(execution_id)
).group_by(AutoExecutionCase.status).all()
summary = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
for status, count in rows:
summary[int(status)] = int(count)
summary['total'] = sum(summary.values())
return summary
@staticmethod
def query_case_auto_item(session, case_id):
return session.query(TestCase).filter(
TestCase.id == int(case_id), TestCase.is_delete == 0, TestCase.is_auto == 1
).first()
@staticmethod
def query_plan_auto_cases(session, plan_id, round_no=None, case_ids=None):
query = session.query(PlanCase, TestCase).join(
TestCase, PlanCase.case_id == TestCase.id
).filter(
PlanCase.plan_id == int(plan_id),
TestCase.is_delete == 0,
TestCase.is_auto == 1
)
if round_no not in (None, ''):
query = query.filter(PlanCase.round_no == int(round_no))
if case_ids:
query = query.filter(PlanCase.case_id.in_([int(case_id) for case_id in case_ids]))
return query.order_by(PlanCase.id.asc()).all()
@staticmethod
def update_plan_case_result(session, plan_case_id, update_info):
update_res = session.query(PlanCase).filter(PlanCase.id == int(plan_case_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'PlanCase更新失败id: {plan_case_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应计划用例!'
return int(plan_case_id), ''
@staticmethod
def get_plan_by_id(session, plan_id):
return session.query(TestPlan).filter(TestPlan.id == int(plan_id), TestPlan.is_delete == 0).first()

View File

@@ -56,9 +56,166 @@ class CaseDao(object):
return CaseDao.update_by_id(session, model_cls, obj_id, {'is_delete': 1})
@staticmethod
def next_case_key(session, project_id):
count_num = session.query(func.count(TestCase.id)).filter(TestCase.project_id == int(project_id)).scalar() or 0
return 'TC-{:03d}'.format(count_num + 1)
def next_case_key(session, project_id, module_id=None, product_id=None):
from ..model.productModel import Product
from ..model.projectModel import Project
from ..model.caseModel import Module
product_abbr = ''
if product_id:
product = session.query(Product).filter(Product.id == int(product_id), Product.is_delete == 0).first()
if product and product.name:
product_abbr = CaseDao._generate_abbreviation(product.name)
project_abbr = ''
project = session.query(Project).filter(Project.id == int(project_id), Project.is_delete == 0).first()
if project and project.name:
project_abbr = CaseDao._generate_abbreviation(project.name)
module_abbr = ''
if module_id:
module = session.query(Module).filter(Module.id == int(module_id), Module.is_delete == 0).first()
if module and module.name:
module_abbr = CaseDao._generate_abbreviation(module.name)
parts = ['TC']
if product_abbr:
parts.append(product_abbr)
if project_abbr:
parts.append(project_abbr)
if module_abbr:
parts.append(module_abbr)
prefix = '-'.join(parts)
count_num = session.query(func.count(TestCase.id)).filter(
TestCase.project_id == int(project_id),
TestCase.is_delete == 0,
TestCase.case_key.like(f'{prefix}-%')
).scalar() or 0
return '{}-{:03d}'.format(prefix, count_num + 1)
@staticmethod
def _generate_abbreviation(name):
import re
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
english_pattern = re.compile(r'[a-zA-Z]+')
chinese_chars = chinese_pattern.findall(name)
if chinese_chars:
full_chinese = ''.join(chinese_chars)
abbr = ''.join([CaseDao._get_pinyin_first_char(c) for c in full_chinese[:4]])
abbr = abbr.lower()
if len(abbr) < 2:
abbr = abbr.ljust(2, 'n')
return abbr
english_words = english_pattern.findall(name)
if english_words:
abbr = english_words[0].lower()[:4]
if len(abbr) < 2:
abbr = abbr.ljust(2, 'x')
return abbr
abbr = name.lower()[:4]
if len(abbr) < 2:
abbr = abbr.ljust(2, 'x')
return abbr
@staticmethod
def _get_pinyin_first_char(char):
pinyin_map = {
'': 'Z', '': 'H', '': 'Y', '': 'Y',
'': 'B', '': 'G', '': 'G', '': 'Z', '': 'T',
'': 'C', '': 'S', '': 'Y', '': 'L',
'': 'C', '': 'P', '': 'X', '': 'M',
'': 'M', '': 'K', '': 'G', '': 'L',
'': 'X', '': 'T', '': 'G', '': 'N',
'': 'Y', '': 'M', '': 'C', '': 'X',
'': 'T', '': 'J', '': 'B', '': 'J',
'': 'S', '': 'C', '': 'D', '': 'R',
'': 'D', '': 'C', '': 'P', '': 'L',
'': 'S', '': 'Z', '': 'P', '': 'Z',
'': 'Q', '': 'X', '': 'J', '': 'S',
'': 'Y', '': 'H', '': 'Z', '': 'Z',
'': 'J', '': 'H', '': 'Z', '': 'X',
'': 'B', '': 'G', '': 'T', '': 'J',
'': 'S', '': 'Y', '': 'Y', '': 'B',
'': 'Y', '': 'M', '': 'H', '': 'G',
'': 'J', '': 'C', '': 'J', '': 'K',
'': 'A', '': 'Q', '': 'R', '': 'Z',
'': 'J', '': 'K', '': 'Y', '': 'H',
'': 'X', '': 'N', '': 'S', '': 'W',
'': 'K', '': 'F', '': 'C', '': 'Y',
'': 'W', '': 'S', '': 'J', '': 'Y',
'': 'F', '': 'B', '': 'Z', '': 'G',
'': 'R', '': 'G', '': 'S', '': 'Y',
'': 'X', '': 'Z', '': 'R', '': 'G',
'': 'F', '': 'J', '': 'T', '': 'K',
'': 'J', '': 'W', '': 'L', '': 'X',
'': 'X', '': 'R', '': 'J', '': 'X',
'': 'T', '': 'J', '': 'J', '': 'F',
'': 'A', '': 'F', '': 'W', '': 'Z',
'': 'C', '': 'P', '': 'X', '': 'Z',
'': 'X', '': 'X', '': 'S', '': 'S',
'': 'C', '': 'Y', '': 'Y', '': 'G',
'': 'L', '': 'C', '': 'W', '': 'R',
'': 'L', '': 'Z', '': 'Y', '': 'X',
'': 'Z', '': 'F', '': 'L', '': 'H',
'': 'G', '': 'Z', '': 'L', '': 'A',
'': 'Q', '': 'H', '': 'J', '': 'Z',
'': 'N', '': 'M', '': 'K', '': 'Y',
'': 'M', '': 'C', '': 'K', '': 'B',
'': 'D', '': 'A', '': 'N', '': 'L',
'': 'J', '': 'T', '': 'B', '': 'C',
'': 'D', '': 'D', '': 'H', '': 'S',
'': 'S', '': 'G', '': 'L', '': 'P',
'': 'X', '': 'F', '': 'Y', '': 'D',
'': 'R', '': 'D', '': 'C', '': 'D',
'': 'Y', '': 'D', '': 'C', '': 'S',
'': 'C', '': 'F', '': 'Z', '': 'N',
'': 'T', '': 'J', '': 'Q', '': 'B',
'': 'C', '': 'Q', '': 'X', '': 'Q',
'': 'R', '': 'T', '': 'J', '': 'S',
'': 'H', '': 'P', '': 'Z', '': 'J',
'': 'J', '': 'F', '': 'H', '': 'X',
'': 'G', '': 'C', '': 'K', '': 'X',
'': 'Q', '': 'L', '': 'B', '': 'T',
'': 'J', '': 'T', '': 'B', '': 'B',
'': 'G', '': 'R', '': 'Z', '': 'B',
'': 'Z', '': 'M', '': 'S', '': 'M',
'': 'C', '': 'B', '': 'H', '': 'L',
'': 'X', '': 'Z', '': 'T', '': 'Y',
'': 'X', '': 'J', '': 'B', '': 'Q',
'': 'G', '': 'J', '': 'C', '': 'M',
'': 'S', '': 'F', '': 'J', '': 'L',
'': 'J', '': 'S', '': 'J', '': 'R',
'': 'Q', '': 'S', '': 'L', '': 'J',
'': 'E', '': 'J', '': 'G', '': 'S',
'': 'J', '': 'K', '': 'F', '': 'W',
'': 'Q', '线': 'X', '': 'C', '': 'J',
'': 'D', '': 'S', '': 'D', '': 'X',
'': 'L', '': 'X', '': 'N', '': 'W',
'': 'D', '': 'K', '': 'K', '': 'A',
'': 'Q', '': 'J', '': 'R', '': 'K',
'': 'Z', '': 'S', '': 'J', '': 'G',
'': 'X', '': 'X', '': 'B', '': 'X',
'': 'F', '': 'Y', '': 'H', '': 'Z',
'': 'G', '': 'G', '': 'Z', '': 'Y',
'': 'Z', '': 'J', '': 'C', '': 'J',
'': 'K', '': 'T', '': 'S', '': 'C',
'': 'S', '': 'Y', '': 'Z', '': 'Q',
'': 'R', '': 'H', '': 'G', '': 'Y',
'': 'S', '': 'P', '': 'X', '': 'Z',
'': 'D', '': 'Z', '': 'C', '': 'B',
'': 'Z', '': 'F', '': 'K', '': 'T',
'': 'S', '': 'J', '': 'Y', '': 'P',
'': 'D', '': 'L', '': 'Z', '': 'C',
'': 'G', '': 'G', '': 'Y', '': 'S',
'': 'J', '': 'M', '': 'Y', '': 'D'
}
return pinyin_map.get(char, 'N')
@staticmethod
def next_snapshot_version(session, case_id):

View File

@@ -183,3 +183,23 @@ class RbacDao(object):
def get_role_name_map(session):
items = session.query(Role).filter(Role.is_delete == 0, Role.status == 1).all()
return {item.id: item.name for item in items}
@staticmethod
def get_menu_permission_codes(session, menu_ids):
if not menu_ids:
return []
items = session.query(Menu.permission_code).filter(
Menu.id.in_(menu_ids),
Menu.is_delete == 0
).all()
return [item[0] for item in items if item[0]]
@staticmethod
def get_permission_ids_by_codes(session, permission_codes):
if not permission_codes:
return []
items = session.query(Permission.id).filter(
Permission.code.in_(permission_codes),
Permission.is_delete == 0
).all()
return [item[0] for item in items]