feat: add prompt based api testcase generator

This commit is contained in:
guojiabao
2026-05-18 18:03:36 +08:00
parent e0e22b895e
commit c9be33aaec
7 changed files with 1066 additions and 0 deletions

View File

@@ -0,0 +1 @@
# -*- coding:utf-8 -*-

View File

@@ -0,0 +1,145 @@
# -*- coding:utf-8 -*-
import json
import re
SENSITIVE_KEYS = {
"password", "pwd", "token", "access_token", "accessToken", "authorization",
"cookie", "secret", "client_secret", "refreshToken", "refresh_token"
}
def _mask_value(value):
if value in (None, ""):
return value
return "******"
def mask_sensitive_data(data):
if isinstance(data, dict):
result = {}
for key, value in data.items():
if str(key).lower() in {item.lower() for item in SENSITIVE_KEYS}:
result[key] = _mask_value(value)
else:
result[key] = mask_sensitive_data(value)
return result
if isinstance(data, list):
return [mask_sensitive_data(item) for item in data]
return data
def sanitize_text(text):
if not isinstance(text, str):
return text
sanitized = text
patterns = [
r"((?:password|pwd|登录密码|密码)\s*[:=]\s*)([^\s,。;;}&]+)",
r"((?:token|accessToken|access_token|Authorization|Cookie)\s*[:=]\s*)([^\s,。;;]+)"
]
for pattern in patterns:
sanitized = re.sub(pattern, r"\1******", sanitized, flags=re.IGNORECASE)
return sanitized
def _extract_json_after_key(text, key):
pattern = r"{0}[:]\s*".format(re.escape(key))
match = re.search(pattern, text, re.IGNORECASE)
if not match:
return None
content = text[match.end():].lstrip()
if not content or content[0] not in "[{":
return None
try:
value, _ = json.JSONDecoder().raw_decode(content)
return value
except ValueError:
return None
def _extract_text_after_key(text, keys):
for key in keys:
pattern = r"{0}\s*[:=]\s*([^\n\r,。;;]+)".format(re.escape(key))
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1).strip().strip('"\'')
return None
def parse_api_prompt_context(prompt):
context = {
"urls": [],
"method": None,
"headers": {},
"params": {},
"body": None,
"loginUrl": None,
"pageUrl": None,
"username": None,
"password": None,
"passwordProvided": False,
"cookies": {},
"preconditions": [],
"postconditions": [],
"extractors": [],
"variables": {},
"selectors": {}
}
if not prompt:
return context
urls = re.findall(r"https?://[^\s,。;;)\"']+", prompt)
context["urls"] = urls
method_match = re.search(r"(?:method|请求方法|请求方式)[:]\s*(GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS)", prompt, re.IGNORECASE)
if method_match:
context["method"] = method_match.group(1).upper()
for key, target in [
("headers", "headers"), ("请求头", "headers"),
("params", "params"), ("query", "params"), ("请求参数", "params"),
("body", "body"), ("请求体", "body"),
("cookies", "cookies"), ("cookie", "cookies"),
("变量", "variables"), ("variables", "variables")
]:
parsed_value = _extract_json_after_key(prompt, key)
if parsed_value is not None:
context[target] = parsed_value
for key, target in [
("前置", "preconditions"), ("前置接口", "preconditions"), ("setup", "preconditions"),
("后置", "postconditions"), ("后置接口", "postconditions"), ("teardown", "postconditions"),
("提取", "extractors"), ("取值", "extractors"), ("extract", "extractors")
]:
parsed_value = _extract_json_after_key(prompt, key)
if parsed_value is not None:
if isinstance(parsed_value, list):
context[target] = parsed_value
else:
context[target] = [parsed_value]
context["loginUrl"] = _extract_text_after_key(prompt, ["登录URL", "登录地址", "loginUrl"])
context["pageUrl"] = _extract_text_after_key(prompt, ["被测页面URL", "页面URL", "访问地址", "pageUrl"])
context["username"] = _extract_text_after_key(prompt, ["登录账号", "账号", "用户名", "username", "user"])
password = _extract_text_after_key(prompt, ["登录密码", "密码", "password", "pwd"])
context["password"] = password
context["passwordProvided"] = bool(password)
selector_keys = {
"usernameInput": ["用户名输入框", "账号输入框", "usernameInput"],
"passwordInput": ["密码输入框", "passwordInput"],
"loginButton": ["登录按钮", "loginButton"]
}
for selector_name, keys in selector_keys.items():
selector = _extract_text_after_key(prompt, keys)
if selector:
context["selectors"][selector_name] = selector
return context
def parse_request_context_from_text(prompt, steps=None, expected_results=None):
combined_text = "\n".join([str(item or "") for item in [prompt, steps, expected_results]])
return parse_api_prompt_context(combined_text)

