Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

267 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
import argparse
import copy
import datetime
import json
import os
import sys
import time
from base_framework.platform_tools.Keywords_service.keyword_service import SetAttr, SwaggerDBInfo, \
KWFileOperation, log, KWOperation, run_keyword_generage, Project_Path
# from base_framework.platform_tools.Swagger_scanner.scanner import SwaggerOnlineDebug
from base_framework.platform_tools.Testcase_service.case_service import ITCaseContentOperation, \
run_interface_case_generate, load_module_from_file
from base_framework.base_config.current_pth import *
from base_framework.public_tools.read_config import ReadConfig
from base_framework.platform_tools.Swagger_scanner.scanner_add import GenerateBody
dir_name = os.path.dirname(__file__)
interface_path = os.path.join(dir_name, "Swagger_scanner/interface.txt")
token_header = ["accesstoken", "user-token"]
def get_all_interface_info_from_file(path):
with open(path, encoding="UTF-8") as f:
content = f.readlines()
return content[1:]
def write_interface_info_to_file(content, path=interface_path):
with open(path, mode="r", encoding="UTF-8") as f:
line = f.readline().strip("\n")
with open(path, mode="w", encoding="UTF-8") as f1:
f1.write(line + "\n" + content)
def generate_result_file():
file_name = "reslut_%s.txt" % time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
result_dir = os.path.join(dir_name, "case_result")
result_file = os.path.abspath(os.path.join(result_dir, file_name))
if not os.path.exists(result_dir):
os.makedirs(result_dir)
with open(result_file, "w+", encoding="UTF-8") as f:
f.write("开始自动生成keyword和case! 开始时间:%s\n" % time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()))
return result_file
def write_case_result_to_case_file(file_path, content):
with open(file_path, "a+", encoding="UTF-8") as f:
lines = f.readlines()
text = content + "\n"
if lines:
if lines[-1] != "\n":
text = "\n" + "\n" + content + "\n"
f.write(text)
def clean_the_interface_info(result_file, interface_info):
interface = tuple(interface_info.strip("\n").split(","))
interface_infos = SwaggerDBInfo.get_interface_info_by_url_and_type(interface[1], interface[0], interface[2])
if len(interface_infos) > 1:
msg = "接口url%s,%s,存在多条数据,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
elif len(interface_infos) == 0:
msg = "接口url%s,%s,在数据库中不存在,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
else:
interface_info_attr = SetAttr(interface_infos[0])
if interface_info_attr.is_used == 0:
msg = "接口url%s,%s,%s,已不在使用,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
elif interface_info_attr.at_numbers > 0:
msg = "接口url%s,%s,%s,已存在用例,请确认!" % interface
log.warning(msg)
write_case_result_to_case_file(result_file, msg)
else:
msg = "接口url%s,%s,%s,开始生成接口关键字和用例!" % interface
log.info(msg)
write_case_result_to_case_file(result_file, msg)
return interface_info_attr
return msg
def generate_swagger_url_and_option(swagger_info):
swagger_info_attr = SetAttr(swagger_info)
url = swagger_info_attr.sw_url
temp_list = url.split("v2")
swagger_url = temp_list[0] + 'swagger-ui.html'
option = '/v2' + temp_list[1]
return swagger_url, option, temp_list[0]
def get_demo_by_interface_info(interface_id):
body_demo = GenerateBody(interface_id)
request_data = body_demo.combine_request_parameters()
if request_data["d"]:
request_demo = body_demo.combine_request_data(request_data["d"][0], request_data["d"][1], key=0)
request_data["d"] = request_demo
return request_data
def clear_request_parameters(parameters):
"""
清除参数中作为token的参数
"""
parameters_result = copy.deepcopy(parameters)
for item in parameters:
if item.get("name").lower() in token_header:
parameters_result.remove(item)
return parameters_result
def clear_header_token_demo(demo_info):
"""
清除请求头中带有token这种无用参数
"""
demo_copy = json.loads(copy.deepcopy(demo_info.request_demo))
header = copy.deepcopy(demo_copy.get('h'))
for key, _ in demo_copy.get('h').items():
if key in token_header:
header.pop(key)
if not header:
demo_copy['h'] = {}
else:
demo_copy['h'] = header
demo_info.request_demo = json.dumps(demo_copy)
return demo_info
def generate_parameter_to_dict(parameters):
"""
将数据库查询到的参数以参数名组成字典
"""
dict_temp = dict()
for parameter in parameters:
parameter_attr = SetAttr(parameter)
dict_temp[parameter_attr.name] = parameter_attr
return dict_temp
def clean_un_use_parameter(demo, parameters):
"""
删除不再使用的参数
"""
if isinstance(demo, SetAttr):
demo_item = json.loads(demo.request_demo)
demo_copy = copy.deepcopy(demo_item)
else:
demo_item = copy.deepcopy(demo)
demo_copy = copy.deepcopy(demo)
parameters_dict = generate_parameter_to_dict(parameters)
if isinstance(demo, list):
demo_copy.clear()
for index in range(len(demo_item)):
demo_copy.append(clean_un_use_parameter(demo_item[index], parameters))
elif isinstance(demo_item, dict):
for key, values in demo_item.items():
if parameters_dict.get(key, None) and int(parameters_dict.get(key).is_need) == 2:
demo_copy.pop(key)
continue
demo_copy[key] = clean_un_use_parameter(values, parameters)
else:
demo_copy = demo_copy
return demo_copy
def confirm_demo_value_string(kw, case, demo_string, parameters):
"""
确认demo中的请求参数是不是纯值并非json说格式
"""
demo = copy.deepcopy(demo_string.request_demo)
demo_value = json.loads(demo)
if (not demo_value["d"]) and (demo_value["d"] != 0):
return demo_value
if (isinstance(demo_value["d"], str) or isinstance(demo_value["d"], int)):
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
kw.kw_string = True
case.case_string = True
elif isinstance(demo_value["d"], list):
if not isinstance(demo_value["d"][0], dict):
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
kw.kw_string = True
case.case_string = True
elif 'empty' in demo_value['d']:
kw.kw_array = demo_value['d'].pop('empty')
return demo_value
def confirm_correct_team_case(team):
interface_file_path = os.path.abspath(os.path.join(Project_Path,
"{team}".format(team=team), "library",
"{team}_interface.py".format(team=team.upper())))
try:
module = load_module_from_file(interface_file_path, team.upper())
real_team = team.upper()
except Exception as err:
real_team = team.lower()
return real_team
def run_interface_case(team, tags=None, platform=None, server=None, driver=None, normal=None):
team = confirm_correct_team_case(team)
kw_instance = KWFileOperation(team)
result_file = generate_result_file()
interface_contents = get_all_interface_info_from_file(interface_path)
keyword = KWOperation(team, kw_instance, server)
case = ITCaseContentOperation(team, kw_instance)
for interface_temp in interface_contents:
if interface_temp == "\n" or interface_temp.startswith("#"):
continue
interface = clean_the_interface_info(result_file, interface_temp)
if not isinstance(interface, str):
parameters = keyword.get_interface_parameters(interface.id)
parameters = clear_request_parameters(parameters)
if parameters:
demo = get_demo_by_interface_info(interface.id)
else:
demo = {"d": {}, "h": {}, "p": {}, "u": {}}
demo = [{"request_demo": json.dumps(demo)}]
demo_info = SetAttr(demo[0])
demo_info = clear_header_token_demo(demo_info)
demo_info.request_demo = json.dumps(clean_un_use_parameter(demo_info, parameters))
demo_info.request_demo = json.dumps(confirm_demo_value_string(keyword, case, demo_info, parameters))
kw_status = run_keyword_generage(result_file, interface, demo_info, parameters, keyword)
if kw_status:
msg = "接口url%s,%s,生成keyword成功接下来继续生成用例!" % (interface.type, interface.in_url)
log.info(msg)
write_case_result_to_case_file(result_file, msg)
run_interface_case_generate(result_file, interface, demo_info, parameters, case, tags, normal)
return True
else:
return interface
def print_usage():
parser_inst.print_usage()
parser_inst.exit(1, "生成用例失败,请根据使用帮助传入正确参数。")
def write_platform_to_running_env(platform, team):
if not platform:
platform = ""
cfg_opt = ReadConfig(env_choose_path)
cfg_opt.set_section("run_jira_id", "huohua-podenv", platform)
cfg_opt.set_section("run_evn_name", "current_team", team)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="请输入组名!")
parser.add_argument("-team", dest='team', help="TO or TMO")
parser.add_argument("-tag", dest='tags', help="标签", default=None)
parser.add_argument("-p", dest='platform', help="独立环境", default=None)
parser.add_argument("-s", dest='server', help="服务名", default=None)
parser.add_argument("-d", dest='driver', help="驱动路径", default=None)
parser.add_argument("-n", dest='normal', help="是否只生成正常用例", default=None)
args = parser.parse_args()
global parser_inst
parser_inst = parser
if not args.team:
print_usage()
else:
write_platform_to_running_env(args.platform, args.team)
run_interface_case(args.team, args.tags, args.platform, args.server, args.driver, args.normal)