240 lines
12 KiB
Python
240 lines
12 KiB
Python
# -*- coding:utf-8 -*-
|
||
# @Time : 2023/3/7 13:28
|
||
# @Author: luozhipeng
|
||
# @File : contract_pair_check.py
|
||
import os
|
||
import sys
|
||
input_team_name = sys.argv
|
||
|
||
BASIC_PATH = os.path.dirname(os.path.abspath(__file__))
|
||
TEAM_PATH = os.path.abspath(os.path.join(BASIC_PATH, '../../../{}'.format("base_framework")))
|
||
sys.path.append(TEAM_PATH)
|
||
PROJECT_PATH = os.path.abspath(os.path.join(BASIC_PATH, '../../..'))
|
||
sys.path.append(PROJECT_PATH)
|
||
from base_framework.public_tools.sqlhelper import MySqLHelper
|
||
from base_framework.public_tools.utils import Tools
|
||
import requests
|
||
import re
|
||
import json
|
||
import pymysql
|
||
|
||
obj_my_sql_helper = MySqLHelper()
|
||
|
||
|
||
class ContractPairCheck():
|
||
def __init__(self):
|
||
pass
|
||
|
||
def get_unfinished_interface(self):
|
||
sql_get_interface = "SELECT distinct (x.con_url) 提供方接口,x.pro_url 消费方接口,x.con_method 提供方请求方式,x.con_server 提供方服务,x.pro_server 消费方服务,x.at_num 自动化接口数 FROM sparkatp.contract_pair x WHERE x.at_num =0 and x.status =1 and x.con_server in (select si.server_name from sparkatp.swagger_info si where si.team in ('ubrd','GUE') and access_type IN (2)) and x.pro_server not in (select si.server_name from sparkatp.swagger_info si where si.team ='ubrd' and access_type IN (2)) and x.roles = 0 order by x.pro_server,x.con_url"
|
||
unfinished_interface = obj_my_sql_helper.select_all(sql_get_interface)
|
||
return unfinished_interface
|
||
|
||
def send_reshult(self):
|
||
|
||
headers = {"Content-Type": "application/json;charset=UTF-8"}
|
||
web_hook = "https://open.feishu.cn/open-apis/bot/v2/hook/9f3556b7-cb60-44bf-adbf-24b5b2552014"
|
||
|
||
contract_pair_intf = self.get_unfinished_interface()
|
||
if len(contract_pair_intf) != 0:
|
||
message_data = {"msg_type": "text", "content": {'text': '未完成自动化的契约对接口{}'.format(contract_pair_intf)}}
|
||
|
||
rsp = requests.post(url=web_hook, json=message_data, headers=headers)
|
||
|
||
|
||
class AutoInterfaceCheck():
|
||
def __init__(self):
|
||
pass
|
||
|
||
def get_unfinished_interface(self):
|
||
sql_get_interface = """select id,in_url from sparkatp.interface_info a WHERE( a.swagger_id in (SELECT id FROM sparkatp.swagger_info WHERE team in ("UBRD","GUE") and access_type not IN (0)) ) AND a.created_time > "2023-01-01 00:00:50" AND is_used = 1 and (case_numbers IS NULL or case_numbers='') and offline=0 and jira_id is null order by created_time """
|
||
unfinished_interface_list = obj_my_sql_helper.select_all(sql_get_interface)
|
||
return unfinished_interface_list
|
||
|
||
def get_interface_jira(self,interface):
|
||
sql_req_time = "select max(created_time) created_time from sparkatp.request_parameters rp where rp.interface_id ={}".format(
|
||
interface["id"])
|
||
req_max_time = obj_my_sql_helper.select_one(sql_req_time)['created_time']
|
||
sql_rep_time = "select max(created_time) created_time from sparkatp.response_parameters rp where rp.interface_id ={}".format(
|
||
interface["id"])
|
||
rep_max_time = obj_my_sql_helper.select_one(sql_rep_time)['created_time']
|
||
if rep_max_time and req_max_time:
|
||
if rep_max_time <= req_max_time:
|
||
sql_req_jira = "select distinct (jira_id) from sparkatp.request_parameters rp where rp.interface_id ={0} and created_time = '{1}'".format(
|
||
interface["id"], req_max_time)
|
||
interface_jira = obj_my_sql_helper.select_one(sql_req_jira)["jira_id"]
|
||
else:
|
||
sql_rep_jira = "select distinct (jira_id) from sparkatp.response_parameters rp where rp.interface_id ={0} and created_time = '{1}'".format(
|
||
interface["id"], rep_max_time)
|
||
interface_jira = obj_my_sql_helper.select_one(sql_rep_jira)["jira_id"]
|
||
elif rep_max_time and not req_max_time:
|
||
sql_rep_jira = "select distinct (jira_id) from sparkatp.response_parameters rp where rp.interface_id ={0} and created_time = '{1}'".format(
|
||
interface["id"], rep_max_time)
|
||
interface_jira = obj_my_sql_helper.select_one(sql_rep_jira)["jira_id"]
|
||
elif not rep_max_time and req_max_time:
|
||
sql_req_jira = "select distinct (jira_id) from sparkatp.request_parameters rp where rp.interface_id ={0} and created_time = '{1}'".format(
|
||
interface["id"], req_max_time)
|
||
interface_jira = obj_my_sql_helper.select_one(sql_req_jira)["jira_id"]
|
||
return interface_jira
|
||
|
||
def send_result(self):
|
||
unfinished_interface_list = self.get_unfinished_interface()
|
||
for interface in unfinished_interface_list:
|
||
interface['jira'] = self.get_interface_jira(interface)
|
||
if not interface['jira']:
|
||
interface['qa'] = None
|
||
else:
|
||
interface['qa'] = self.get_jira_qa(interface['jira'])
|
||
headers = {"Content-Type": "application/json;charset=UTF-8"}
|
||
web_hook = "https://open.feishu.cn/open-apis/bot/v2/hook/9f3556b7-cb60-44bf-adbf-24b5b2552014"
|
||
if len(unfinished_interface_list) != 0:
|
||
message_data = {"msg_type": "text", "content": {'text': 'QA公共环境未完成自动化的接口{}'.format(unfinished_interface_list)}}
|
||
rsp = requests.post(url=web_hook, json=message_data, headers=headers)
|
||
|
||
def get_jira_qa(self,jira):
|
||
try:
|
||
conn = pymysql.connect(host='10.250.200.53',user='root',password='peppa@test',database='tools',charset="utf8",port=3306)
|
||
except :
|
||
raise pymysql.OperationalError("连接数据库失败")
|
||
cn =conn.cursor()
|
||
sql = "SELECT tester FROM tools.tm_project where jira_number = '{}'".format(jira)
|
||
cn.execute(sql)
|
||
qa = cn.fetchall()
|
||
cn.close()
|
||
conn.close()
|
||
if qa:
|
||
return qa[0][0]
|
||
else:
|
||
return None
|
||
|
||
|
||
class CoverageCheck():
|
||
|
||
# 连接数据库
|
||
@staticmethod
|
||
def get_select(sql):
|
||
try:
|
||
conn = pymysql.connect(host='10.250.200.53', user='root', password='peppa@test', database='tools',
|
||
charset="utf8", port=3306)
|
||
except:
|
||
raise pymysql.OperationalError("连接数据库失败")
|
||
cn = conn.cursor()
|
||
cn.execute(sql)
|
||
res = cn.fetchall()
|
||
cn.close()
|
||
conn.close()
|
||
return res
|
||
|
||
# 获取未搜集覆盖率的项目
|
||
@staticmethod
|
||
def get_unfinished_coverage():
|
||
start_time = Tools().get_format_date(r_type=15, add_days=-2)
|
||
end_time = Tools().get_format_date(r_type=16, add_days=-1)
|
||
|
||
# 查询最近一天上线有后端代码变动,需要搜集覆盖率项目
|
||
need_sql = """SELECT env FROM tools.project_plan WHERE ID IN
|
||
(SELECT project_id FROM tools.project_tester WHERE tester IN ("陈洁","陈江","罗志鹏","谯新久","刘涛婷"))
|
||
AND status IN (14) AND rd_code_add_line>0 AND (it_start_date IS NOT NULL OR qa_start_date IS NOT null)
|
||
AND online_date BETWEEN '{}' AND '{}'""".format(start_time, end_time)
|
||
need_coverage_list = CoverageCheck().get_select(need_sql)
|
||
|
||
# 查询已搜集覆盖率项目
|
||
sql_implemented_sql = '''SELECT `env_name` FROM `sparkatp`.`build_jacoco` WHERE `team` = 'UBRD' AND `status` = '1'
|
||
AND `is_delete` = '0' AND `is_pass` = '1' '''
|
||
completed_coverage_list = obj_my_sql_helper.select_all(sql_implemented_sql)
|
||
|
||
unfinished_coverage_list = []
|
||
for item1 in range(0, len(need_coverage_list)):
|
||
unfinished_coverage_list.append(need_coverage_list[item1][0])
|
||
|
||
# 返回未搜集覆盖率的项目
|
||
for item in completed_coverage_list:
|
||
completed_jira = item["env_name"]
|
||
if completed_jira in unfinished_coverage_list:
|
||
unfinished_coverage_list.remove(completed_jira)
|
||
# 根据unfinished_coverage_list 查jira_name
|
||
jira_name_list = []
|
||
for item in unfinished_coverage_list:
|
||
env_name = item
|
||
sql = """SELECT jira_number FROM tools.project_plan WHERE env='{}'""".format(env_name)
|
||
jira_name = CoverageCheck().get_select(sql)
|
||
jira_name_list.append(jira_name[0][0])
|
||
return jira_name_list
|
||
|
||
# 获取未构建基线用例项目
|
||
@staticmethod
|
||
def get_not_bulid_jira():
|
||
start_time = Tools().get_format_date(r_type=15, add_days=-2)
|
||
end_time = Tools().get_format_date(r_type=16, add_days=-1)
|
||
|
||
# 返回有后端变动的jira (有后端代码变更&上线时间在3天内 + jira存在服务变更,项目状态为sim测试)
|
||
need_sql = """SELECT env FROM tools.project_plan WHERE
|
||
ID IN (SELECT project_id FROM tools.project_tester WHERE tester IN ("陈洁","陈江","罗志鹏","谯新久","刘涛婷"))
|
||
AND status IN (14) AND rd_code_add_line>0 AND online_date BETWEEN '{}' AND '{}'
|
||
AND qa_start_date IS NOT NULL UNION SELECT env FROM tools.project_plan WHERE ID IN (SELECT project_plan_id
|
||
FROM tools.project_plan_server) AND jira_number IN (SELECT jira_number FROM tools.tm_project WHERE status
|
||
IN (12) AND `tester` IN ("陈洁","陈江","罗志鹏","谯新久","刘涛婷") AND test_qa_time_consume>0)""".format(start_time,
|
||
end_time)
|
||
|
||
need_build_list = CoverageCheck().get_select(need_sql)
|
||
|
||
# 返回有构建基线用例jira
|
||
built_sql = """SELECT DISTINCT special_env FROM sparkatp.build_info WHERE run_type=2 AND team='UBRD' AND STATUS=2"""
|
||
built_list = obj_my_sql_helper.select_all(built_sql)
|
||
|
||
not_bulit_list = []
|
||
# 取出有后端改动得jira
|
||
for item1 in range(0, len(need_build_list)):
|
||
not_bulit_list.append(need_build_list[item1][0])
|
||
|
||
# 返回未构建基线用例jira
|
||
for item in built_list:
|
||
completed_jira = item["special_env"]
|
||
if completed_jira in not_bulit_list:
|
||
not_bulit_list.remove(completed_jira)
|
||
jira_name_list = []
|
||
for item in not_bulit_list:
|
||
env_name = item
|
||
sql = """SELECT jira_number FROM tools.project_plan WHERE env='{}'""".format(env_name)
|
||
jira_name = CoverageCheck().get_select(sql)
|
||
jira_name_list.append(jira_name[0][0])
|
||
return jira_name_list
|
||
|
||
# 发送消息
|
||
def send_result(self, type=1):
|
||
if type == 1:
|
||
jira_list = self.get_unfinished_coverage()
|
||
elif type == 2:
|
||
jira_list = self.get_not_bulid_jira()
|
||
|
||
send_list = []
|
||
for item in jira_list:
|
||
send_dict = {}
|
||
send_dict["jira"] = item
|
||
qa_list = AutoInterfaceCheck().get_jira_qa(item)
|
||
send_dict["qa"] = qa_list
|
||
send_list.append(send_dict)
|
||
|
||
headers = {"Content-Type": "application/json;charset=UTF-8"}
|
||
web_hook = "https://open.feishu.cn/open-apis/bot/v2/hook/b6bf33ae-4239-4bef-a8a8-21896e0d1ba1"
|
||
if len(jira_list) != 0:
|
||
if type == 1:
|
||
message_data = {"msg_type": "text",
|
||
"content": {'text': '未搜集覆盖率已上线项目{}'.format(send_list)}}
|
||
rsp = requests.post(url=web_hook, json=message_data, headers=headers)
|
||
elif type == 2:
|
||
message_data = {"msg_type": "text",
|
||
"content": {'text': '未构建基线用例已上线项目{}'.format(send_list)}}
|
||
rsp = requests.post(url=web_hook, json=message_data, headers=headers)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
A = ContractPairCheck()
|
||
A.send_reshult()
|
||
B = AutoInterfaceCheck()
|
||
# B.get_unfinished_interface()
|
||
# B.get_jira_qa(jira = 'PLATFORM-31791')
|
||
B.send_result()
|
||
C = CoverageCheck()
|
||
C.send_result(type=1)
|
||
C.send_result(type=2) |