267 lines
10 KiB
Python
267 lines
10 KiB
Python
# -*- coding:utf-8 -*-
|
||
import argparse
|
||
import copy
|
||
import datetime
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
from base_framework.platform_tools.Keywords_service.keyword_service import SetAttr, SwaggerDBInfo, \
|
||
KWFileOperation, log, KWOperation, run_keyword_generage, Project_Path
|
||
# from base_framework.platform_tools.Swagger_scanner.scanner import SwaggerOnlineDebug
|
||
from base_framework.platform_tools.Testcase_service.case_service import ITCaseContentOperation, \
|
||
run_interface_case_generate, load_module_from_file
|
||
from base_framework.base_config.current_pth import *
|
||
from base_framework.public_tools.read_config import ReadConfig
|
||
from base_framework.platform_tools.Swagger_scanner.scanner_add import GenerateBody
|
||
|
||
dir_name = os.path.dirname(__file__)
|
||
interface_path = os.path.join(dir_name, "Swagger_scanner/interface.txt")
|
||
token_header = ["accesstoken", "user-token"]
|
||
|
||
|
||
def get_all_interface_info_from_file(path):
|
||
with open(path, encoding="UTF-8") as f:
|
||
content = f.readlines()
|
||
return content[1:]
|
||
|
||
|
||
def write_interface_info_to_file(content, path=interface_path):
|
||
with open(path, mode="r", encoding="UTF-8") as f:
|
||
line = f.readline().strip("\n")
|
||
with open(path, mode="w", encoding="UTF-8") as f1:
|
||
f1.write(line + "\n" + content)
|
||
|
||
|
||
def generate_result_file():
|
||
file_name = "reslut_%s.txt" % time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
|
||
result_dir = os.path.join(dir_name, "case_result")
|
||
result_file = os.path.abspath(os.path.join(result_dir, file_name))
|
||
if not os.path.exists(result_dir):
|
||
os.makedirs(result_dir)
|
||
with open(result_file, "w+", encoding="UTF-8") as f:
|
||
f.write("开始自动生成keyword和case! 开始时间:%s\n" % time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime()))
|
||
return result_file
|
||
|
||
|
||
def write_case_result_to_case_file(file_path, content):
|
||
with open(file_path, "a+", encoding="UTF-8") as f:
|
||
lines = f.readlines()
|
||
text = content + "\n"
|
||
if lines:
|
||
if lines[-1] != "\n":
|
||
text = "\n" + "\n" + content + "\n"
|
||
|
||
f.write(text)
|
||
|
||
|
||
def clean_the_interface_info(result_file, interface_info):
|
||
interface = tuple(interface_info.strip("\n").split(","))
|
||
interface_infos = SwaggerDBInfo.get_interface_info_by_url_and_type(interface[1], interface[0], interface[2])
|
||
if len(interface_infos) > 1:
|
||
msg = "接口url:%s,%s,存在多条数据,请确认!" % interface
|
||
log.warning(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
elif len(interface_infos) == 0:
|
||
msg = "接口url:%s,%s,在数据库中不存在,请确认!" % interface
|
||
log.warning(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
else:
|
||
interface_info_attr = SetAttr(interface_infos[0])
|
||
if interface_info_attr.is_used == 0:
|
||
msg = "接口url:%s,%s,%s,已不在使用,请确认!" % interface
|
||
log.warning(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
elif interface_info_attr.at_numbers > 0:
|
||
msg = "接口url:%s,%s,%s,已存在用例,请确认!" % interface
|
||
log.warning(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
else:
|
||
msg = "接口url:%s,%s,%s,开始生成接口关键字和用例!" % interface
|
||
log.info(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
return interface_info_attr
|
||
return msg
|
||
|
||
|
||
def generate_swagger_url_and_option(swagger_info):
|
||
swagger_info_attr = SetAttr(swagger_info)
|
||
url = swagger_info_attr.sw_url
|
||
temp_list = url.split("v2")
|
||
swagger_url = temp_list[0] + 'swagger-ui.html'
|
||
option = '/v2' + temp_list[1]
|
||
return swagger_url, option, temp_list[0]
|
||
|
||
|
||
def get_demo_by_interface_info(interface_id):
|
||
body_demo = GenerateBody(interface_id)
|
||
request_data = body_demo.combine_request_parameters()
|
||
if request_data["d"]:
|
||
request_demo = body_demo.combine_request_data(request_data["d"][0], request_data["d"][1], key=0)
|
||
request_data["d"] = request_demo
|
||
return request_data
|
||
|
||
|
||
def clear_request_parameters(parameters):
|
||
"""
|
||
清除参数中作为token的参数
|
||
"""
|
||
parameters_result = copy.deepcopy(parameters)
|
||
for item in parameters:
|
||
if item.get("name").lower() in token_header:
|
||
parameters_result.remove(item)
|
||
return parameters_result
|
||
|
||
|
||
def clear_header_token_demo(demo_info):
|
||
"""
|
||
清除请求头中带有token这种无用参数
|
||
"""
|
||
demo_copy = json.loads(copy.deepcopy(demo_info.request_demo))
|
||
header = copy.deepcopy(demo_copy.get('h'))
|
||
for key, _ in demo_copy.get('h').items():
|
||
if key in token_header:
|
||
header.pop(key)
|
||
if not header:
|
||
demo_copy['h'] = {}
|
||
else:
|
||
demo_copy['h'] = header
|
||
demo_info.request_demo = json.dumps(demo_copy)
|
||
return demo_info
|
||
|
||
|
||
def generate_parameter_to_dict(parameters):
|
||
"""
|
||
将数据库查询到的参数以参数名组成字典
|
||
"""
|
||
dict_temp = dict()
|
||
for parameter in parameters:
|
||
parameter_attr = SetAttr(parameter)
|
||
dict_temp[parameter_attr.name] = parameter_attr
|
||
return dict_temp
|
||
|
||
|
||
def clean_un_use_parameter(demo, parameters):
|
||
"""
|
||
删除不再使用的参数
|
||
"""
|
||
if isinstance(demo, SetAttr):
|
||
demo_item = json.loads(demo.request_demo)
|
||
demo_copy = copy.deepcopy(demo_item)
|
||
else:
|
||
demo_item = copy.deepcopy(demo)
|
||
demo_copy = copy.deepcopy(demo)
|
||
parameters_dict = generate_parameter_to_dict(parameters)
|
||
if isinstance(demo, list):
|
||
demo_copy.clear()
|
||
for index in range(len(demo_item)):
|
||
demo_copy.append(clean_un_use_parameter(demo_item[index], parameters))
|
||
elif isinstance(demo_item, dict):
|
||
for key, values in demo_item.items():
|
||
if parameters_dict.get(key, None) and int(parameters_dict.get(key).is_need) == 2:
|
||
demo_copy.pop(key)
|
||
continue
|
||
demo_copy[key] = clean_un_use_parameter(values, parameters)
|
||
else:
|
||
demo_copy = demo_copy
|
||
return demo_copy
|
||
|
||
|
||
def confirm_demo_value_string(kw, case, demo_string, parameters):
|
||
"""
|
||
确认demo中的请求参数是不是纯值,并非json说格式
|
||
"""
|
||
demo = copy.deepcopy(demo_string.request_demo)
|
||
demo_value = json.loads(demo)
|
||
if (not demo_value["d"]) and (demo_value["d"] != 0):
|
||
return demo_value
|
||
if (isinstance(demo_value["d"], str) or isinstance(demo_value["d"], int)):
|
||
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
|
||
kw.kw_string = True
|
||
case.case_string = True
|
||
elif isinstance(demo_value["d"], list):
|
||
if not isinstance(demo_value["d"][0], dict):
|
||
demo_value["d"] = {parameters[0].get("name"): demo_value["d"]}
|
||
kw.kw_string = True
|
||
case.case_string = True
|
||
elif 'empty' in demo_value['d']:
|
||
kw.kw_array = demo_value['d'].pop('empty')
|
||
return demo_value
|
||
|
||
|
||
def confirm_correct_team_case(team):
|
||
interface_file_path = os.path.abspath(os.path.join(Project_Path,
|
||
"{team}".format(team=team), "library",
|
||
"{team}_interface.py".format(team=team.upper())))
|
||
try:
|
||
module = load_module_from_file(interface_file_path, team.upper())
|
||
real_team = team.upper()
|
||
except Exception as err:
|
||
real_team = team.lower()
|
||
return real_team
|
||
|
||
|
||
def run_interface_case(team, tags=None, platform=None, server=None, driver=None, normal=None):
|
||
team = confirm_correct_team_case(team)
|
||
kw_instance = KWFileOperation(team)
|
||
result_file = generate_result_file()
|
||
interface_contents = get_all_interface_info_from_file(interface_path)
|
||
keyword = KWOperation(team, kw_instance, server)
|
||
case = ITCaseContentOperation(team, kw_instance)
|
||
for interface_temp in interface_contents:
|
||
if interface_temp == "\n" or interface_temp.startswith("#"):
|
||
continue
|
||
interface = clean_the_interface_info(result_file, interface_temp)
|
||
if not isinstance(interface, str):
|
||
parameters = keyword.get_interface_parameters(interface.id)
|
||
parameters = clear_request_parameters(parameters)
|
||
if parameters:
|
||
demo = get_demo_by_interface_info(interface.id)
|
||
else:
|
||
demo = {"d": {}, "h": {}, "p": {}, "u": {}}
|
||
demo = [{"request_demo": json.dumps(demo)}]
|
||
demo_info = SetAttr(demo[0])
|
||
demo_info = clear_header_token_demo(demo_info)
|
||
demo_info.request_demo = json.dumps(clean_un_use_parameter(demo_info, parameters))
|
||
demo_info.request_demo = json.dumps(confirm_demo_value_string(keyword, case, demo_info, parameters))
|
||
kw_status = run_keyword_generage(result_file, interface, demo_info, parameters, keyword)
|
||
if kw_status:
|
||
msg = "接口url:%s,%s,生成keyword成功!接下来继续生成用例!" % (interface.type, interface.in_url)
|
||
log.info(msg)
|
||
write_case_result_to_case_file(result_file, msg)
|
||
run_interface_case_generate(result_file, interface, demo_info, parameters, case, tags, normal)
|
||
return True
|
||
else:
|
||
return interface
|
||
|
||
|
||
def print_usage():
|
||
parser_inst.print_usage()
|
||
parser_inst.exit(1, "生成用例失败,请根据使用帮助传入正确参数。")
|
||
|
||
|
||
def write_platform_to_running_env(platform, team):
|
||
if not platform:
|
||
platform = ""
|
||
cfg_opt = ReadConfig(env_choose_path)
|
||
cfg_opt.set_section("run_jira_id", "huohua-podenv", platform)
|
||
cfg_opt.set_section("run_evn_name", "current_team", team)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
parser = argparse.ArgumentParser(description="请输入组名!")
|
||
parser.add_argument("-team", dest='team', help="TO or TMO")
|
||
parser.add_argument("-tag", dest='tags', help="标签", default=None)
|
||
parser.add_argument("-p", dest='platform', help="独立环境", default=None)
|
||
parser.add_argument("-s", dest='server', help="服务名", default=None)
|
||
parser.add_argument("-d", dest='driver', help="驱动路径", default=None)
|
||
parser.add_argument("-n", dest='normal', help="是否只生成正常用例", default=None)
|
||
args = parser.parse_args()
|
||
global parser_inst
|
||
parser_inst = parser
|
||
if not args.team:
|
||
print_usage()
|
||
else:
|
||
write_platform_to_running_env(args.platform, args.team)
|
||
run_interface_case(args.team, args.tags, args.platform, args.server, args.driver, args.normal)
|