# -*- coding: utf-8 -*- __author__ = 'huaxuemin' from logbook import Logger import shutil import errno import os import sqlite3 from pathlib import Path import random import socket import configparser import codecs import time import json import re import requests logging = Logger(__name__) HERE = os.path.dirname(os.path.abspath(__file__)) ROBOT_LOG_LEVEL = 'INFO' from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class OSType: WIN, LINUX, UNKNOWN = range(3) def __init__(self): pass @staticmethod def get_type(): import platform system_name = platform.system() if system_name.lower() == 'windows': return OSType.WIN elif system_name.lower() == 'linux': return OSType.LINUX else: return OSType.UNKNOWN def run_process(cmd_str, out_p=False): """ run command cmd_str unicode string. """ if OSType.WIN == OSType.get_type(): # cmd_str = cmd_str.encode('gbk') cmd_str = cmd_str elif OSType.LINUX == OSType.get_type(): cmd_str = cmd_str.encode('utf-8') else: raise RuntimeError("your os is not support.") logging.info('cmd: %s' % cmd_str) print(cmd_str) import subprocess from subprocess import PIPE close_fds = False if OSType.WIN == OSType.get_type() else True if out_p: p = subprocess.Popen(cmd_str, shell=True, close_fds=close_fds, stdout=PIPE) p.wait() return p.returncode, p.stdout.read() else: p = subprocess.Popen(cmd_str, shell=True, close_fds=close_fds, stdout=subprocess.DEVNULL) p.wait() return p.returncode, None class CaseRunner: def __init__(self, workspace, case_path, test_case, include, exclude, rerun="true", special_env="", team="", physics_env="QA", business="hh"): self.workspace = workspace self.case_path = case_path self.test_case = test_case self.include = include self.exclude = exclude self.rerun = rerun self.special_env = special_env self.team = team self.physics_env = physics_env self.business = business self.con = sqlite3.connect(HERE + "/case_runner.db") self.cur = self.con.cursor() self.def_port = set("9" + "".join(map(str, random.choices(range(10), k=3))) for i in range(500)) self.env_port = None self.pid = None self.wait_time = 120 self.all_dir_name = set() self.galaxy_server_name_to_swagger = {"peppa-teach-opt-cms-api": "teach-opt-cms-api", "peppa-teach-biz-server": "peppa-teach-biz", "peppa-market-server": "peppa-market", "peppa-teach-parker-server": "peppa-teach-parker", "peppa-course-server": "peppa-course"} self.swagger_name_to_galaxy = {"teach-opt-cms-api": "peppa-teach-opt-cms-api", "peppa-teach-biz": "peppa-teach-biz-server", "peppa-market": "peppa-market-server", "peppa-teach-parker": "peppa-teach-parker-server", "peppa-course": "peppa-course-server"} # 获取opengalaxy ssotoken相关变量 self.ops_uri = "http://opengalaxy.bg.huohua.cn" self.showUsername = "luohong" self.username = "luohong" self.password = "Lh123456789@" self.sso_login_url = "https://sso.huohua.cn/authentication/form" self.redirect_url = "https://sso.huohua.cn/oauth/authorize?client_id=open-galaxy&response_type=code&redirect_uri=http://opengalaxy.bg.huohua.cn/api/v1/users/user/ssologin/" def getSsoToken(self): session = requests.session() post_data = dict() post_data['showUsername'] = self.showUsername post_data['username'] = self.username post_data['password'] = self.password session.post(url=self.sso_login_url, data=post_data, allow_redirects=True, verify=False) resp = session.get( url=self.redirect_url, allow_redirects=False, verify=False) resp1 = session.get( url=resp.headers['Location'], allow_redirects=False, verify=False) ssoToken = resp1.headers["Set-Cookie"].split(";")[4].split("=")[2] return ssoToken def update_platform(self): if len(self.special_env) > 0: config_file = str(Path(self.workspace) / "base_framework/base_config/env_choose.ini") fd = open(config_file, encoding='utf-8') data = fd.read() if data[:3] == codecs.BOM_UTF8: data = data[3:] files = codecs.open(config_file, "w") files.write(data) files.close() fd.close() cf = configparser.ConfigParser(allow_no_value=True) cf.read(config_file, encoding='utf-8') cf.set("run_jira_id", "huohua-podenv", self.special_env) with open(config_file, 'w') as fw: # 循环写入 cf.write(fw) def assign_port(self): workspace_base_name = Path(self.workspace).name query_port = "SELECT PORT FROM ENV_PORT WHERE NAME='{}'".format(workspace_base_name) res_port = self.cur.execute(query_port).fetchall() query_all_port = "SELECT PORT FROM ENV_PORT" res_all_port = self.cur.execute(query_all_port).fetchall() not_use_port = (self.def_port - set(str(item[0]) for item in res_all_port)).pop() print("not use port is {}".format(not_use_port)) self.env_port = not_use_port if len(res_port) == 0: insert_name_port_sql = "INSERT INTO ENV_PORT(NAME,PORT)VALUES(?,?)" self.cur.execute(insert_name_port_sql, (workspace_base_name, not_use_port)).fetchall() self.con.commit() else: update_name_port_sql = "UPDATE ENV_PORT SET PORT='{}' WHERE NAME='{}'".format(self.env_port, workspace_base_name) self.cur.execute(update_name_port_sql).fetchall() self.con.commit() self.cur.close() self.con.close() print("use port is {}".format(self.env_port)) # def assign_port(self): # workspace_base_name = Path(self.workspace).name # query_port = "SELECT PORT FROM ENV_PORT WHERE NAME='{}'".format(workspace_base_name) # res = self.cur.execute(query_port).fetchall() # if len(res) == 0: # query_all_port = "SELECT PORT FROM ENV_PORT" # res_all_port = self.cur.execute(query_all_port).fetchall() # not_use_port = (self.def_port - set(str(item[0]) for item in res_all_port)).pop() # print("not use port is {}".format(not_use_port)) # insert_name_port_sql = "INSERT INTO ENV_PORT(NAME,PORT)VALUES(?,?)" # self.cur.execute(insert_name_port_sql, (workspace_base_name, not_use_port)).fetchall() # self.con.commit() # self.env_port = not_use_port # else: # self.env_port = str(res[0][0]) # self.cur.close() # self.con.close() # print("use port is {}".format(self.env_port)) def _get_not_used_port(self): find_pid_linux_cmd = "lsof -i:{}".format(self.env_port) res_code, res_context = run_process(find_pid_linux_cmd, out_p=True) if len(res_context) > 0: return True else: return False def check_port(self): if OSType.WIN == OSType.get_type(): find_pid_win_cmd = 'netstat -ano | findstr {} | findstr LISTENING'.format(self.env_port) res_code, res_context = run_process(find_pid_win_cmd, out_p=True) if res_code == 0: print(res_context) if len(res_context) > 0: try: self.pid = str(res_context).split()[-1].replace("\\r\\n'", "") self._kill_pid() except IndexError: pass elif OSType.LINUX == OSType.get_type(): find_pid_linux_cmd = "lsof -i:{}".format(self.env_port) res_code, res_context = run_process(find_pid_linux_cmd, out_p=True) if res_code == 0: print(res_context) # 获取pid if len(res_context) > 0: try: self.pid = str(res_context).split("\\n")[1].split()[1] self._kill_3_pid() except IndexError: pass else: raise RuntimeError("your os is not support.") def _kill_3_pid(self): self._kill_pid() count = 3 while count > 0: find_pid_linux_cmd = "lsof -i:{}".format(self.env_port) res_code, res_context = run_process(find_pid_linux_cmd, out_p=True) if len(res_context) > 0: time.sleep(2) self.pid = str(res_context).split("\\n")[1].split()[1] self._kill_pid() count -= 1 continue else: break def _kill_pid(self): if OSType.WIN == OSType.get_type(): kill_pid_cmd = "taskkill /f /pid {}".format(self.pid) elif OSType.LINUX == OSType.get_type(): kill_pid_cmd = "kill -9 {}".format(self.pid) else: raise RuntimeError("your os is not support.") res_code, res_context = run_process(kill_pid_cmd) if res_code: raise RuntimeError("kill pid: {} failed. error: {}".format(self.pid, res_context)) def update_port(self): main_py = Path(self.workspace) / "base_framework/main.py" tmp_py = Path(self.workspace) / "base_framework/tmp.py" shutil.copy(main_py, tmp_py) src_context = "port=9999" dst_context = "port={}".format(self.env_port) self._replace_file_context(tmp_py, main_py, src_context, dst_context) env_robot = Path(self.workspace) / "{}/test_case/Resource/AdapterKws/env.robot".format(self.team) tmp_robot = Path(self.workspace) / "{}/test_case/Resource/AdapterKws/tmp.robot".format(self.team) shutil.copy(env_robot, tmp_robot) src_context = "127.0.0.1:9999" dst_context = "127.0.0.1:{}".format(self.env_port) self._replace_file_context(tmp_robot, env_robot, src_context, dst_context) def update_suite(self): case_dir = Path(self.workspace) / "{}/test_case".format(self.team) self._set_path(case_dir) case_src_path = Path(self.case_path) if case_src_path.is_file(): base_name = case_src_path.name.split(case_src_path.suffix)[0] if base_name.upper() in self.all_dir_name: self.case_path = str(case_src_path.with_name(base_name + "9" + case_src_path.suffix)) self._replace_path(case_dir) def _set_path(self, src_path): for p in src_path.iterdir(): if p.is_dir(): self.all_dir_name.add(p.name.upper()) self._set_path(p) def _replace_path(self, src_path): for p in src_path.iterdir(): if p.is_dir(): self._replace_path(p) elif p.is_file(): base_name = p.name.split(p.suffix)[0] if base_name.upper() in self.all_dir_name: p.rename(p.with_name(base_name + "9" + p.suffix)) def start_kwl(self): startup_path = Path(self.workspace) / "base_framework" startup = "startup.py" exec_cmd_params = "" if len(self.special_env) > 0: exec_cmd_params = exec_cmd_params + "-j {} ".format(self.special_env) if len(self.team) > 0: exec_cmd_params = exec_cmd_params + "-t {} ".format(self.team) if len(self.business) > 0: exec_cmd_params = exec_cmd_params + "-b {} ".format(self.business) if OSType.WIN == OSType.get_type(): exec_cmd_path = "cd {} && python {} ".format(startup_path, startup) exec_cmd = exec_cmd_path + exec_cmd_params elif OSType.LINUX == OSType.get_type(): exec_cmd_path = "cd {} && python3 {} ".format(startup_path, startup) exec_cmd = exec_cmd_path + exec_cmd_params else: raise RuntimeError("your os is not support.") exec_cmd = exec_cmd[:-1] # import subprocess # from subprocess import PIPE # close_fds = False if OSType.WIN == OSType.get_type() else True # subprocess.Popen(exec_cmd, shell=True, close_fds=close_fds, stdout=subprocess.DEVNULL) try: pro = self._start_kwl_server(exec_cmd) if not pro.is_alive(): print("--------pro.exec_status: ", pro.is_alive()) pro.kill() time.sleep(5) self._kill_3_pid() self._start_kwl_server(exec_cmd) except Exception as e: print("--------error: ", e) @staticmethod def _start_kwl_server(exec_cmd): from multiprocessing import Process p = Process(target=run_process, args=(exec_cmd,)) p.daemon = True p.start() time.sleep(30) return p @staticmethod def _replace_file_context(src_file, dst_file, src_context, dst_context): with open(src_file, "r") as f_src: with open(dst_file, "w") as f_dst: for line in f_src: if src_context in line: f_dst.writelines(line.replace(src_context, dst_context)) else: f_dst.writelines(line) def _get_report_dir(self): """ Create %WORKSPACE%/Report 用于存放对应构建的构建日志 """ report_dir = os.path.join(self.workspace, 'Report') if not os.path.exists(report_dir): logging.info(u'create report directory: %s' % report_dir) os.makedirs(report_dir) return report_dir def _get_report_ci_out_dir(self): """ Create %WORKSPACE%/Report/ci_out,用于存放构建日志 """ ci_out_dir = os.path.join(self.workspace, 'Report', 'ci_out') if os.path.exists(ci_out_dir): logging.info(u'create ci output directory: %s' % ci_out_dir) shutil.rmtree(ci_out_dir) return ci_out_dir @staticmethod def copy_any_thing(src, dst): try: shutil.copytree(src, dst) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src, dst) else: raise def _wait_kwl_run(self): i = 0 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while i < self.wait_time: try: s.connect(("127.0.0.1", int(self.env_port))) s.shutdown(2) s.close() return except socket.error: i += 1 s.close() def run_cases(self): """ run cases by workspace include exclude. """ self._wait_kwl_run() report_dir = os.path.join(self.workspace, 'Report') if os.path.exists(report_dir): logging.info(u'clean report directory: %s' % report_dir) shutil.rmtree(report_dir) os.makedirs(report_dir) if self.rerun == 'true' or self.rerun == '': self._rerun_failed_cases() else: self._default_run_cases() out_dir = os.path.join(self._get_report_dir(), 'out') if os.path.exists(out_dir): # copy report to ci_out. self.copy_any_thing(self._get_report_dir(), self._get_report_ci_out_dir()) else: raise RuntimeError(u'%s directory is not exist.' % out_dir) def _default_run_cases(self): """ exec robot and output log to {report_dir}/out directory. return command execute exit code. """ test_case_cmdstr = [] if len(self.test_case) > 0: for tc in self.test_case.split(','): test_case_cmdstr.append(u'--test %s' % tc) includes_cmdstr = [] if len(self.include) > 0: for include in self.include.split(','): includes_cmdstr.append(u'--include %s' % include) excludes_cmdstr = [] if len(self.exclude) > 0: for exclude in self.exclude.split(','): excludes_cmdstr.append(u'--exclude %s' % exclude) output_directory = os.path.join(self._get_report_dir(), 'out') Path(output_directory).mkdir(exist_ok=True) # Set ROBOT_SYSLOG_FILE and ROBOT_SYSLOG_LEVEL environment variable. os.environ['ROBOT_SYSLOG_FILE'] = os.path.join(self._get_report_dir(), 'robot_syslog.txt') os.environ['ROBOT_SYSLOG_LEVEL'] = ROBOT_LOG_LEVEL logging.info(u'building cases...') res_code, _ = run_process(' '.join( [u'robot'] + test_case_cmdstr + excludes_cmdstr + includes_cmdstr + [u'-d', output_directory, self.case_path])) # clear ROBOT_SYSLOG_FILE(NONE) and ROBOT_SYSLOG_LEVEL environment variable. os.environ['ROBOT_SYSLOG_FILE'] = 'NONE' return res_code def _rerun_failed_cases(self): """ execute robot twice(the second execute is only failed in first.) and remerge output to {report_dir}/out directory. """ logging.info('rerunfailed cases mode...') return_code = self._default_run_cases() logging.info('first run exit code: %s' % return_code) if not return_code: logging.info('first cases built successfully') return logging.info('the first cases to build there is failure cases') output_directory = os.path.join(self._get_report_dir(), u'out') if not os.path.exists(output_directory): raise RuntimeError(u'the first cases to built throw exception.') output_directory_r1 = os.path.join(self._get_report_dir(), u'first_out') output_directory_r2 = os.path.join(self._get_report_dir(), u'second_out') logging.info('rename the first cases to build the output directory') cur_dir = os.getcwd() os.chdir(self._get_report_dir()) if OSType.WIN == OSType.get_type(): cmd_str = 'ren out first_out' elif OSType.LINUX == OSType.get_type(): cmd_str = 'cp -R out first_out' else: raise RuntimeError("your os is not support.") os.system(cmd_str) os.chdir(cur_dir) output_directory_cmdstr = [u'-d', output_directory_r2] rerun_failed_cmdstr = [u'--rerunfailed', os.path.join(output_directory_r1, u'output.xml')] logging.info('rerunfailed test cases...') run_process(' '.join([u'robot'] + output_directory_cmdstr + rerun_failed_cmdstr + [self.case_path])) if not os.path.exists(output_directory_r2): raise RuntimeError(u'the second cases to built throw throw exception.') # Set ROBOT_SYSLOG_FILE and ROBOT_SYSLOG_LEVEL environment variable. os.environ['ROBOT_SYSLOG_FILE'] = os.path.join(self._get_report_dir(), 'robot_syslog2.txt') os.environ['ROBOT_SYSLOG_LEVEL'] = ROBOT_LOG_LEVEL logging.info('merge report...') run_process(' '.join([u'rebot', u'-d', output_directory, u'-o', u'output.xml', u'--merge', os.path.join(output_directory_r1, u'output.xml'), os.path.join(output_directory_r2, u'output.xml')])) # clear ROBOT_SYSLOG_FILE(NONE) and ROBOT_SYSLOG_LEVEL environment variable. os.environ['ROBOT_SYSLOG_FILE'] = 'NONE' def record_build_url(self, build_id, report_url): try: if build_id: import pymysql db = pymysql.connect(host="mysql.qa.huohua.cn", user="qa-dev", password="jaeg3SCQt0", database="sparkatp", charset='utf8') cursor = db.cursor() if build_id and report_url: update_sql = "UPDATE sparkatp.build_info set report_url='{}',status=2 WHERE id={}".format( report_url, build_id) cursor.execute(update_sql) cursor.fetchall() try: self.get_jacoco_report(cursor, build_id) except Exception as e: print(e) db.commit() cursor.close() db.close() except Exception as e: print(e) def get_tester_by_project_id(self): """ 获取project_id和tester :return: """ try: import pymysql db = pymysql.connect(host="10.250.200.53", user="root", password="peppa@test", database="tools", charset='utf8') cursor = db.cursor() db.commit() cursor.close() db.close() except Exception as e: print(e) def get_project_id_by_build_id(self, cursor, build_id): try: if build_id: get_scene_id_sql = "SELECT scene_id FROM build_info where id='{}'".format( build_id) cursor.execute(get_scene_id_sql) scene_id_info = cursor.fetchone() if scene_id_info: scene_id = scene_id_info[0] get_project_id_sql = "SELECT project_id FROM scene_new where id='{}'".format( scene_id) cursor.execute(get_project_id_sql) project_id_info = cursor.fetchone() if project_id_info: project_id = project_id_info[0] return project_id return 0 return 0 except Exception as e: print(e) def get_jacoco_report(self, cursor, build_id): get_server_name_sql = "SELECT run_server_list FROM build_info where id='{}' and is_jacoco=1".format(build_id) cursor.execute(get_server_name_sql) server_name_info = cursor.fetchone() if server_name_info: server_name_list = eval(server_name_info[0]) for server_name in server_name_list: if server_name == "PEPPA-TEACH-API" or server_name == "peppa-teach-api" or "-EXECUTOR" in server_name.upper(): continue self._do_jacoco_report(server_name, build_id, cursor) def _do_jacoco_report(self, project_name, build_id, cursor): if not self.special_env: insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, "", self.team, project_name, self.special_env, "master", self.special_env, "1", "", "QA环境构建,不需要收集增量覆盖率") cursor.execute(insert_data) return elif self.special_env.upper() == "NONE" or self.special_env.upper() == "QA": insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, "", self.team, project_name, self.special_env, "master", self.special_env, "1", "", "QA环境构建,不需要收集增量覆盖率") cursor.execute(insert_data) return # 收集覆盖率报告 import requests import json if self.team.upper() in ["CC", "LALIVE", "H2R"]: self.team = "CC" if self.team.upper() in ["SCM", "ES"]: self.team = "ES" if self.team.upper() in ["TO", "TMO"]: self.team = "TTS" current_version = self.get_branch_from_open_galaxy(self.special_env, project_name) if current_version == "master": insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, "", self.team, project_name, current_version, current_version, self.special_env, "1", "", "master不需要收集增量覆盖率") cursor.execute(insert_data) return if not current_version: print("-----------------: {}, 独立环境: {}, 服务名: {}".format("未在独立环境中服务,不收集覆盖率", self.special_env, project_name)) return if project_name.lower() in self.galaxy_server_name_to_swagger.keys(): project_name = self.galaxy_server_name_to_swagger[project_name.lower()] base_version = "master" url = "http://10.250.0.252:8989/cov/syncCollectionCov" params = {"baseVersion": base_version, "businessName": self.team, "currentVersion": current_version, "departmentName": "质量保障中心", "envName": self.special_env, "isBranch": 1, "isDiff": 2, "projectName": project_name} res = requests.post(url, json=params) if res.status_code == 200: if json.loads(res.text)["msg"] != "success": logging.error("{}收集覆盖率报告失败,err: {}".format(project_name, res.text)) insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, "", self.team, project_name, current_version, base_version, self.special_env, "3", "", res.text) cursor.execute(insert_data) else: jacoco_report_id = json.loads(res.text)["data"] insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, int(jacoco_report_id), self.team, project_name, current_version, base_version, self.special_env, "0", "", "") cursor.execute(insert_data) else: logging.error("{}收集覆盖率报告失败,status:{},err: {}".format(project_name, str(res.status_code), res.text)) insert_data = "INSERT INTO `sparkatp`.`build_jacoco`(`build_info_id`, `jacoco_report_id`, `team`, `server_name`, `now_version`, `base_version`, `env_name`, `status`, `report_url`, `remark`) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');".format( build_id, "", self.team, project_name, current_version, base_version, self.special_env, "3", "", res.text) cursor.execute(insert_data) # print(res.text) def get_branch_from_open_galaxy(self, special_env, server_name): if server_name.lower() in self.swagger_name_to_galaxy.keys(): server_name = self.swagger_name_to_galaxy[server_name.lower()] try: headers = { "Api-Token": "2a1cbc25e0d183e1ec9fe5872c1433617c96da9e2905c4371d8f979479910aa1"} open_galaxy_url = "http://opengalaxy.bg.huohua.cn/api/v1/hcloud/tree/node/sec/application?sec_name={}".format( special_env) res = requests.get(open_galaxy_url, headers=headers) if res.status_code == 200: result = json.loads(res.text) for item in result["data"]["results"]: if item["name"].lower() == server_name.lower(): branch = item["branch"] return branch return "" else: print("获取运维服务列表失败,status: {}, res: {}".format(res.status_code, res.text)) return "" except Exception as e: print(e) return "" if __name__ == '__main__': # workspace = "/root/workspaces/huaxuemin-dev" workspace = r"E:\huohua\auto\huaxuemin-dev" # case_path = "/root/workspaces/huaxuemin-dev/HuoHuaTestCase/EN/1.接口/Peppa-Eng-Live/play_back.robot" case_path = r"E:\huohua\auto\huaxuemin-dev\HuoHuaTestCase\EN\1.接口\Peppa-Eng-Live\play_back.robot" test_case = "" include = "" exclude = "" test = CaseRunner(workspace, case_path, test_case, include, exclude) # test.assign_port() # test.check_port() # test.update_port() # test.start_kwl() # test.run_cases() # test.check_port() # build_info_id = '16329' # build_url = 'http://10.250.200.1:8080/jenkins/view/QE_JOB/job/qe_job1/63/' # test.record_build_url(build_info_id, build_url) res = test.get_branch_from_open_galaxy("HHC-92692", "peppa-asset-server") print(res)