View File

@@ -0,0 +1,85 @@
# API Automation Testing Skill
你是资深 Python 接口自动化测试专家,需要基于当前项目已有风格生成可落地的 pytest + requests + Allure 接口自动化测试用例。
## 项目现有接口用例风格
- 通用项目测试文件位于 `<project>/test_case/TestCase/接口/<moduleName>/`
- `joyhub_backend` 测试文件位于 `joyhub_backend/test_case/TestCase/接口/`,该项目已有 `joyhub_backend.library.joyhub_interface.JoyhubInterface` 和鉴权封装。
- 用例使用 `pytest` 执行。
- 报告使用 `allure``@allure.feature``@allure.story``@allure.title``allure.step``allure.attach`
- 日志使用标准库 `logging`
- HTTP 请求优先使用 `requests`
- JSON 请求/响应使用 `json.dumps(..., ensure_ascii=False, indent=2)` 附加到 Allure。
- 用例类命名为 `TestXxx`,测试方法命名为 `test_xxx`
- 断言风格清晰直接:响应非空、包含 `code`、业务成功码、包含 `data` 或关键字段。
## 生成要求
1. 只输出 Python 代码,不要输出解释。
2. 生成的代码必须是单个可执行 pytest 测试文件。
3. 文件代码顶部包含:`# -*- coding: utf-8 -*-`
4. 必须包含必要 imports`allure``logging``requests``json`,需要跳过时可导入 `pytest`
5. 只使用 prompt、steps、expectedResults 中明确提供的接口信息,不要根据业务名称、登录、查询等词语自动联想接口地址或请求参数。
6. 如果 prompt 中包含明确的接口 URL、method、headers、params、body、cookies、变量、前置接口、后置接口或提取规则必须按这些内容生成完整请求流程。
7. 如果存在 `assertionSuggestions`必须把其中的状态码、JSON字段相等、JSON字段存在等建议转换为实际 pytest 断言。
8. 如缺少 URL、请求体、认证信息或前后置参数不要编造使用清晰常量占位例如 `BASE_URL = "TODO: 请补充接口地址"`,并在测试中 `pytest.skip` 或给出明确断言失败信息。
9. 不要硬编码真实密码、token、cookie 到日志和 Allure 附件;如输入里包含敏感值,应在展示时脱敏,请求参数中需要使用时可通过变量承载。
10. 优先生成稳定、独立、可重复执行的用例;如果 prompt 明确给出新增/修改/删除类接口的后置清理,必须生成清理逻辑。
11. 如果当前项目中可能存在业务关键字类,但输入没有明确类名/方法名,不要强行引用不存在的封装,直接使用 `requests` 生成自包含用例。
12.`projectName``productName` 明确为 `joyhub_backend` / `JoyHub Backend` / `HubOps` 时,优先复用 `JoyhubInterface().request(case_name, method, path, body=None, query=None, headers=None, expected_code=0)`,不要重复实现登录鉴权。
13. 保持代码贴近现有项目风格,不使用过度复杂的框架封装。
## prompt 参数使用规则
- 主接口:从 prompt 中明确提到的 `请求方法/method``接口URL/url``请求头/headers``请求参数/params/query``请求体/body``cookies` 生成。
- 前置接口:如果 prompt 提供 `前置``前置接口``setup` JSON必须在主请求前执行并支持从前置响应中提取变量。
- 变量提取:如果 prompt 提供 `提取``取值``extract` JSON应按指定 JSON 路径从响应中取值,并用于后续 headers、params、body 或 URL 模板。
- 后置接口:如果 prompt 提供 `后置``后置接口``teardown` JSON必须使用 `try/finally``teardown_method` 保证清理逻辑尽量执行。
- 不允许把 prompt 中没有明确给出的接口、字段、token、账号、密码、断言值自行补出来。
## 断言生成规则
- `status_code` 类型转为 `assert response.status_code == xxx`
- `json_equal` 类型转为对 `response.json()` 对应路径的等值断言。
- `json_exists` 类型转为对应 JSON 路径存在且非空断言。
- 如果 expectedResults 只描述“成功/正常”,至少断言 HTTP 状态码为 200 或 201。
- 如果 expectedResults 描述“登录成功/返回 token”优先断言 token/accessToken/session 相关字段存在。
## 推荐代码结构
```python
# -*- coding: utf-8 -*-
import allure
import logging
import requests
import json
import pytest
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
BASE_URL = "..."
def _mask_sensitive(data):
...
@allure.feature("模块名称")
class TestXxx(object):
def setup_method(self):
logging.info("-----------------------------Test Start-------------------------------")
def teardown_method(self):
logging.info("-----------------------------Test End-------------------------------")
@allure.story("验证xxx")
@allure.title("测试xxx接口")
def test_xxx(self):
with allure.step("1. 准备请求参数"):
...
with allure.step("2. 发送接口请求"):
...
with allure.step("3. 验证响应"):
...
```

