130 lines
5.5 KiB
Python
130 lines
5.5 KiB
Python
# encoding: UTF-8
|
||
from sqlalchemy import func
|
||
|
||
from logger import logger
|
||
from ..model.automationModel import AutoExecution, AutoExecutionCase
|
||
from ..model.caseModel import TestCase
|
||
from ..model.planModel import PlanCase, TestPlan
|
||
|
||
|
||
class AutomationDao(object):
|
||
@staticmethod
|
||
def create_execution(session, add_info):
|
||
obj = AutoExecution(**add_info)
|
||
session.add(obj)
|
||
err = session.done(close=False)
|
||
if err:
|
||
logger.warning(f'AutoExecution新增失败!{err}')
|
||
return 0, f'新增失败!{err}'
|
||
return obj, ''
|
||
|
||
@staticmethod
|
||
def batch_create_execution_cases(session, batch_info_list):
|
||
if not batch_info_list:
|
||
return [], ''
|
||
objs = [AutoExecutionCase(**info) for info in batch_info_list]
|
||
session.add_all(objs)
|
||
err = session.done(close=False)
|
||
if err:
|
||
logger.warning(f'AutoExecutionCase批量新增失败!{err}')
|
||
return [], f'批量新增失败!{err}'
|
||
return objs, ''
|
||
|
||
@staticmethod
|
||
def update_execution_by_id(session, execution_id, update_info):
|
||
update_res = session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).update(update_info)
|
||
err = session.done(close=False)
|
||
if err:
|
||
logger.error(f'AutoExecution更新失败!id: {execution_id}, err: {err}')
|
||
return 0, f'更新失败!{err}'
|
||
if not update_res:
|
||
return 0, '未查询到对应执行记录!'
|
||
return int(execution_id), ''
|
||
|
||
@staticmethod
|
||
def get_execution_by_id(session, execution_id):
|
||
return session.query(AutoExecution).filter(AutoExecution.id == int(execution_id)).first()
|
||
|
||
@staticmethod
|
||
def list_execution_by_filters(session, filters, page=1, limit=20):
|
||
query = session.query(AutoExecution).filter(*filters)
|
||
total = query.count()
|
||
items = query.order_by(AutoExecution.created_time.desc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
|
||
return items, total
|
||
|
||
@staticmethod
|
||
def get_execution_case_by_id(session, execution_case_id):
|
||
return session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).first()
|
||
|
||
@staticmethod
|
||
def get_execution_case_by_unique(session, execution_id, case_id, plan_case_id=None):
|
||
filters = [AutoExecutionCase.execution_id == int(execution_id), AutoExecutionCase.case_id == int(case_id)]
|
||
if plan_case_id:
|
||
filters.append(AutoExecutionCase.plan_case_id == int(plan_case_id))
|
||
return session.query(AutoExecutionCase).filter(*filters).order_by(AutoExecutionCase.id.asc()).first()
|
||
|
||
@staticmethod
|
||
def update_execution_case_by_id(session, execution_case_id, update_info):
|
||
update_res = session.query(AutoExecutionCase).filter(AutoExecutionCase.id == int(execution_case_id)).update(update_info)
|
||
err = session.done(close=False)
|
||
if err:
|
||
logger.error(f'AutoExecutionCase更新失败!id: {execution_case_id}, err: {err}')
|
||
return 0, f'更新失败!{err}'
|
||
if not update_res:
|
||
return 0, '未查询到对应执行明细!'
|
||
return int(execution_case_id), ''
|
||
|
||
@staticmethod
|
||
def list_execution_case_by_filters(session, filters, page=1, limit=20):
|
||
query = session.query(AutoExecutionCase).filter(*filters)
|
||
total = query.count()
|
||
items = query.order_by(AutoExecutionCase.id.asc()).offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
|
||
return items, total
|
||
|
||
@staticmethod
|
||
def count_execution_case_summary(session, execution_id):
|
||
rows = session.query(AutoExecutionCase.status, func.count(AutoExecutionCase.id)).filter(
|
||
AutoExecutionCase.execution_id == int(execution_id)
|
||
).group_by(AutoExecutionCase.status).all()
|
||
summary = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
|
||
for status, count in rows:
|
||
summary[int(status)] = int(count)
|
||
summary['total'] = sum(summary.values())
|
||
return summary
|
||
|
||
@staticmethod
|
||
def query_case_auto_item(session, case_id):
|
||
return session.query(TestCase).filter(
|
||
TestCase.id == int(case_id), TestCase.is_delete == 0, TestCase.is_auto == 1
|
||
).first()
|
||
|
||
@staticmethod
|
||
def query_plan_auto_cases(session, plan_id, round_no=None, case_ids=None):
|
||
query = session.query(PlanCase, TestCase).join(
|
||
TestCase, PlanCase.case_id == TestCase.id
|
||
).filter(
|
||
PlanCase.plan_id == int(plan_id),
|
||
TestCase.is_delete == 0,
|
||
TestCase.is_auto == 1
|
||
)
|
||
if round_no not in (None, ''):
|
||
query = query.filter(PlanCase.round_no == int(round_no))
|
||
if case_ids:
|
||
query = query.filter(PlanCase.case_id.in_([int(case_id) for case_id in case_ids]))
|
||
return query.order_by(PlanCase.id.asc()).all()
|
||
|
||
@staticmethod
|
||
def update_plan_case_result(session, plan_case_id, update_info):
|
||
update_res = session.query(PlanCase).filter(PlanCase.id == int(plan_case_id)).update(update_info)
|
||
err = session.done(close=False)
|
||
if err:
|
||
logger.error(f'PlanCase更新失败!id: {plan_case_id}, err: {err}')
|
||
return 0, f'更新失败!{err}'
|
||
if not update_res:
|
||
return 0, '未查询到对应计划用例!'
|
||
return int(plan_case_id), ''
|
||
|
||
@staticmethod
|
||
def get_plan_by_id(session, plan_id):
|
||
return session.query(TestPlan).filter(TestPlan.id == int(plan_id), TestPlan.is_delete == 0).first()
|