addproject

This commit is contained in:
qiaoxinjiu
2026-01-22 19:10:37 +08:00
commit 6994b185a3
184 changed files with 21039 additions and 0 deletions

View File

@@ -0,0 +1,264 @@
# -*- coding:utf-8 -*-
import argparse
import copy
import datetime
import json
import os
import sys
import time
from base_framework.platform_tools.Keywords_service.keyword_service1 import SetAttr, SwaggerDBInfo, \
KWFileOperation, log, KWOperation, run_keyword_generage, Project_Path
# from base_framework.platform_tools.Swagger_scanner.scanner import SwaggerOnlineDebug
from base_framework.platform_tools.Testcase_service.case_service1 import ITCaseContentOperation, \
run_interface_case_generate, load_module_from_file
from base_framework.base_config.current_pth import *
from base_framework.public_tools.read_config import ReadConfig
from base_framework.platform_tools.Swagger_scanner.scanner_add import GenerateBody
dir_name = os.path.dirname(__file__)
interface_path = os.path.join(dir_name, "Swagger_scanner/interface.txt")
token_header = ["accesstoken", "user-token"]
def get_all_interface_info_from_file(path):
with open(path, encoding="UTF-8") as f:
content = f.readlines()
return content[1:]
def write_interface_info_to_file(content, path=interface_path):
with open(path, mode="r", encoding="UTF-8") as f:
line = f.readline().strip("\n")
with open(path, mode="w", encoding="UTF-8") as f1:
f1.write(line + "\n" + content)
def generate_result_file():
file_name = "reslut_%s.txt" % time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
result_dir = os.path.join(dir_name, "case_result")
result_file = os.path.abspath(os.path.join(result_dir, file_name))
if not os.path.exists(result_dir):
os.makedirs(result_dir)
with open(result_file, "w+", encoding="UTF-8") as f:
f.write("开始自动生成keyword和case! 开始时间:%s\n" % time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()))
return result_file
def write_case_result_to_case_file(file_path, content):
with open(file_path, "a+", encoding="UTF-8") as f:
lines = f.readlines()
text = content + "\n"
if lines:
if lines[-1] != "\n":
text = "\n" + "\n" + content + "\n"
f.write(text)
def clean_the_interface_info(result_file, interface_info, team):
interface = tuple(interface_info.strip("\n").split(","))
interface_infos = SwaggerDBInfo.get_interface_info_by_url_and_type(interface[1], interface[0], team)
if len(interface_infos) > 1:
msg = "接口url%s,%s,存在多条数据,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
elif len(interface_infos) == 0:
msg = "接口url%s,%s,在数据库中不存在,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
else:
interface_info_attr = SetAttr(interface_infos[0])
if interface_info_attr.is_used == 0:
msg = "接口url%s,%s,已不在使用,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
elif interface_info_attr.at_numbers > 0:
msg = "接口url%s,%s,已存在用例,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
else:
msg = "接口url%s,%s,开始生成接口关键字和用例!" % interface
log.info(msg)
write_case_result_to_case_file(result_file, msg)
return interface_info_attr
return msg
def generate_swagger_url_and_option(swagger_info):
swagger_info_attr = SetAttr(swagger_info)
url = swagger_info_attr.sw_url
temp_list = url.split("v2")
swagger_url = temp_list[0] + 'swagger-ui.html'
option = '/v2' + temp_list[1]
return swagger_url, option, temp_list[0]
def get_demo_by_interface_info(interface_id):
body_demo = GenerateBody(interface_id)
request_data = body_demo.combine_request_parameters()
if request_data["d"]:
request_demo = body_demo.combine_request_data(request_data["d"][0], request_data["d"][1], key=0)
request_data["d"] = request_demo
return request_data
def clear_request_parameters(parameters):
"""
清除参数中作为token的参数
"""
parameters_result = copy.deepcopy(parameters)
for item in parameters:
if item.get("name").lower() in token_header:
parameters_result.remove(item)
return parameters_result
def clear_header_token_demo(demo_info):
"""
清除请求头中带有token这种无用参数
"""
demo_copy = json.loads(copy.deepcopy(demo_info.request_demo))
header = copy.deepcopy(demo_copy.get('h'))
for key, _ in demo_copy.get('h').items():
if key in token_header:
header.pop(key)
if not header:
demo_copy['h'] = {}
else:
demo_copy['h'] = header
demo_info.request_demo = json.dumps(demo_copy)
return demo_info
def generate_parameter_to_dict(parameters):
"""
将数据库查询到的参数以参数名组成字典
"""
dict_temp = dict()
for parameter in parameters:
parameter_attr = SetAttr(parameter)
dict_temp[parameter_attr.name] = parameter_attr
return dict_temp
def clean_un_use_parameter(demo, parameters):
"""
删除不再使用的参数
"""
if isinstance(demo, SetAttr):
demo_item = json.loads(demo.request_demo)
demo_copy = copy.deepcopy(demo_item)
else:
demo_item = copy.deepcopy(demo)
demo_copy = copy.deepcopy(demo)
parameters_dict = generate_parameter_to_dict(parameters)
if isinstance(demo, list):
demo_copy.clear()
for index in range(len(demo_item)):
demo_copy.append(clean_un_use_parameter(demo_item[index], parameters))
elif isinstance(demo_item, dict):
for key, values in demo_item.items():
if parameters_dict.get(key, None) and int(parameters_dict.get(key).is_need) == 2:
demo_copy.pop(key)
continue
demo_copy[key] = clean_un_use_parameter(values, parameters)
else:
demo_copy = demo_copy
return demo_copy
def confirm_demo_value_string(kw, case, demo_string, parameters):
"""
确认demo中的请求参数是不是纯值并非json说格式
"""
demo = copy.deepcopy(demo_string.request_demo)
demo_value = json.loads(demo)
if (not demo_value["d"]) and (demo_value["d"] != 0):
return demo_value
if (isinstance(demo_value["d"], str) or isinstance(demo_value["d"], int)):
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
kw.kw_string = True
case.case_string = True
elif isinstance(demo_value["d"], list):
if not isinstance(demo_value["d"][0], dict):
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
kw.kw_string = True
case.case_string = True
return demo_value
def confirm_correct_team_case(team):
interface_file_path = os.path.abspath(os.path.join(Project_Path,
"{team}".format(team=team), "library",
"{team}_interface.py".format(team=team.upper())))
try:
module = load_module_from_file(interface_file_path, team.upper())
real_team = team.upper()
except Exception as err:
real_team = team.lower()
return real_team
def run_interface_case(team, tags=None, server=None, normal=None):
team = confirm_correct_team_case(team)
kw_instance = KWFileOperation(team, server)
result_file = generate_result_file()
interface_contents = get_all_interface_info_from_file(interface_path)
keyword = KWOperation(team, kw_instance)
case = ITCaseContentOperation(team, kw_instance)
for interface_temp in interface_contents:
if interface_temp == "\n" or interface_temp.startswith("#"):
continue
interface = clean_the_interface_info(result_file, interface_temp, team)
if not isinstance(interface, str):
parameters = keyword.get_interface_parameters(interface.id)
parameters = clear_request_parameters(parameters)
if parameters:
demo = get_demo_by_interface_info(interface.id)
else:
demo = {"d": {}, "h": {}, "p": {}, "u": {}}
demo = [{"request_demo": json.dumps(demo)}]
demo_info = SetAttr(demo[0])
demo_info = clear_header_token_demo(demo_info)
demo_info.request_demo = json.dumps(clean_un_use_parameter(demo_info, parameters))
demo_info.request_demo = json.dumps(confirm_demo_value_string(keyword, case, demo_info, parameters))
kw_status = run_keyword_generage(result_file, interface, demo_info, parameters, keyword)
if kw_status:
msg = "接口url%s,%s,生成keyword成功接下来继续生成用例!" % (interface.type, interface.in_url)
log.info(msg)
write_case_result_to_case_file(result_file, msg)
run_interface_case_generate(result_file, interface, demo_info, parameters, case, tags, normal)
return True
else:
return interface
def print_usage():
parser_inst.print_usage()
parser_inst.exit(1, "生成用例失败,请根据使用帮助传入正确参数。")
def write_platform_to_running_env(platform, team):
if not platform:
platform = ""
cfg_opt = ReadConfig(env_choose_path)
cfg_opt.set_section("run_jira_id", "huohua-podenv", platform)
cfg_opt.set_section("run_evn_name", "current_team", team)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="请输入组名!")
parser.add_argument("-team", dest='team', help="TO or TMO")
parser.add_argument("-tag", dest='tags', help="标签", default=None)
parser.add_argument("-p", dest='platform', help="独立环境", default=None)
parser.add_argument("-s", dest='server', help="服务名", default=None)
parser.add_argument("-d", dest='driver', help="驱动路径", default=None)
parser.add_argument("-n", dest='normal', help="是否只生成正常用例", default=None)
args = parser.parse_args()
global parser_inst
parser_inst = parser
if not args.team:
print_usage()
else:
write_platform_to_running_env(args.platform, args.team)
run_interface_case(args.team, args.tags, args.platform, args.server, args.driver, args.normal)