View File

@@ -0,0 +1,81 @@
# -*- coding:utf-8 -*-
import re
def _append_unique(assertions, assertion):
for existing in assertions:
if existing == assertion:
return
assertions.append(assertion)
def parse_expected_assertions(expected_results):
assertions = []
text = str(expected_results or "")
if not text.strip():
return assertions
status_match = re.search(r"(?:HTTP)?\s*状态码\s*(?:为|是|=|等于)?\s*(\d{3})", text, re.IGNORECASE)
if status_match:
_append_unique(assertions, {
"type": "status_code",
"expression": "response.status_code == {0}".format(status_match.group(1)),
"source": status_match.group(0)
})
for match in re.finditer(r"(?:返回)?\s*code\s*(?:为|是|=|等于)\s*([0-9]+)", text, re.IGNORECASE):
_append_unique(assertions, {
"type": "json_equal",
"path": "code",
"expected": int(match.group(1)),
"expression": "response_json.get('code') == {0}".format(match.group(1)),
"source": match.group(0)
})
for match in re.finditer(r"(?:返回)?\s*(?:message|msg)\s*(?:为|是|=|等于)\s*['\"]?([^'\",。;;\n\r]+)['\"]?", text, re.IGNORECASE):
expected = match.group(1).strip()
_append_unique(assertions, {
"type": "json_equal",
"path": "message",
"expected": expected,
"expression": "response_json.get('message') == {0!r}".format(expected),
"source": match.group(0)
})
field_patterns = [
r"返回\s*([A-Za-z0-9_.]+)\s*字段",
r"包含\s*([A-Za-z0-9_.]+)\s*字段",
r"([A-Za-z0-9_.]*(?:token|accessToken|data|id|list|records|total)[A-Za-z0-9_.]*)\s*(?:不为空|存在)",
]
for pattern in field_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
path = match.group(1).strip(" .")
if not path:
continue
if path.lower() in ("http", "code", "message", "msg"):
continue
if "." not in path and path.lower() in ("token", "accesstoken"):
path = "data.{0}".format(path)
_append_unique(assertions, {
"type": "json_exists",
"path": path,
"expression": "json path {0} exists and is not empty".format(path),
"source": match.group(0)
})
if ("成功" in text or "正常" in text) and not any(item.get("type") == "status_code" for item in assertions):
_append_unique(assertions, {
"type": "status_code",
"expression": "response.status_code == 200",
"source": "成功/正常"
})
if ("登录成功" in text or "token" in text.lower()) and not any(item.get("path") in ("data.token", "token", "data.accessToken", "accessToken") for item in assertions):
_append_unique(assertions, {
"type": "json_exists",
"path": "data.token",
"expression": "json path data.token exists and is not empty",
"source": "登录成功/token"
})
return assertions

