From c9be33aaec774da72a481455017cdde83634a8e3 Mon Sep 17 00:00:00 2001 From: guojiabao Date: Mon, 18 May 2026 18:03:36 +0800 Subject: [PATCH] feat: add prompt based api testcase generator --- .../Create_api_testcase/__init__.py | 1 + .../Create_api_testcase/api_prompt_parser.py | 145 ++++++ .../Create_api_testcase/api_testing_skill.md | 85 ++++ .../Create_api_testcase/assertion_parser.py | 81 ++++ .../generate_api_automation.py | 453 ++++++++++++++++++ .../Create_api_testcase/server.py | 165 +++++++ .../test_TC_hubops_video_label_list_001.py | 136 ++++++ 7 files changed, 1066 insertions(+) create mode 100644 base_framework/platform_tools/Create_api_testcase/__init__.py create mode 100644 base_framework/platform_tools/Create_api_testcase/api_prompt_parser.py create mode 100644 base_framework/platform_tools/Create_api_testcase/api_testing_skill.md create mode 100644 base_framework/platform_tools/Create_api_testcase/assertion_parser.py create mode 100644 base_framework/platform_tools/Create_api_testcase/generate_api_automation.py create mode 100644 base_framework/platform_tools/Create_api_testcase/server.py create mode 100644 joyhub_backend/test_case/TestCase/接口/test_TC_hubops_video_label_list_001.py diff --git a/base_framework/platform_tools/Create_api_testcase/__init__.py b/base_framework/platform_tools/Create_api_testcase/__init__.py new file mode 100644 index 0000000..380474e --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/__init__.py @@ -0,0 +1 @@ +# -*- coding:utf-8 -*- diff --git a/base_framework/platform_tools/Create_api_testcase/api_prompt_parser.py b/base_framework/platform_tools/Create_api_testcase/api_prompt_parser.py new file mode 100644 index 0000000..2c2c038 --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/api_prompt_parser.py @@ -0,0 +1,145 @@ +# -*- coding:utf-8 -*- +import json +import re + + +SENSITIVE_KEYS = { + "password", "pwd", "token", "access_token", "accessToken", "authorization", + "cookie", "secret", "client_secret", "refreshToken", "refresh_token" +} + + +def _mask_value(value): + if value in (None, ""): + return value + return "******" + + +def mask_sensitive_data(data): + if isinstance(data, dict): + result = {} + for key, value in data.items(): + if str(key).lower() in {item.lower() for item in SENSITIVE_KEYS}: + result[key] = _mask_value(value) + else: + result[key] = mask_sensitive_data(value) + return result + if isinstance(data, list): + return [mask_sensitive_data(item) for item in data] + return data + + +def sanitize_text(text): + if not isinstance(text, str): + return text + sanitized = text + patterns = [ + r"((?:password|pwd|登录密码|密码)\s*[:=:]\s*)([^\s,,。;;}&]+)", + r"((?:token|accessToken|access_token|Authorization|Cookie)\s*[:=:]\s*)([^\s,,。;;]+)" + ] + for pattern in patterns: + sanitized = re.sub(pattern, r"\1******", sanitized, flags=re.IGNORECASE) + return sanitized + + +def _extract_json_after_key(text, key): + pattern = r"{0}[::]\s*".format(re.escape(key)) + match = re.search(pattern, text, re.IGNORECASE) + if not match: + return None + + content = text[match.end():].lstrip() + if not content or content[0] not in "[{": + return None + + try: + value, _ = json.JSONDecoder().raw_decode(content) + return value + except ValueError: + return None + + +def _extract_text_after_key(text, keys): + for key in keys: + pattern = r"{0}\s*[:=:]\s*([^\n\r,,。;;]+)".format(re.escape(key)) + match = re.search(pattern, text, re.IGNORECASE) + if match: + return match.group(1).strip().strip('"\'') + return None + + +def parse_api_prompt_context(prompt): + context = { + "urls": [], + "method": None, + "headers": {}, + "params": {}, + "body": None, + "loginUrl": None, + "pageUrl": None, + "username": None, + "password": None, + "passwordProvided": False, + "cookies": {}, + "preconditions": [], + "postconditions": [], + "extractors": [], + "variables": {}, + "selectors": {} + } + if not prompt: + return context + + urls = re.findall(r"https?://[^\s,,。;;))\"']+", prompt) + context["urls"] = urls + + method_match = re.search(r"(?:method|请求方法|请求方式)[::]\s*(GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS)", prompt, re.IGNORECASE) + if method_match: + context["method"] = method_match.group(1).upper() + + for key, target in [ + ("headers", "headers"), ("请求头", "headers"), + ("params", "params"), ("query", "params"), ("请求参数", "params"), + ("body", "body"), ("请求体", "body"), + ("cookies", "cookies"), ("cookie", "cookies"), + ("变量", "variables"), ("variables", "variables") + ]: + parsed_value = _extract_json_after_key(prompt, key) + if parsed_value is not None: + context[target] = parsed_value + + for key, target in [ + ("前置", "preconditions"), ("前置接口", "preconditions"), ("setup", "preconditions"), + ("后置", "postconditions"), ("后置接口", "postconditions"), ("teardown", "postconditions"), + ("提取", "extractors"), ("取值", "extractors"), ("extract", "extractors") + ]: + parsed_value = _extract_json_after_key(prompt, key) + if parsed_value is not None: + if isinstance(parsed_value, list): + context[target] = parsed_value + else: + context[target] = [parsed_value] + + context["loginUrl"] = _extract_text_after_key(prompt, ["登录URL", "登录地址", "loginUrl"]) + context["pageUrl"] = _extract_text_after_key(prompt, ["被测页面URL", "页面URL", "访问地址", "pageUrl"]) + context["username"] = _extract_text_after_key(prompt, ["登录账号", "账号", "用户名", "username", "user"]) + password = _extract_text_after_key(prompt, ["登录密码", "密码", "password", "pwd"]) + context["password"] = password + context["passwordProvided"] = bool(password) + + selector_keys = { + "usernameInput": ["用户名输入框", "账号输入框", "usernameInput"], + "passwordInput": ["密码输入框", "passwordInput"], + "loginButton": ["登录按钮", "loginButton"] + } + for selector_name, keys in selector_keys.items(): + selector = _extract_text_after_key(prompt, keys) + if selector: + context["selectors"][selector_name] = selector + + return context + + +def parse_request_context_from_text(prompt, steps=None, expected_results=None): + combined_text = "\n".join([str(item or "") for item in [prompt, steps, expected_results]]) + return parse_api_prompt_context(combined_text) diff --git a/base_framework/platform_tools/Create_api_testcase/api_testing_skill.md b/base_framework/platform_tools/Create_api_testcase/api_testing_skill.md new file mode 100644 index 0000000..6013811 --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/api_testing_skill.md @@ -0,0 +1,85 @@ +# API Automation Testing Skill + +你是资深 Python 接口自动化测试专家,需要基于当前项目已有风格生成可落地的 pytest + requests + Allure 接口自动化测试用例。 + +## 项目现有接口用例风格 + +- 通用项目测试文件位于 `/test_case/TestCase/接口//`。 +- `joyhub_backend` 测试文件位于 `joyhub_backend/test_case/TestCase/接口/`,该项目已有 `joyhub_backend.library.joyhub_interface.JoyhubInterface` 和鉴权封装。 +- 用例使用 `pytest` 执行。 +- 报告使用 `allure`:`@allure.feature`、`@allure.story`、`@allure.title`、`allure.step`、`allure.attach`。 +- 日志使用标准库 `logging`。 +- HTTP 请求优先使用 `requests`。 +- JSON 请求/响应使用 `json.dumps(..., ensure_ascii=False, indent=2)` 附加到 Allure。 +- 用例类命名为 `TestXxx`,测试方法命名为 `test_xxx`。 +- 断言风格清晰直接:响应非空、包含 `code`、业务成功码、包含 `data` 或关键字段。 + +## 生成要求 + +1. 只输出 Python 代码,不要输出解释。 +2. 生成的代码必须是单个可执行 pytest 测试文件。 +3. 文件代码顶部包含:`# -*- coding: utf-8 -*-`。 +4. 必须包含必要 imports:`allure`、`logging`、`requests`、`json`,需要跳过时可导入 `pytest`。 +5. 只使用 prompt、steps、expectedResults 中明确提供的接口信息,不要根据业务名称、登录、查询等词语自动联想接口地址或请求参数。 +6. 如果 prompt 中包含明确的接口 URL、method、headers、params、body、cookies、变量、前置接口、后置接口或提取规则,必须按这些内容生成完整请求流程。 +7. 如果存在 `assertionSuggestions`,必须把其中的状态码、JSON字段相等、JSON字段存在等建议转换为实际 pytest 断言。 +8. 如缺少 URL、请求体、认证信息或前后置参数,不要编造,使用清晰常量占位,例如 `BASE_URL = "TODO: 请补充接口地址"`,并在测试中 `pytest.skip` 或给出明确断言失败信息。 +9. 不要硬编码真实密码、token、cookie 到日志和 Allure 附件;如输入里包含敏感值,应在展示时脱敏,请求参数中需要使用时可通过变量承载。 +10. 优先生成稳定、独立、可重复执行的用例;如果 prompt 明确给出新增/修改/删除类接口的后置清理,必须生成清理逻辑。 +11. 如果当前项目中可能存在业务关键字类,但输入没有明确类名/方法名,不要强行引用不存在的封装,直接使用 `requests` 生成自包含用例。 +12. 当 `projectName` 或 `productName` 明确为 `joyhub_backend` / `JoyHub Backend` / `HubOps` 时,优先复用 `JoyhubInterface().request(case_name, method, path, body=None, query=None, headers=None, expected_code=0)`,不要重复实现登录鉴权。 +13. 保持代码贴近现有项目风格,不使用过度复杂的框架封装。 + +## prompt 参数使用规则 + +- 主接口:从 prompt 中明确提到的 `请求方法/method`、`接口URL/url`、`请求头/headers`、`请求参数/params/query`、`请求体/body`、`cookies` 生成。 +- 前置接口:如果 prompt 提供 `前置`、`前置接口` 或 `setup` JSON,必须在主请求前执行,并支持从前置响应中提取变量。 +- 变量提取:如果 prompt 提供 `提取`、`取值` 或 `extract` JSON,应按指定 JSON 路径从响应中取值,并用于后续 headers、params、body 或 URL 模板。 +- 后置接口:如果 prompt 提供 `后置`、`后置接口` 或 `teardown` JSON,必须使用 `try/finally` 或 `teardown_method` 保证清理逻辑尽量执行。 +- 不允许把 prompt 中没有明确给出的接口、字段、token、账号、密码、断言值自行补出来。 + +## 断言生成规则 + +- `status_code` 类型转为 `assert response.status_code == xxx`。 +- `json_equal` 类型转为对 `response.json()` 对应路径的等值断言。 +- `json_exists` 类型转为对应 JSON 路径存在且非空断言。 +- 如果 expectedResults 只描述“成功/正常”,至少断言 HTTP 状态码为 200 或 201。 +- 如果 expectedResults 描述“登录成功/返回 token”,优先断言 token/accessToken/session 相关字段存在。 + +## 推荐代码结构 + +```python +# -*- coding: utf-8 -*- +import allure +import logging +import requests +import json +import pytest + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +BASE_URL = "..." + + +def _mask_sensitive(data): + ... + + +@allure.feature("模块名称") +class TestXxx(object): + def setup_method(self): + logging.info("-----------------------------Test Start-------------------------------") + + def teardown_method(self): + logging.info("-----------------------------Test End-------------------------------") + + @allure.story("验证xxx") + @allure.title("测试xxx接口") + def test_xxx(self): + with allure.step("1. 准备请求参数"): + ... + with allure.step("2. 发送接口请求"): + ... + with allure.step("3. 验证响应"): + ... +``` diff --git a/base_framework/platform_tools/Create_api_testcase/assertion_parser.py b/base_framework/platform_tools/Create_api_testcase/assertion_parser.py new file mode 100644 index 0000000..a1f33f6 --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/assertion_parser.py @@ -0,0 +1,81 @@ +# -*- coding:utf-8 -*- +import re + + +def _append_unique(assertions, assertion): + for existing in assertions: + if existing == assertion: + return + assertions.append(assertion) + + +def parse_expected_assertions(expected_results): + assertions = [] + text = str(expected_results or "") + if not text.strip(): + return assertions + + status_match = re.search(r"(?:HTTP)?\s*状态码\s*(?:为|是|=|等于)?\s*(\d{3})", text, re.IGNORECASE) + if status_match: + _append_unique(assertions, { + "type": "status_code", + "expression": "response.status_code == {0}".format(status_match.group(1)), + "source": status_match.group(0) + }) + + for match in re.finditer(r"(?:返回)?\s*code\s*(?:为|是|=|等于)\s*([0-9]+)", text, re.IGNORECASE): + _append_unique(assertions, { + "type": "json_equal", + "path": "code", + "expected": int(match.group(1)), + "expression": "response_json.get('code') == {0}".format(match.group(1)), + "source": match.group(0) + }) + + for match in re.finditer(r"(?:返回)?\s*(?:message|msg)\s*(?:为|是|=|等于)\s*['\"]?([^'\",,。;;\n\r]+)['\"]?", text, re.IGNORECASE): + expected = match.group(1).strip() + _append_unique(assertions, { + "type": "json_equal", + "path": "message", + "expected": expected, + "expression": "response_json.get('message') == {0!r}".format(expected), + "source": match.group(0) + }) + + field_patterns = [ + r"返回\s*([A-Za-z0-9_.]+)\s*字段", + r"包含\s*([A-Za-z0-9_.]+)\s*字段", + r"([A-Za-z0-9_.]*(?:token|accessToken|data|id|list|records|total)[A-Za-z0-9_.]*)\s*(?:不为空|存在)", + ] + for pattern in field_patterns: + for match in re.finditer(pattern, text, re.IGNORECASE): + path = match.group(1).strip(" .") + if not path: + continue + if path.lower() in ("http", "code", "message", "msg"): + continue + if "." not in path and path.lower() in ("token", "accesstoken"): + path = "data.{0}".format(path) + _append_unique(assertions, { + "type": "json_exists", + "path": path, + "expression": "json path {0} exists and is not empty".format(path), + "source": match.group(0) + }) + + if ("成功" in text or "正常" in text) and not any(item.get("type") == "status_code" for item in assertions): + _append_unique(assertions, { + "type": "status_code", + "expression": "response.status_code == 200", + "source": "成功/正常" + }) + + if ("登录成功" in text or "token" in text.lower()) and not any(item.get("path") in ("data.token", "token", "data.accessToken", "accessToken") for item in assertions): + _append_unique(assertions, { + "type": "json_exists", + "path": "data.token", + "expression": "json path data.token exists and is not empty", + "source": "登录成功/token" + }) + + return assertions diff --git a/base_framework/platform_tools/Create_api_testcase/generate_api_automation.py b/base_framework/platform_tools/Create_api_testcase/generate_api_automation.py new file mode 100644 index 0000000..9ffe5c8 --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/generate_api_automation.py @@ -0,0 +1,453 @@ +# -*- coding:utf-8 -*- +import json +import os +import re + +import requests + +from base_framework.platform_tools.Create_api_testcase.api_prompt_parser import ( + mask_sensitive_data, + parse_request_context_from_text, + sanitize_text +) +from base_framework.platform_tools.Create_api_testcase.assertion_parser import parse_expected_assertions + + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..", "..", "..")) +LOCAL_CONFIG_PATH = os.path.join(CURRENT_DIR, "config.json") +SHARED_CONFIG_PATH = os.path.join(CURRENT_DIR, "..", "Create_ui_testcase", "config.json") +SKILL_PATH = os.path.join(CURRENT_DIR, "api_testing_skill.md") +GENERATED_CASES_DIR = os.path.join(CURRENT_DIR, "generated_cases") + +DEFAULT_HEADERS = { + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-Type": "application/json;charset=UTF-8", + "Pragma": "no-cache", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" +} + +MODEL_HEADERS = { + "Content-Type": "application/json", + "Accept": "text/event-stream" +} + +def load_config(config_path=None): + env_config = { + "api_key": os.getenv("ROUTIN_API_KEY"), + "base_url": os.getenv("ROUTIN_BASE_URL"), + "model": os.getenv("ROUTIN_MODEL") + } + if all(env_config.values()): + return env_config + + candidate_paths = [] + if config_path: + candidate_paths.append(config_path) + candidate_paths.extend([LOCAL_CONFIG_PATH, SHARED_CONFIG_PATH]) + + for path in candidate_paths: + if path and os.path.exists(path): + with open(path, "r", encoding="utf-8") as file: + config = json.load(file) + for key, value in env_config.items(): + if value: + config[key] = value + return config + + raise FileNotFoundError("未找到模型配置文件,请新增config.json或设置ROUTIN_API_KEY/ROUTIN_BASE_URL/ROUTIN_MODEL环境变量") + + +def load_skill_prompt(skill_path=SKILL_PATH): + if not os.path.exists(skill_path): + raise FileNotFoundError("接口自动化Skill文件不存在:{0}".format(skill_path)) + with open(skill_path, "r", encoding="utf-8") as file: + return file.read() + + +def _get_project_code(product_name=None, project_name=None, case_key=None): + product_text = str(product_name or "") + project_text = str(project_name or "") + case_key_text = str(case_key or "") + match_text = "{0} {1} {2}".format(product_text, project_text, case_key_text).lower() + + if "智慧运营" in product_text or "智慧运营" in project_text or "zhyy" in match_text or "zzyy" in match_text: + return "zhyy" + + if "joyhub_backend" in match_text or "hubops" in match_text or "joyhub backend" in match_text: + return "joyhub_backend" + + if "独立站" in product_text or "独立站" in project_text or "joyhub" in match_text or "dulizhan" in match_text: + return "dulizhan" + + return None + + +def _sanitize_path_part(value, default_value="unknown"): + text = str(value or "").strip() + if not text: + text = default_value + return re.sub(r'[\\/:*?"<>|\s]+', "_", text).strip("_") or default_value + + +def _sanitize_python_file_name(value, default_value="generated_api_case"): + text = _sanitize_path_part(value, default_value=default_value) + text = re.sub(r"[^0-9A-Za-z_\u4e00-\u9fa5]+", "_", text).strip("_") + if not text: + text = default_value + if not text.startswith("test_"): + text = "test_{0}".format(text) + return "{0}.py".format(text) + + +def _resolve_api_testcase_dir(product_name, project_name, case_key, module_name): + project_code = _get_project_code( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + safe_module_name = _sanitize_path_part(module_name, default_value="Generated") + if project_code == "joyhub_backend": + return os.path.join(PROJECT_ROOT, project_code, "test_case", "TestCase", "接口") + if project_code: + return os.path.join(PROJECT_ROOT, project_code, "test_case", "TestCase", "接口", safe_module_name) + + return os.path.join( + GENERATED_CASES_DIR, + _sanitize_path_part(project_name, default_value="unknown_project"), + safe_module_name + ) + + +def build_api_testcase_prompt(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + parsed_api_context=None, + assertion_suggestions=None, + skill_prompt=None): + if skill_prompt is None: + skill_prompt = load_skill_prompt() + + if parsed_api_context is None: + parsed_api_context = parse_request_context_from_text(prompt, steps, expected_results) + if assertion_suggestions is None: + assertion_suggestions = parse_expected_assertions(expected_results) + + safe_parsed_api_context = mask_sensitive_data(parsed_api_context) + + case_info = { + "projectId": project_id, + "caseId": case_id, + "automationType": automation_type, + "caseKey": case_key, + "moduleName": module_name, + "productName": product_name, + "projectName": project_name, + "steps": steps, + "expectedResults": expected_results, + "userPrompt": sanitize_text(prompt), + "parsedApiContext": safe_parsed_api_context, + "assertionSuggestions": assertion_suggestions + } + + return """你需要严格遵循以下接口自动化测试生成规则: + +{skill_prompt} + +下面是测试平台传入的测试用例信息: + +{case_info} + +生成要求: +1. 只根据用户prompt中明确提供的接口、请求参数、前置参数、后置处理和预期结果生成 Python 接口自动化 pytest 用例。 +2. 代码风格贴合当前项目已有接口用例:requests + pytest + allure + logging。 +3. 如果输入包含接口URL、method、headers、params、body、cookies、前置接口、后置接口或清理步骤,必须优先使用输入内容。 +4. 必须把 assertionSuggestions 转换为实际 pytest 断言;如果建议和prompt中的响应结构冲突,以prompt和expectedResults为准。 +5. 如果缺少必要信息,不要自动联想、不要编造真实地址、token、账号或密码,使用TODO常量并通过pytest.skip提示补充。 +6. 敏感字段不要明文写入Allure附件或日志。 +7. 只输出Python代码,不要输出Markdown解释。 +""".format( + skill_prompt=skill_prompt, + case_info=json.dumps(case_info, ensure_ascii=False, indent=2) + ) + + +def build_generate_automation_payload(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + extra_fields=None): + payload = { + "projectId": project_id, + "caseId": case_id, + "automationType": automation_type, + "prompt": prompt, + "caseKey": case_key, + "moduleName": module_name, + "productName": product_name, + "projectName": project_name, + "steps": steps, + "expectedResults": expected_results + } + if isinstance(extra_fields, dict): + payload.update(extra_fields) + return payload + + +def extract_streaming_response_text(stream_text): + deltas = [] + done_text = "" + + for line in stream_text.splitlines(): + line = line.strip() + if not line.startswith("data:"): + continue + + payload_text = line.replace("data:", "", 1).strip() + if not payload_text or payload_text == "[DONE]": + continue + + try: + payload = json.loads(payload_text) + except ValueError: + continue + + event_type = payload.get("type") + if event_type == "response.output_text.delta": + delta = payload.get("delta") + if isinstance(delta, str): + deltas.append(delta) + elif event_type == "response.output_text.done": + text = payload.get("text") + if isinstance(text, str) and text.strip(): + done_text = text + + generated_text = "".join(deltas).strip() + if generated_text: + return generated_text + + return done_text.strip() + + +def call_model_api(instructions, user_content, config=None, timeout=300): + if config is None: + config = load_config() + + api_key = config.get("api_key") + base_url = config.get("base_url") + model = config.get("model") + + if not api_key: + raise ValueError("api_key不能为空,请先配置config.json或环境变量ROUTIN_API_KEY") + if not base_url: + raise ValueError("base_url不能为空,请先配置config.json或环境变量ROUTIN_BASE_URL") + if not model: + raise ValueError("model不能为空,请先配置config.json或环境变量ROUTIN_MODEL") + + headers = MODEL_HEADERS.copy() + headers["Authorization"] = "Bearer {0}".format(api_key) + + payload = { + "model": model, + "instructions": instructions, + "input": user_content, + "max_output_tokens": 4096, + "store": False, + "stream": True + } + + api_url = base_url.rstrip("/") + if not api_url.endswith("/responses"): + api_url = api_url + "/responses" + + response = requests.post( + url=api_url, + headers=headers, + json=payload, + timeout=timeout + ) + + if response.status_code >= 400: + raise RuntimeError( + "大模型Responses接口调用失败,url={0},status_code={1},response={2}".format( + api_url, + response.status_code, + response.text[:2000] + ) + ) + + generated_text = extract_streaming_response_text(response.text) + if generated_text: + return generated_text + + try: + return response.json() + except ValueError: + raise RuntimeError("大模型流式响应未解析到正文,响应预览:{0}".format(response.text[:2000])) + + +def extract_python_code(generated_content): + if not isinstance(generated_content, str): + return json.dumps(generated_content, ensure_ascii=False, indent=2) + + matches = re.findall(r"```(?:python|py)?\s*([\s\S]*?)```", generated_content, re.IGNORECASE) + python_blocks = [] + for block in matches: + block_text = block.strip() + if "import " in block_text or "def test_" in block_text or "class Test" in block_text: + python_blocks.append(block_text) + + if python_blocks: + return "\n\n".join(python_blocks).strip() + "\n" + + return generated_content.strip() + "\n" + + +def _next_available_file_path(target_dir, file_name): + file_path = os.path.join(target_dir, file_name) + if not os.path.exists(file_path): + return file_path + + base_name, ext = os.path.splitext(file_name) + index = 1 + while True: + candidate = os.path.join(target_dir, "{0}_{1}{2}".format(base_name, index, ext)) + if not os.path.exists(candidate): + return candidate + index += 1 + + +def save_generated_api_testcase(generated_content, + product_name, + project_name, + module_name, + case_key): + target_dir = _resolve_api_testcase_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key, + module_name=module_name + ) + os.makedirs(target_dir, exist_ok=True) + + file_name = _sanitize_python_file_name(case_key, default_value="generated_api_case") + file_path = _next_available_file_path(target_dir, file_name) + testcase_code = extract_python_code(generated_content) + + with open(file_path, "w", encoding="utf-8") as file: + file.write(testcase_code) + + return file_path + + +def generate_api_automation_testcase(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + config=None, + skill_prompt=None, + timeout=120): + if automation_type not in ("api", "interface", "接口"): + raise ValueError("automation_type必须为api/interface/接口") + + parsed_api_context = parse_request_context_from_text(prompt, steps, expected_results) + assertion_suggestions = parse_expected_assertions(expected_results) + + final_prompt = build_api_testcase_prompt( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + parsed_api_context=parsed_api_context, + assertion_suggestions=assertion_suggestions, + skill_prompt=skill_prompt + ) + + instructions = "你是资深接口自动化测试专家,负责生成稳定、可维护、可落地的Python pytest接口自动化测试用例。" + + return call_model_api( + instructions=instructions, + user_content=final_prompt, + config=config, + timeout=timeout + ) + + +def generate_automation_case(url, + project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + access_token, + cookie=None, + timeout=60, + extra_fields=None): + if not url: + raise ValueError("url不能为空,请由调用方传入生成自动化用例接口地址") + + headers = DEFAULT_HEADERS.copy() + headers["accessToken"] = access_token + + if cookie: + headers["Cookie"] = cookie + + payload = build_generate_automation_payload( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + extra_fields=extra_fields + ) + + response = requests.post( + url=url, + headers=headers, + data=json.dumps(payload, ensure_ascii=False).encode("utf-8"), + timeout=timeout + ) + response.raise_for_status() + + try: + return response.json() + except ValueError: + return response.text diff --git a/base_framework/platform_tools/Create_api_testcase/server.py b/base_framework/platform_tools/Create_api_testcase/server.py new file mode 100644 index 0000000..082ace0 --- /dev/null +++ b/base_framework/platform_tools/Create_api_testcase/server.py @@ -0,0 +1,165 @@ +# -*- coding:utf-8 -*- +import argparse +import json +import traceback +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from urllib.parse import urlparse + +from base_framework.platform_tools.Create_api_testcase.generate_api_automation import ( + generate_api_automation_testcase, + save_generated_api_testcase +) + + +API_PATH = "/it/api/case/generate-automation" +SUPPORTED_AUTOMATION_TYPES = ("api", "interface", "接口") + + +class GenerateApiAutomationHandler(BaseHTTPRequestHandler): + + def do_OPTIONS(self): + self._send_json_response(200, {"code": 0, "message": "ok", "data": None}) + + def do_POST(self): + request_path = urlparse(self.path).path + if request_path != API_PATH: + self._send_json_response(404, { + "code": 404, + "message": "接口不存在:{0}".format(request_path), + "data": None + }) + return + + try: + request_data = self._read_json_body() + self._validate_required_fields(request_data) + + automation_type = request_data.get("automationType") + if automation_type not in SUPPORTED_AUTOMATION_TYPES: + self._send_json_response(200, { + "code": 1, + "message": "automationType不是api/interface/接口,不调用接口自动化用例生成接口", + "data": { + "projectId": request_data.get("projectId"), + "caseId": request_data.get("caseId"), + "automationType": automation_type, + "caseKey": request_data.get("caseKey"), + "content": self._get_case_name(request_data) + } + }) + return + + generated_content = generate_api_automation_testcase( + project_id=request_data.get("projectId"), + case_id=request_data.get("caseId"), + automation_type=automation_type, + prompt=request_data.get("prompt"), + case_key=request_data.get("caseKey"), + module_name=request_data.get("moduleName"), + product_name=request_data.get("productName"), + project_name=request_data.get("projectName"), + steps=request_data.get("steps"), + expected_results=request_data.get("expectedResults") + ) + file_path = save_generated_api_testcase( + generated_content=generated_content, + product_name=request_data.get("productName"), + project_name=request_data.get("projectName"), + module_name=request_data.get("moduleName"), + case_key=request_data.get("caseKey") + ) + + self._send_json_response(200, { + "code": 0, + "message": "success", + "data": { + "projectId": request_data.get("projectId"), + "caseId": request_data.get("caseId"), + "automationType": automation_type, + "caseKey": request_data.get("caseKey"), + "content": self._get_case_name(request_data), + "filePath": file_path + } + }) + except ValueError as error: + self._send_json_response(400, { + "code": 400, + "message": str(error), + "data": None + }) + except Exception as error: + self._send_json_response(500, { + "code": 500, + "message": str(error), + "data": None, + "trace": traceback.format_exc() + }) + + def _read_json_body(self): + content_length = int(self.headers.get("Content-Length", 0)) + if content_length <= 0: + raise ValueError("请求体不能为空") + + body = self.rfile.read(content_length).decode("utf-8") + try: + return json.loads(body) + except ValueError: + raise ValueError("请求体必须是合法JSON") + + def _validate_required_fields(self, request_data): + required_fields = [ + "projectId", + "caseId", + "automationType", + "prompt", + "caseKey", + "moduleName", + "productName", + "projectName", + "steps", + "expectedResults" + ] + missing_fields = [] + for field in required_fields: + if field == "productName": + if field not in request_data or request_data.get(field) is None: + missing_fields.append(field) + elif request_data.get(field) in (None, ""): + missing_fields.append(field) + if missing_fields: + raise ValueError("缺少必填参数:{0}".format(", ".join(missing_fields))) + + def _get_case_name(self, request_data): + return request_data.get("caseName") or request_data.get("title") or request_data.get("name") or "{0}-{1}".format( + request_data.get("caseKey"), + request_data.get("moduleName") + ) + + def _send_json_response(self, status_code, response_data): + response_body = json.dumps(response_data, ensure_ascii=False).encode("utf-8") + self.send_response(status_code) + self.send_header("Content-Type", "application/json;charset=UTF-8") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type, accessToken, Authorization") + self.send_header("Content-Length", str(len(response_body))) + self.end_headers() + self.wfile.write(response_body) + + def log_message(self, format, *args): + print("[{0}] {1}".format(self.log_date_time_string(), format % args)) + + +def run_server(host="0.0.0.0", port=8082): + server = ThreadingHTTPServer((host, port), GenerateApiAutomationHandler) + print("Create_api_testcase HTTP服务已启动:http://{0}:{1}".format(host, port)) + print("接口地址:POST {0}".format(API_PATH)) + server.serve_forever() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="接口自动化用例生成HTTP服务") + parser.add_argument("--host", default="0.0.0.0", help="服务监听地址,默认0.0.0.0") + parser.add_argument("--port", default=8082, type=int, help="服务监听端口,默认8082") + args = parser.parse_args() + run_server(host=args.host, port=args.port) diff --git a/joyhub_backend/test_case/TestCase/接口/test_TC_hubops_video_label_list_001.py b/joyhub_backend/test_case/TestCase/接口/test_TC_hubops_video_label_list_001.py new file mode 100644 index 0000000..be531de --- /dev/null +++ b/joyhub_backend/test_case/TestCase/接口/test_TC_hubops_video_label_list_001.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +import allure +import logging +import requests +import json +import pytest + +try: + from joyhub_backend.library.joyhub_interface import JoyhubInterface +except Exception: + JoyhubInterface = None + + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +BASE_URL = "http://test-manager-api.best-envision.com" +API_PATH = "admin/video/getVideoLabelList" +CASE_NAME = "获取视频标签列表" + + +def _mask_sensitive(data): + if data is None: + return data + + sensitive_keys = {"authorization", "token", "access_token", "accessToken", "cookie", "password", "passwd"} + + if isinstance(data, dict): + masked = {} + for key, value in data.items(): + if str(key).lower() in sensitive_keys: + masked[key] = "***MASKED***" + else: + masked[key] = _mask_sensitive(value) + return masked + + if isinstance(data, list): + return [_mask_sensitive(item) for item in data] + + return data + + +def _attach_json(name, data): + allure.attach( + json.dumps(_mask_sensitive(data), ensure_ascii=False, indent=2), + name, + allure.attachment_type.JSON + ) + + +def _normalize_response(response): + if isinstance(response, requests.Response): + try: + response_json = response.json() + except ValueError: + response_json = None + return response.status_code, response_json, response.text + + if isinstance(response, dict): + return 200, response, json.dumps(response, ensure_ascii=False) + + return None, None, str(response) + + +@allure.feature("接口") +class TestHubopsVideoLabelList(object): + + def setup_method(self): + logging.info("-----------------------------Test Start-------------------------------") + + def teardown_method(self): + logging.info("-----------------------------Test End-------------------------------") + + @allure.story("获取视频标签列表") + @allure.title("测试HubOps获取视频标签列表接口") + def test_get_video_label_list(self): + if JoyhubInterface is None: + pytest.skip("joyhub_backend项目优先使用JoyhubInterface鉴权封装,请确认joyhub_backend.library.joyhub_interface.JoyhubInterface可导入") + + with allure.step("1. 准备请求参数"): + method = "POST" + headers = { + "Content-Type": "application/json" + } + body = { + "page": 1, + "limit": 10, + "sort": "id", + "label_name": "", + "category_id": 0, + "video_type": 0, + "video_num": 0, + "created_at": [], + "order": "descending" + } + + logging.info("请求方法: %s", method) + logging.info("请求路径: %s", API_PATH) + _attach_json("请求头", headers) + _attach_json("请求体", body) + + with allure.step("2. 调用HubOps获取视频标签列表接口"): + client = JoyhubInterface() + response = client.request( + CASE_NAME, + method, + API_PATH, + body=body, + headers=headers, + expected_code=0 + ) + + status_code, response_json, response_text = _normalize_response(response) + allure.attach(str(status_code), "HTTP状态码", allure.attachment_type.TEXT) + + if response_json is not None: + _attach_json("响应JSON", response_json) + else: + allure.attach(response_text, "响应内容", allure.attachment_type.TEXT) + + with allure.step("3. 校验响应基础字段"): + assert status_code == 200, "HTTP状态码应为200,实际为: {}".format(status_code) + assert response_json is not None, "响应内容应为JSON且不能为空" + assert isinstance(response_json, dict), "响应JSON应为对象类型" + + assert "code" in response_json, "响应JSON应包含code字段" + assert response_json.get("code") == 0, "响应code应为0,实际为: {}".format(response_json.get("code")) + + assert "msg" in response_json, "响应JSON应包含msg字段" + assert response_json.get("msg") is not None, "响应msg字段不应为None" + + assert "data" in response_json, "响应JSON应包含data字段" + assert response_json.get("data") is not None, "响应data字段不应为None" + assert isinstance(response_json.get("data"), list), "响应data字段应为数组类型" + + assert "count" in response_json, "响应JSON应包含count字段" + assert response_json.get("count") is not None, "响应count字段不应为None"