Files
effekt-interface/app/api/dao/automationDao.py
2026-05-11 14:29:16 +08:00

130 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# encoding: UTF-8
from sqlalchemy import func
from logger import logger
from ..model.automationModel import AutoExecution, AutoExecutionCase
from ..model.caseModel import TestCase
from ..model.planModel import PlanCase, TestPlan
class AutomationDao(object):
@staticmethod
def create_execution(session, add_info):
obj = AutoExecution(**add_info)
session.add(obj)
err = session.done(close=False)
if err:
logger.warning(f'AutoExecution新增失败{err}')
return 0, f'新增失败!{err}'
return obj, ''
@staticmethod
def batch_create_execution_cases(session, batch_info_list):
if not batch_info_list:
return [], ''
objs = [AutoExecutionCase(**info) for info in batch_info_list]
session.add_all(objs)
err = session.done(close=False)
if err:
logger.warning(f'AutoExecutionCase批量新增失败{err}')
return [], f'批量新增失败!{err}'
return objs, ''
@staticmethod
def update_execution_by_id(session, execution_id, update_info):
update_res = session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'AutoExecution更新失败id: {execution_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应执行记录!'
return int(execution_id), ''
@staticmethod
def get_execution_by_id(session, execution_id):
return session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).first()
@staticmethod
def list_execution_by_filters(session, filters, page=1, limit=20):
query = session.query(AutoExecution).filter(*filters)
total = query.count()
items = query.order_by(AutoExecution.created_time.desc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
return items, total
@staticmethod
def get_execution_case_by_id(session, execution_case_id):
return session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).first()
@staticmethod
def get_execution_case_by_unique(session, execution_id, case_id, plan_case_id=None):
filters = [AutoExecutionCase.execution_id == int(execution_id), AutoExecutionCase.case_id == int(case_id)]
if plan_case_id:
filters.append(AutoExecutionCase.plan_case_id == int(plan_case_id))
return session.query(AutoExecutionCase).filter(*filters).order_by(AutoExecutionCase.id.asc()).first()
@staticmethod
def update_execution_case_by_id(session, execution_case_id, update_info):
update_res = session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'AutoExecutionCase更新失败id: {execution_case_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应执行明细!'
return int(execution_case_id), ''
@staticmethod
def list_execution_case_by_filters(session, filters, page=1, limit=20):
query = session.query(AutoExecutionCase).filter(*filters)
total = query.count()
items = query.order_by(AutoExecutionCase.id.asc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
return items, total
@staticmethod
def count_execution_case_summary(session, execution_id):
rows = session.query(AutoExecutionCase.status, func.count(AutoExecutionCase.id)).filter(
AutoExecutionCase.execution_id == int(execution_id)
).group_by(AutoExecutionCase.status).all()
summary = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
for status, count in rows:
summary[int(status)] = int(count)
summary['total'] = sum(summary.values())
return summary
@staticmethod
def query_case_auto_item(session, case_id):
return session.query(TestCase).filter(
TestCase.id == int(case_id), TestCase.is_delete == 0, TestCase.is_auto == 1
).first()
@staticmethod
def query_plan_auto_cases(session, plan_id, round_no=None, case_ids=None):
query = session.query(PlanCase, TestCase).join(
TestCase, PlanCase.case_id == TestCase.id
).filter(
PlanCase.plan_id == int(plan_id),
TestCase.is_delete == 0,
TestCase.is_auto == 1
)
if round_no not in (None, ''):
query = query.filter(PlanCase.round_no == int(round_no))
if case_ids:
query = query.filter(PlanCase.case_id.in_([int(case_id) for case_id in case_ids]))
return query.order_by(PlanCase.id.asc()).all()
@staticmethod
def update_plan_case_result(session, plan_case_id, update_info):
update_res = session.query(PlanCase).filter(PlanCase.id == int(plan_case_id)).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'PlanCase更新失败id: {plan_case_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应计划用例!'
return int(plan_case_id), ''
@staticmethod
def get_plan_by_id(session, plan_id):
return session.query(TestPlan).filter(TestPlan.id == int(plan_id), TestPlan.is_delete == 0).first()