Files
effekt-interface/app/api/dao/planDao.py
2026-05-07 19:21:19 +08:00

89 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# encoding: UTF-8
from sqlalchemy import func
from ..model.planModel import PlanCase, TestPlan, TestRound
from logger import logger
class PlanDao(object):
@staticmethod
def create(session, model_cls, add_info):
obj = model_cls(**add_info)
session.add(obj)
err = session.done(close=False)
if err:
logger.warning(f'{model_cls.__name__}新增失败!{err}')
return 0, f'新增失败!{err}'
return obj.id, ''
@staticmethod
def batch_create(session, model_cls, batch_info_list):
if not batch_info_list:
return 0, ''
objs = [model_cls(**info) for info in batch_info_list]
session.add_all(objs)
err = session.done(close=False)
if err:
logger.warning(f'{model_cls.__name__}批量新增失败!{err}')
return 0, f'批量新增失败!{err}'
return len(objs), ''
@staticmethod
def update_by_id(session, model_cls, obj_id, update_info, soft_delete=True):
filters = [model_cls.id == int(obj_id)]
if soft_delete and hasattr(model_cls, 'is_delete'):
filters.append(model_cls.is_delete == 0)
update_res = session.query(model_cls).filter(*filters).update(update_info)
err = session.done(close=False)
if err:
logger.error(f'{model_cls.__name__}更新失败id: {obj_id}, err: {err}')
return 0, f'更新失败!{err}'
if not update_res:
return 0, '未查询到对应记录!'
return int(obj_id), ''
@staticmethod
def get_by_id(session, model_cls, obj_id, soft_delete=True):
filters = [model_cls.id == int(obj_id)]
if soft_delete and hasattr(model_cls, 'is_delete'):
filters.append(model_cls.is_delete == 0)
return session.query(model_cls).filter(*filters).first()
@staticmethod
def list_by_filters(session, model_cls, filter_list, page=1, limit=20, order_column=None, asc=False):
query = session.query(model_cls).filter(*filter_list)
if hasattr(model_cls, 'is_delete'):
query = query.filter(model_cls.is_delete == 0)
total = query.count()
if order_column is not None:
query = query.order_by(order_column.asc() if asc else order_column.desc())
rets = query.offset((int(page) - 1) * int(limit)).limit(int(limit)).all()
return rets, total
@staticmethod
def delete_by_id(session, model_cls, obj_id):
return PlanDao.update_by_id(session, model_cls, obj_id, {'is_delete': 1})
@staticmethod
def plan_stats(session, plan_id):
"""聚合计划执行进度,用于计划详情、进度看板和报告生成。"""
total = session.query(func.count(PlanCase.id)).filter(PlanCase.plan_id == int(plan_id)).scalar() or 0
passed = session.query(func.count(PlanCase.id)).filter(PlanCase.plan_id == int(plan_id), PlanCase.status == 1).scalar() or 0
failed = session.query(func.count(PlanCase.id)).filter(PlanCase.plan_id == int(plan_id), PlanCase.status == 2).scalar() or 0
blocked = session.query(func.count(PlanCase.id)).filter(PlanCase.plan_id == int(plan_id), PlanCase.status == 3).scalar() or 0
completed = passed + failed + blocked
pass_rate = round(passed / total * 100, 2) if total else 0
return {'total_cases': total, 'completed': completed, 'passed': passed, 'failed': failed, 'blocked': blocked, 'pass_rate': pass_rate}
@staticmethod
def plan_model():
return TestPlan
@staticmethod
def plan_case_model():
return PlanCase
@staticmethod
def round_model():
return TestRound