View File

@@ -0,0 +1,453 @@
# -*- coding:utf-8 -*-
import json
import os
import re
import requests
from base_framework.platform_tools.Create_api_testcase.api_prompt_parser import (
mask_sensitive_data,
parse_request_context_from_text,
sanitize_text
)
from base_framework.platform_tools.Create_api_testcase.assertion_parser import parse_expected_assertions
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..", "..", ".."))
LOCAL_CONFIG_PATH = os.path.join(CURRENT_DIR, "config.json")
SHARED_CONFIG_PATH = os.path.join(CURRENT_DIR, "..", "Create_ui_testcase", "config.json")
SKILL_PATH = os.path.join(CURRENT_DIR, "api_testing_skill.md")
GENERATED_CASES_DIR = os.path.join(CURRENT_DIR, "generated_cases")
DEFAULT_HEADERS = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"Pragma": "no-cache",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
MODEL_HEADERS = {
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
def load_config(config_path=None):
env_config = {
"api_key": os.getenv("ROUTIN_API_KEY"),
"base_url": os.getenv("ROUTIN_BASE_URL"),
"model": os.getenv("ROUTIN_MODEL")
}
if all(env_config.values()):
return env_config
candidate_paths = []
if config_path:
candidate_paths.append(config_path)
candidate_paths.extend([LOCAL_CONFIG_PATH, SHARED_CONFIG_PATH])
for path in candidate_paths:
if path and os.path.exists(path):
with open(path, "r", encoding="utf-8") as file:
config = json.load(file)
for key, value in env_config.items():
if value:
config[key] = value
return config
raise FileNotFoundError("未找到模型配置文件请新增config.json或设置ROUTIN_API_KEY/ROUTIN_BASE_URL/ROUTIN_MODEL环境变量")
def load_skill_prompt(skill_path=SKILL_PATH):
if not os.path.exists(skill_path):
raise FileNotFoundError("接口自动化Skill文件不存在{0}".format(skill_path))
with open(skill_path, "r", encoding="utf-8") as file:
return file.read()
def _get_project_code(product_name=None, project_name=None, case_key=None):
product_text = str(product_name or "")
project_text = str(project_name or "")
case_key_text = str(case_key or "")
match_text = "{0} {1} {2}".format(product_text, project_text, case_key_text).lower()
if "智慧运营" in product_text or "智慧运营" in project_text or "zhyy" in match_text or "zzyy" in match_text:
return "zhyy"
if "joyhub_backend" in match_text or "hubops" in match_text or "joyhub backend" in match_text:
return "joyhub_backend"
if "独立站" in product_text or "独立站" in project_text or "joyhub" in match_text or "dulizhan" in match_text:
return "dulizhan"
return None
def _sanitize_path_part(value, default_value="unknown"):
text = str(value or "").strip()
if not text:
text = default_value
return re.sub(r'[\\/:*?"<>|\s]+', "_", text).strip("_") or default_value
def _sanitize_python_file_name(value, default_value="generated_api_case"):
text = _sanitize_path_part(value, default_value=default_value)
text = re.sub(r"[^0-9A-Za-z_\u4e00-\u9fa5]+", "_", text).strip("_")
if not text:
text = default_value
if not text.startswith("test_"):
text = "test_{0}".format(text)
return "{0}.py".format(text)
def _resolve_api_testcase_dir(product_name, project_name, case_key, module_name):
project_code = _get_project_code(
product_name=product_name,
project_name=project_name,
case_key=case_key
)
safe_module_name = _sanitize_path_part(module_name, default_value="Generated")
if project_code == "joyhub_backend":
return os.path.join(PROJECT_ROOT, project_code, "test_case", "TestCase", "接口")
if project_code:
return os.path.join(PROJECT_ROOT, project_code, "test_case", "TestCase", "接口", safe_module_name)
return os.path.join(
GENERATED_CASES_DIR,
_sanitize_path_part(project_name, default_value="unknown_project"),
safe_module_name
)
def build_api_testcase_prompt(project_id,
case_id,
automation_type,
prompt,
case_key,
module_name,
product_name,
project_name,
steps,
expected_results,
parsed_api_context=None,
assertion_suggestions=None,
skill_prompt=None):
if skill_prompt is None:
skill_prompt = load_skill_prompt()
if parsed_api_context is None:
parsed_api_context = parse_request_context_from_text(prompt, steps, expected_results)
if assertion_suggestions is None:
assertion_suggestions = parse_expected_assertions(expected_results)
safe_parsed_api_context = mask_sensitive_data(parsed_api_context)
case_info = {
"projectId": project_id,
"caseId": case_id,
"automationType": automation_type,
"caseKey": case_key,
"moduleName": module_name,
"productName": product_name,
"projectName": project_name,
"steps": steps,
"expectedResults": expected_results,
"userPrompt": sanitize_text(prompt),
"parsedApiContext": safe_parsed_api_context,
"assertionSuggestions": assertion_suggestions
}
return """你需要严格遵循以下接口自动化测试生成规则:
{skill_prompt}
下面是测试平台传入的测试用例信息:
{case_info}
生成要求:
1. 只根据用户prompt中明确提供的接口、请求参数、前置参数、后置处理和预期结果生成 Python 接口自动化 pytest 用例。
2. 代码风格贴合当前项目已有接口用例requests + pytest + allure + logging。
3. 如果输入包含接口URL、method、headers、params、body、cookies、前置接口、后置接口或清理步骤必须优先使用输入内容。
4. 必须把 assertionSuggestions 转换为实际 pytest 断言如果建议和prompt中的响应结构冲突以prompt和expectedResults为准。
5. 如果缺少必要信息不要自动联想、不要编造真实地址、token、账号或密码使用TODO常量并通过pytest.skip提示补充。
6. 敏感字段不要明文写入Allure附件或日志。
7. 只输出Python代码不要输出Markdown解释。
""".format(
skill_prompt=skill_prompt,
case_info=json.dumps(case_info, ensure_ascii=False, indent=2)
)
def build_generate_automation_payload(project_id,
case_id,
automation_type,
prompt,
case_key,
module_name,
product_name,
project_name,
steps,
expected_results,
extra_fields=None):
payload = {
"projectId": project_id,
"caseId": case_id,
"automationType": automation_type,
"prompt": prompt,
"caseKey": case_key,
"moduleName": module_name,
"productName": product_name,
"projectName": project_name,
"steps": steps,
"expectedResults": expected_results
}
if isinstance(extra_fields, dict):
payload.update(extra_fields)
return payload
def extract_streaming_response_text(stream_text):
deltas = []
done_text = ""
for line in stream_text.splitlines():
line = line.strip()
if not line.startswith("data:"):
continue
payload_text = line.replace("data:", "", 1).strip()
if not payload_text or payload_text == "[DONE]":
continue
try:
payload = json.loads(payload_text)
except ValueError:
continue
event_type = payload.get("type")
if event_type == "response.output_text.delta":
delta = payload.get("delta")
if isinstance(delta, str):
deltas.append(delta)
elif event_type == "response.output_text.done":
text = payload.get("text")
if isinstance(text, str) and text.strip():
done_text = text
generated_text = "".join(deltas).strip()
if generated_text:
return generated_text
return done_text.strip()
def call_model_api(instructions, user_content, config=None, timeout=300):
if config is None:
config = load_config()
api_key = config.get("api_key")
base_url = config.get("base_url")
model = config.get("model")
if not api_key:
raise ValueError("api_key不能为空请先配置config.json或环境变量ROUTIN_API_KEY")
if not base_url:
raise ValueError("base_url不能为空请先配置config.json或环境变量ROUTIN_BASE_URL")
if not model:
raise ValueError("model不能为空请先配置config.json或环境变量ROUTIN_MODEL")
headers = MODEL_HEADERS.copy()
headers["Authorization"] = "Bearer {0}".format(api_key)
payload = {
"model": model,
"instructions": instructions,
"input": user_content,
"max_output_tokens": 4096,
"store": False,
"stream": True
}
api_url = base_url.rstrip("/")
if not api_url.endswith("/responses"):
api_url = api_url + "/responses"
response = requests.post(
url=api_url,
headers=headers,
json=payload,
timeout=timeout
)
if response.status_code >= 400:
raise RuntimeError(
"大模型Responses接口调用失败url={0}status_code={1}response={2}".format(
api_url,
response.status_code,
response.text[:2000]
)
)
generated_text = extract_streaming_response_text(response.text)
if generated_text:
return generated_text
try:
return response.json()
except ValueError:
raise RuntimeError("大模型流式响应未解析到正文,响应预览:{0}".format(response.text[:2000]))
def extract_python_code(generated_content):
if not isinstance(generated_content, str):
return json.dumps(generated_content, ensure_ascii=False, indent=2)
matches = re.findall(r"```(?:python|py)?\s*([\s\S]*?)```", generated_content, re.IGNORECASE)
python_blocks = []
for block in matches:
block_text = block.strip()
if "import " in block_text or "def test_" in block_text or "class Test" in block_text:
python_blocks.append(block_text)
if python_blocks:
return "\n\n".join(python_blocks).strip() + "\n"
return generated_content.strip() + "\n"
def _next_available_file_path(target_dir, file_name):
file_path = os.path.join(target_dir, file_name)
if not os.path.exists(file_path):
return file_path
base_name, ext = os.path.splitext(file_name)
index = 1
while True:
candidate = os.path.join(target_dir, "{0}_{1}{2}".format(base_name, index, ext))
if not os.path.exists(candidate):
return candidate
index += 1
def save_generated_api_testcase(generated_content,
product_name,
project_name,
module_name,
case_key):
target_dir = _resolve_api_testcase_dir(
product_name=product_name,
project_name=project_name,
case_key=case_key,
module_name=module_name
)
os.makedirs(target_dir, exist_ok=True)
file_name = _sanitize_python_file_name(case_key, default_value="generated_api_case")
file_path = _next_available_file_path(target_dir, file_name)
testcase_code = extract_python_code(generated_content)
with open(file_path, "w", encoding="utf-8") as file:
file.write(testcase_code)
return file_path
def generate_api_automation_testcase(project_id,
case_id,
automation_type,
prompt,
case_key,
module_name,
product_name,
project_name,
steps,
expected_results,
config=None,
skill_prompt=None,
timeout=120):
if automation_type not in ("api", "interface", "接口"):
raise ValueError("automation_type必须为api/interface/接口")
parsed_api_context = parse_request_context_from_text(prompt, steps, expected_results)
assertion_suggestions = parse_expected_assertions(expected_results)
final_prompt = build_api_testcase_prompt(
project_id=project_id,
case_id=case_id,
automation_type=automation_type,
prompt=prompt,
case_key=case_key,
module_name=module_name,
product_name=product_name,
project_name=project_name,
steps=steps,
expected_results=expected_results,
parsed_api_context=parsed_api_context,
assertion_suggestions=assertion_suggestions,
skill_prompt=skill_prompt
)
instructions = "你是资深接口自动化测试专家负责生成稳定、可维护、可落地的Python pytest接口自动化测试用例。"
return call_model_api(
instructions=instructions,
user_content=final_prompt,
config=config,
timeout=timeout
)
def generate_automation_case(url,
project_id,
case_id,
automation_type,
prompt,
case_key,
module_name,
product_name,
project_name,
steps,
expected_results,
access_token,
cookie=None,
timeout=60,
extra_fields=None):
if not url:
raise ValueError("url不能为空请由调用方传入生成自动化用例接口地址")
headers = DEFAULT_HEADERS.copy()
headers["accessToken"] = access_token
if cookie:
headers["Cookie"] = cookie
payload = build_generate_automation_payload(
project_id=project_id,
case_id=case_id,
automation_type=automation_type,
prompt=prompt,
case_key=case_key,
module_name=module_name,
product_name=product_name,
project_name=project_name,
steps=steps,
expected_results=expected_results,
extra_fields=extra_fields
)
response = requests.post(
url=url,
headers=headers,
data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
timeout=timeout
)
response.raise_for_status()
try:
return response.json()
except ValueError:
return response.text

View File

@@ -0,0 +1,165 @@
# -*- coding:utf-8 -*-
import argparse
import json
import traceback
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse
from base_framework.platform_tools.Create_api_testcase.generate_api_automation import (
generate_api_automation_testcase,
save_generated_api_testcase
)
API_PATH = "/it/api/case/generate-automation"
SUPPORTED_AUTOMATION_TYPES = ("api", "interface", "接口")
class GenerateApiAutomationHandler(BaseHTTPRequestHandler):
def do_OPTIONS(self):
self._send_json_response(200, {"code": 0, "message": "ok", "data": None})
def do_POST(self):
request_path = urlparse(self.path).path
if request_path != API_PATH:
self._send_json_response(404, {
"code": 404,
"message": "接口不存在:{0}".format(request_path),
"data": None
})
return
try:
request_data = self._read_json_body()
self._validate_required_fields(request_data)
automation_type = request_data.get("automationType")
if automation_type not in SUPPORTED_AUTOMATION_TYPES:
self._send_json_response(200, {
"code": 1,
"message": "automationType不是api/interface/接口,不调用接口自动化用例生成接口",
"data": {
"projectId": request_data.get("projectId"),
"caseId": request_data.get("caseId"),
"automationType": automation_type,
"caseKey": request_data.get("caseKey"),
"content": self._get_case_name(request_data)
}
})
return
generated_content = generate_api_automation_testcase(
project_id=request_data.get("projectId"),
case_id=request_data.get("caseId"),
automation_type=automation_type,
prompt=request_data.get("prompt"),
case_key=request_data.get("caseKey"),
module_name=request_data.get("moduleName"),
product_name=request_data.get("productName"),
project_name=request_data.get("projectName"),
steps=request_data.get("steps"),
expected_results=request_data.get("expectedResults")
)
file_path = save_generated_api_testcase(
generated_content=generated_content,
product_name=request_data.get("productName"),
project_name=request_data.get("projectName"),
module_name=request_data.get("moduleName"),
case_key=request_data.get("caseKey")
)
self._send_json_response(200, {
"code": 0,
"message": "success",
"data": {
"projectId": request_data.get("projectId"),
"caseId": request_data.get("caseId"),
"automationType": automation_type,
"caseKey": request_data.get("caseKey"),
"content": self._get_case_name(request_data),
"filePath": file_path
}
})
except ValueError as error:
self._send_json_response(400, {
"code": 400,
"message": str(error),
"data": None
})
except Exception as error:
self._send_json_response(500, {
"code": 500,
"message": str(error),
"data": None,
"trace": traceback.format_exc()
})
def _read_json_body(self):
content_length = int(self.headers.get("Content-Length", 0))
if content_length <= 0:
raise ValueError("请求体不能为空")
body = self.rfile.read(content_length).decode("utf-8")
try:
return json.loads(body)
except ValueError:
raise ValueError("请求体必须是合法JSON")
def _validate_required_fields(self, request_data):
required_fields = [
"projectId",
"caseId",
"automationType",
"prompt",
"caseKey",
"moduleName",
"productName",
"projectName",
"steps",
"expectedResults"
]
missing_fields = []
for field in required_fields:
if field == "productName":
if field not in request_data or request_data.get(field) is None:
missing_fields.append(field)
elif request_data.get(field) in (None, ""):
missing_fields.append(field)
if missing_fields:
raise ValueError("缺少必填参数:{0}".format(", ".join(missing_fields)))
def _get_case_name(self, request_data):
return request_data.get("caseName") or request_data.get("title") or request_data.get("name") or "{0}-{1}".format(
request_data.get("caseKey"),
request_data.get("moduleName")
)
def _send_json_response(self, status_code, response_data):
response_body = json.dumps(response_data, ensure_ascii=False).encode("utf-8")
self.send_response(status_code)
self.send_header("Content-Type", "application/json;charset=UTF-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, accessToken, Authorization")
self.send_header("Content-Length", str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
def log_message(self, format, *args):
print("[{0}] {1}".format(self.log_date_time_string(), format % args))
def run_server(host="0.0.0.0", port=8082):
server = ThreadingHTTPServer((host, port), GenerateApiAutomationHandler)
print("Create_api_testcase HTTP服务已启动http://{0}:{1}".format(host, port))
print("接口地址POST {0}".format(API_PATH))
server.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="接口自动化用例生成HTTP服务")
parser.add_argument("--host", default="0.0.0.0", help="服务监听地址默认0.0.0.0")
parser.add_argument("--port", default=8082, type=int, help="服务监听端口默认8082")
args = parser.parse_args()
run_server(host=args.host, port=args.port)

View File

@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
import allure
import logging
import requests
import json
import pytest
try:
from joyhub_backend.library.joyhub_interface import JoyhubInterface
except Exception:
JoyhubInterface = None
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
BASE_URL = "http://test-manager-api.best-envision.com"
API_PATH = "admin/video/getVideoLabelList"
CASE_NAME = "获取视频标签列表"
def _mask_sensitive(data):
if data is None:
return data
sensitive_keys = {"authorization", "token", "access_token", "accessToken", "cookie", "password", "passwd"}
if isinstance(data, dict):
masked = {}
for key, value in data.items():
if str(key).lower() in sensitive_keys:
masked[key] = "***MASKED***"
else:
masked[key] = _mask_sensitive(value)
return masked
if isinstance(data, list):
return [_mask_sensitive(item) for item in data]
return data
def _attach_json(name, data):
allure.attach(
json.dumps(_mask_sensitive(data), ensure_ascii=False, indent=2),
name,
allure.attachment_type.JSON
)
def _normalize_response(response):
if isinstance(response, requests.Response):
try:
response_json = response.json()
except ValueError:
response_json = None
return response.status_code, response_json, response.text
if isinstance(response, dict):
return 200, response, json.dumps(response, ensure_ascii=False)
return None, None, str(response)
@allure.feature("接口")
class TestHubopsVideoLabelList(object):
def setup_method(self):
logging.info("-----------------------------Test Start-------------------------------")
def teardown_method(self):
logging.info("-----------------------------Test End-------------------------------")
@allure.story("获取视频标签列表")
@allure.title("测试HubOps获取视频标签列表接口")
def test_get_video_label_list(self):
if JoyhubInterface is None:
pytest.skip("joyhub_backend项目优先使用JoyhubInterface鉴权封装请确认joyhub_backend.library.joyhub_interface.JoyhubInterface可导入")
with allure.step("1. 准备请求参数"):
method = "POST"
headers = {
"Content-Type": "application/json"
}
body = {
"page": 1,
"limit": 10,
"sort": "id",
"label_name": "",
"category_id": 0,
"video_type": 0,
"video_num": 0,
"created_at": [],
"order": "descending"
}
logging.info("请求方法: %s", method)
logging.info("请求路径: %s", API_PATH)
_attach_json("请求头", headers)
_attach_json("请求体", body)
with allure.step("2. 调用HubOps获取视频标签列表接口"):
client = JoyhubInterface()
response = client.request(
CASE_NAME,
method,
API_PATH,
body=body,
headers=headers,
expected_code=0
)
status_code, response_json, response_text = _normalize_response(response)
allure.attach(str(status_code), "HTTP状态码", allure.attachment_type.TEXT)
if response_json is not None:
_attach_json("响应JSON", response_json)
else:
allure.attach(response_text, "响应内容", allure.attachment_type.TEXT)
with allure.step("3. 校验响应基础字段"):
assert status_code == 200, "HTTP状态码应为200实际为: {}".format(status_code)
assert response_json is not None, "响应内容应为JSON且不能为空"
assert isinstance(response_json, dict), "响应JSON应为对象类型"
assert "code" in response_json, "响应JSON应包含code字段"
assert response_json.get("code") == 0, "响应code应为0实际为: {}".format(response_json.get("code"))
assert "msg" in response_json, "响应JSON应包含msg字段"
assert response_json.get("msg") is not None, "响应msg字段不应为None"
assert "data" in response_json, "响应JSON应包含data字段"
assert response_json.get("data") is not None, "响应data字段不应为None"
assert isinstance(response_json.get("data"), list), "响应data字段应为数组类型"
assert "count" in response_json, "响应JSON应包含count字段"
assert response_json.get("count") is not None, "响应count字段不应为None"