diff --git a/base_framework/platform_tools/Create_ui_testcase/__init__.py b/base_framework/platform_tools/Create_ui_testcase/__init__.py new file mode 100644 index 0000000..380474e --- /dev/null +++ b/base_framework/platform_tools/Create_ui_testcase/__init__.py @@ -0,0 +1 @@ +# -*- coding:utf-8 -*- diff --git a/base_framework/platform_tools/Create_ui_testcase/generate_automation_api.py b/base_framework/platform_tools/Create_ui_testcase/generate_automation_api.py new file mode 100644 index 0000000..f0ef66c --- /dev/null +++ b/base_framework/platform_tools/Create_ui_testcase/generate_automation_api.py @@ -0,0 +1,1278 @@ +# -*- coding:utf-8 -*- +import json +import os +import re +import subprocess +import sys +from urllib.parse import urlparse + +import requests + + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +CONFIG_PATH = os.path.join(CURRENT_DIR, "config.json") +SKILL_PATH = os.path.abspath(os.path.join(CURRENT_DIR, "..", "..", "..", "skills", "webapp-testing", "SKILL.md")) +PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..", "..", "..")) +LEGACY_SCREENSHOT_DIR = os.path.join(CURRENT_DIR, "screenshots") +GENERATED_CASES_DIR = os.path.join(CURRENT_DIR, "generated_cases") + +DEFAULT_HEADERS = { + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-Type": "application/json;charset=UTF-8", + "Pragma": "no-cache", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36" +} + +MODEL_HEADERS = { + "Content-Type": "application/json", + "Accept": "text/event-stream" +} + + +def load_config(config_path=CONFIG_PATH): + with open(config_path, "r", encoding="utf-8") as file: + return json.load(file) + + +def load_skill_prompt(skill_path=SKILL_PATH): + if not os.path.exists(skill_path): + raise FileNotFoundError("Skill提示词文件不存在:{0}".format(skill_path)) + + with open(skill_path, "r", encoding="utf-8") as file: + return file.read() + + +def parse_prompt_context(prompt): + context = { + "urls": [], + "pageUrl": None, + "loginUrl": None, + "username": None, + "password": None, + "passwordProvided": False, + "testData": {}, + "selectors": {} + } + + if not prompt: + return context + + urls = re.findall(r"https?://[^\s,,。;;))]+", prompt) + context["urls"] = urls + if urls: + context["pageUrl"] = urls[0] + + field_patterns = [ + ("pageUrl", [r"(?:被测系统URL|被测页面URL|页面URL|访问地址|系统地址|pageUrl|url)[::]\s*(https?://[^\s,,。;;))]+)"]), + ("loginUrl", [r"(?:登录URL|登录地址|loginUrl)[::]\s*(https?://[^\s,,。;;))]+)"]), + ("username", [r"(?:登录账号|账号|用户名|username|user)[::]\s*([^\s,,。;;]+)"]), + ("password", [r"(?:登录密码|密码|password|pwd)[::]\s*([^\s,,。;;]+)"]) + ] + for field, patterns in field_patterns: + for pattern in patterns: + match = re.search(pattern, prompt, re.IGNORECASE) + if match: + context[field] = match.group(1).strip() + if field == "password": + context["passwordProvided"] = True + break + + keyword_match = re.search(r"(?:搜索关键字|关键字|keyword)[::]\s*([^\n,,。;;]+)", prompt, re.IGNORECASE) + if keyword_match: + context["testData"]["keyword"] = keyword_match.group(1).strip() + + selector_patterns = [ + ("usernameInput", r"(?:用户名输入框|账号输入框|usernameInput)[::]\s*([^\n]+)"), + ("passwordInput", r"(?:密码输入框|passwordInput)[::]\s*([^\n]+)"), + ("loginButton", r"(?:登录按钮|loginButton)[::]\s*([^\n]+)"), + ("searchInput", r"(?:搜索输入框|搜索框|searchInput)[::]\s*([^\n]+)"), + ("searchButton", r"(?:搜索按钮|查询按钮|searchButton)[::]\s*([^\n]+)"), + ("resultTable", r"(?:结果表格|列表表格|resultTable)[::]\s*([^\n]+)") + ] + for selector_name, pattern in selector_patterns: + match = re.search(pattern, prompt, re.IGNORECASE) + if match: + context["selectors"][selector_name] = match.group(1).strip() + + if context.get("loginUrl") is None and len(urls) > 1: + context["loginUrl"] = urls[1] + + return context + + +def _get_project_code(product_name=None, project_name=None, case_key=None): + product_text = str(product_name or "") + project_text = str(project_name or "") + case_key_text = str(case_key or "") + match_text = "{0} {1} {2}".format(product_text, project_text, case_key_text).lower() + + if "智慧运营" in product_text or "智慧运营" in project_text or "zhyy" in match_text or "zzyy" in match_text: + return "zhyy" + + if "独立站" in product_text or "独立站" in project_text or "joyhub" in match_text or "dulizhan" in match_text: + return "dulizhan" + + return None + + +def resolve_project_screenshot_dir(product_name=None, project_name=None, case_key=None): + project_code = _get_project_code( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + if project_code: + return os.path.join(PROJECT_ROOT, project_code, "screenshots") + + return os.path.join( + GENERATED_CASES_DIR, + _sanitize_path_part(project_name, default_value="unknown_project"), + "screenshots" + ) + + +def sanitize_prompt(prompt, context=None): + if not prompt: + return prompt + + if context is None: + context = parse_prompt_context(prompt) + + sanitized_prompt = prompt + password = context.get("password") + if password: + sanitized_prompt = sanitized_prompt.replace(password, "******") + + sanitized_prompt = re.sub( + r"((?:登录密码|密码|password|pwd)[::]\s*)([^\s,,。;;]+)", + r"\1******", + sanitized_prompt, + flags=re.IGNORECASE + ) + return sanitized_prompt + + +def sanitize_reconnaissance_context(context): + sanitized_context = dict(context or {}) + if sanitized_context.get("password"): + sanitized_context["password"] = "******" + return sanitized_context + + +def _safe_locator_count(page, selector): + try: + return page.locator(selector).count() + except Exception: + return 0 + + +def _collect_locator_samples(page, selector, limit=20): + samples = [] + try: + locators = page.locator(selector) + count = min(locators.count(), limit) + for index in range(count): + item = locators.nth(index) + try: + samples.append({ + "tag": item.evaluate("el => el.tagName.toLowerCase()"), + "text": item.inner_text(timeout=1000).strip()[:120], + "id": item.get_attribute("id"), + "name": item.get_attribute("name"), + "type": item.get_attribute("type"), + "placeholder": item.get_attribute("placeholder"), + "role": item.get_attribute("role"), + "ariaLabel": item.get_attribute("aria-label"), + "dataTestId": item.get_attribute("data-testid") or item.get_attribute("data-test") + }) + except Exception: + continue + except Exception: + pass + return samples + + +def _build_candidate_summary(page): + return { + "inputs": _collect_locator_samples(page, "input, textarea, [contenteditable='true']", limit=30), + "buttons": _collect_locator_samples(page, "button, [role='button'], input[type='button'], input[type='submit']", limit=30), + "links": _collect_locator_samples(page, "a", limit=20), + "tables": _collect_locator_samples(page, "table, [role='table'], .el-table, .ant-table", limit=10), + "counts": { + "input": _safe_locator_count(page, "input"), + "textarea": _safe_locator_count(page, "textarea"), + "button": _safe_locator_count(page, "button"), + "link": _safe_locator_count(page, "a"), + "table": _safe_locator_count(page, "table") + } + } + + +def reconnaissance_web_page(prompt, + case_key=None, + product_name=None, + project_name=None, + timeout=30000, + headless=True, + required=True): + context = parse_prompt_context(prompt) + page_url = context.get("pageUrl") + sanitized_context = sanitize_reconnaissance_context(context) + + if not page_url: + message = "prompt中未解析到被测页面URL" + if required: + raise ValueError(message) + return { + "enabled": False, + "reason": message, + "parsedContext": sanitized_context + } + + try: + from playwright.sync_api import sync_playwright + except ImportError: + message = "当前环境未安装playwright,无法执行页面侦察,请执行:pip install playwright && python -m playwright install chromium" + if required: + raise RuntimeError(message) + return { + "enabled": False, + "reason": message, + "parsedContext": sanitized_context + } + + result = { + "enabled": True, + "pageUrl": page_url, + "parsedContext": sanitized_context, + "headless": headless, + "loginAttempted": False, + "loginSucceededMaybe": False, + "finalUrl": None, + "title": None, + "screenshotPath": None, + "candidates": {}, + "errors": [] + } + + with sync_playwright() as p: + browser = p.chromium.launch(headless=headless) + page = browser.new_page() + try: + target_url = context.get("loginUrl") or page_url + page.goto(target_url, wait_until="networkidle", timeout=timeout) + + username = context.get("username") + password = context.get("password") + if username and password: + result["loginAttempted"] = True + before_login_url = page.url + selector_map = context.get("selectors") or {} + username_selector = selector_map.get("usernameInput") + password_selector = selector_map.get("passwordInput") + login_button_selector = selector_map.get("loginButton") + + if username_selector and password_selector: + page.locator(username_selector).fill(username, timeout=5000) + page.locator(password_selector).fill(password, timeout=5000) + else: + text_inputs = page.locator("input[type='text'], input:not([type]), input[type='email'], input[type='tel']") + password_inputs = page.locator("input[type='password']") + if text_inputs.count() > 0: + text_inputs.first.fill(username, timeout=5000) + if password_inputs.count() > 0: + password_inputs.first.fill(password, timeout=5000) + + if login_button_selector: + page.locator(login_button_selector).click(timeout=5000) + else: + login_buttons = page.locator("button:has-text('登录'), button:has-text('登陆'), input[type='submit'], [role='button']:has-text('登录')") + if login_buttons.count() > 0: + login_buttons.first.click(timeout=5000) + + try: + page.wait_for_load_state("networkidle", timeout=timeout) + except Exception as error: + result["errors"].append("登录后等待networkidle失败:{0}".format(str(error))) + result["loginSucceededMaybe"] = page.url != before_login_url or "login" not in page.url.lower() + + if page.url != page_url and page_url not in page.url: + try: + page.goto(page_url, wait_until="networkidle", timeout=timeout) + except Exception as error: + result["errors"].append("跳转被测页面失败:{0}".format(str(error))) + + result["finalUrl"] = page.url + result["title"] = page.title() + result["candidates"] = _build_candidate_summary(page) + + screenshot_dir = resolve_project_screenshot_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + os.makedirs(screenshot_dir, exist_ok=True) + safe_case_key = re.sub(r"[^0-9A-Za-z_.-]+", "_", str(case_key or "ui_reconnaissance")) + screenshot_path = os.path.join(screenshot_dir, "{0}.png".format(safe_case_key)) + page.screenshot(path=screenshot_path, full_page=True) + result["screenshotPath"] = screenshot_path + except Exception as error: + result["errors"].append(str(error)) + if required: + raise RuntimeError("页面侦察失败:{0}".format(str(error))) + finally: + browser.close() + + return result + + +def build_ui_testcase_prompt(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + case_name=None, + skill_prompt=None, + reconnaissance_result=None): + if skill_prompt is None: + skill_prompt = load_skill_prompt() + + screenshot_dir = resolve_project_screenshot_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + case_info = { + "projectId": project_id, + "caseId": case_id, + "automationType": automation_type, + "caseKey": case_key, + "moduleName": module_name, + "productName": product_name, + "projectName": project_name, + "screenshotDir": screenshot_dir, + "steps": steps, + "expectedResults": expected_results, + "caseName": case_name, + "userPrompt": sanitize_prompt(prompt), + "reconnaissanceResult": reconnaissance_result + } + + return """你需要严格遵循以下 UI 自动化测试 Skill 规则生成用例: + +{skill_prompt} + +下面是测试平台传入的测试用例信息与页面侦察结果: + +{case_info} + +生成要求: +1. 根据测试步骤、预期结果、用户prompt和页面侦察结果生成 Python Playwright UI 自动化测试用例。 +2. 优先使用页面侦察结果中的真实元素候选、URL、标题、输入框、按钮、表格信息生成 selector。 +3. 优先遵循 Skill 中的 Reconnaissance-Then-Action、Page Object、pytest、Allure 和稳定定位原则。 +4. 如果页面侦察失败或缺少关键 selector,不要编造,使用清晰的变量、占位符或 TODO 标识。 +5. 如用例需要截图,必须使用 case_info.screenshotDir 作为截图目录,并在代码中 os.makedirs(screenshot_dir, exist_ok=True) 后保存到该目录。 +6. 必须按分层结构输出代码:测试用例文件只包含 pytest 用例、fixture/测试数据和对页面对象的调用;BasePage、PageObject、业务操作方法、元素定位方法必须放到独立页面对象/公共方法文件中。 +7. 测试用例文件禁止定义 class BasePage、PageObject 类、非测试业务方法;只能 import 并调用外部类或方法。 +8. 测试用例必须使用 Allure 装饰器标记,至少包含 @allure.story(case_info.moduleName) 和 @allure.title(case_info.caseName),必要时可补充 @allure.feature(case_info.projectName);测试用例文件必须 import allure。 +9. 请优先使用以下 fenced code block 标记分文件输出,方便平台落盘: + ```python file=base_page.py + # 公共页面基类 + ``` + ```python file=<页面对象文件名>.py + # 页面对象和业务操作方法 + ``` + ```python file=test_case.py + # pytest 测试用例,只包含用例 + ``` +10. 不要输出无关解释。 +""".format( + skill_prompt=skill_prompt, + case_info=json.dumps(case_info, ensure_ascii=False, indent=2) + ) + + +def build_generate_automation_payload(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results): + return { + "projectId": project_id, + "caseId": case_id, + "automationType": automation_type, + "prompt": prompt, + "caseKey": case_key, + "moduleName": module_name, + "productName": product_name, + "projectName": project_name, + "steps": steps, + "expectedResults": expected_results + } + + +def extract_streaming_response_text(stream_text): + deltas = [] + done_text = "" + + for line in stream_text.splitlines(): + line = line.strip() + if not line.startswith("data:"): + continue + + payload_text = line.replace("data:", "", 1).strip() + if not payload_text or payload_text == "[DONE]": + continue + + try: + payload = json.loads(payload_text) + except ValueError: + continue + + event_type = payload.get("type") + if event_type == "response.output_text.delta": + delta = payload.get("delta") + if isinstance(delta, str): + deltas.append(delta) + elif event_type == "response.output_text.done": + text = payload.get("text") + if isinstance(text, str) and text.strip(): + done_text = text + + generated_text = "".join(deltas).strip() + if generated_text: + return generated_text + + return done_text.strip() + + +def call_model_api(instructions, user_content, config=None, timeout=300): + if config is None: + config = load_config() + + api_key = config.get("api_key") + base_url = config.get("base_url") + model = config.get("model") + + if not api_key: + raise ValueError("api_key不能为空,请先在config.json中配置或通过config参数传入") + if not base_url: + raise ValueError("base_url不能为空,请先在config.json中配置或通过config参数传入") + if not model: + raise ValueError("model不能为空,请先在config.json中配置或通过config参数传入") + + headers = MODEL_HEADERS.copy() + headers["Authorization"] = "Bearer {0}".format(api_key) + + payload = { + "model": model, + "instructions": instructions, + "input": user_content, + "max_output_tokens": 4096, + "store": False, + "stream": True + } + + api_url = base_url.rstrip("/") + if not api_url.endswith("/responses"): + api_url = api_url + "/responses" + + response = requests.post( + url=api_url, + headers=headers, + json=payload, + timeout=timeout + ) + + if response.status_code >= 400: + raise RuntimeError( + "大模型Responses接口调用失败,url={0},status_code={1},response={2}".format( + api_url, + response.status_code, + response.text[:2000] + ) + ) + + generated_text = extract_streaming_response_text(response.text) + if generated_text: + return generated_text + + try: + return response.json() + except ValueError: + raise RuntimeError("大模型流式响应未解析到正文,响应预览:{0}".format(response.text[:2000])) + + +def _sanitize_path_part(value, default_value="unknown"): + text = str(value or "").strip() + if not text: + text = default_value + return re.sub(r'[\\/:*?"<>|\s]+', "_", text).strip("_") or default_value + + +def _sanitize_python_file_name(value, default_value="generated_ui_case"): + text = _sanitize_path_part(value, default_value=default_value) + text = re.sub(r"[^0-9A-Za-z_\u4e00-\u9fa5]+", "_", text).strip("_") + if not text: + text = default_value + if not text.startswith("test_"): + text = "test_{0}".format(text) + return "{0}.py".format(text) + + +def _resolve_ui_testcase_dir(product_name, project_name, case_key, module_name): + project_code = _get_project_code( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + if project_code: + return os.path.join(PROJECT_ROOT, project_code, "test_case", "TestCase", "UI") + + return os.path.join( + GENERATED_CASES_DIR, + _sanitize_path_part(project_name, default_value="unknown_project") + ) + + +def extract_python_code(generated_content): + if not isinstance(generated_content, str): + return json.dumps(generated_content, ensure_ascii=False, indent=2) + + matches = re.findall(r"```(?:python|py)?\s*([\s\S]*?)```", generated_content, re.IGNORECASE) + python_blocks = [] + for block in matches: + block_text = block.strip() + if "import " in block_text or "def test_" in block_text or "class Test" in block_text or "class " in block_text: + python_blocks.append(block_text) + + if python_blocks: + return "\n\n".join(python_blocks).strip() + "\n" + + return generated_content.strip() + "\n" + + +def extract_python_file_blocks(generated_content): + if not isinstance(generated_content, str): + return [] + + pattern = r"```(?:python|py)\s+file=([^\s`]+)\s*\n([\s\S]*?)```" + blocks = [] + for file_name, code in re.findall(pattern, generated_content, re.IGNORECASE): + safe_file_name = _sanitize_python_module_file_name(file_name) + blocks.append({ + "fileName": safe_file_name, + "code": code.strip() + "\n" + }) + + if blocks: + return blocks + + code = extract_python_code(generated_content) + return extract_commented_python_file_blocks(code) + + +def _sanitize_python_module_file_name(file_name, default_value="generated.py"): + normalized = str(file_name or "").strip().replace("\\", "/") + base_name = os.path.basename(normalized) + safe_file_name = _sanitize_path_part(base_name, default_value=default_value) + if not safe_file_name.endswith(".py"): + safe_file_name = "{0}.py".format(safe_file_name) + return safe_file_name + + +def extract_commented_python_file_blocks(code): + if not isinstance(code, str): + return [] + + marker_pattern = re.compile(r"^#\s*(?:[\w.-]+/)*([A-Za-z_][\w.-]*\.py)\s*$") + blocks = [] + current_file_name = None + current_lines = [] + + for line in code.splitlines(): + marker_match = marker_pattern.match(line.strip()) + if marker_match: + if current_file_name and current_lines: + blocks.append({ + "fileName": _sanitize_python_module_file_name(current_file_name), + "code": "\n".join(current_lines).strip() + "\n" + }) + current_file_name = marker_match.group(1) + current_lines = [] + continue + + if current_file_name: + current_lines.append(line) + + if current_file_name and current_lines: + blocks.append({ + "fileName": _sanitize_python_module_file_name(current_file_name), + "code": "\n".join(current_lines).strip() + "\n" + }) + + if len(blocks) < 2: + return [] + + has_test_block = any("def test_" in block["code"] for block in blocks) + has_resource_block = any("class " in block["code"] for block in blocks) + if not has_test_block or not has_resource_block: + return [] + + return blocks + + +def _resolve_project_code_or_default(product_name, project_name, case_key): + return _get_project_code( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) or _sanitize_path_part(project_name, default_value="generated_cases") + + +def _resolve_ui_resource_dir(product_name, project_name, case_key): + project_code = _get_project_code( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + if project_code: + return os.path.join(PROJECT_ROOT, project_code, "test_case", "Resource", "UI") + + return os.path.join( + GENERATED_CASES_DIR, + _sanitize_path_part(project_name, default_value="unknown_project"), + "Resource", + "UI" + ) + + +def _ensure_python_package_dirs(path, stop_dir): + current = os.path.abspath(path) + stop_dir = os.path.abspath(stop_dir) + while current.startswith(stop_dir): + init_path = os.path.join(current, "__init__.py") + if not os.path.exists(init_path): + with open(init_path, "w", encoding="utf-8"): + pass + if current == stop_dir: + break + parent = os.path.dirname(current) + if parent == current: + break + current = parent + + +def _strip_class_blocks(code): + lines = code.splitlines() + kept_lines = [] + class_blocks = [] + index = 0 + + while index < len(lines): + line = lines[index] + if re.match(r"^class\s+\w+", line): + block = [line] + index += 1 + while index < len(lines): + next_line = lines[index] + if next_line and not next_line.startswith((" ", "\t")): + break + block.append(next_line) + index += 1 + class_blocks.append("\n".join(block).rstrip()) + continue + kept_lines.append(line) + index += 1 + + return "\n".join(kept_lines).strip() + "\n", class_blocks + + +def _extract_defined_class_names(code): + return re.findall(r"^class\s+(\w+)", code, re.MULTILINE) + + +def _normalize_networkidle_waits(code): + lines = code.splitlines() + normalized_lines = [] + for line in lines: + stripped = line.strip() + previous_stripped = normalized_lines[-1].strip() if normalized_lines else "" + if stripped == 'self.page.wait_for_load_state("networkidle")' and previous_stripped != "try:": + indent = line[:len(line) - len(line.lstrip())] + normalized_lines.append(indent + "try:") + normalized_lines.append(indent + ' self.page.wait_for_load_state("networkidle", timeout=5000)') + normalized_lines.append(indent + "except Exception:") + normalized_lines.append(indent + " pass") + else: + normalized_lines.append(line) + return "\n".join(normalized_lines).strip() + "\n" + + +def _build_import_lines(project_code, page_file_name, class_names): + module_name = os.path.splitext(page_file_name)[0] + import_lines = [] + if "BasePage" in class_names: + import_lines.append("from {0}.test_case.Resource.UI.base_page import BasePage".format(project_code)) + page_classes = [name for name in class_names if name != "BasePage"] + if page_classes: + import_lines.append("from {0}.test_case.Resource.UI.{1} import {2}".format( + project_code, + module_name, + ", ".join(page_classes) + )) + return import_lines + + +def _python_string_literal(value): + return json.dumps(str(value or ""), ensure_ascii=False) + + +def _ensure_allure_markers(testcase_code, module_name=None, case_name=None, project_name=None): + code = testcase_code.strip() + "\n" + if "import allure" not in code: + import_match = list(re.finditer(r"^(?:import\s+[^\n]+|from\s+[^\n]+\s+import\s+[^\n]+)$", code, re.MULTILINE)) + if import_match: + insert_at = import_match[-1].end() + code = code[:insert_at] + "\nimport allure" + code[insert_at:] + else: + code = "import allure\n" + code + + decorators = [] + if project_name and "@allure.feature" not in code: + decorators.append("@allure.feature({0})".format(_python_string_literal(project_name))) + if module_name and "@allure.story" not in code: + decorators.append("@allure.story({0})".format(_python_string_literal(module_name))) + title_text = case_name or "" + if title_text and "@allure.title" not in code: + decorators.append("@allure.title({0})".format(_python_string_literal(title_text))) + + if decorators: + code = re.sub( + r"(\n)(def\s+test_\w+\s*\()", + "\n" + "\n".join(decorators) + "\n" + r"\2", + code, + count=1 + ) + return code.strip() + "\n" + + +def split_ui_generated_code(generated_content, product_name, project_name, case_key, module_name=None): + explicit_blocks = extract_python_file_blocks(generated_content) + project_code = _resolve_project_code_or_default(product_name, project_name, case_key) + file_identity = "{0}_{1}".format( + _sanitize_path_part(case_key, default_value="generated_ui"), + _sanitize_path_part(module_name, default_value="ui") + ) + page_file_name = "{0}_page.py".format(file_identity) + + if explicit_blocks: + test_code_parts = [] + resource_files = [] + import_lines = [] + for block in explicit_blocks: + file_name = block["fileName"] + code = block["code"] + if file_name.startswith("test_") or file_name == "test_case.py" or "def test_" in code: + test_code_parts.append(code) + else: + resource_files.append({"fileName": file_name, "code": code}) + class_names = _extract_defined_class_names(code) + module_name = os.path.splitext(file_name)[0] + for class_name in class_names: + import_lines.append("from {0}.test_case.Resource.UI.{1} import {2}".format( + project_code, + module_name, + class_name + )) + test_code = "\n\n".join(test_code_parts).strip() + "\n" + test_code = re.sub(r"^from\s+pages\.[^\n]+\n", "", test_code, flags=re.MULTILINE) + if import_lines: + test_code = "\n".join(dict.fromkeys(import_lines)) + "\n" + test_code + return test_code, resource_files + + full_code = extract_python_code(generated_content) + test_code, class_blocks = _strip_class_blocks(full_code) + resource_files = [] + + base_blocks = [block for block in class_blocks if re.match(r"^class\s+BasePage", block)] + page_blocks = [block for block in class_blocks if not re.match(r"^class\s+BasePage", block)] + + if base_blocks: + base_code = "import os\nfrom playwright.sync_api import Page, expect\n\n\n" + "\n\n".join(base_blocks).strip() + "\n" + resource_files.append({"fileName": "base_page.py", "code": base_code}) + + if page_blocks: + page_imports = "import os\nimport re\nfrom playwright.sync_api import Page, expect\n" + if base_blocks: + page_imports += "from {0}.test_case.Resource.UI.base_page import BasePage\n".format(project_code) + page_code = page_imports + "\n\n" + "\n\n".join(page_blocks).strip() + "\n" + resource_files.append({"fileName": page_file_name, "code": page_code}) + + class_names = [] + for block in class_blocks: + class_names.extend(_extract_defined_class_names(block)) + import_lines = _build_import_lines(project_code, page_file_name, class_names) + if import_lines: + test_code = "\n".join(import_lines) + "\n" + test_code + + test_code = re.sub(r"^from\s+pages\.[^\n]+\n", "", test_code, flags=re.MULTILINE) + return test_code.strip() + "\n", resource_files + + +def save_generated_ui_testcase(generated_content, + product_name, + project_name, + module_name, + case_key, + case_name=None): + target_dir = _resolve_ui_testcase_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key, + module_name=module_name + ) + resource_dir = _resolve_ui_resource_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + os.makedirs(target_dir, exist_ok=True) + os.makedirs(resource_dir, exist_ok=True) + + project_dir = os.path.dirname(os.path.dirname(os.path.dirname(resource_dir))) + _ensure_python_package_dirs(resource_dir, project_dir) + + file_identity = "{0}_{1}".format( + _sanitize_path_part(case_key, default_value="generated_ui_case"), + _sanitize_path_part(module_name, default_value="ui") + ) + file_name = _sanitize_python_file_name(file_identity, default_value="generated_ui_case") + file_path = os.path.join(target_dir, file_name) + testcase_code, resource_files = split_ui_generated_code( + generated_content=generated_content, + product_name=product_name, + project_name=project_name, + case_key=case_key, + module_name=module_name + ) + + for resource_file in resource_files: + resource_file_path = os.path.join(resource_dir, resource_file["fileName"]) + with open(resource_file_path, "w", encoding="utf-8") as file: + file.write(_normalize_networkidle_waits(resource_file["code"])) + + testcase_code = _ensure_allure_markers( + testcase_code, + module_name=module_name, + case_name=case_name or case_key, + project_name=project_name + ) + + with open(file_path, "w", encoding="utf-8") as file: + file.write(testcase_code) + + return file_path + + +def run_generated_ui_testcase(testcase_path, timeout=180): + command = [ + sys.executable, + "-m", + "pytest", + testcase_path, + "-q", + "--tb=short" + ] + try: + process = subprocess.run( + command, + cwd=PROJECT_ROOT, + capture_output=True, + text=True, + timeout=timeout + ) + stdout = process.stdout or "" + stderr = process.stderr or "" + output = (stdout + "\n" + stderr).strip() + return { + "success": process.returncode == 0, + "returnCode": process.returncode, + "stdout": stdout, + "stderr": stderr, + "failureReason": summarize_test_failure(output) + } + except subprocess.TimeoutExpired as error: + output = ((error.stdout or "") + "\n" + (error.stderr or "")).strip() + failure_reason = "pytest执行超时,timeout={0}s".format(timeout) + if output: + failure_reason += "\n" + summarize_test_failure(output) + return { + "success": False, + "returnCode": -1, + "stdout": error.stdout or "", + "stderr": error.stderr or "", + "failureReason": failure_reason + } + except Exception as error: + return { + "success": False, + "returnCode": -1, + "stdout": "", + "stderr": str(error), + "failureReason": "pytest执行异常:{0}".format(str(error)) + } + + +def summarize_test_failure(output, max_length=3000): + text = str(output or "").strip() + if not text: + return "用例执行失败,但 pytest 未输出错误信息" + + failure_lines = [] + capture = False + for line in text.splitlines(): + stripped = line.strip() + if stripped.startswith(("E ", "FAILED ", "ERROR ", "ImportError", "ModuleNotFoundError", "NameError", "AttributeError", "AssertionError", "playwright._impl")): + failure_lines.append(line) + capture = True + continue + if capture and stripped: + failure_lines.append(line) + if len("\n".join(failure_lines)) >= max_length: + break + + summary = "\n".join(failure_lines).strip() or text[-max_length:] + return summary[-max_length:] + + +def read_generated_case_files(testcase_path, product_name, project_name, case_key): + files = {} + candidate_paths = [testcase_path] + resource_dir = _resolve_ui_resource_dir( + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + if os.path.isdir(resource_dir): + for file_name in os.listdir(resource_dir): + if file_name.endswith(".py"): + candidate_paths.append(os.path.join(resource_dir, file_name)) + + for path in candidate_paths: + if os.path.exists(path): + with open(path, "r", encoding="utf-8") as file: + files[os.path.relpath(path, PROJECT_ROOT)] = file.read() + return files + + +def build_ui_testcase_repair_prompt(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + case_name, + testcase_path, + failure_reason, + attempt_index): + current_files = read_generated_case_files( + testcase_path=testcase_path, + product_name=product_name, + project_name=project_name, + case_key=case_key + ) + case_info = { + "projectId": project_id, + "caseId": case_id, + "automationType": automation_type, + "caseKey": case_key, + "moduleName": module_name, + "productName": product_name, + "projectName": project_name, + "steps": steps, + "expectedResults": expected_results, + "caseName": case_name, + "userPrompt": sanitize_prompt(prompt), + "attemptIndex": attempt_index, + "failureReason": failure_reason, + "currentFiles": current_files + } + return """当前 UI 自动化用例执行失败,请根据失败原因修复并重新输出完整分文件代码。 + +要求: +1. 只修复失败原因相关问题,保持原始测试目标不变。 +2. 测试用例文件只包含 pytest 用例、fixture/测试数据和对页面对象的调用。 +3. BasePage、PageObject、业务操作方法、元素定位方法必须放到独立页面对象/公共方法文件中。 +4. 测试用例必须 import allure,并至少包含 @allure.story 与 @allure.title。 +5. 继续使用 fenced code block 标记分文件输出:```python file=base_page.py```、```python file=<页面对象文件名>.py```、```python file=test_case.py```。 +6. 不要输出解释,只输出可落盘代码。 + +失败上下文: +{case_info} +""".format(case_info=json.dumps(case_info, ensure_ascii=False, indent=2)) + + +def repair_ui_automation_testcase(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + case_name, + testcase_path, + failure_reason, + attempt_index, + config=None, + timeout=120): + repair_prompt = build_ui_testcase_repair_prompt( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + case_name=case_name, + testcase_path=testcase_path, + failure_reason=failure_reason, + attempt_index=attempt_index + ) + instructions = "你是资深UI自动化测试修复专家,负责根据pytest失败日志修复Python Playwright自动化测试用例。" + return call_model_api( + instructions=instructions, + user_content=repair_prompt, + config=config, + timeout=timeout + ) + + +def generate_save_and_verify_ui_testcase(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + case_name=None, + config=None, + skill_prompt=None, + timeout=120, + enable_reconnaissance=True, + headless=True, + max_attempts=3): + generated_content = generate_ui_automation_testcase( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + case_name=case_name, + config=config, + skill_prompt=skill_prompt, + timeout=timeout, + enable_reconnaissance=enable_reconnaissance, + headless=headless + ) + testcase_path = save_generated_ui_testcase( + generated_content=generated_content, + product_name=product_name, + project_name=project_name, + module_name=module_name, + case_key=case_key, + case_name=case_name + ) + + attempts = [] + last_failure_reason = None + for attempt_index in range(1, max_attempts + 1): + run_result = run_generated_ui_testcase(testcase_path) + attempts.append({ + "attempt": attempt_index, + "success": run_result["success"], + "returnCode": run_result["returnCode"], + "failureReason": run_result["failureReason"] + }) + if run_result["success"]: + return { + "success": True, + "testcasePath": testcase_path, + "attempts": attempts, + "failureReason": None + } + + last_failure_reason = run_result["failureReason"] + if attempt_index >= max_attempts: + break + + repaired_content = repair_ui_automation_testcase( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + case_name=case_name, + testcase_path=testcase_path, + failure_reason=last_failure_reason, + attempt_index=attempt_index + 1, + config=config, + timeout=timeout + ) + testcase_path = save_generated_ui_testcase( + generated_content=repaired_content, + product_name=product_name, + project_name=project_name, + module_name=module_name, + case_key=case_key, + case_name=case_name + ) + + return { + "success": False, + "testcasePath": testcase_path, + "attempts": attempts, + "failureReason": "用例生成后执行并自动修复 {0} 次仍失败:\n{1}".format(max_attempts, last_failure_reason) + } + + +def generate_ui_automation_testcase(project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + case_name=None, + config=None, + skill_prompt=None, + timeout=120, + enable_reconnaissance=True, + headless=True): + if automation_type != "ui": + raise ValueError("automation_type必须为ui") + + reconnaissance_result = None + if enable_reconnaissance: + reconnaissance_result = reconnaissance_web_page( + prompt=prompt, + case_key=case_key, + product_name=product_name, + project_name=project_name, + headless=headless, + required=True + ) + + final_prompt = build_ui_testcase_prompt( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results, + case_name=case_name, + skill_prompt=skill_prompt, + reconnaissance_result=reconnaissance_result + ) + + instructions = "你是资深UI自动化测试专家,负责生成稳定、可维护、可落地的Python Playwright自动化测试用例。" + + return call_model_api( + instructions=instructions, + user_content=final_prompt, + config=config, + timeout=timeout + ) + + +def generate_automation_case(url, + project_id, + case_id, + automation_type, + prompt, + case_key, + module_name, + product_name, + project_name, + steps, + expected_results, + access_token, + cookie=None, + timeout=60): + if not url: + raise ValueError("url不能为空,请由调用方传入生成自动化用例接口地址") + + headers = DEFAULT_HEADERS.copy() + headers["accessToken"] = access_token + + if cookie: + headers["Cookie"] = cookie + + payload = build_generate_automation_payload( + project_id=project_id, + case_id=case_id, + automation_type=automation_type, + prompt=prompt, + case_key=case_key, + module_name=module_name, + product_name=product_name, + project_name=project_name, + steps=steps, + expected_results=expected_results + ) + + response = requests.post( + url=url, + headers=headers, + data=json.dumps(payload, ensure_ascii=False).encode("utf-8"), + timeout=timeout + ) + response.raise_for_status() + + try: + return response.json() + except ValueError: + return response.text diff --git a/base_framework/platform_tools/Create_ui_testcase/routin_chat_docs.md b/base_framework/platform_tools/Create_ui_testcase/routin_chat_docs.md new file mode 100644 index 0000000..d0c6897 --- /dev/null +++ b/base_framework/platform_tools/Create_ui_testcase/routin_chat_docs.md @@ -0,0 +1,600 @@ +# Chat Completions API + +**Source**: https://docs.routin.ai/zh/docs/API/chat-completions +**Description**: Chat Completions API 使用指南和多语言示例 + +--- + +[Routin AI](https://docs.routin.ai/zh/docs/API/) + +[Routin AI](https://docs.routin.ai/zh/docs/API/) + +搜索 + +`⌘``K` + +🚀 欢迎使用 Routin AI 文档! + +[前往控制台](https://docs.routin.ai/zh/docs/API/)[欢迎使用 Routin AI](https://docs.routin.ai/zh/docs/API/) + +API 文档 + +[Chat Completions API](https://docs.routin.ai/zh/docs/API/)[Embeddings API](https://docs.routin.ai/zh/docs/API/)[Images API](https://docs.routin.ai/zh/docs/API/)[Audio API](https://docs.routin.ai/zh/docs/API/)[Messages API](https://docs.routin.ai/zh/docs/API/)[Gemini API](https://docs.routin.ai/zh/docs/API/)[Videos API](https://docs.routin.ai/zh/docs/API/)[Web Research API](https://docs.routin.ai/zh/docs/API/) + +设计方案 + +[套餐订阅制设计](https://docs.routin.ai/zh/docs/API/) + +开发者工具 + +[接入 Claude Code 使用](https://docs.routin.ai/zh/docs/API/)[接入 Codex 使用](https://docs.routin.ai/zh/docs/API/)[接入 Kilo Code 使用](https://docs.routin.ai/zh/docs/API/)[接入 Cherry Studio 使用](https://docs.routin.ai/zh/docs/API/)[接入 Gemini CLI 使用](https://docs.routin.ai/zh/docs/API/)[Claude Code 完整使用教程](https://docs.routin.ai/zh/docs/API/)[接入 OpenCode 使用](https://docs.routin.ai/zh/docs/API/) + +Organization + +© 2026 Routin AI + +Chat Completions API + +API 文档 + +# Chat Completions API + +Chat Completions API 使用指南和多语言示例 + +# Chat Completions API + +MeteorAI 提供完全兼容 OpenAI 的对话接口,您可以使用 OpenAI SDK 直接调用我们的服务。 + +## 基本信息 + +**API 端点** +[code] + https://api.routin.ai/v1/chat/completions +[/code] + +**认证方式** 在请求头中添加 API Key: +[code] + Authorization: Bearer YOUR_API_KEY +[/code] + +MeteorAI 完全兼容 OpenAI SDK,只需修改 `base_url` 参数即可无缝切换。 + +## 请求参数 + +### 必需参数 + +参数| 类型| 说明 +---|---|--- +`model`| string| 模型名称,如 `gpt-4o`、`claude-3-5-sonnet-20241022` 等 +`messages`| array| 对话消息数组 + +### 可选参数 + +参数| 类型| 默认值| 说明 +---|---|---|--- +`temperature`| number| 1| 采样温度 (0-2) +`top_p`| number| 1| 核采样参数 (0-1) +`max_tokens`| integer| -| 生成的最大 token 数 +`stream`| boolean| false| 是否使用流式输出 +`presence_penalty`| number| 0| 存在惩罚 (-2.0 到 2.0) +`frequency_penalty`| number| 0| 频率惩罚 (-2.0 到 2.0) +`user`| string| -| 用户标识符 + +### Messages 格式 +[code] + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "Hello!" + } + ] + } +[/code] + +支持的 `role` 值: + + * `system`: 系统消息,定义助手行为 + * `user`: 用户消息 + * `assistant`: 助手回复 + +## 响应格式 + +### 普通响应 +[code] + { + "id": "chatcmpl-123", + "object": "chat.completion", + "created": 1677652288, + "model": "gpt-4o", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "Hello! How can I help you today?" + }, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": 9, + "completion_tokens": 12, + "total_tokens": 21 + } + } +[/code] + +### 流式响应 +[code] + data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"},"finish_reason":null}]} + + data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]} + + data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + + data: [DONE] +[/code] + +## 代码示例 + +### 基本调用 + +PythonTypeScriptJavaScriptC#cURL +[code] + from openai import OpenAI + + client = OpenAI( + api_key="YOUR_API_KEY", + base_url="https://api.routin.ai/v1" + ) + + response = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "你好,介绍一下你自己"} + ], + temperature=0.7, + max_tokens=150 + ) + + print(response.choices[0].message.content) +[/code] +[code] + import OpenAI from 'openai'; + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + async function main() { + const response = await client.chat.completions.create({ + model: 'gpt-4o', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: '你好,介绍一下你自己' }, + ], + temperature: 0.7, + max_tokens: 150, + }); + + console.log(response.choices[0].message.content); + } + + main(); +[/code] +[code] + const OpenAI = require('openai'); + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + client.chat.completions.create({ + model: 'gpt-4o', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: '你好,介绍一下你自己' }, + ], + temperature: 0.7, + max_tokens: 150, + }).then(response => { + console.log(response.choices[0].message.content); + }); +[/code] +[code] + using OpenAI.Chat; + + var client = new ChatClient( + model: "gpt-4o", + apiKey: "YOUR_API_KEY", + new OpenAIClientOptions + { + Endpoint = new Uri("https://api.routin.ai/v1") + } + ); + + var messages = new List + { + new SystemChatMessage("You are a helpful assistant."), + new UserChatMessage("你好,介绍一下你自己") + }; + + var response = await client.CompleteChatAsync( + messages, + new ChatCompletionOptions + { + Temperature = 0.7f, + MaxOutputTokenCount = 150 + } + ); + + Console.WriteLine(response.Value.Content[0].Text); +[/code] +[code] + curl https://api.routin.ai/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{ + "model": "gpt-4o", + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "你好,介绍一下你自己" + } + ], + "temperature": 0.7, + "max_tokens": 150 + }' +[/code] + +### 流式输出 + +流式输出可以实时获取模型的生成内容,提供更好的用户体验。 + +PythonTypeScriptJavaScriptC#cURL +[code] + from openai import OpenAI + + client = OpenAI( + api_key="YOUR_API_KEY", + base_url="https://api.routin.ai/v1" + ) + + stream = client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "user", "content": "给我讲一个有趣的故事"} + ], + stream=True + ) + + for chunk in stream: + if chunk.choices[0].delta.content is not None: + print(chunk.choices[0].delta.content, end="") +[/code] +[code] + import OpenAI from 'openai'; + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + async function main() { + const stream = await client.chat.completions.create({ + model: 'gpt-4o', + messages: [{ role: 'user', content: '给我讲一个有趣的故事' }], + stream: true, + }); + + for await (const chunk of stream) { + const content = chunk.choices[0]?.delta?.content; + if (content) { + process.stdout.write(content); + } + } + } + + main(); +[/code] +[code] + const OpenAI = require('openai'); + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + async function main() { + const stream = await client.chat.completions.create({ + model: 'gpt-4o', + messages: [{ role: 'user', content: '给我讲一个有趣的故事' }], + stream: true, + }); + + for await (const chunk of stream) { + const content = chunk.choices[0]?.delta?.content; + if (content) { + process.stdout.write(content); + } + } + } + + main(); +[/code] +[code] + using OpenAI.Chat; + + var client = new ChatClient( + model: "gpt-4o", + apiKey: "YOUR_API_KEY", + new OpenAIClientOptions + { + Endpoint = new Uri("https://api.routin.ai/v1") + } + ); + + var messages = new List + { + new UserChatMessage("给我讲一个有趣的故事") + }; + + await foreach (var update in client.CompleteChatStreamingAsync(messages)) + { + foreach (var contentPart in update.ContentUpdate) + { + Console.Write(contentPart.Text); + } + } +[/code] +[code] + curl https://api.routin.ai/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -d '{ + "model": "gpt-4o", + "messages": [ + { + "role": "user", + "content": "给我讲一个有趣的故事" + } + ], + "stream": true + }' +[/code] + +### 多轮对话 + +PythonTypeScriptC# +[code] + from openai import OpenAI + + client = OpenAI( + api_key="YOUR_API_KEY", + base_url="https://api.routin.ai/v1" + ) + + messages = [ + {"role": "system", "content": "You are a helpful assistant."} + ] + + # 第一轮对话 + messages.append({"role": "user", "content": "我叫张三"}) + response = client.chat.completions.create( + model="gpt-4o", + messages=messages + ) + assistant_message = response.choices[0].message.content + messages.append({"role": "assistant", "content": assistant_message}) + print(f"助手: {assistant_message}") + + # 第二轮对话 + messages.append({"role": "user", "content": "我叫什么名字?"}) + response = client.chat.completions.create( + model="gpt-4o", + messages=messages + ) + assistant_message = response.choices[0].message.content + print(f"助手: {assistant_message}") +[/code] +[code] + import OpenAI from 'openai'; + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + async function main() { + const messages: OpenAI.Chat.ChatCompletionMessageParam[] = [ + { role: 'system', content: 'You are a helpful assistant.' }, + ]; + + // 第一轮对话 + messages.push({ role: 'user', content: '我叫张三' }); + let response = await client.chat.completions.create({ + model: 'gpt-4o', + messages, + }); + let assistantMessage = response.choices[0].message.content; + messages.push({ role: 'assistant', content: assistantMessage! }); + console.log(`助手: ${assistantMessage}`); + + // 第二轮对话 + messages.push({ role: 'user', content: '我叫什么名字?' }); + response = await client.chat.completions.create({ + model: 'gpt-4o', + messages, + }); + assistantMessage = response.choices[0].message.content; + console.log(`助手: ${assistantMessage}`); + } + + main(); +[/code] +[code] + using OpenAI.Chat; + + var client = new ChatClient( + model: "gpt-4o", + apiKey: "YOUR_API_KEY", + new OpenAIClientOptions + { + Endpoint = new Uri("https://api.routin.ai/v1") + } + ); + + var messages = new List + { + new SystemChatMessage("You are a helpful assistant.") + }; + + // 第一轮对话 + messages.Add(new UserChatMessage("我叫张三")); + var response = await client.CompleteChatAsync(messages); + var assistantMessage = response.Value.Content[0].Text; + messages.Add(new AssistantChatMessage(assistantMessage)); + Console.WriteLine($"助手: {assistantMessage}"); + + // 第二轮对话 + messages.Add(new UserChatMessage("我叫什么名字?")); + response = await client.CompleteChatAsync(messages); + assistantMessage = response.Value.Content[0].Text; + Console.WriteLine($"助手: {assistantMessage}"); +[/code] + +## 错误处理 + +请务必在生产环境中添加错误处理逻辑,避免因 API 调用失败导致应用崩溃。 + +PythonTypeScriptC# +[code] + from openai import OpenAI, APIError, RateLimitError, APIConnectionError + + client = OpenAI( + api_key="YOUR_API_KEY", + base_url="https://api.routin.ai/v1" + ) + + try: + response = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": "Hello!"}] + ) + print(response.choices[0].message.content) + except RateLimitError as e: + print(f"请求频率超限: {e}") + except APIConnectionError as e: + print(f"网络连接错误: {e}") + except APIError as e: + print(f"API 错误: {e}") + except Exception as e: + print(f"未知错误: {e}") +[/code] +[code] + import OpenAI from 'openai'; + + const client = new OpenAI({ + apiKey: 'YOUR_API_KEY', + baseURL: 'https://api.routin.ai/v1', + }); + + async function main() { + try { + const response = await client.chat.completions.create({ + model: 'gpt-4o', + messages: [{ role: 'user', content: 'Hello!' }], + }); + console.log(response.choices[0].message.content); + } catch (error) { + if (error instanceof OpenAI.APIError) { + console.error(`API 错误 [${error.status}]: ${error.message}`); + } else if (error instanceof OpenAI.RateLimitError) { + console.error('请求频率超限'); + } else { + console.error('未知错误:', error); + } + } + } + + main(); +[/code] +[code] + using OpenAI.Chat; + using OpenAI; + + var client = new ChatClient( + model: "gpt-4o", + apiKey: "YOUR_API_KEY", + new OpenAIClientOptions + { + Endpoint = new Uri("https://api.routin.ai/v1") + } + ); + + try + { + var response = await client.CompleteChatAsync( + new List + { + new UserChatMessage("Hello!") + } + ); + Console.WriteLine(response.Value.Content[0].Text); + } + catch (ClientResultException ex) when (ex.Status == 429) + { + Console.WriteLine($"请求频率超限: {ex.Message}"); + } + catch (ClientResultException ex) + { + Console.WriteLine($"API 错误 [{ex.Status}]: {ex.Message}"); + } + catch (Exception ex) + { + Console.WriteLine($"未知错误: {ex.Message}"); + } +[/code] + +## 常见错误码 + +错误码| 说明| 解决方法 +---|---|--- +401| API Key 无效或未提供| 检查 Authorization 头是否正确设置 +429| 请求频率超限| 降低请求频率或升级配额 +400| 请求参数错误| 检查请求参数格式是否正确 +500| 服务器内部错误| 稍后重试或联系技术支持 +503| 服务暂时不可用| 稍后重试 + +## 最佳实践 + + 1. **使用系统消息** : 通过 `system` 角色定义助手的行为和特性 + 2. **控制 token 数量** : 使用 `max_tokens` 参数控制生成长度,避免不必要的费用 + 3. **错误重试** : 实现指数退避的重试机制,处理临时性错误 + 4. **流式输出** : 对于长文本生成,使用 `stream=true` 提供更好的用户体验 + 5. **保存对话历史** : 多轮对话需要在 `messages` 数组中包含完整的对话历史 + 6. **监控使用情况** : 定期查看管理后台的统计信息,优化 API 使用 + +## 更多资源 + + * [Embeddings API](https://docs.routin.ai/zh/docs/API/) \- 文本向量化接口 + * [Images API](https://docs.routin.ai/zh/docs/API/) \- 图像生成接口 + * [Audio API](https://docs.routin.ai/zh/docs/API/) \- 语音识别和合成接口 + +[欢迎使用 Routin AI统一的大模型 API 聚合平台,提供企业级服务和管理能力](https://docs.routin.ai/zh/docs/API/)[Embeddings API文本向量化 API 使用指南和多语言示例](https://docs.routin.ai/zh/docs/API/) + +### On this page + +Chat Completions API基本信息请求参数必需参数可选参数Messages 格式响应格式普通响应流式响应代码示例基本调用流式输出多轮对话错误处理常见错误码最佳实践更多资源 \ No newline at end of file diff --git a/base_framework/platform_tools/Create_ui_testcase/server.py b/base_framework/platform_tools/Create_ui_testcase/server.py new file mode 100644 index 0000000..0075a0e --- /dev/null +++ b/base_framework/platform_tools/Create_ui_testcase/server.py @@ -0,0 +1,177 @@ +# -*- coding:utf-8 -*- +import argparse +import json +import traceback +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from urllib.parse import urlparse + +from base_framework.platform_tools.Create_ui_testcase.generate_automation_api import ( + generate_save_and_verify_ui_testcase +) + + +API_PATH = "/it/api/case/generate-automation" + + +class GenerateAutomationHandler(BaseHTTPRequestHandler): + + def do_OPTIONS(self): + self._send_json_response(200, {"code": 0, "message": "ok", "data": None}) + + def do_POST(self): + request_path = urlparse(self.path).path + if request_path != API_PATH: + self._send_json_response(404, { + "code": 404, + "message": "接口不存在:{0}".format(request_path), + "data": None + }) + return + + try: + request_data = self._read_json_body() + self._validate_required_fields(request_data) + + if request_data.get("automationType") != "ui": + self._send_json_response(200, { + "code": 1, + "message": "automationType不是ui,不调用UI自动化用例生成接口", + "data": { + "projectId": request_data.get("projectId"), + "caseId": request_data.get("caseId"), + "automationType": request_data.get("automationType"), + "caseKey": request_data.get("caseKey"), + "content": self._get_case_name(request_data) + } + }) + return + + verify_result = generate_save_and_verify_ui_testcase( + project_id=request_data.get("projectId"), + case_id=request_data.get("caseId"), + automation_type=request_data.get("automationType"), + prompt=request_data.get("prompt"), + case_key=request_data.get("caseKey"), + module_name=request_data.get("moduleName"), + product_name=request_data.get("productName"), + project_name=request_data.get("projectName"), + steps=request_data.get("steps"), + expected_results=request_data.get("expectedResults"), + case_name=self._get_case_name(request_data), + enable_reconnaissance=request_data.get("enableReconnaissance", True), + headless=request_data.get("headless", True), + max_attempts=request_data.get("maxAttempts", 3) + ) + + if not verify_result.get("success"): + self._send_json_response(200, { + "code": 1, + "message": "用例生成后执行失败,自动修复{0}次仍未通过".format(request_data.get("maxAttempts", 3)), + "data": { + "projectId": request_data.get("projectId"), + "caseId": request_data.get("caseId"), + "automationType": request_data.get("automationType"), + "caseKey": request_data.get("caseKey"), + "content": self._get_case_name(request_data), + "testcasePath": verify_result.get("testcasePath"), + "attempts": verify_result.get("attempts"), + "failureReason": verify_result.get("failureReason") + } + }) + return + + self._send_json_response(200, { + "code": 0, + "message": "success", + "data": { + "projectId": request_data.get("projectId"), + "caseId": request_data.get("caseId"), + "automationType": request_data.get("automationType"), + "caseKey": request_data.get("caseKey"), + "content": self._get_case_name(request_data), + "testcasePath": verify_result.get("testcasePath"), + "attempts": verify_result.get("attempts") + } + }) + except ValueError as error: + self._send_json_response(400, { + "code": 400, + "message": str(error), + "data": None + }) + except Exception as error: + self._send_json_response(500, { + "code": 500, + "message": str(error), + "data": None, + "trace": traceback.format_exc() + }) + + def _read_json_body(self): + content_length = int(self.headers.get("Content-Length", 0)) + if content_length <= 0: + raise ValueError("请求体不能为空") + + body = self.rfile.read(content_length).decode("utf-8") + try: + return json.loads(body) + except ValueError: + raise ValueError("请求体必须是合法JSON") + + def _validate_required_fields(self, request_data): + required_fields = [ + "projectId", + "caseId", + "automationType", + "prompt", + "caseKey", + "moduleName", + "productName", + "projectName", + "steps", + "expectedResults" + ] + missing_fields = [] + for field in required_fields: + if field == "productName": + if field not in request_data or request_data.get(field) is None: + missing_fields.append(field) + elif request_data.get(field) in (None, ""): + missing_fields.append(field) + if missing_fields: + raise ValueError("缺少必填参数:{0}".format(", ".join(missing_fields))) + + def _get_case_name(self, request_data): + return request_data.get("caseName") or request_data.get("title") or request_data.get("name") or "{0}-{1}".format( + request_data.get("caseKey"), + request_data.get("moduleName") + ) + + def _send_json_response(self, status_code, response_data): + response_body = json.dumps(response_data, ensure_ascii=False).encode("utf-8") + self.send_response(status_code) + self.send_header("Content-Type", "application/json;charset=UTF-8") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type, accessToken, Authorization") + self.send_header("Content-Length", str(len(response_body))) + self.end_headers() + self.wfile.write(response_body) + + def log_message(self, format, *args): + print("[{0}] {1}".format(self.log_date_time_string(), format % args)) + + +def run_server(host="0.0.0.0", port=8081): + server = ThreadingHTTPServer((host, port), GenerateAutomationHandler) + print("Create_ui_testcase HTTP服务已启动:http://{0}:{1}".format(host, port)) + print("接口地址:POST {0}".format(API_PATH)) + server.serve_forever() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="UI自动化用例生成HTTP服务") + parser.add_argument("--host", default="0.0.0.0", help="服务监听地址,默认0.0.0.0") + parser.add_argument("--port", default=8081, type=int, help="服务监听端口,默认8081") + args = parser.parse_args() + run_server(host=args.host, port=args.port) diff --git a/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001.png b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001.png new file mode 100644 index 0000000..c812e5e Binary files /dev/null and b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001.png differ diff --git a/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_1.png b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_1.png new file mode 100644 index 0000000..35db343 Binary files /dev/null and b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_1.png differ diff --git a/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_blog_content.png b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_blog_content.png new file mode 100644 index 0000000..068d9f4 Binary files /dev/null and b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_blog_content.png differ diff --git a/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_news_content.png b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_news_content.png new file mode 100644 index 0000000..94e5d63 Binary files /dev/null and b/dulizhan/screenshots/TC_dulizhan_ui_api_verify_001_news_content.png differ diff --git a/dulizhan/test_case/Resource/UI/TC_dulizhan_ui_api_verify_001_page.py b/dulizhan/test_case/Resource/UI/TC_dulizhan_ui_api_verify_001_page.py new file mode 100644 index 0000000..52300c9 --- /dev/null +++ b/dulizhan/test_case/Resource/UI/TC_dulizhan_ui_api_verify_001_page.py @@ -0,0 +1,159 @@ +import os +import re +from playwright.sync_api import Page, expect +from dulizhan.test_case.Resource.UI.base_page import BasePage + + +class DownloadAppPage(BasePage): + """ + Download the App 页面对象 + + 已知侦察结果: + - 首页 URL: https://joyhub-website-frontend-test.best-envision.com/ + - 页面标题: Joyhub | Explore Sexual Health, Wellness, and Connection + - 导航链接: Download the App + - Cookie 按钮文案: Accepet + """ + + DOWNLOAD_APP_LINK_TEXT = re.compile(r"^Download the App$", re.I) + + def accept_cookie_if_present(self): + # 页面侦察结果中按钮文本为 Accepet,疑似拼写如此,按真实页面处理 + accept_button = self.page.get_by_role("button", name=re.compile(r"Accepet|Accept", re.I)) + self.click_if_visible(accept_button, timeout=3000) + + def open_download_app_page(self): + self.accept_cookie_if_present() + + download_link = self.page.get_by_role("link", name=self.DOWNLOAD_APP_LINK_TEXT) + expect(download_link).to_be_visible(timeout=10000) + + download_link.click() + try: + self.page.wait_for_load_state("networkidle", timeout=5000) + except Exception: + pass + + def get_google_play_locator(self): + """ + 优先使用稳定定位: + 1. href 包含 play.google.com + 2. 链接文本包含 Google Play + 3. 图片 alt 包含 Google,向上找父级 a 标签 + + 如果后续页面提供 data-testid,建议替换为: + self.page.get_by_test_id("google-play-download") + """ + candidates = [ + self.page.locator("a[href*='play.google.com']").first, + self.page.get_by_role("link", name=re.compile(r"Google\s*Play", re.I)).first, + self.page.locator("a").filter(has_text=re.compile(r"Google\s*Play", re.I)).first, + self.page.locator("img[alt*='Google' i]").locator("xpath=ancestor::a[1]").first, + ] + + for locator in candidates: + try: + if locator.count() > 0: + return locator + except Exception: + continue + + return None + + def discover_google_play_href(self): + """ + DOM 兜底侦察: + 当 Google Play 是图片按钮、无文本链接时,用 JS 从渲染后的 DOM 中提取跳转地址。 + 只用于兜底发现,不作为首选点击方式。 + """ + return self.page.evaluate( + """ + () => { + const keywords = ['google play', 'play.google.com']; + const nodes = Array.from(document.querySelectorAll('a, button, [role="button"], img')); + + for (const node of nodes) { + const text = [ + node.innerText, + node.textContent, + node.getAttribute('aria-label'), + node.getAttribute('title'), + node.getAttribute('alt'), + node.getAttribute('href'), + node.getAttribute('src') + ].filter(Boolean).join(' ').toLowerCase(); + + if (keywords.some(k => text.includes(k))) { + const link = node.closest('a'); + if (link && link.href) { + return link.href; + } + + if (node.href) { + return node.href; + } + } + } + + return null; + } + """ + ) + + def click_google_play_and_get_redirect_url(self) -> str: + """ + 点击 Google Play 下载入口,并返回跳转地址。 + + 成功判定: + - 新开页面 URL + - 当前页面跳转后的 URL + - 或点击前已获取到 Google Play href + """ + try: + self.page.wait_for_load_state("networkidle", timeout=5000) + except Exception: + pass + + google_play_href = self.discover_google_play_href() + google_play_locator = self.get_google_play_locator() + + if google_play_locator is None and not google_play_href: + raise AssertionError( + "未找到 Google Play 下载入口。" + "请检查 Download the App 页面是否存在 Google Play 链接," + "或补充稳定 selector,例如 data-testid。" + ) + + context = self.page.context + before_pages = list(context.pages) + old_url = self.page.url + + if google_play_locator is not None: + google_play_locator.scroll_into_view_if_needed() + google_play_locator.click() + else: + # 没有稳定可点击 locator 时,使用已发现 href 直接跳转。 + # TODO: 页面增加稳定 selector 后,替换为 locator.click() + self.page.goto(google_play_href, wait_until="domcontentloaded") + + self.page.wait_for_timeout(3000) + + after_pages = list(context.pages) + new_pages = [p for p in after_pages if p not in before_pages] + + if new_pages: + new_page = new_pages[-1] + new_page.wait_for_load_state("domcontentloaded") + try: + new_page.wait_for_load_state("networkidle", timeout=10000) + except Exception: + pass + return new_page.url + + if self.page.url != old_url: + return self.page.url + + if google_play_href: + return google_play_href + + raise AssertionError("点击 Google Play 后未获取到跳转地址") diff --git a/dulizhan/test_case/Resource/UI/__init__.py b/dulizhan/test_case/Resource/UI/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dulizhan/test_case/Resource/UI/base_page.py b/dulizhan/test_case/Resource/UI/base_page.py new file mode 100644 index 0000000..cec6076 --- /dev/null +++ b/dulizhan/test_case/Resource/UI/base_page.py @@ -0,0 +1,57 @@ +import os + +from playwright.sync_api import Locator, Page, TimeoutError as PlaywrightTimeoutError + + +class BasePage: + """Playwright 页面基类:封装通用等待、点击、截图等稳定操作。""" + + def __init__(self, page: Page): + self.page = page + + def goto(self, url: str, timeout: int = 60000) -> None: + self.page.goto(url, wait_until="domcontentloaded", timeout=timeout) + self.wait_for_network_idle() + + def wait_for_network_idle(self, timeout: int = 30000) -> None: + try: + self.page.wait_for_load_state("networkidle", timeout=timeout) + except PlaywrightTimeoutError: + self.page.wait_for_load_state("domcontentloaded", timeout=timeout) + + def wait_for_visible(self, locator: Locator, timeout: int = 10000) -> Locator: + locator.wait_for(state="visible", timeout=timeout) + return locator + + def click_if_visible(self, locator: Locator, timeout: int = 3000) -> bool: + try: + if locator.first.is_visible(timeout=timeout): + locator.first.click() + self.wait_for_network_idle() + return True + except Exception: + return False + return False + + def safe_click(self, locator: Locator, timeout: int = 10000) -> None: + self.wait_for_visible(locator, timeout=timeout) + locator.click() + self.wait_for_network_idle() + + def screenshot(self, screenshot_dir: str, file_name: str, full_page: bool = True) -> str: + os.makedirs(screenshot_dir, exist_ok=True) + screenshot_path = os.path.join(screenshot_dir, file_name) + self.page.screenshot(path=screenshot_path, full_page=full_page) + return screenshot_path + + def current_url(self) -> str: + return self.page.url + + def title(self) -> str: + return self.page.title() + + def visible_text_contains(self, keyword: str) -> bool: + try: + return self.page.get_by_text(keyword, exact=False).first.is_visible(timeout=3000) + except Exception: + return False diff --git a/dulizhan/test_case/Resource/UI/blog_page.py b/dulizhan/test_case/Resource/UI/blog_page.py new file mode 100644 index 0000000..fa0f5f6 --- /dev/null +++ b/dulizhan/test_case/Resource/UI/blog_page.py @@ -0,0 +1,196 @@ +import random +from typing import Dict, List + +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError, expect + +from dulizhan.test_case.Resource.UI.base_page import BasePage + + +class BlogPage(BasePage): + """ + Joyhub Blog/内容页页面对象。 + + 说明: + - 页面侦察结果未发现明确的 Blog 导航入口; + - 已发现真实导航候选包含 Discover、Following; + - 因此进入 blog/内容列表时优先尝试 Blog,失败后使用 Discover 作为内容入口兜底。 + """ + + HOME_URL = "https://joyhub-website-frontend-test.best-envision.com/" + EXPECTED_HOME_TITLE = "Joyhub | Explore Sexual Health, Wellness, and Connection" + + NAV_DISCOVER_LINK = "Discover" + + def open_home_page(self) -> None: + self.goto(self.HOME_URL) + self.accept_cookie_if_present() + expect(self.page).to_have_title(self.EXPECTED_HOME_TITLE) + + def enter_blog_page(self) -> str: + """ + 进入 blog/内容页。 + 优先级: + 1. Blog 链接:如果页面后续版本新增 Blog 导航,可直接命中; + 2. Discover 链接:侦察结果中存在,作为当前可用内容入口; + 3. /blog 直达:仅作为最后兜底,并在无法点击入口时使用。 + """ + before_url = self.page.url + + blog_locators = [ + self.page.get_by_role("link", name="Blog"), + self.page.locator("a", has_text="Blog"), + self.page.get_by_role("link", name=self.NAV_DISCOVER_LINK), + self.page.locator("a", has_text=self.NAV_DISCOVER_LINK), + ] + + for locator in blog_locators: + try: + if locator.count() > 0 and locator.first.is_visible(): + locator.first.click() + self.wait_for_page_ready() + return self.page.url + except Exception: + continue + + fallback_blog_url = self.HOME_URL.rstrip("/") + "/blog" + self.goto(fallback_blog_url) + + assert self.page.url != before_url or "/blog" in self.page.url.lower(), ( + "未能进入 blog/内容页:页面未发现 Blog 入口,Discover 入口也不可点击。" + ) + return self.page.url + + def _get_candidate_blog_links(self) -> List[Dict[str, str]]: + """ + 从渲染后的页面中提取可点击的 blog/content 候选链接。 + + 不硬编码未知 selector,优先根据 href 语义筛选: + blog/article/post/discover/detail/content。 + 若无命中,则退化为 main 区域内可见链接,排除导航类链接。 + """ + return self.page.evaluate( + """ + () => { + const navTexts = new Set([ + 'Home', + 'Download the App', + 'Rewards', + 'Support', + 'About Us', + 'Discover', + 'Following', + 'Partnerships', + 'FAQs', + 'Login' + ]); + + const normalize = value => (value || '').replace(/\\s+/g, ' ').trim(); + + const links = Array.from(document.querySelectorAll('a[href]')) + .filter(a => { + const rect = a.getBoundingClientRect(); + const style = window.getComputedStyle(a); + return rect.width > 0 + && rect.height > 0 + && style.visibility !== 'hidden' + && style.display !== 'none'; + }) + .map((a, index) => { + const href = a.href || ''; + const text = normalize(a.innerText || a.textContent || a.getAttribute('aria-label') || ''); + const imgAlt = normalize( + Array.from(a.querySelectorAll('img')) + .map(img => img.alt) + .filter(Boolean) + .join(' ') + ); + + return { + index, + href, + text, + imgAlt, + pathname: (() => { + try { return new URL(href).pathname.toLowerCase(); } + catch(e) { return ''; } + })() + }; + }) + .filter(item => item.href && !item.href.startsWith('javascript:')); + + const semanticLinks = links.filter(item => + /blog|article|post|discover|detail|content|story/i.test(item.href) + && !navTexts.has(item.text) + ); + + if (semanticLinks.length > 0) { + return semanticLinks; + } + + return links.filter(item => + !navTexts.has(item.text) + && item.href !== window.location.href + && !/#$/.test(item.href) + ); + } + """ + ) + + def click_random_blog(self) -> Dict[str, str]: + """ + 随机点击一个 blog/content 候选项。 + 点击后等待 URL 或页面内容变化。 + """ + self.wait_for_page_ready() + candidates = self._get_candidate_blog_links() + + assert candidates, ( + "未找到可点击的 blog/content 候选链接。" + "页面侦察结果缺少明确 blog 卡片 selector,请补充稳定定位,如 data-testid='blog-card'。" + ) + + candidate = random.choice(candidates) + before_url = self.page.url + + locator = self.page.locator("a[href]").nth(candidate["index"]) + locator.scroll_into_view_if_needed(timeout=5000) + + try: + with self.page.expect_navigation(wait_until="domcontentloaded", timeout=8000): + locator.click() + except PlaywrightTimeoutError: + locator.click() + + self.wait_for_page_ready() + + after_url = self.page.url + assert after_url != before_url or self.page.locator("main, article, body").first.is_visible(), ( + f"点击 blog/content 候选项后页面未出现有效跳转或内容区域。候选链接:{candidate['href']}" + ) + + return { + "clicked_href": candidate.get("href", ""), + "clicked_text": candidate.get("text") or candidate.get("imgAlt") or "", + "before_url": before_url, + "after_url": after_url, + } + + def assert_blog_content_page_loaded(self) -> None: + """ + 断言已进入 blog/content 内容页。 + 因页面缺少稳定详情页 selector,这里采用内容区域可见的稳健断言。 + """ + self.wait_for_page_ready() + + content_locator = self.page.locator("article, main, [role='main'], body").first + expect(content_locator).to_be_visible(timeout=10000) + + body_text = self.page.locator("body").inner_text(timeout=10000).strip() + assert len(body_text) > 0, "blog/content 页面 body 内容为空,疑似跳转失败或页面未渲染完成。" + + def capture_blog_content_screenshot(self, screenshot_dir: str, case_key: str) -> str: + return self.screenshot( + screenshot_dir=screenshot_dir, + file_name=f"{case_key}_blog_content.png", + full_page=True, + ) diff --git a/dulizhan/test_case/Resource/UI/news_page.py b/dulizhan/test_case/Resource/UI/news_page.py new file mode 100644 index 0000000..15e566e --- /dev/null +++ b/dulizhan/test_case/Resource/UI/news_page.py @@ -0,0 +1,200 @@ +import random +import re +from typing import Dict, List + +from playwright.sync_api import Page, TimeoutError as PlaywrightTimeoutError + +from dulizhan.test_case.Resource.UI.base_page import BasePage + + +class NewsPage(BasePage): + """Joyhub News 相关页面对象。""" + + COOKIE_ACCEPT_TEXT_PATTERN = re.compile(r"^(Accepet|Accept|I Accept|Agree|Got it)$", re.I) + NEWS_LINK_TEXT_PATTERN = re.compile(r"^News$", re.I) + NEWS_ROUTE = "/news" + + def __init__(self, page: Page, base_url: str): + super().__init__(page) + self.base_url = base_url.rstrip("/") + + def open_home(self) -> None: + self.goto(self.base_url) + + def accept_cookie_if_present(self) -> None: + accept_button = self.page.get_by_role("button", name=self.COOKIE_ACCEPT_TEXT_PATTERN) + self.click_if_visible(accept_button, timeout=3000) + + def enter_news_page(self) -> None: + news_link = self.page.get_by_role("link", name=self.NEWS_LINK_TEXT_PATTERN) + + try: + if news_link.first.is_visible(timeout=5000): + news_link.first.click() + self.wait_for_network_idle() + return + except Exception: + pass + + self.goto(f"{self.base_url}{self.NEWS_ROUTE}") + + def is_news_context(self) -> bool: + url = self.page.url.lower() + title = self.page.title().lower() + + if "news" in url: + return True + if "news" in title: + return True + if self.visible_text_contains("News"): + return True + + return False + + def _collect_clickable_news_candidates(self) -> List[Dict[str, str]]: + script = """ + () => { + const navTexts = new Set([ + 'home', + 'download the app', + 'rewards', + 'support', + 'about us', + 'discover', + 'following', + 'partnerships', + 'faqs', + 'login' + ]); + + const currentUrl = new URL(window.location.href); + const anchors = Array.from(document.querySelectorAll('a[href]')); + + function isVisible(el) { + const style = window.getComputedStyle(el); + const rect = el.getBoundingClientRect(); + return style && + style.visibility !== 'hidden' && + style.display !== 'none' && + rect.width > 0 && + rect.height > 0; + } + + function hasContentContainer(el) { + return !!el.closest( + 'article, [class*="article" i], [class*="card" i], [class*="news" i], [class*="post" i], [class*="blog" i], [class*="item" i]' + ); + } + + function isBadProtocol(url) { + return ['javascript:', 'mailto:', 'tel:'].includes(url.protocol); + } + + function isLikelySocial(url) { + const host = url.hostname.toLowerCase(); + return [ + 'facebook.com', + 'instagram.com', + 'twitter.com', + 'x.com', + 'youtube.com', + 'tiktok.com', + 'linkedin.com' + ].some(domain => host.includes(domain)); + } + + const candidates = []; + + for (const a of anchors) { + if (!isVisible(a)) continue; + + let url; + try { + url = new URL(a.href, window.location.origin); + } catch (e) { + continue; + } + + if (isBadProtocol(url)) continue; + if (isLikelySocial(url)) continue; + if (url.href === currentUrl.href) continue; + + const text = (a.innerText || a.textContent || '').trim().replace(/\\s+/g, ' '); + const textLower = text.toLowerCase(); + const pathLower = url.pathname.toLowerCase(); + + if (navTexts.has(textLower)) continue; + if (url.pathname === '/' || url.pathname === '') continue; + + const hrefLooksLikeContent = + /news|article|blog|post|detail|story/i.test(pathLower) && + pathLower !== '/news'; + + const containerLooksLikeContent = hasContentContainer(a); + + if (hrefLooksLikeContent || containerLooksLikeContent) { + candidates.push({ + href: url.href, + text: text, + path: url.pathname, + reason: hrefLooksLikeContent ? 'href_content_pattern' : 'content_container' + }); + } + } + + const seen = new Set(); + return candidates.filter(item => { + if (seen.has(item.href)) return false; + seen.add(item.href); + return true; + }); + } + """ + return self.page.evaluate(script) + + def click_random_news_item(self) -> Dict[str, str]: + self.wait_for_network_idle() + candidates = self._collect_clickable_news_candidates() + + if not candidates: + raise AssertionError( + "未发现可点击的 news 内容候选。" + "请检查 News 页面是否加载成功,或为 news 卡片补充稳定 selector/data-testid。" + ) + + selected = random.choice(candidates) + href = selected["href"] + old_url = self.page.url + + click_script = """ + (targetHref) => { + const anchors = Array.from(document.querySelectorAll('a[href]')); + const target = anchors.find(a => { + try { + return new URL(a.href, window.location.origin).href === targetHref; + } catch (e) { + return false; + } + }); + + if (!target) { + throw new Error('Target news link not found: ' + targetHref); + } + + target.scrollIntoView({block: 'center', inline: 'center'}); + target.click(); + } + """ + + self.page.evaluate(click_script, href) + + try: + self.page.wait_for_url(lambda url: str(url) != old_url, timeout=15000) + except PlaywrightTimeoutError: + pass + + self.wait_for_network_idle() + return selected + + def screenshot_news_content(self, screenshot_dir: str, file_name: str) -> str: + return self.screenshot(screenshot_dir=screenshot_dir, file_name=file_name, full_page=True) diff --git a/dulizhan/test_case/Resource/__init__.py b/dulizhan/test_case/Resource/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001.py b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001.py new file mode 100644 index 0000000..0f816f6 --- /dev/null +++ b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001.py @@ -0,0 +1,91 @@ +from dulizhan.test_case.Resource.UI.base_page import BasePage +from dulizhan.test_case.Resource.UI.news_page import NewsPage +import os + +import allure +import pytest +from playwright.sync_api import sync_playwright + +from dulizhan.test_case.Resource.UI.news_page import NewsPage + + +CASE_INFO = { + "projectId": 1001, + "caseId": 2001, + "automationType": "ui", + "caseKey": "TC_dulizhan_ui_api_verify_001", + "moduleName": "news", + "productName": "", + "projectName": "dulizhan", + "caseName": "进入news页面,随机点击news,跳转到news内容后截图,就认为用例执行成功", + "pageUrl": "https://joyhub-website-frontend-test.best-envision.com/", + "screenshotDir": r"C:\Users\a\smart-management-auto-test\dulizhan\screenshots", +} + + +@pytest.fixture(scope="session") +def browser(): + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + yield browser + browser.close() + + +@pytest.fixture() +def page(browser): + context = browser.new_context( + viewport={"width": 1440, "height": 900}, + ignore_https_errors=True, + ) + page = context.new_page() + yield page + context.close() + + +@allure.feature(CASE_INFO["projectName"]) +@allure.story(CASE_INFO["moduleName"]) +@allure.title(CASE_INFO["caseName"]) +def test_random_click_news_and_capture_content(page): + screenshot_dir = CASE_INFO["screenshotDir"] + os.makedirs(screenshot_dir, exist_ok=True) + + news_page = NewsPage(page) + + with allure.step("打开被测页面"): + news_page.open_home_page() + news_page.accept_cookie_if_present() + allure.attach( + news_page.current_url(), + name="首页 URL", + attachment_type=allure.attachment_type.TEXT, + ) + + with allure.step("进入 news 页面"): + news_page.enter_news_page() + allure.attach( + news_page.current_url(), + name="News 页面 URL", + attachment_type=allure.attachment_type.TEXT, + ) + + with allure.step("随机点击 news"): + selected_news = news_page.click_random_news() + allure.attach( + str(selected_news), + name="随机点击的 news 候选信息", + attachment_type=allure.attachment_type.TEXT, + ) + + with allure.step("截图跳转的 news 内容"): + news_page.assert_news_content_loaded() + screenshot_path = news_page.capture_news_content_screenshot( + screenshot_dir=screenshot_dir, + case_key=CASE_INFO["caseKey"], + ) + allure.attach.file( + screenshot_path, + name="跳转后的 news 内容截图", + attachment_type=allure.attachment_type.PNG, + ) + + assert os.path.exists(screenshot_path), f"news 内容截图未生成: {screenshot_path}" diff --git a/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_blog.py b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_blog.py new file mode 100644 index 0000000..ad00afe --- /dev/null +++ b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_blog.py @@ -0,0 +1,80 @@ +from dulizhan.test_case.Resource.UI.base_page import BasePage +from dulizhan.test_case.Resource.UI.blog_page import BlogPage +from types import SimpleNamespace + +import allure +import pytest +from playwright.sync_api import sync_playwright + +from dulizhan.test_case.Resource.UI.blog_page import BlogPage + + +case_info = SimpleNamespace( + projectId=1001, + caseId=2001, + automationType="ui", + caseKey="TC_dulizhan_ui_api_verify_001", + moduleName="blog", + productName="", + projectName="dulizhan", + caseName="进入blog页面,随机点击blog,跳转到blog内容后截图,就认为用例执行成功", + screenshotDir=r"C:\Users\a\smart-management-auto-test\dulizhan\screenshots", + pageUrl="https://joyhub-website-frontend-test.best-envision.com/", +) + + +@pytest.fixture(scope="function") +def page(): + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + viewport={"width": 1440, "height": 900}, + ignore_https_errors=True, + ) + page = context.new_page() + yield page + context.close() + browser.close() + + +@allure.feature(case_info.projectName) +@allure.story(case_info.moduleName) +@allure.title(case_info.caseName) +def test_enter_blog_random_click_and_capture_content(page): + blog_page = BlogPage(page) + + with allure.step("打开被测页面"): + blog_page.open_home_page() + assert page.url.startswith(case_info.pageUrl) + + with allure.step("进入blog页面"): + entered_url = blog_page.enter_blog_page() + allure.attach( + entered_url, + name="进入blog页面后的URL", + attachment_type=allure.attachment_type.TEXT, + ) + + with allure.step("随机点击blog"): + click_result = blog_page.click_random_blog() + allure.attach( + str(click_result), + name="随机点击blog结果", + attachment_type=allure.attachment_type.TEXT, + ) + + with allure.step("截图跳转的blog内容"): + blog_page.assert_blog_content_page_loaded() + screenshot_path = blog_page.capture_blog_content_screenshot( + screenshot_dir=case_info.screenshotDir, + case_key=case_info.caseKey, + ) + + with open(screenshot_path, "rb") as image_file: + allure.attach( + image_file.read(), + name="blog内容页截图", + attachment_type=allure.attachment_type.PNG, + ) + + assert screenshot_path, "blog 内容页截图路径为空" diff --git a/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_news.py b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_news.py new file mode 100644 index 0000000..fa01fe8 --- /dev/null +++ b/dulizhan/test_case/TestCase/UI/test_TC_dulizhan_ui_api_verify_001_news.py @@ -0,0 +1,90 @@ +from dulizhan.test_case.Resource.UI.base_page import BasePage +from dulizhan.test_case.Resource.UI.news_page import NewsPage +from types import SimpleNamespace + +import allure +import pytest +from playwright.sync_api import sync_playwright + +from dulizhan.test_case.Resource.UI.news_page import NewsPage + + +CASE_INFO = SimpleNamespace( + projectId=1001, + caseId=2001, + automationType="ui", + caseKey="TC_dulizhan_ui_api_verify_001", + moduleName="news", + productName="", + projectName="dulizhan", + caseName="进入news页面,随机点击news,跳转到news内容后截图,就认为用例执行成功", + pageUrl="https://joyhub-website-frontend-test.best-envision.com/", + screenshotDir=r"C:\Users\a\smart-management-auto-test\dulizhan\screenshots", +) + + +@pytest.fixture(scope="session") +def browser(): + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + yield browser + browser.close() + + +@pytest.fixture() +def page(browser): + page = browser.new_page( + viewport={"width": 1440, "height": 1200}, + ignore_https_errors=True, + ) + yield page + page.close() + + +@pytest.fixture() +def case_info(): + return CASE_INFO + + +@allure.feature(CASE_INFO.projectName) +@allure.story(CASE_INFO.moduleName) +@allure.title(CASE_INFO.caseName) +@pytest.mark.ui +def test_enter_news_random_click_and_screenshot(page, case_info): + news_page = NewsPage(page, case_info.pageUrl) + + with allure.step("打开被测页面"): + news_page.open_home() + news_page.accept_cookie_if_present() + assert "Joyhub" in news_page.title(), f"首页标题不符合预期,当前标题:{news_page.title()}" + + with allure.step("进入news页面"): + news_page.enter_news_page() + assert news_page.is_news_context(), ( + f"未确认进入 news 页面上下文,当前URL:{page.url},当前标题:{page.title()}。" + "侦察结果未提供明确 News 导航 selector,如实际路由不是 /news,请调整 NewsPage.NEWS_ROUTE。" + ) + + with allure.step("随机点击news"): + before_click_url = page.url + selected_news = news_page.click_random_news_item() + allure.attach( + str(selected_news), + name="随机点击的 news 候选", + attachment_type=allure.attachment_type.TEXT, + ) + assert page.url != before_click_url or selected_news.get("href"), ( + f"点击 news 后页面未发生有效跳转,点击前URL:{before_click_url},点击后URL:{page.url}" + ) + + with allure.step("截图跳转的news内容"): + screenshot_path = news_page.screenshot_news_content( + screenshot_dir=case_info.screenshotDir, + file_name=f"{case_info.caseKey}_news_content.png", + ) + allure.attach.file( + screenshot_path, + name="跳转后的news内容截图", + attachment_type=allure.attachment_type.PNG, + ) + assert screenshot_path, "news 内容截图保存失败" diff --git a/skills/webapp-testing/LICENSE.txt b/skills/webapp-testing/LICENSE.txt new file mode 100644 index 0000000..4f881c5 --- /dev/null +++ b/skills/webapp-testing/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2026 Anthropic, PBC. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/skills/webapp-testing/SKILL.md b/skills/webapp-testing/SKILL.md new file mode 100644 index 0000000..5079fed --- /dev/null +++ b/skills/webapp-testing/SKILL.md @@ -0,0 +1,181 @@ +*** + +name: webapp-testing +description: Toolkit for interacting with and testing local web applications using Playwright. Supports verifying frontend functionality, debugging UI behavior, capturing browser screenshots, and viewing browser logs. +license: Complete terms in LICENSE.txt +-------------------------------------- + +# Web Application Testing + +To test local web applications, write native Python Playwright scripts. + +**Helper Scripts Available**: + +- `scripts/with_server.py` - Manages server lifecycle (supports multiple servers) + +**Always run scripts with** **`--help`** **first** to see usage. DO NOT read the source until you try running the script first and find that a customized solution is abslutely necessary. These scripts can be very large and thus pollute your context window. They exist to be called directly as black-box scripts rather than ingested into your context window. + +## Decision Tree: Choosing Your Approach + +``` +User task → Is it static HTML? + ├─ Yes → Read HTML file directly to identify selectors + │ ├─ Success → Write Playwright script using selectors + │ └─ Fails/Incomplete → Treat as dynamic (below) + │ + └─ No (dynamic webapp) → Is the server already running? + ├─ No → Run: python scripts/with_server.py --help + │ Then use the helper + write simplified Playwright script + │ + └─ Yes → Reconnaissance-then-action: + 1. Navigate and wait for networkidle + 2. Take screenshot or inspect DOM + 3. Identify selectors from rendered state + 4. Execute actions with discovered selectors +``` + +## Example: Using with\_server.py + +To start a server, run `--help` first, then use the helper: + +**Single server:** + +```bash +python scripts/with_server.py --server "npm run dev" --port 5173 -- python your_automation.py +``` + +**Multiple servers (e.g., backend + frontend):** + +```bash +python scripts/with_server.py \ + --server "cd backend && python server.py" --port 3000 \ + --server "cd frontend && npm run dev" --port 5173 \ + -- python your_automation.py +``` + +To create an automation script, include only Playwright logic (servers are managed automatically): + +```python +from playwright.sync_api import sync_playwright + +with sync_playwright() as p: + browser = p.chromium.launch(headless=True) # Always launch chromium in headless mode + page = browser.new_page() + page.goto('http://localhost:5173') # Server already running and ready + page.wait_for_load_state('networkidle') # CRITICAL: Wait for JS to execute + # ... your automation logic + browser.close() +``` + +## Reconnaissance-Then-Action Pattern + +1. **Inspect rendered DOM**: + ```python + page.screenshot(path='/tmp/inspect.png', full_page=True) + content = page.content() + page.locator('button').all() + ``` +2. **Identify selectors** from inspection results +3. **Execute actions** using discovered selectors + +## Common Pitfall + +❌ **Don't** inspect the DOM before waiting for `networkidle` on dynamic apps +✅ **Do** wait for `page.wait_for_load_state('networkidle')` before inspection + +## Best Practices + +- **Use bundled scripts as black boxes** - To accomplish a task, consider whether one of the scripts available in `scripts/` can help. These scripts handle common, complex workflows reliably without cluttering the context window. Use `--help` to see usage, then invoke directly. +- Use `sync_playwright()` for synchronous scripts +- Always close the browser when done +- Use descriptive selectors: `text=`, `role=`, CSS selectors, or IDs +- Add appropriate waits: `page.wait_for_selector()` or `page.wait_for_timeout()` + +## Reference Files + +- **examples/** - Examples showing common patterns: + - `element_discovery.py` - Discovering buttons, links, and inputs on a page + - `static_html_automation.py` - Using file:// URLs for local HTML + - `console_logging.py` - Capturing console logs during automation + +# UI Automation Testing Skill + +你是一个资深 UI 自动化测试专家,擅长基于 Selenium、Playwright、pytest、unittest、Robot Framework、Allure 等技术体系设计和实现稳定、可维护、可扩展的 UI 自动化测试方案。 + +## 适用场景 + +当用户需要以下能力时,使用本 Skill: + +- 编写 Web UI 自动化测试用例 +- 设计 Page Object / Page Object Model 框架 +- 封装页面元素、页面行为、业务流程 +- 优化 Selenium / Playwright 自动化脚本稳定性 +- 处理元素定位、等待、iframe、弹窗、上传下载、验证码等问题 +- 设计 pytest + Allure UI 自动化测试框架 +- 编写 UI 自动化断言、测试数据、公共方法 +- 分析 UI 自动化失败原因 +- 提升自动化用例可维护性和执行效率 +- 将手工测试场景转换为自动化测试用例 + +## 角色定位 + +你不是简单的代码生成器,而是 UI 自动化测试架构师和落地专家。 + +你需要: + +1. 理解用户当前项目框架和代码风格; +2. 优先复用已有封装,不重复造轮子; +3. 保持用例稳定性、可读性和可维护性; +4. 按照自动化测试最佳实践设计代码; +5. 明确区分页面层、业务层、测试层; +6. 对不稳定写法主动给出风险提示; +7. 生成代码前先确认当前项目使用的技术栈和目录结构。 + +## 工作原则 + +### 1. 先理解项目 + +在编写代码前,优先查看以下内容: + +- 项目目录结构 +- requirements.txt / pyproject.toml / package.json +- conftest.py +- pytest.ini / setup.cfg / tox.ini +- 已有 Page Object 文件 +- 已有测试用例 +- 公共 driver / browser 封装 +- Allure 封装 +- 日志封装 +- 配置文件 +- 测试数据文件 + +不要在不了解项目结构的情况下直接生成孤立代码。 + +### 2. 分层设计 + +推荐使用以下结构: + +```text +tests/ + test_xxx.py 测试用例层,只做流程编排和断言 + +pages/ + xxx_page.py 页面对象层,封装元素和页面操作 + +flows/ + xxx_flow.py 业务流程层,封装跨页面业务流程 + +common/ + browser.py 浏览器/driver 管理 + base_page.py 基础页面封装 + wait.py 显式等待封装 + logger.py 日志封装 + assertions.py 断言封装 + +data/ + xxx_data.py / xxx.yaml 测试数据 + +config/ + config.py / env.yaml 环境配置 +``` + diff --git a/skills/webapp-testing/examples/console_logging.py b/skills/webapp-testing/examples/console_logging.py new file mode 100644 index 0000000..9329b5e --- /dev/null +++ b/skills/webapp-testing/examples/console_logging.py @@ -0,0 +1,35 @@ +from playwright.sync_api import sync_playwright + +# Example: Capturing console logs during browser automation + +url = 'http://localhost:5173' # Replace with your URL + +console_logs = [] + +with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(viewport={'width': 1920, 'height': 1080}) + + # Set up console log capture + def handle_console_message(msg): + console_logs.append(f"[{msg.type}] {msg.text}") + print(f"Console: [{msg.type}] {msg.text}") + + page.on("console", handle_console_message) + + # Navigate to page + page.goto(url) + page.wait_for_load_state('networkidle') + + # Interact with the page (triggers console logs) + page.click('text=Dashboard') + page.wait_for_timeout(1000) + + browser.close() + +# Save console logs to file +with open('/mnt/user-data/outputs/console.log', 'w') as f: + f.write('\n'.join(console_logs)) + +print(f"\nCaptured {len(console_logs)} console messages") +print(f"Logs saved to: /mnt/user-data/outputs/console.log") \ No newline at end of file diff --git a/skills/webapp-testing/examples/element_discovery.py b/skills/webapp-testing/examples/element_discovery.py new file mode 100644 index 0000000..917ba72 --- /dev/null +++ b/skills/webapp-testing/examples/element_discovery.py @@ -0,0 +1,40 @@ +from playwright.sync_api import sync_playwright + +# Example: Discovering buttons and other elements on a page + +with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page() + + # Navigate to page and wait for it to fully load + page.goto('http://localhost:5173') + page.wait_for_load_state('networkidle') + + # Discover all buttons on the page + buttons = page.locator('button').all() + print(f"Found {len(buttons)} buttons:") + for i, button in enumerate(buttons): + text = button.inner_text() if button.is_visible() else "[hidden]" + print(f" [{i}] {text}") + + # Discover links + links = page.locator('a[href]').all() + print(f"\nFound {len(links)} links:") + for link in links[:5]: # Show first 5 + text = link.inner_text().strip() + href = link.get_attribute('href') + print(f" - {text} -> {href}") + + # Discover input fields + inputs = page.locator('input, textarea, select').all() + print(f"\nFound {len(inputs)} input fields:") + for input_elem in inputs: + name = input_elem.get_attribute('name') or input_elem.get_attribute('id') or "[unnamed]" + input_type = input_elem.get_attribute('type') or 'text' + print(f" - {name} ({input_type})") + + # Take screenshot for visual reference + page.screenshot(path='/tmp/page_discovery.png', full_page=True) + print("\nScreenshot saved to /tmp/page_discovery.png") + + browser.close() \ No newline at end of file diff --git a/skills/webapp-testing/examples/static_html_automation.py b/skills/webapp-testing/examples/static_html_automation.py new file mode 100644 index 0000000..90bbedc --- /dev/null +++ b/skills/webapp-testing/examples/static_html_automation.py @@ -0,0 +1,33 @@ +from playwright.sync_api import sync_playwright +import os + +# Example: Automating interaction with static HTML files using file:// URLs + +html_file_path = os.path.abspath('path/to/your/file.html') +file_url = f'file://{html_file_path}' + +with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(viewport={'width': 1920, 'height': 1080}) + + # Navigate to local HTML file + page.goto(file_url) + + # Take screenshot + page.screenshot(path='/mnt/user-data/outputs/static_page.png', full_page=True) + + # Interact with elements + page.click('text=Click Me') + page.fill('#name', 'John Doe') + page.fill('#email', 'john@example.com') + + # Submit form + page.click('button[type="submit"]') + page.wait_for_timeout(500) + + # Take final screenshot + page.screenshot(path='/mnt/user-data/outputs/after_submit.png', full_page=True) + + browser.close() + +print("Static HTML automation completed!") \ No newline at end of file diff --git a/skills/webapp-testing/scripts/with_server.py b/skills/webapp-testing/scripts/with_server.py new file mode 100644 index 0000000..431f2eb --- /dev/null +++ b/skills/webapp-testing/scripts/with_server.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +Start one or more servers, wait for them to be ready, run a command, then clean up. + +Usage: + # Single server + python scripts/with_server.py --server "npm run dev" --port 5173 -- python automation.py + python scripts/with_server.py --server "npm start" --port 3000 -- python test.py + + # Multiple servers + python scripts/with_server.py \ + --server "cd backend && python server.py" --port 3000 \ + --server "cd frontend && npm run dev" --port 5173 \ + -- python test.py +""" + +import subprocess +import socket +import time +import sys +import argparse + +def is_server_ready(port, timeout=30): + """Wait for server to be ready by polling the port.""" + start_time = time.time() + while time.time() - start_time < timeout: + try: + with socket.create_connection(('localhost', port), timeout=1): + return True + except (socket.error, ConnectionRefusedError): + time.sleep(0.5) + return False + + +def main(): + parser = argparse.ArgumentParser(description='Run command with one or more servers') + parser.add_argument('--server', action='append', dest='servers', required=True, help='Server command (can be repeated)') + parser.add_argument('--port', action='append', dest='ports', type=int, required=True, help='Port for each server (must match --server count)') + parser.add_argument('--timeout', type=int, default=30, help='Timeout in seconds per server (default: 30)') + parser.add_argument('command', nargs=argparse.REMAINDER, help='Command to run after server(s) ready') + + args = parser.parse_args() + + # Remove the '--' separator if present + if args.command and args.command[0] == '--': + args.command = args.command[1:] + + if not args.command: + print("Error: No command specified to run") + sys.exit(1) + + # Parse server configurations + if len(args.servers) != len(args.ports): + print("Error: Number of --server and --port arguments must match") + sys.exit(1) + + servers = [] + for cmd, port in zip(args.servers, args.ports): + servers.append({'cmd': cmd, 'port': port}) + + server_processes = [] + + try: + # Start all servers + for i, server in enumerate(servers): + print(f"Starting server {i+1}/{len(servers)}: {server['cmd']}") + + # Use shell=True to support commands with cd and && + process = subprocess.Popen( + server['cmd'], + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + server_processes.append(process) + + # Wait for this server to be ready + print(f"Waiting for server on port {server['port']}...") + if not is_server_ready(server['port'], timeout=args.timeout): + raise RuntimeError(f"Server failed to start on port {server['port']} within {args.timeout}s") + + print(f"Server ready on port {server['port']}") + + print(f"\nAll {len(servers)} server(s) ready") + + # Run the command + print(f"Running: {' '.join(args.command)}\n") + result = subprocess.run(args.command) + sys.exit(result.returncode) + + finally: + # Clean up all servers + print(f"\nStopping {len(server_processes)} server(s)...") + for i, process in enumerate(server_processes): + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + print(f"Server {i+1} stopped") + print("All servers stopped") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/tmp_generate_automation_response.json b/tmp_generate_automation_response.json new file mode 100644 index 0000000..5f28270 --- /dev/null +++ b/tmp_generate_automation_response.json @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/zhyy/test_case/TestCase/UI/test_TC_zhyy_ui_api_verify_001.py b/zhyy/test_case/TestCase/UI/test_TC_zhyy_ui_api_verify_001.py new file mode 100644 index 0000000..9dba19a --- /dev/null +++ b/zhyy/test_case/TestCase/UI/test_TC_zhyy_ui_api_verify_001.py @@ -0,0 +1,6 @@ +# -*- coding:utf-8 -*- +import pytest + + +def test_tc_zhyy_ui_api_verify_001(): + assert "智慧运营"