# -*- coding:utf-8 -*- import argparse import copy import datetime import json import os import sys import time from base_framework.platform_tools.Keywords_service.keyword_service import SetAttr, SwaggerDBInfo, \ KWFileOperation, log, KWOperation, run_keyword_generage, Project_Path # from base_framework.platform_tools.Swagger_scanner.scanner import SwaggerOnlineDebug from base_framework.platform_tools.Testcase_service.case_service import ITCaseContentOperation, \ run_interface_case_generate, load_module_from_file from base_framework.base_config.current_pth import * from base_framework.public_tools.read_config import ReadConfig from base_framework.platform_tools.Swagger_scanner.scanner_add import GenerateBody dir_name = os.path.dirname(__file__) interface_path = os.path.join(dir_name, "Swagger_scanner/interface.txt") token_header = ["accesstoken", "user-token"] def get_all_interface_info_from_file(path): with open(path, encoding="UTF-8") as f: content = f.readlines() return content[1:] def write_interface_info_to_file(content, path=interface_path): with open(path, mode="r", encoding="UTF-8") as f: line = f.readline().strip("\n") with open(path, mode="w", encoding="UTF-8") as f1: f1.write(line + "\n" + content) def generate_result_file(): file_name = "reslut_%s.txt" % time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) result_dir = os.path.join(dir_name, "case_result") result_file = os.path.abspath(os.path.join(result_dir, file_name)) if not os.path.exists(result_dir): os.makedirs(result_dir) with open(result_file, "w+", encoding="UTF-8") as f: f.write("开始自动生成keyword和case! 开始时间:%s\n" % time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())) return result_file def write_case_result_to_case_file(file_path, content): with open(file_path, "a+", encoding="UTF-8") as f: lines = f.readlines() text = content + "\n" if lines: if lines[-1] != "\n": text = "\n" + "\n" + content + "\n" f.write(text) def clean_the_interface_info(result_file, interface_info): interface = tuple(interface_info.strip("\n").split(",")) interface_infos = SwaggerDBInfo.get_interface_info_by_url_and_type(interface[1], interface[0], interface[2]) if len(interface_infos) > 1: msg = "接口url:%s,%s,存在多条数据,请确认!" % interface log.warning(msg) write_case_result_to_case_file(result_file, msg) elif len(interface_infos) == 0: msg = "接口url:%s,%s,在数据库中不存在,请确认!" % interface log.warning(msg) write_case_result_to_case_file(result_file, msg) else: interface_info_attr = SetAttr(interface_infos[0]) if interface_info_attr.is_used == 0: msg = "接口url:%s,%s,%s,已不在使用,请确认!" % interface log.warning(msg) write_case_result_to_case_file(result_file, msg) elif interface_info_attr.at_numbers > 0: msg = "接口url:%s,%s,%s,已存在用例,请确认!" % interface log.warning(msg) write_case_result_to_case_file(result_file, msg) else: msg = "接口url:%s,%s,%s,开始生成接口关键字和用例!" % interface log.info(msg) write_case_result_to_case_file(result_file, msg) return interface_info_attr return msg def generate_swagger_url_and_option(swagger_info): swagger_info_attr = SetAttr(swagger_info) url = swagger_info_attr.sw_url temp_list = url.split("v2") swagger_url = temp_list[0] + 'swagger-ui.html' option = '/v2' + temp_list[1] return swagger_url, option, temp_list[0] def get_demo_by_interface_info(interface_id): body_demo = GenerateBody(interface_id) request_data = body_demo.combine_request_parameters() if request_data["d"]: request_demo = body_demo.combine_request_data(request_data["d"][0], request_data["d"][1], key=0) request_data["d"] = request_demo return request_data def clear_request_parameters(parameters): """ 清除参数中作为token的参数 """ parameters_result = copy.deepcopy(parameters) for item in parameters: if item.get("name").lower() in token_header: parameters_result.remove(item) return parameters_result def clear_header_token_demo(demo_info): """ 清除请求头中带有token这种无用参数 """ demo_copy = json.loads(copy.deepcopy(demo_info.request_demo)) header = copy.deepcopy(demo_copy.get('h')) for key, _ in demo_copy.get('h').items(): if key in token_header: header.pop(key) if not header: demo_copy['h'] = {} else: demo_copy['h'] = header demo_info.request_demo = json.dumps(demo_copy) return demo_info def generate_parameter_to_dict(parameters): """ 将数据库查询到的参数以参数名组成字典 """ dict_temp = dict() for parameter in parameters: parameter_attr = SetAttr(parameter) dict_temp[parameter_attr.name] = parameter_attr return dict_temp def clean_un_use_parameter(demo, parameters): """ 删除不再使用的参数 """ if isinstance(demo, SetAttr): demo_item = json.loads(demo.request_demo) demo_copy = copy.deepcopy(demo_item) else: demo_item = copy.deepcopy(demo) demo_copy = copy.deepcopy(demo) parameters_dict = generate_parameter_to_dict(parameters) if isinstance(demo, list): demo_copy.clear() for index in range(len(demo_item)): demo_copy.append(clean_un_use_parameter(demo_item[index], parameters)) elif isinstance(demo_item, dict): for key, values in demo_item.items(): if parameters_dict.get(key, None) and int(parameters_dict.get(key).is_need) == 2: demo_copy.pop(key) continue demo_copy[key] = clean_un_use_parameter(values, parameters) else: demo_copy = demo_copy return demo_copy def confirm_demo_value_string(kw, case, demo_string, parameters): """ 确认demo中的请求参数是不是纯值,并非json说格式 """ demo = copy.deepcopy(demo_string.request_demo) demo_value = json.loads(demo) if (not demo_value["d"]) and (demo_value["d"] != 0): return demo_value if (isinstance(demo_value["d"], str) or isinstance(demo_value["d"], int)): demo_value["d"] = {parameters[0].get("name"): demo_value["d"]} kw.kw_string = True case.case_string = True elif isinstance(demo_value["d"], list): if not isinstance(demo_value["d"][0], dict): demo_value["d"] = {parameters[0].get("name"): demo_value["d"]} kw.kw_string = True case.case_string = True elif 'empty' in demo_value['d']: kw.kw_array = demo_value['d'].pop('empty') return demo_value def confirm_correct_team_case(team): interface_file_path = os.path.abspath(os.path.join(Project_Path, "{team}".format(team=team), "library", "{team}_interface.py".format(team=team.upper()))) try: module = load_module_from_file(interface_file_path, team.upper()) real_team = team.upper() except Exception as err: real_team = team.lower() return real_team def run_interface_case(team, tags=None, platform=None, server=None, driver=None, normal=None): team = confirm_correct_team_case(team) kw_instance = KWFileOperation(team) result_file = generate_result_file() interface_contents = get_all_interface_info_from_file(interface_path) keyword = KWOperation(team, kw_instance, server) case = ITCaseContentOperation(team, kw_instance) for interface_temp in interface_contents: if interface_temp == "\n" or interface_temp.startswith("#"): continue interface = clean_the_interface_info(result_file, interface_temp) if not isinstance(interface, str): parameters = keyword.get_interface_parameters(interface.id) parameters = clear_request_parameters(parameters) if parameters: demo = get_demo_by_interface_info(interface.id) else: demo = {"d": {}, "h": {}, "p": {}, "u": {}} demo = [{"request_demo": json.dumps(demo)}] demo_info = SetAttr(demo[0]) demo_info = clear_header_token_demo(demo_info) demo_info.request_demo = json.dumps(clean_un_use_parameter(demo_info, parameters)) demo_info.request_demo = json.dumps(confirm_demo_value_string(keyword, case, demo_info, parameters)) kw_status = run_keyword_generage(result_file, interface, demo_info, parameters, keyword) if kw_status: msg = "接口url:%s,%s,生成keyword成功!接下来继续生成用例!" % (interface.type, interface.in_url) log.info(msg) write_case_result_to_case_file(result_file, msg) run_interface_case_generate(result_file, interface, demo_info, parameters, case, tags, normal) return True else: return interface def print_usage(): parser_inst.print_usage() parser_inst.exit(1, "生成用例失败,请根据使用帮助传入正确参数。") def write_platform_to_running_env(platform, team): if not platform: platform = "" cfg_opt = ReadConfig(env_choose_path) cfg_opt.set_section("run_jira_id", "huohua-podenv", platform) cfg_opt.set_section("run_evn_name", "current_team", team) if __name__ == '__main__': parser = argparse.ArgumentParser(description="请输入组名!") parser.add_argument("-team", dest='team', help="TO or TMO") parser.add_argument("-tag", dest='tags', help="标签", default=None) parser.add_argument("-p", dest='platform', help="独立环境", default=None) parser.add_argument("-s", dest='server', help="服务名", default=None) parser.add_argument("-d", dest='driver', help="驱动路径", default=None) parser.add_argument("-n", dest='normal', help="是否只生成正常用例", default=None) args = parser.parse_args() global parser_inst parser_inst = parser if not args.team: print_usage() else: write_platform_to_running_env(args.platform, args.team) run_interface_case(args.team, args.tags, args.platform, args.server, args.driver, args.normal)