Update UI automation test files

This commit is contained in:
2026-05-19 18:04:34 +08:00
parent e0e22b895e
commit 6d7dba25d8
14 changed files with 1076 additions and 52 deletions

View File

@@ -737,19 +737,64 @@ def _build_import_lines(project_code, page_file_name, class_names):
return import_lines
def _normalize_generated_imports(code, project_code, resource_files, remove_local_resource_imports=False):
resource_modules = [
os.path.splitext(resource_file.get("fileName", ""))[0]
for resource_file in resource_files
if resource_file.get("fileName", "").endswith(".py")
]
normalized_code = str(code or "")
normalized_code = re.sub(r"^from\s+pages\.[^\n]+\n", "", normalized_code, flags=re.MULTILINE)
for module_name in resource_modules:
if not module_name:
continue
if remove_local_resource_imports:
normalized_code = re.sub(
r"^from\s+{0}\s+import\s+[^\n]+\n".format(re.escape(module_name)),
"",
normalized_code,
flags=re.MULTILINE
)
else:
normalized_code = re.sub(
r"^from\s+{0}\s+import\s+([^\n]+)".format(re.escape(module_name)),
r"from {0}.test_case.Resource.UI.{1} import \1".format(project_code, module_name),
normalized_code,
flags=re.MULTILINE
)
return normalized_code.strip() + "\n"
def _python_string_literal(value):
return json.dumps(str(value or ""), ensure_ascii=False)
def _prepend_import_lines_preserving_future(code, import_lines):
unique_import_lines = list(dict.fromkeys(line for line in import_lines if line))
if not unique_import_lines:
return str(code or "").strip() + "\n"
lines = str(code or "").strip().splitlines()
future_lines = []
while lines and lines[0].startswith("from __future__ import "):
future_lines.append(lines.pop(0))
if lines and not lines[0].strip():
lines.pop(0)
result_lines = []
result_lines.extend(future_lines)
result_lines.extend(unique_import_lines)
result_lines.append("")
result_lines.extend(lines)
return "\n".join(result_lines).strip() + "\n"
def _ensure_allure_markers(testcase_code, module_name=None, case_name=None, project_name=None):
code = testcase_code.strip() + "\n"
if "import allure" not in code:
import_match = list(re.finditer(r"^(?:import\s+[^\n]+|from\s+[^\n]+\s+import\s+[^\n]+)$", code, re.MULTILINE))
if import_match:
insert_at = import_match[-1].end()
code = code[:insert_at] + "\nimport allure" + code[insert_at:]
else:
code = "import allure\n" + code
code = _prepend_import_lines_preserving_future(code, ["import allure"])
decorators = []
if project_name and "@allure.feature" not in code:
@@ -770,6 +815,19 @@ def _ensure_allure_markers(testcase_code, module_name=None, case_name=None, proj
return code.strip() + "\n"
def _normalize_missing_secret_asserts(testcase_code):
code = str(testcase_code or "")
if "pytest.skip" in code:
return code.strip() + "\n"
code = re.sub(
r"^(\s*)assert\s+([^\n,]+(?:password|PASSWORD|token|TOKEN|secret|SECRET)[^\n,]*),\s*([^\n]+)$",
r"\1if not \2:\n\1 pytest.skip(\3)",
code,
flags=re.MULTILINE
)
return code.strip() + "\n"
def split_ui_generated_code(generated_content, product_name, project_name, case_key, module_name=None):
explicit_blocks = extract_python_file_blocks(generated_content)
project_code = _resolve_project_code_or_default(product_name, project_name, case_key)
@@ -799,9 +857,26 @@ def split_ui_generated_code(generated_content, product_name, project_name, case_
class_name
))
test_code = "\n\n".join(test_code_parts).strip() + "\n"
test_code = re.sub(r"^from\s+pages\.[^\n]+\n", "", test_code, flags=re.MULTILINE)
test_code = _normalize_generated_imports(
code=test_code,
project_code=project_code,
resource_files=resource_files,
remove_local_resource_imports=True
)
if import_lines:
test_code = "\n".join(dict.fromkeys(import_lines)) + "\n" + test_code
test_code = _prepend_import_lines_preserving_future(test_code, import_lines)
resource_files = [
{
"fileName": resource_file["fileName"],
"code": _normalize_generated_imports(
code=resource_file["code"],
project_code=project_code,
resource_files=resource_files,
remove_local_resource_imports=False
)
}
for resource_file in resource_files
]
return test_code, resource_files
full_code = extract_python_code(generated_content)
@@ -827,7 +902,7 @@ def split_ui_generated_code(generated_content, product_name, project_name, case_
class_names.extend(_extract_defined_class_names(block))
import_lines = _build_import_lines(project_code, page_file_name, class_names)
if import_lines:
test_code = "\n".join(import_lines) + "\n" + test_code
test_code = _prepend_import_lines_preserving_future(test_code, import_lines)
test_code = re.sub(r"^from\s+pages\.[^\n]+\n", "", test_code, flags=re.MULTILINE)
return test_code.strip() + "\n", resource_files
@@ -881,6 +956,7 @@ def save_generated_ui_testcase(generated_content,
case_name=case_name or case_key,
project_name=project_name
)
testcase_code = _normalize_missing_secret_asserts(testcase_code)
with open(file_path, "w", encoding="utf-8") as file:
file.write(testcase_code)
@@ -888,6 +964,58 @@ def save_generated_ui_testcase(generated_content,
return file_path
def _classify_pytest_result(return_code, output):
output_text = str(output or "")
if return_code == -1:
return "timeout"
if return_code != 0:
return "failed"
if re.search(r"\b\d+\s+skipped\b", output_text, re.IGNORECASE):
return "skipped"
return "passed"
def _build_verification_message(status, failure_reason=None):
if status == "passed":
return "用例已生成并执行通过"
if status == "skipped":
return "用例已生成,但 pytest 执行被跳过,请检查环境变量、测试数据或跳过条件"
if status == "timeout":
return "用例已生成,但 pytest 执行超时"
return failure_reason or "用例已生成,但 pytest 执行失败"
def analyze_generated_case_quality(testcase_path, product_name, project_name, case_key):
files = read_generated_case_files(
testcase_path=testcase_path,
product_name=product_name,
project_name=project_name,
case_key=case_key,
only_imported_resources=True
)
todo_items = []
for relative_path, content in files.items():
for line_number, line in enumerate(str(content or "").splitlines(), 1):
if "TODO" in line or "PLACEHOLDER" in line:
todo_items.append({
"file": relative_path,
"line": line_number,
"text": line.strip()[:200]
})
warnings = []
if todo_items:
warnings.append("生成文件包含 TODO/PLACEHOLDER需要人工补充页面元素定位或测试数据")
return {
"hasTodoSelectors": bool(todo_items),
"todoCount": len(todo_items),
"requiresManualCompletion": bool(todo_items),
"warnings": warnings,
"todoItems": todo_items[:20]
}
def run_generated_ui_testcase(testcase_path, timeout=180):
command = [
sys.executable,
@@ -908,12 +1036,16 @@ def run_generated_ui_testcase(testcase_path, timeout=180):
stdout = process.stdout or ""
stderr = process.stderr or ""
output = (stdout + "\n" + stderr).strip()
verification_status = _classify_pytest_result(process.returncode, output)
failure_reason = summarize_test_failure(output)
return {
"success": process.returncode == 0,
"returnCode": process.returncode,
"stdout": stdout,
"stderr": stderr,
"failureReason": summarize_test_failure(output)
"verificationStatus": verification_status,
"verificationMessage": _build_verification_message(verification_status, failure_reason),
"failureReason": failure_reason
}
except subprocess.TimeoutExpired as error:
output = ((error.stdout or "") + "\n" + (error.stderr or "")).strip()
@@ -925,15 +1057,20 @@ def run_generated_ui_testcase(testcase_path, timeout=180):
"returnCode": -1,
"stdout": error.stdout or "",
"stderr": error.stderr or "",
"verificationStatus": "timeout",
"verificationMessage": _build_verification_message("timeout", failure_reason),
"failureReason": failure_reason
}
except Exception as error:
failure_reason = "pytest执行异常{0}".format(str(error))
return {
"success": False,
"returnCode": -1,
"stdout": "",
"stderr": str(error),
"failureReason": "pytest执行异常{0}".format(str(error))
"verificationStatus": "failed",
"verificationMessage": _build_verification_message("failed", failure_reason),
"failureReason": failure_reason
}
@@ -959,18 +1096,35 @@ def summarize_test_failure(output, max_length=3000):
return summary[-max_length:]
def read_generated_case_files(testcase_path, product_name, project_name, case_key):
def _extract_imported_resource_files(testcase_path, project_code):
if not os.path.exists(testcase_path):
return []
with open(testcase_path, "r", encoding="utf-8") as file:
content = file.read()
modules = re.findall(
r"^from\s+{0}\.test_case\.Resource\.UI\.([A-Za-z_][A-Za-z0-9_]*)\s+import\s+".format(re.escape(project_code)),
content,
flags=re.MULTILINE
)
return ["{0}.py".format(module_name) for module_name in modules]
def read_generated_case_files(testcase_path, product_name, project_name, case_key, only_imported_resources=False):
files = {}
candidate_paths = [testcase_path]
project_code = _resolve_project_code_or_default(product_name, project_name, case_key)
resource_dir = _resolve_ui_resource_dir(
product_name=product_name,
project_name=project_name,
case_key=case_key
)
if os.path.isdir(resource_dir):
for file_name in os.listdir(resource_dir):
if file_name.endswith(".py"):
candidate_paths.append(os.path.join(resource_dir, file_name))
if only_imported_resources:
resource_file_names = _extract_imported_resource_files(testcase_path, project_code)
else:
resource_file_names = [file_name for file_name in os.listdir(resource_dir) if file_name.endswith(".py")]
for file_name in resource_file_names:
candidate_paths.append(os.path.join(resource_dir, file_name))
for path in candidate_paths:
if os.path.exists(path):
@@ -1119,10 +1273,18 @@ def generate_save_and_verify_ui_testcase(project_id,
last_failure_reason = None
for attempt_index in range(1, max_attempts + 1):
run_result = run_generated_ui_testcase(testcase_path)
quality = analyze_generated_case_quality(
testcase_path=testcase_path,
product_name=product_name,
project_name=project_name,
case_key=case_key
)
attempts.append({
"attempt": attempt_index,
"success": run_result["success"],
"returnCode": run_result["returnCode"],
"verificationStatus": run_result.get("verificationStatus"),
"verificationMessage": run_result.get("verificationMessage"),
"failureReason": run_result["failureReason"]
})
if run_result["success"]:
@@ -1130,6 +1292,9 @@ def generate_save_and_verify_ui_testcase(project_id,
"success": True,
"testcasePath": testcase_path,
"attempts": attempts,
"quality": quality,
"verificationStatus": run_result.get("verificationStatus"),
"verificationMessage": run_result.get("verificationMessage"),
"failureReason": None
}
@@ -1164,10 +1329,19 @@ def generate_save_and_verify_ui_testcase(project_id,
case_name=case_name
)
quality = analyze_generated_case_quality(
testcase_path=testcase_path,
product_name=product_name,
project_name=project_name,
case_key=case_key
)
return {
"success": False,
"testcasePath": testcase_path,
"attempts": attempts,
"quality": quality,
"verificationStatus": attempts[-1].get("verificationStatus") if attempts else "failed",
"verificationMessage": attempts[-1].get("verificationMessage") if attempts else "用例生成后未执行",
"failureReason": "用例生成后执行并自动修复 {0} 次仍失败:\n{1}".format(max_attempts, last_failure_reason)
}

View File

@@ -1,7 +1,6 @@
# -*- coding:utf-8 -*-
import argparse
import json
import traceback
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse
@@ -46,52 +45,42 @@ class GenerateAutomationHandler(BaseHTTPRequestHandler):
})
return
generation_mode = self._get_generation_mode(request_data)
normalized_prompt = self._build_effective_prompt(request_data)
normalized_steps = self._normalize_text_list(request_data.get("steps"))
normalized_expected_results = self._normalize_text_list(request_data.get("expectedResults"))
max_attempts = self._get_max_attempts(request_data, generation_mode)
verify_result = generate_save_and_verify_ui_testcase(
project_id=request_data.get("projectId"),
case_id=request_data.get("caseId"),
automation_type=request_data.get("automationType"),
prompt=request_data.get("prompt"),
prompt=normalized_prompt,
case_key=request_data.get("caseKey"),
module_name=request_data.get("moduleName"),
product_name=request_data.get("productName"),
project_name=request_data.get("projectName"),
steps=request_data.get("steps"),
expected_results=request_data.get("expectedResults"),
steps=normalized_steps,
expected_results=normalized_expected_results,
case_name=self._get_case_name(request_data),
enable_reconnaissance=request_data.get("enableReconnaissance", True),
enable_reconnaissance=self._should_enable_reconnaissance(request_data, generation_mode),
headless=request_data.get("headless", True),
max_attempts=request_data.get("maxAttempts", 3)
max_attempts=max_attempts
)
response_data = self._build_response_data(request_data, verify_result, generation_mode)
if not verify_result.get("success"):
self._send_json_response(200, {
"code": 1,
"message": "用例生成后执行失败,自动修复{0}次仍未通过".format(request_data.get("maxAttempts", 3)),
"data": {
"projectId": request_data.get("projectId"),
"caseId": request_data.get("caseId"),
"automationType": request_data.get("automationType"),
"caseKey": request_data.get("caseKey"),
"content": self._get_case_name(request_data),
"testcasePath": verify_result.get("testcasePath"),
"attempts": verify_result.get("attempts"),
"failureReason": verify_result.get("failureReason")
}
"code": self._build_failure_code(verify_result),
"message": verify_result.get("verificationMessage") or "用例生成后执行失败,自动修复{0}次仍未通过".format(max_attempts),
"data": response_data
})
return
self._send_json_response(200, {
"code": 0,
"message": "success",
"data": {
"projectId": request_data.get("projectId"),
"caseId": request_data.get("caseId"),
"automationType": request_data.get("automationType"),
"caseKey": request_data.get("caseKey"),
"content": self._get_case_name(request_data),
"testcasePath": verify_result.get("testcasePath"),
"attempts": verify_result.get("attempts")
}
"message": verify_result.get("verificationMessage") or "success",
"data": response_data
})
except ValueError as error:
self._send_json_response(400, {
@@ -102,9 +91,8 @@ class GenerateAutomationHandler(BaseHTTPRequestHandler):
except Exception as error:
self._send_json_response(500, {
"code": 500,
"message": str(error),
"data": None,
"trace": traceback.format_exc()
"message": self._sanitize_error_message(str(error)),
"data": None
})
def _read_json_body(self):
@@ -147,6 +135,67 @@ class GenerateAutomationHandler(BaseHTTPRequestHandler):
request_data.get("moduleName")
)
def _normalize_text_list(self, value):
if isinstance(value, list):
return "\n".join(str(item) for item in value)
return str(value or "")
def _get_generation_mode(self, request_data):
generation_mode = str(request_data.get("generationMode") or "fast").strip().lower()
if generation_mode not in ("fast", "recon"):
raise ValueError("generationMode仅支持fast或recon")
return generation_mode
def _should_enable_reconnaissance(self, request_data, generation_mode):
if "enableReconnaissance" in request_data:
return bool(request_data.get("enableReconnaissance"))
return generation_mode == "recon"
def _get_max_attempts(self, request_data, generation_mode):
default_attempts = 1 if generation_mode == "fast" else 2
try:
max_attempts = int(request_data.get("maxAttempts", default_attempts))
except (TypeError, ValueError):
raise ValueError("maxAttempts必须是整数")
return max(1, min(max_attempts, 3))
def _build_effective_prompt(self, request_data):
prompt_parts = [self._normalize_text_list(request_data.get("prompt"))]
selectors = request_data.get("selectors")
test_data = request_data.get("testData")
if isinstance(selectors, dict) and selectors:
prompt_parts.append("前端传入的页面元素 selectors\n{0}".format(json.dumps(selectors, ensure_ascii=False, indent=2)))
if isinstance(test_data, dict) and test_data:
prompt_parts.append("前端传入的测试数据 testData\n{0}".format(json.dumps(test_data, ensure_ascii=False, indent=2)))
return "\n\n".join(part for part in prompt_parts if part)
def _build_response_data(self, request_data, verify_result, generation_mode):
return {
"projectId": request_data.get("projectId"),
"caseId": request_data.get("caseId"),
"automationType": request_data.get("automationType"),
"caseKey": request_data.get("caseKey"),
"content": self._get_case_name(request_data),
"generationMode": generation_mode,
"testcasePath": verify_result.get("testcasePath"),
"attempts": verify_result.get("attempts"),
"quality": verify_result.get("quality"),
"verificationStatus": verify_result.get("verificationStatus"),
"verificationMessage": verify_result.get("verificationMessage"),
"failureReason": verify_result.get("failureReason")
}
def _build_failure_code(self, verify_result):
verification_status = verify_result.get("verificationStatus")
if verification_status == "skipped":
return 3002
if verification_status == "timeout":
return 3004
return 3001
def _sanitize_error_message(self, message):
return str(message or "").replace("password", "******").replace("密码", "密码******")
def _send_json_response(self, status_code, response_data):
response_body = json.dumps(response_data, ensure_ascii=False).encode("utf-8")
self.send_response(status_code)