From 99b6202e6f8278f55c1e52fbcbfbaeba5e9c124d Mon Sep 17 00:00:00 2001 From: guojiabao Date: Mon, 18 May 2026 18:24:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E5=AE=8C=E6=95=B4=E7=9A=84jo?= =?UTF-8?q?yhub=5Fbackend=E7=9B=AE=E5=BD=95=EF=BC=8C=E5=8C=85=E5=90=ABlibr?= =?UTF-8?q?ary=E5=92=8Ctest=5Fcase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- joyhub_backend/__init__.py | 0 joyhub_backend/library/__init__.py | 11 + joyhub_backend/library/auth.py | 148 +++ joyhub_backend/library/business.py | 62 ++ .../library/functional_case_converter.py | 917 ++++++++++++++++++ joyhub_backend/library/hubops_parser.py | 189 ++++ joyhub_backend/library/joyhub_interface.py | 131 +++ joyhub_backend/test_case/__init__.py | 0 joyhub_backend/test_case/run_tests.py | 170 ++++ 9 files changed, 1628 insertions(+) create mode 100644 joyhub_backend/__init__.py create mode 100644 joyhub_backend/library/__init__.py create mode 100644 joyhub_backend/library/auth.py create mode 100644 joyhub_backend/library/business.py create mode 100644 joyhub_backend/library/functional_case_converter.py create mode 100644 joyhub_backend/library/hubops_parser.py create mode 100644 joyhub_backend/library/joyhub_interface.py create mode 100644 joyhub_backend/test_case/__init__.py create mode 100644 joyhub_backend/test_case/run_tests.py diff --git a/joyhub_backend/__init__.py b/joyhub_backend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/joyhub_backend/library/__init__.py b/joyhub_backend/library/__init__.py new file mode 100644 index 0000000..adc8683 --- /dev/null +++ b/joyhub_backend/library/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +from joyhub_backend.library.functional_case_converter import ( + ApiContextItem, + ApiMatcher, + FunctionalCaseInput, + FunctionalCaseParser, + FunctionalCaseToApiAutomationService, + FunctionalStep, + LLMApiCaseConverter, + convert_functional_case, +) diff --git a/joyhub_backend/library/auth.py b/joyhub_backend/library/auth.py new file mode 100644 index 0000000..83e035d --- /dev/null +++ b/joyhub_backend/library/auth.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +import json +import logging +import os + +import allure +import requests + + +CAPTCHA_URL = os.getenv( + "JOYHUB_CAPTCHA_URL", + "http://test-manager-api.best-envision.com/admin/login/captcha", +) +LOGIN_BASE_URL = os.getenv( + "JOYHUB_LOGIN_BASE_URL", + "http://test-manager-api.best-envision.com", +) +LOGIN_PATH = os.getenv("JOYHUB_LOGIN_PATH", "/admin/login/login") +USERNAME = os.getenv("JOYHUB_USERNAME", "guojiabao") +PASSWORD = os.getenv("JOYHUB_PASSWORD", "gjb123456") +CAPTCHA = os.getenv("JOYHUB_CAPTCHA", "1111") +TIMEOUT = int(os.getenv("JOYHUB_TIMEOUT", "20")) +TENANT_ID = os.getenv("JOYHUB_TENANT_ID", "126") + + +class JoyhubAuth(object): + def __init__(self): + self.session = requests.Session() + self._token = None + + @property + def login_url(self): + return LOGIN_BASE_URL.rstrip("/") + LOGIN_PATH + + def get_captcha_key(self): + with allure.step("前置:获取登录验证码 key"): + logging.info("GET %s", CAPTCHA_URL) + response = self.session.get(CAPTCHA_URL, timeout=TIMEOUT) + self._attach_response("captcha", response) + response.raise_for_status() + data = response.json() + key = self._extract_key(data) + assert key, "验证码接口未返回 key,响应:{}".format(data) + return key + + def login(self): + if self._token: + return self._token + key = self.get_captcha_key() + body = { + "key": key, + "username": USERNAME, + "password": PASSWORD, + "captcha": CAPTCHA, + } + headers = {"Content-Type": "application/json", "tenant-id": TENANT_ID} + with allure.step("前置:登录并获取 token"): + logging.info("POST %s", self.login_url) + logging.info("request headers: %s", headers) + logging.info("request body: %s", self._safe_body(body)) + response = self.session.post(self.login_url, json=body, headers=headers, timeout=TIMEOUT) + self._attach_request(self.login_url, "POST", headers, self._safe_body(body)) + self._attach_response("login", response) + response.raise_for_status() + data = response.json() + token = self._extract_token(data) + assert token, "登录接口未返回 token,响应:{}".format(data) + self._token = token if token.startswith("Bearer ") else "Bearer " + token + return self._token + + def auth_headers(self): + return { + "Authorization": self.login(), + "Content-Type": "application/json", + "tenant-id": TENANT_ID, + } + + @staticmethod + def _extract_key(data): + if isinstance(data, dict): + for field in ("key", "captchaKey", "captcha_key", "uuid"): + if data.get(field): + return data.get(field) + nested = data.get("data") + if isinstance(nested, dict): + for field in ("key", "captchaKey", "captcha_key", "uuid"): + if nested.get(field): + return nested.get(field) + if isinstance(nested, str): + return nested + return None + + @staticmethod + def _extract_token(data): + token_fields = ( + "token", + "access_token", + "accessToken", + "Authorization", + "authorization", + "userToken", + "user_token", + "jwt", + ) + if isinstance(data, dict): + for field in token_fields: + token = data.get(field) + if JoyhubAuth._looks_like_token(token): + return token + nested = data.get("data") + if isinstance(nested, dict): + for field in token_fields: + token = nested.get(field) + if JoyhubAuth._looks_like_token(token): + return token + return None + + @staticmethod + def _looks_like_token(value): + if not isinstance(value, str): + return False + value = value.strip() + if not value: + return False + if any(ord(char) > 127 for char in value): + return False + return len(value) >= 16 or value.startswith("Bearer ") + + @staticmethod + def _safe_body(body): + safe = dict(body) + if "password" in safe: + safe["password"] = "******" + return safe + + @staticmethod + def _attach_request(url, method, headers, body): + allure.attach(str(url), "请求url", allure.attachment_type.TEXT) + allure.attach(str(method), "请求方式", allure.attachment_type.TEXT) + allure.attach(json.dumps(headers, ensure_ascii=False, indent=2), "请求头", allure.attachment_type.JSON) + allure.attach(json.dumps(body, ensure_ascii=False, indent=2), "请求体", allure.attachment_type.JSON) + + @staticmethod + def _attach_response(name, response): + logging.info("%s response status: %s", name, response.status_code) + logging.info("%s response body: %s", name, response.text) + allure.attach(str(response.status_code), "{} 响应状态码".format(name), allure.attachment_type.TEXT) + allure.attach(response.text, "{} 响应体".format(name), allure.attachment_type.JSON) diff --git a/joyhub_backend/library/business.py b/joyhub_backend/library/business.py new file mode 100644 index 0000000..76af10c --- /dev/null +++ b/joyhub_backend/library/business.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +import logging + +from joyhub_backend.library.joyhub_interface import JoyhubInterface + + +class JoyhubBusiness(JoyhubInterface): + def get_video_label_list(self): + logging.info("==========获取视频标签列表==========") + body = { + "page": 1, + "limit": 10, + "sort": "id", + "label_name": "", + "category_id": 0, + "video_type": 0, + "video_num": 0, + "created_at": [], + "order": "descending", + } + return self.request("获取视频标签列表", "POST", "admin/video/getVideoLabelList", body=body) + + def get_video_category_list(self): + logging.info("==========获取视频分类列表==========") + body = { + "page": 1, + "limit": 10, + "sort": "id", + "category_name": "", + "order": "descending", + } + return self.request("获取视频分类列表", "POST", "admin/video/getVideoCategoryList", body=body) + + def get_exchange_record_list(self): + logging.info("==========兑换记录列表==========") + body = { + "page": 1, + "limit": 10, + "status": "", + "goods_type": "", + "created_at": [], + } + return self.request("兑换记录列表", "POST", "admin/exchange/recordList", body=body) + + def get_app_feedback_list(self): + logging.info("==========app反馈意见列表==========") + body = { + "page": 1, + "limit": 10, + "status": "", + "keyword": "", + } + return self.request("app反馈意见列表", "POST", "admin/feedback/list", body=body) + + def get_sensitive_word_list(self): + logging.info("==========敏感词列表==========") + body = { + "page": 1, + "limit": 10, + "keyword": "", + } + return self.request("敏感词列表", "POST", "admin/sensitiveWord/list", body=body) diff --git a/joyhub_backend/library/functional_case_converter.py b/joyhub_backend/library/functional_case_converter.py new file mode 100644 index 0000000..bcd1390 --- /dev/null +++ b/joyhub_backend/library/functional_case_converter.py @@ -0,0 +1,917 @@ +# -*- coding: utf-8 -*- +import argparse +import copy +import json +import os +import re +from dataclasses import asdict, dataclass, field +from difflib import SequenceMatcher +from typing import Any, Callable, Dict, Iterable, List, Optional, Union + + +COMMON_CN_KEYWORDS = [ + "登录", + "查询", + "搜索", + "筛选", + "列表", + "详情", + "新增", + "创建", + "添加", + "修改", + "编辑", + "更新", + "删除", + "导出", + "导入", + "上传", + "下载", + "提交", + "审核", + "确认", + "保存", + "分页", + "分页查询", + "性能", + "响应时间", + "超时", + "返回", + "结果", + "成功", + "失败", + "校验", + "重置", + "启用", + "停用", + "开关", + "状态", + "排序", + "标签", + "分类", + "关键字", + "关键字搜索", + "模糊搜索", +] + +METHOD_INTENT_KEYWORDS = { + "GET": ["查询", "搜索", "筛选", "列表", "详情", "获取", "查看", "分页", "返回"], + "POST": ["新增", "创建", "添加", "登录", "提交", "导入", "上传", "确认", "审核", "保存"], + "PUT": ["修改", "编辑", "更新", "重置"], + "PATCH": ["修改", "编辑", "更新", "状态", "启用", "停用", "开关"], + "DELETE": ["删除", "移除"], +} + +DEFAULT_PLACEHOLDER_URL = "/api/need-confirm" +DEFAULT_PAGE_SIZE = 10 +DEFAULT_RESPONSE_TIME_MS = 1000 + + +@dataclass +class FunctionalStep: + stepNo: int + action: str + expectedResult: str = "" + + +@dataclass +class ApiContextItem: + apiName: str = "" + method: str = "" + url: str = "" + description: str = "" + headers: Dict[str, Any] = field(default_factory=dict) + queryParams: Dict[str, Any] = field(default_factory=dict) + requestBody: Any = None + responseExample: Any = None + tags: List[str] = field(default_factory=list) + + +@dataclass +class FunctionalCaseInput: + caseId: str = "" + caseKey: str = "" + caseName: str = "" + projectName: str = "" + moduleName: str = "" + priority: str = "" + preconditions: str = "" + steps: List[FunctionalStep] = field(default_factory=list) + expectedResults: List[str] = field(default_factory=list) + apiContext: Optional[List[ApiContextItem]] = None + generateOptions: Dict[str, Any] = field(default_factory=dict) + extra: Dict[str, Any] = field(default_factory=dict) + + +class FunctionalCaseParser: + @classmethod + def parse(cls, payload: Union[str, Dict[str, Any], FunctionalCaseInput]) -> FunctionalCaseInput: + if isinstance(payload, FunctionalCaseInput): + return payload + + if isinstance(payload, str): + try: + payload = json.loads(payload) + except Exception: + payload = { + "caseName": "手工输入功能用例", + "steps": [payload], + "expectedResults": [], + } + + if not isinstance(payload, dict): + raise TypeError("functional case payload must be dict, json string or FunctionalCaseInput") + + steps = cls._normalize_steps(payload.get("steps") or payload.get("step") or []) + expected_results = cls._normalize_text_list( + payload.get("expectedResults") + or payload.get("expectedResult") + or payload.get("expectations") + or [] + ) + api_context = cls._normalize_api_context(payload.get("apiContext") or payload.get("apis") or payload.get("apiInfo")) + generate_options = payload.get("generateOptions") or payload.get("options") or {} + + known_keys = { + "caseId", + "caseKey", + "caseName", + "projectName", + "moduleName", + "priority", + "preconditions", + "steps", + "step", + "expectedResults", + "expectedResult", + "expectations", + "apiContext", + "apis", + "apiInfo", + "generateOptions", + "options", + } + extra = {key: value for key, value in payload.items() if key not in known_keys} + + return FunctionalCaseInput( + caseId=str(payload.get("caseId", "") or ""), + caseKey=str(payload.get("caseKey", "") or ""), + caseName=str(payload.get("caseName", "") or payload.get("name", "") or ""), + projectName=str(payload.get("projectName", "") or ""), + moduleName=str(payload.get("moduleName", "") or ""), + priority=str(payload.get("priority", "") or ""), + preconditions=str(payload.get("preconditions", "") or payload.get("precondition", "") or ""), + steps=steps, + expectedResults=expected_results, + apiContext=api_context, + generateOptions=generate_options if isinstance(generate_options, dict) else {}, + extra=extra, + ) + + @staticmethod + def _normalize_steps(value: Any) -> List[FunctionalStep]: + if not value: + return [] + + if isinstance(value, str): + raw_items = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()] + if len(raw_items) <= 1: + raw_items = [item.strip() for item in re.split(r"(?<=[。;;])", value) if item.strip()] + items = raw_items + elif isinstance(value, list): + items = value + else: + items = [value] + + normalized: List[FunctionalStep] = [] + for index, item in enumerate(items, start=1): + if isinstance(item, dict): + step_no = int(item.get("stepNo") or item.get("step_no") or item.get("no") or index) + action = str(item.get("action") or item.get("step") or item.get("content") or item.get("description") or "") + expected_result = str(item.get("expectedResult") or item.get("expected") or "") + else: + raw_text = str(item).strip() + step_no = index + action = re.sub(r"^\s*\d+[\.、\)]\s*", "", raw_text) + expected_result = "" + if action: + normalized.append(FunctionalStep(stepNo=step_no, action=action, expectedResult=expected_result)) + return normalized + + @staticmethod + def _normalize_text_list(value: Any) -> List[str]: + if not value: + return [] + if isinstance(value, str): + parts = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()] + if len(parts) <= 1: + parts = [item.strip() for item in re.split(r"[。;;]\s*", value) if item.strip()] + return [item for item in parts if item] + if isinstance(value, list): + result = [] + for item in value: + if isinstance(item, dict): + text = str(item.get("text") or item.get("content") or item.get("expected") or "").strip() + else: + text = str(item).strip() + if text: + result.append(text) + return result + return [str(value).strip()] if str(value).strip() else [] + + @staticmethod + def _normalize_api_context(value: Any) -> Optional[List[ApiContextItem]]: + if not value: + return None + + if isinstance(value, dict): + if "apis" in value and isinstance(value["apis"], list): + value = value["apis"] + elif "items" in value and isinstance(value["items"], list): + value = value["items"] + else: + value = [value] + + if isinstance(value, str): + value = value.strip() + if not value: + return None + try: + parsed = json.loads(value) + return FunctionalCaseParser._normalize_api_context(parsed) + except Exception: + return FunctionalCaseParser._parse_markdown_api_context(value) + + if not isinstance(value, list): + return None + + apis: List[ApiContextItem] = [] + for item in value: + if isinstance(item, ApiContextItem): + apis.append(item) + continue + if not isinstance(item, dict): + continue + headers = item.get("headers") or item.get("header") or {} + query_params = item.get("queryParams") or item.get("query") or item.get("params") or {} + tags = item.get("tags") or [] + if isinstance(tags, str): + tags = [tag.strip() for tag in re.split(r"[\s,,/|]+", tags) if tag.strip()] + apis.append( + ApiContextItem( + apiName=str(item.get("apiName") or item.get("name") or ""), + method=str(item.get("method") or item.get("httpMethod") or "").upper(), + url=str(item.get("url") or item.get("path") or ""), + description=str(item.get("description") or item.get("desc") or ""), + headers=headers if isinstance(headers, dict) else {}, + queryParams=query_params if isinstance(query_params, dict) else {}, + requestBody=item.get("requestBody") if "requestBody" in item else item.get("body"), + responseExample=item.get("responseExample") or item.get("response") or item.get("example"), + tags=tags if isinstance(tags, list) else [], + ) + ) + return apis or None + + @staticmethod + def _parse_markdown_api_context(text: str) -> Optional[List[ApiContextItem]]: + lines = text.splitlines() + apis: List[ApiContextItem] = [] + current: Dict[str, Any] = {} + in_body = False + body_lines: List[str] = [] + + def flush_current(): + nonlocal current, body_lines, in_body + if not current: + return + body_text = "\n".join(body_lines).strip() + request_body = FunctionalCaseParser._safe_json_load(body_text) + apis.append( + ApiContextItem( + apiName=current.get("apiName", ""), + method=current.get("method", ""), + url=current.get("url", ""), + description=current.get("description", ""), + headers=current.get("headers", {}), + queryParams=current.get("queryParams", {}), + requestBody=request_body if request_body is not None else body_text, + responseExample=current.get("responseExample"), + tags=current.get("tags", []), + ) + ) + current = {} + body_lines = [] + in_body = False + + for line in lines: + stripped = line.strip() + if stripped.startswith("## "): + flush_current() + current["apiName"] = stripped[3:].strip() + continue + if stripped == "**接口URL**": + continue + if stripped.startswith("> ") and not current.get("url"): + value = stripped[2:].strip() + if value and value != "暂无参数": + current["url"] = value + continue + if stripped == "**请求方式**": + continue + if stripped == "**请求Body参数**": + in_body = True + body_lines = [] + continue + if stripped == "**响应示例**": + in_body = False + continue + if in_body: + if stripped.startswith("```"): + continue + body_lines.append(line) + + flush_current() + return apis or None + + @staticmethod + def _safe_json_load(text: str) -> Any: + if not text: + return None + cleaned = text.strip() + if cleaned.startswith("```"): + cleaned = re.sub(r"^```[a-zA-Z0-9_-]*\s*", "", cleaned) + cleaned = re.sub(r"\s*```$", "", cleaned) + try: + return json.loads(cleaned) + except Exception: + return None + + +class ApiMatcher: + def match(self, functional_case: FunctionalCaseInput, candidates: List[ApiContextItem], top_k: int = 3) -> List[Dict[str, Any]]: + matched: List[Dict[str, Any]] = [] + keywords = self.extract_keywords(functional_case) + intent = self.detect_intent(functional_case) + + for api in candidates: + score = self._score_api(api, keywords, intent, functional_case) + if score <= 0: + continue + matched.append( + { + "score": round(score, 4), + "api": api, + "reason": self._build_reason(api, keywords, intent), + } + ) + + matched.sort(key=lambda item: item["score"], reverse=True) + return matched[:top_k] + + def extract_keywords(self, functional_case: FunctionalCaseInput) -> List[str]: + texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions] + texts.extend(step.action for step in functional_case.steps) + texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult) + texts.extend(functional_case.expectedResults) + texts.extend(str(value) for value in functional_case.extra.values() if value is not None) + full_text = " \n ".join(texts) + keywords: List[str] = [] + + for keyword in COMMON_CN_KEYWORDS: + if keyword in full_text and keyword not in keywords: + keywords.append(keyword) + + english_tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]{1,}", full_text) + for token in english_tokens: + if token not in keywords: + keywords.append(token) + + generic_tokens = re.findall(r"[\u4e00-\u9fa5]{2,}", full_text) + for token in generic_tokens: + if len(token) <= 2: + if token not in keywords: + keywords.append(token) + continue + if token not in keywords: + keywords.append(token) + + return self._dedupe(keywords) + + def detect_intent(self, functional_case: FunctionalCaseInput) -> str: + text = self._all_text(functional_case) + for method, keywords in METHOD_INTENT_KEYWORDS.items(): + if any(keyword in text for keyword in keywords): + return method + return "GET" + + def _score_api(self, api: ApiContextItem, keywords: List[str], intent: str, functional_case: FunctionalCaseInput) -> float: + score = 0.0 + api_text = " ".join( + [ + api.apiName or "", + api.method or "", + api.url or "", + api.description or "", + " ".join(api.tags or []), + " ".join(api.headers.keys()), + " ".join(api.queryParams.keys()), + ] + ) + if api.method and api.method.upper() == intent: + score += 1.2 + elif api.method and api.method.upper() in ("GET", "POST", "PUT", "PATCH", "DELETE"): + score += 0.2 + + for keyword in keywords: + if keyword and keyword in api_text: + score += 0.8 + + for token in self._url_tokens(api.url): + if token and any(keyword in token or token in keyword for keyword in keywords): + score += 0.6 + + if functional_case.moduleName and functional_case.moduleName in api_text: + score += 0.8 + if functional_case.caseName and functional_case.caseName in api_text: + score += 0.5 + + ratio = SequenceMatcher(None, self._all_text(functional_case), api_text).ratio() + score += ratio * 0.8 + return score + + @staticmethod + def _build_reason(api: ApiContextItem, keywords: List[str], intent: str) -> str: + matched_keywords = [keyword for keyword in keywords if keyword and keyword in (api.apiName + " " + api.description + " " + api.url)] + parts = [] + if api.method and api.method.upper() == intent: + parts.append("方法匹配") + if matched_keywords: + parts.append("关键词命中:{}".format("、".join(matched_keywords[:5]))) + return "; ".join(parts) or "模糊匹配" + + @staticmethod + def _url_tokens(url: str) -> List[str]: + if not url: + return [] + return [token for token in re.split(r"[/?&=_\-\.]+", url) if token] + + @staticmethod + def _dedupe(items: Iterable[str]) -> List[str]: + seen = set() + result = [] + for item in items: + if item and item not in seen: + seen.add(item) + result.append(item) + return result + + @staticmethod + def _all_text(functional_case: FunctionalCaseInput) -> str: + texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions] + texts.extend(step.action for step in functional_case.steps) + texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult) + texts.extend(functional_case.expectedResults) + return " \n ".join(filter(None, texts)) + + +class LLMApiCaseConverter: + def convert(self, prompt: str, llm_client: Any = None) -> Optional[Dict[str, Any]]: + if llm_client is None: + return None + + raw = self._invoke_client(llm_client, prompt) + if raw is None: + return None + if isinstance(raw, dict): + return raw + if hasattr(raw, "content"): + raw = raw.content + elif hasattr(raw, "text"): + raw = raw.text + elif hasattr(raw, "data"): + raw = raw.data + if isinstance(raw, dict): + return raw + if not isinstance(raw, str): + raw = str(raw) + return self._extract_json(raw) + + @staticmethod + def _invoke_client(llm_client: Any, prompt: str) -> Any: + if callable(llm_client): + try: + return llm_client(prompt=prompt) + except TypeError: + try: + return llm_client(prompt) + except TypeError: + return llm_client({"prompt": prompt}) + if hasattr(llm_client, "generate") and callable(llm_client.generate): + return llm_client.generate(prompt) + if hasattr(llm_client, "chat") and callable(llm_client.chat): + return llm_client.chat(prompt) + raise TypeError("llm_client must be callable or expose generate/chat") + + @staticmethod + def _extract_json(text: str) -> Optional[Dict[str, Any]]: + if not text: + return None + cleaned = text.strip() + match = re.search(r"```json\s*(\{.*?\})\s*```", cleaned, flags=re.S) + if match: + cleaned = match.group(1) + else: + first = cleaned.find("{") + last = cleaned.rfind("}") + if first >= 0 and last > first: + cleaned = cleaned[first : last + 1] + try: + parsed = json.loads(cleaned) + return parsed if isinstance(parsed, dict) else {"data": parsed} + except Exception: + return None + + +class FunctionalCaseToApiAutomationService: + def __init__(self, matcher: Optional[ApiMatcher] = None): + self.matcher = matcher or ApiMatcher() + self.llm_converter = LLMApiCaseConverter() + + def convert(self, payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]: + functional_case = FunctionalCaseParser.parse(payload) + options = self._build_options(functional_case) + candidates = functional_case.apiContext or [] + matched = self.matcher.match(functional_case, candidates, top_k=int(options.get("topK", 3))) if candidates else [] + + llm_result = None + if options.get("useLLM", True) and llm_client is not None: + prompt = self.build_prompt(functional_case, matched, options) + llm_result = self.llm_converter.convert(prompt, llm_client) + + if llm_result: + return self._finalize_llm_result(functional_case, matched, llm_result, options) + + return self._build_rule_result(functional_case, matched, options) + + def build_prompt(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> str: + payload = { + "functionalCase": asdict(functional_case), + "matchedApis": [ + { + "score": item["score"], + "reason": item["reason"], + "api": asdict(item["api"]), + } + for item in matched + ], + "options": options, + } + return ( + "你是接口自动化测试用例生成专家。\n" + "请把功能测试用例转换成接口自动化测试用例。\n" + "要求:\n" + "1. 只输出严格 JSON。\n" + "2. 如果给了候选接口信息,优先使用候选接口。\n" + "3. 无法确认的接口信息写入 missingInfo。\n" + "4. 需要包含 method、url、headers、queryParams、body、assertions、performanceAssertions。\n" + "5. 如果涉及性能要求,生成 responseTime 断言。\n" + "6. 如果涉及登录态,使用 ${token}。\n\n" + "输入数据:\n" + f"{json.dumps(payload, ensure_ascii=False, indent=2)}\n\n" + "输出 JSON 结构:\n" + "{\n" + ' "caseId": "",\n' + ' "caseKey": "",\n' + ' "caseName": "",\n' + ' "automationType": "api",\n' + ' "convertStatus": "SUCCESS",\n' + ' "apiTestCases": [],\n' + ' "missingInfo": [],\n' + ' "warnings": []\n' + "}" + ) + + def _build_options(self, functional_case: FunctionalCaseInput) -> Dict[str, Any]: + options = copy.deepcopy(functional_case.generateOptions or {}) + options.setdefault("useLLM", True) + options.setdefault("outputFormat", "json") + options.setdefault("targetFramework", "pytest") + options.setdefault("allowMissingInfo", True) + options.setdefault("topK", 3) + return options + + def _build_rule_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> Dict[str, Any]: + api_test_cases: List[Dict[str, Any]] = [] + missing_info: List[str] = [] + warnings: List[str] = [] + + if matched: + for index, item in enumerate(matched, start=1): + api_test_cases.append(self._build_api_test_case(functional_case, item["api"], index, item["score"], options)) + else: + api_test_cases.append(self._build_fallback_test_case(functional_case, options)) + missing_info.extend(self._collect_missing_info(functional_case)) + warnings.append("未匹配到真实接口信息,已生成占位自动化用例") + + missing_info = self._dedupe_text(missing_info) + warnings = self._dedupe_text(warnings) + + convert_status = "SUCCESS" + if missing_info: + convert_status = "SUCCESS_WITH_MISSING_INFO" + if warnings and not matched: + convert_status = "DRAFT" + + return { + "caseId": functional_case.caseId, + "caseKey": functional_case.caseKey, + "caseName": functional_case.caseName, + "projectName": functional_case.projectName, + "moduleName": functional_case.moduleName, + "automationType": "api", + "convertStatus": convert_status, + "apiTestCases": api_test_cases, + "missingInfo": missing_info, + "warnings": warnings, + } + + def _finalize_llm_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], llm_result: Dict[str, Any], options: Dict[str, Any]) -> Dict[str, Any]: + result = copy.deepcopy(llm_result) + result.setdefault("caseId", functional_case.caseId) + result.setdefault("caseKey", functional_case.caseKey) + result.setdefault("caseName", functional_case.caseName) + result.setdefault("projectName", functional_case.projectName) + result.setdefault("moduleName", functional_case.moduleName) + result.setdefault("automationType", "api") + result.setdefault("convertStatus", "SUCCESS") + result.setdefault("apiTestCases", []) + result.setdefault("missingInfo", []) + result.setdefault("warnings", []) + + if not result.get("apiTestCases"): + rule_result = self._build_rule_result(functional_case, matched, options) + result["apiTestCases"] = rule_result["apiTestCases"] + if not result.get("missingInfo"): + result["missingInfo"] = rule_result["missingInfo"] + if not result.get("warnings"): + result["warnings"] = rule_result["warnings"] + result["convertStatus"] = rule_result["convertStatus"] + + result["missingInfo"] = self._dedupe_text(result.get("missingInfo", [])) + result["warnings"] = self._dedupe_text(result.get("warnings", [])) + return result + + def _build_api_test_case(self, functional_case: FunctionalCaseInput, api: ApiContextItem, index: int, score: float, options: Dict[str, Any]) -> Dict[str, Any]: + method = (api.method or self.matcher.detect_intent(functional_case)).upper() + query_params = copy.deepcopy(api.queryParams or {}) + body = copy.deepcopy(api.requestBody) + headers = copy.deepcopy(api.headers or {}) + if not headers: + headers = {"Content-Type": "application/json"} + if not any(key.lower() == "authorization" for key in headers): + headers["Authorization"] = "Bearer ${token}" + + variables = self._build_variables(functional_case, api, query_params, body) + query_params = self._fill_placeholders(query_params, variables) + body = self._fill_placeholders(body, variables) + + assertions = self._build_assertions(functional_case, api, method) + performance_assertions = self._build_performance_assertions(functional_case) + step_name = api.apiName or functional_case.caseName or "接口自动化步骤" + + return { + "stepNo": index, + "name": step_name, + "apiName": api.apiName or step_name, + "method": method, + "url": api.url or self._infer_placeholder_url(functional_case), + "headers": headers, + "queryParams": query_params, + "body": body, + "extractVariables": [], + "assertions": assertions, + "performanceAssertions": performance_assertions, + "variables": variables, + "matchedApi": { + "apiName": api.apiName, + "method": method, + "url": api.url, + "matchScore": round(score, 4), + }, + } + + def _build_fallback_test_case(self, functional_case: FunctionalCaseInput, options: Dict[str, Any]) -> Dict[str, Any]: + method = self.matcher.detect_intent(functional_case) + url = self._infer_placeholder_url(functional_case) + variables = self._build_variables(functional_case, None, {}, None) + return { + "stepNo": 1, + "name": functional_case.caseName or "功能用例转接口自动化", + "apiName": functional_case.caseName or "待确认接口", + "method": method, + "url": url, + "headers": { + "Content-Type": "application/json", + "Authorization": "Bearer ${token}", + }, + "queryParams": {}, + "body": None, + "extractVariables": [], + "assertions": self._build_assertions(functional_case, None, method), + "performanceAssertions": self._build_performance_assertions(functional_case), + "variables": variables, + "matchedApi": { + "apiName": "", + "method": method, + "url": url, + "matchScore": 0, + }, + } + + def _build_variables(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], query_params: Dict[str, Any], body: Any) -> Dict[str, Any]: + variables: Dict[str, Any] = {} + text = self.matcher._all_text(functional_case) + for keyword in self.matcher.extract_keywords(functional_case): + if keyword in ("搜索", "查询", "列表", "筛选", "关键字", "模糊搜索"): + variables.setdefault("keyword", "测试") + if keyword in ("分页", "分页查询"): + variables.setdefault("page", 1) + variables.setdefault("limit", DEFAULT_PAGE_SIZE) + if keyword in ("登录",): + variables.setdefault("token", "${token}") + if not variables and text: + variables["keyword"] = "测试" + if query_params: + for key in query_params.keys(): + if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]): + variables[key] = query_params[key] + if isinstance(body, dict): + for key in body.keys(): + if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]): + variables[key] = body[key] + return variables + + def _build_assertions(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], method: str) -> List[Dict[str, Any]]: + assertions = [ + { + "type": "statusCode", + "actual": "$.code" if api or method != "DELETE" else "$.status", + "operator": "==", + "expected": 200, + } + ] + text = self.matcher._all_text(functional_case) + if api and isinstance(api.responseExample, dict): + example = api.responseExample + if "code" in example: + assertions.append( + { + "type": "jsonPath", + "actual": "$.code", + "operator": "==", + "expected": example.get("code", 0), + } + ) + if "msg" in example: + assertions.append( + { + "type": "jsonPath", + "actual": "$.msg", + "operator": "notEmpty", + "expected": True, + } + ) + + if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选", "返回"]): + assertions.append( + { + "type": "jsonPath", + "actual": "$.data", + "operator": "notEmpty", + "expected": True, + } + ) + return assertions + + def _build_performance_assertions(self, functional_case: FunctionalCaseInput) -> List[Dict[str, Any]]: + text = self.matcher._all_text(functional_case) + expected_ms = self._extract_response_time_ms(text) + if expected_ms is None: + return [] + return [ + { + "type": "responseTime", + "operator": "<=", + "expected": expected_ms, + "unit": "ms", + } + ] + + def _collect_missing_info(self, functional_case: FunctionalCaseInput) -> List[str]: + missing = [] + if not functional_case.steps: + missing.append("缺少步骤信息") + if not functional_case.caseName: + missing.append("缺少用例名称") + if not functional_case.apiContext: + missing.append("未传入真实接口信息,已使用占位接口生成") + if self._extract_response_time_ms(self.matcher._all_text(functional_case)) is None and any(k in self.matcher._all_text(functional_case) for k in ["性能", "响应时间", "超时"]): + missing.append("性能阈值未明确,默认使用 1000ms") + return missing + + @staticmethod + def _extract_response_time_ms(text: str) -> Optional[int]: + if not text: + return None + patterns = [ + r"(\d+)\s*毫秒", + r"(\d+)\s*ms", + r"(\d+)\s*秒", + r"不超过\s*(\d+)\s*秒", + r"<=\s*(\d+)\s*秒", + r"小于等于\s*(\d+)\s*秒", + ] + for pattern in patterns: + match = re.search(pattern, text, flags=re.I) + if match: + value = int(match.group(1)) + if "秒" in pattern: + return value * 1000 + return value + if any(keyword in text for keyword in ["响应时间", "超时", "性能"]): + return DEFAULT_RESPONSE_TIME_MS + return None + + @staticmethod + def _infer_placeholder_url(functional_case: FunctionalCaseInput) -> str: + text = " ".join([functional_case.caseName, functional_case.moduleName] + [step.action for step in functional_case.steps]) + if any(keyword in text for keyword in ["登录"]): + return "/api/login" + if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选"]): + return "/api/list/query" + if any(keyword in text for keyword in ["新增", "创建", "添加"]): + return "/api/create" + if any(keyword in text for keyword in ["修改", "编辑", "更新"]): + return "/api/update" + if any(keyword in text for keyword in ["删除", "移除"]): + return "/api/delete" + return DEFAULT_PLACEHOLDER_URL + + @staticmethod + def _fill_placeholders(value: Any, variables: Dict[str, Any]) -> Any: + if isinstance(value, dict): + return {key: FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for key, item in value.items()} + if isinstance(value, list): + return [FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for item in value] + if isinstance(value, str): + result = value + for key, variable in variables.items(): + result = result.replace("${" + key + "}", str(variable)) + return result + return value + + @staticmethod + def _dedupe_text(items: Iterable[str]) -> List[str]: + seen = set() + result = [] + for item in items: + text = str(item).strip() + if not text or text in seen: + continue + seen.add(text) + result.append(text) + return result + + +def convert_functional_case(payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]: + return FunctionalCaseToApiAutomationService().convert(payload, llm_client=llm_client) + + +def load_payload_from_file(file_path: str) -> Dict[str, Any]: + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) + + +def main(): + parser = argparse.ArgumentParser(description="功能测试用例转接口自动化用例") + parser.add_argument("--input", type=str, help="输入JSON文件路径,未指定则从stdin读取") + parser.add_argument("--output", type=str, help="输出JSON文件路径,未指定则打印到stdout") + args = parser.parse_args() + + if args.input: + payload = load_payload_from_file(args.input) + else: + payload = json.load(os.sys.stdin) + + result = convert_functional_case(payload) + + output = json.dumps(result, ensure_ascii=False, indent=2) + if args.output: + with open(args.output, "w", encoding="utf-8") as file: + file.write(output) + else: + print(output) + + +if __name__ == "__main__": + main() diff --git a/joyhub_backend/library/hubops_parser.py b/joyhub_backend/library/hubops_parser.py new file mode 100644 index 0000000..04d1a82 --- /dev/null +++ b/joyhub_backend/library/hubops_parser.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +import ast +import json +import os +import re + + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +DEFAULT_HUBOPS_PATH = os.path.join(PROJECT_ROOT, "HubOps.md") + + +class HubOpsParser(object): + def __init__(self, file_path=None): + self.file_path = file_path or os.getenv("JOYHUB_DOC_PATH", DEFAULT_HUBOPS_PATH) + + def parse(self): + with open(self.file_path, "r", encoding="utf-8") as file: + lines = file.read().splitlines() + + cases = [] + headings = [] + for index, line in enumerate(lines): + heading = self._parse_heading(line) + if heading: + level, title = heading + headings = [item for item in headings if item[0] < level] + headings.append((level, title)) + continue + + if line.strip() != "**接口URL**": + continue + + case = self._parse_case(lines, index, headings) + if case.get("url"): + case["case_id"] = "joyhub_{:04d}".format(len(cases) + 1) + cases.append(case) + return cases + + def _parse_case(self, lines, url_index, headings): + title = self._case_title(headings, url_index) + url = self._next_quote_value(lines, url_index) + method = self._find_section_quote(lines, url_index, "**请求方式**") or "POST" + content_type = self._find_section_quote(lines, url_index, "**Content-Type**") or "json" + body_text = self._find_code_block(lines, url_index, "**请求Body参数**") + query = self._find_param_table(lines, url_index, "**请求Query参数**") + headers = self._find_headers(lines, url_index) + body = self._parse_body(body_text) + return { + "name": title, + "method": method.upper(), + "url": url, + "content_type": content_type, + "headers": headers, + "query": query, + "body": body, + "raw_body": body_text, + } + + @staticmethod + def _parse_heading(line): + match = re.match(r"^(#{2,6})\s+(.+?)\s*$", line) + if not match: + return None + return len(match.group(1)), match.group(2).strip() + + @staticmethod + def _case_title(headings, url_index): + if headings: + return " / ".join(title for _, title in headings[-3:]) + return "HubOps接口{}".format(url_index + 1) + + @staticmethod + def _next_quote_value(lines, start): + for index in range(start + 1, min(start + 8, len(lines))): + line = lines[index].strip() + if line.startswith(">"): + value = line[1:].strip() + if value and value != "暂无参数": + return value + return "" + + @staticmethod + def _section_end(lines, start): + for index in range(start + 1, len(lines)): + line = lines[index].strip() + if line.startswith("## ") or line.startswith("### ") or line == "**接口URL**": + return index + return len(lines) + + def _find_section_quote(self, lines, url_index, section_name): + start = max(0, url_index - 80) + end = self._section_end(lines, url_index) + for index in range(start, min(end, len(lines))): + if lines[index].strip() == section_name: + return self._next_quote_value(lines, index) + return None + + def _find_code_block(self, lines, url_index, section_name): + end = self._section_end(lines, url_index) + for index in range(url_index, end): + if lines[index].strip() != section_name: + continue + for code_start in range(index + 1, end): + if lines[code_start].strip().startswith("```"): + block = [] + for code_end in range(code_start + 1, end): + if lines[code_end].strip().startswith("```"): + return "\n".join(block).strip() + block.append(lines[code_end]) + return "" + + def _find_param_table(self, lines, url_index, section_name): + end = self._section_end(lines, url_index) + for index in range(url_index, end): + if lines[index].strip() != section_name: + continue + params = {} + for row_index in range(index + 1, end): + row = lines[row_index].strip() + if not row.startswith("|"): + if params: + break + continue + if "---" in row or "参数名" in row or "暂无参数" in row: + continue + columns = [column.strip() for column in row.strip("|").split("|")] + if len(columns) >= 2 and columns[0]: + params[columns[0]] = self._coerce_value(columns[1]) + return params + return {} + + def _find_headers(self, lines, url_index): + headers = {} + end = self._section_end(lines, url_index) + for index in range(url_index, end): + if lines[index].strip() != "**请求Header参数**": + continue + table_started = False + for row_index in range(index + 1, end): + row = lines[row_index].strip() + if row.startswith("**") or row.startswith("#") or row.startswith("*"): + break + if not row.startswith("|"): + if table_started: + break + continue + table_started = True + if "---" in row or "参数名" in row or "暂无参数" in row: + continue + columns = [column.strip() for column in row.strip("|").split("|")] + if len(columns) >= 2 and columns[0] and columns[0] != "Authorization": + headers[columns[0]] = columns[1] + return headers + return headers + + @classmethod + def _parse_body(cls, body_text): + if not body_text or body_text == "暂无数据": + return {} + text = cls._clean_body(body_text) + for loader in (json.loads, ast.literal_eval): + try: + value = loader(text) + return value if isinstance(value, (dict, list)) else {} + except Exception: + pass + return {} + + @staticmethod + def _clean_body(text): + text = re.sub(r"//.*", "", text) + text = re.sub(r"/\*.*?\*/", "", text, flags=re.S) + text = text.replace("{{token}}", "") + text = re.sub(r",\s*([}\]])", r"\1", text) + return text.strip() + + @staticmethod + def _coerce_value(value): + if value in ("-", "暂无参数", ""): + return "" + if value.isdigit(): + return int(value) + if value.lower() in ("true", "false"): + return value.lower() == "true" + return value + + +def load_hubops_cases(): + return HubOpsParser().parse() diff --git a/joyhub_backend/library/joyhub_interface.py b/joyhub_backend/library/joyhub_interface.py new file mode 100644 index 0000000..3fea861 --- /dev/null +++ b/joyhub_backend/library/joyhub_interface.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +import json +import logging +import os +from urllib.parse import urljoin + +import allure +import requests + +from joyhub_backend.library.auth import JoyhubAuth, TIMEOUT + + +MANAGER_BASE_URL = os.getenv( + "JOYHUB_MANAGER_BASE_URL", + "http://test-manager-api.best-envision.com", +) + + +class JoyhubInterface(object): + def __init__(self): + self.auth = JoyhubAuth() + self.session = requests.Session() + self.base_url = MANAGER_BASE_URL.rstrip("/") + "/" + + def request(self, case_name, method, path, body=None, query=None, headers=None, expected_code=0): + url = path if path.startswith("http") else urljoin(self.base_url, path.lstrip("/")) + request_headers = self.auth.auth_headers() + if headers: + request_headers.update({key: value for key, value in headers.items() if key.lower() != "authorization"}) + + with allure.step("操作步骤:{}".format(case_name)): + self._attach_request(url, method, request_headers, body, query) + logging.info("case: %s", case_name) + logging.info("request url: %s", url) + logging.info("request method: %s", method) + logging.info("request headers: %s", request_headers) + logging.info("request body: %s", body) + logging.info("request query: %s", query) + + request_kwargs = { + "method": method, + "url": url, + "params": query, + "headers": request_headers, + "timeout": TIMEOUT, + } + if method.upper() in ("POST", "PUT", "PATCH"): + request_kwargs["json"] = body + elif body: + request_kwargs["params"] = dict(query or {}, **body) if isinstance(body, dict) else query + response = self.session.request(**request_kwargs) + self._attach_response(response) + self._attach_log(case_name, url, method, request_headers, body, query, response) + logging.info("response status: %s", response.status_code) + logging.info("response body: %s", response.text) + + is_business_api = self._is_business_api(url) + assertion_text = "HTTP状态码为200,且业务code为{}".format(expected_code) if is_business_api else "HTTP状态码为2xx,兼容非JSON/SSE响应" + allure.attach(assertion_text, "断言内容", allure.attachment_type.TEXT) + with allure.step("断言内容:{}".format(assertion_text)): + try: + if is_business_api: + assert response.status_code == 200, "HTTP状态码期望200,实际{},响应{}".format( + response.status_code, response.text + ) + data = self._safe_json(response) + assert isinstance(data, dict), "业务接口响应不是JSON对象,响应{}".format(response.text) + assert "code" in data, "响应缺少 code 字段,响应{}".format(data) + assert data.get("code") == expected_code, "业务code期望{},实际{},msg={},响应{}".format( + expected_code, data.get("code"), data.get("msg"), data + ) + else: + assert 200 <= response.status_code < 300, "HTTP状态码期望2xx,实际{},响应{}".format( + response.status_code, response.text + ) + data = self._non_json_result(response) + allure.attach("无", "断言失败原因", allure.attachment_type.TEXT) + return data + except AssertionError as error: + allure.attach(str(error), "断言失败原因", allure.attachment_type.TEXT) + raise AssertionError("断言失败原因:{}".format(error)) + + @staticmethod + def _attach_request(url, method, headers, body, query): + allure.attach(str(url), "请求url", allure.attachment_type.TEXT) + allure.attach(str(method), "请求方式", allure.attachment_type.TEXT) + allure.attach(json.dumps(headers, ensure_ascii=False, indent=2), "请求头", allure.attachment_type.JSON) + allure.attach(json.dumps(query or {}, ensure_ascii=False, indent=2), "请求query", allure.attachment_type.JSON) + allure.attach(json.dumps(body or {}, ensure_ascii=False, indent=2), "请求体", allure.attachment_type.JSON) + + @staticmethod + def _attach_response(response): + allure.attach(str(response.status_code), "响应状态码", allure.attachment_type.TEXT) + content_type = response.headers.get("Content-Type", "") + attachment_type = allure.attachment_type.JSON if "json" in content_type.lower() else allure.attachment_type.TEXT + allure.attach(response.text, "响应体/日志", attachment_type) + + def _is_business_api(self, url): + return url.startswith(self.base_url) + + @staticmethod + def _safe_json(response): + if not response.text.strip(): + return None + return response.json() + + @staticmethod + def _non_json_result(response): + try: + data = response.json() + except ValueError: + data = { + "status_code": response.status_code, + "content_type": response.headers.get("Content-Type", ""), + "text": response.text, + } + return data + + @staticmethod + def _attach_log(case_name, url, method, headers, body, query, response): + log_text = "\n".join([ + "case: {}".format(case_name), + "request url: {}".format(url), + "request method: {}".format(method), + "request headers: {}".format(headers), + "request query: {}".format(query or {}), + "request body: {}".format(body or {}), + "response status: {}".format(response.status_code), + "response body: {}".format(response.text), + ]) + allure.attach(log_text, "日志", allure.attachment_type.TEXT) diff --git a/joyhub_backend/test_case/__init__.py b/joyhub_backend/test_case/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/joyhub_backend/test_case/run_tests.py b/joyhub_backend/test_case/run_tests.py new file mode 100644 index 0000000..84f9916 --- /dev/null +++ b/joyhub_backend/test_case/run_tests.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +import argparse +import os +import shutil +import subprocess +import sys + +current_file_path = os.path.abspath(__file__) +project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../')) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +TEST_CASE_DIR = 'joyhub_backend/test_case/TestCase' +case_dir = os.path.join(project_root, TEST_CASE_DIR) +ALLURE_RESULTS_DIR = os.path.join(project_root, 'joyhub_backend', 'test_case', 'reports', 'allure-results') +ALLURE_REPORT_DIR = os.path.join(project_root, 'joyhub_backend', 'test_case', 'reports', 'allure-report') + + +def ensure_dirs(): + os.makedirs(ALLURE_RESULTS_DIR, exist_ok=True) + os.makedirs(ALLURE_REPORT_DIR, exist_ok=True) + + +def clean_allure_results(): + if os.path.exists(ALLURE_RESULTS_DIR): + shutil.rmtree(ALLURE_RESULTS_DIR) + os.makedirs(ALLURE_RESULTS_DIR, exist_ok=True) + + +def find_test_files(directory): + test_files = [] + for root, dirs, files in os.walk(directory): + for file in files: + if file.endswith('.py') and not file.startswith('__') and file != 'conftest.py': + test_files.append(os.path.join(root, file)) + return test_files + + +def run_pytest(args_list): + env = os.environ.copy() + env['PYTHONPATH'] = project_root + (os.pathsep + env['PYTHONPATH'] if 'PYTHONPATH' in env else '') + cmd = ['python', '-m', 'pytest'] + args_list + print("开始执行pytest...") + print("执行命令: {}".format(' '.join('"{}"'.format(item) if ' ' in item else item for item in cmd)), flush=True) + result = subprocess.run(cmd, cwd=project_root, env=env) + print("pytest执行结束,退出码: {}".format(result.returncode), flush=True) + return result.returncode + + +def run_tests(target=None, test_type='all'): + base_args = ['-v', '--tb=short', '--alluredir={}'.format(ALLURE_RESULTS_DIR)] + if test_type == 'all': + print("运行所有测试用例...") + test_files = find_test_files(case_dir) + if not test_files: + print("错误: 未找到测试文件") + return 1 + args = test_files + base_args + elif test_type == 'dir': + full_path = os.path.join(case_dir, target.replace('/', os.sep).replace('\\', os.sep)) + if not os.path.exists(full_path): + print("错误: 目录不存在: {}".format(full_path)) + return 1 + print("按目录运行: {}".format(target)) + test_files = find_test_files(full_path) + if not test_files: + print("错误: 未找到测试文件") + return 1 + args = test_files + base_args + elif test_type == 'file': + full_path = os.path.join(case_dir, target.replace('/', os.sep).replace('\\', os.sep)) + if not os.path.exists(full_path): + print("错误: 文件不存在: {}".format(full_path)) + return 1 + print("按文件运行: {}".format(target)) + args = [full_path] + base_args + elif test_type == 'keyword': + print("按关键字运行: {}".format(target)) + test_files = find_test_files(case_dir) + args = test_files + ['-k={}'.format(target)] + base_args + elif test_type == 'marker': + print("按pytest标记运行: {}".format(target)) + test_files = find_test_files(case_dir) + args = test_files + ['-m={}'.format(target)] + base_args + elif test_type == 'feature': + print("按Allure feature运行: {}".format(target)) + test_files = find_test_files(case_dir) + args = test_files + ['--allure-features={}'.format(target)] + base_args + elif test_type == 'story': + print("按Allure story运行: {}".format(target)) + test_files = find_test_files(case_dir) + args = test_files + ['--allure-stories={}'.format(target)] + base_args + else: + print("错误: 未知的测试类型: {}".format(test_type)) + return 1 + return run_pytest(args) + + +def generate_allure_report(): + print("开始生成Allure报告...", flush=True) + if os.path.exists(ALLURE_REPORT_DIR): + shutil.rmtree(ALLURE_REPORT_DIR) + cmd = 'allure generate "{}" --output "{}"'.format(ALLURE_RESULTS_DIR, ALLURE_REPORT_DIR) + print("执行命令: {}".format(cmd), flush=True) + try: + subprocess.run(cmd, check=True, shell=True) + print("Allure报告生成成功: {}".format(ALLURE_REPORT_DIR)) + print("打开报告命令: allure open \"{}\"".format(ALLURE_REPORT_DIR)) + return 0 + except (subprocess.CalledProcessError, FileNotFoundError, OSError) as error: + print("生成Allure报告失败: {}".format(error)) + print("手动执行: {}".format(cmd)) + return 1 + + +def open_allure_report(): + cmd = 'allure open "{}"'.format(ALLURE_REPORT_DIR) + try: + subprocess.Popen(cmd, shell=True) + print("Allure报告已打开: {}".format(ALLURE_REPORT_DIR)) + return 0 + except (FileNotFoundError, OSError) as error: + print("打开Allure报告失败: {}".format(error)) + return 1 + + +def main(): + parser = argparse.ArgumentParser(description='JoyHub Backend 接口自动化测试执行工具') + run_group = parser.add_mutually_exclusive_group(required=False) + run_group.add_argument('--feature', type=str, help='按Allure feature运行') + run_group.add_argument('--story', type=str, help='按Allure story运行') + run_group.add_argument('--dir', type=str, help='按目录运行(相对于TestCase目录)') + run_group.add_argument('--file', type=str, help='按文件运行(相对于TestCase目录)') + run_group.add_argument('--keyword', type=str, help='按关键字运行') + run_group.add_argument('--marker', type=str, help='按pytest标记运行') + parser.add_argument('--report', action='store_true', help='生成Allure报告') + parser.add_argument('--open', action='store_true', help='打开Allure报告') + parser.add_argument('--no-report', action='store_true', help='不生成Allure报告') + args = parser.parse_args() + + ensure_dirs() + clean_allure_results() + if args.feature: + exit_code = run_tests(args.feature, 'feature') + elif args.story: + exit_code = run_tests(args.story, 'story') + elif args.dir: + exit_code = run_tests(args.dir, 'dir') + elif args.file: + exit_code = run_tests(args.file, 'file') + elif args.keyword: + exit_code = run_tests(args.keyword, 'keyword') + elif args.marker: + exit_code = run_tests(args.marker, 'marker') + else: + exit_code = run_tests() + + if args.report or not args.no_report: + generate_allure_report() + if args.open: + open_allure_report() + + print("=" * 80) + print("测试执行完成" if exit_code == 0 else "测试执行失败,退出码: {}".format(exit_code)) + print("=" * 80) + sys.exit(exit_code) + + +if __name__ == '__main__': + main()