# -*- coding: utf-8 -*- import argparse import copy import json import os import re from dataclasses import asdict, dataclass, field from difflib import SequenceMatcher from typing import Any, Callable, Dict, Iterable, List, Optional, Union COMMON_CN_KEYWORDS = [ "登录", "查询", "搜索", "筛选", "列表", "详情", "新增", "创建", "添加", "修改", "编辑", "更新", "删除", "导出", "导入", "上传", "下载", "提交", "审核", "确认", "保存", "分页", "分页查询", "性能", "响应时间", "超时", "返回", "结果", "成功", "失败", "校验", "重置", "启用", "停用", "开关", "状态", "排序", "标签", "分类", "关键字", "关键字搜索", "模糊搜索", ] METHOD_INTENT_KEYWORDS = { "GET": ["查询", "搜索", "筛选", "列表", "详情", "获取", "查看", "分页", "返回"], "POST": ["新增", "创建", "添加", "登录", "提交", "导入", "上传", "确认", "审核", "保存"], "PUT": ["修改", "编辑", "更新", "重置"], "PATCH": ["修改", "编辑", "更新", "状态", "启用", "停用", "开关"], "DELETE": ["删除", "移除"], } DEFAULT_PLACEHOLDER_URL = "/api/need-confirm" DEFAULT_PAGE_SIZE = 10 DEFAULT_RESPONSE_TIME_MS = 1000 @dataclass class FunctionalStep: stepNo: int action: str expectedResult: str = "" @dataclass class ApiContextItem: apiName: str = "" method: str = "" url: str = "" description: str = "" headers: Dict[str, Any] = field(default_factory=dict) queryParams: Dict[str, Any] = field(default_factory=dict) requestBody: Any = None responseExample: Any = None tags: List[str] = field(default_factory=list) @dataclass class FunctionalCaseInput: caseId: str = "" caseKey: str = "" caseName: str = "" projectName: str = "" moduleName: str = "" priority: str = "" preconditions: str = "" steps: List[FunctionalStep] = field(default_factory=list) expectedResults: List[str] = field(default_factory=list) apiContext: Optional[List[ApiContextItem]] = None generateOptions: Dict[str, Any] = field(default_factory=dict) extra: Dict[str, Any] = field(default_factory=dict) class FunctionalCaseParser: @classmethod def parse(cls, payload: Union[str, Dict[str, Any], FunctionalCaseInput]) -> FunctionalCaseInput: if isinstance(payload, FunctionalCaseInput): return payload if isinstance(payload, str): try: payload = json.loads(payload) except Exception: payload = { "caseName": "手工输入功能用例", "steps": [payload], "expectedResults": [], } if not isinstance(payload, dict): raise TypeError("functional case payload must be dict, json string or FunctionalCaseInput") steps = cls._normalize_steps(payload.get("steps") or payload.get("step") or []) expected_results = cls._normalize_text_list( payload.get("expectedResults") or payload.get("expectedResult") or payload.get("expectations") or [] ) api_context = cls._normalize_api_context(payload.get("apiContext") or payload.get("apis") or payload.get("apiInfo")) generate_options = payload.get("generateOptions") or payload.get("options") or {} known_keys = { "caseId", "caseKey", "caseName", "projectName", "moduleName", "priority", "preconditions", "steps", "step", "expectedResults", "expectedResult", "expectations", "apiContext", "apis", "apiInfo", "generateOptions", "options", } extra = {key: value for key, value in payload.items() if key not in known_keys} return FunctionalCaseInput( caseId=str(payload.get("caseId", "") or ""), caseKey=str(payload.get("caseKey", "") or ""), caseName=str(payload.get("caseName", "") or payload.get("name", "") or ""), projectName=str(payload.get("projectName", "") or ""), moduleName=str(payload.get("moduleName", "") or ""), priority=str(payload.get("priority", "") or ""), preconditions=str(payload.get("preconditions", "") or payload.get("precondition", "") or ""), steps=steps, expectedResults=expected_results, apiContext=api_context, generateOptions=generate_options if isinstance(generate_options, dict) else {}, extra=extra, ) @staticmethod def _normalize_steps(value: Any) -> List[FunctionalStep]: if not value: return [] if isinstance(value, str): raw_items = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()] if len(raw_items) <= 1: raw_items = [item.strip() for item in re.split(r"(?<=[。;;])", value) if item.strip()] items = raw_items elif isinstance(value, list): items = value else: items = [value] normalized: List[FunctionalStep] = [] for index, item in enumerate(items, start=1): if isinstance(item, dict): step_no = int(item.get("stepNo") or item.get("step_no") or item.get("no") or index) action = str(item.get("action") or item.get("step") or item.get("content") or item.get("description") or "") expected_result = str(item.get("expectedResult") or item.get("expected") or "") else: raw_text = str(item).strip() step_no = index action = re.sub(r"^\s*\d+[\.、\)]\s*", "", raw_text) expected_result = "" if action: normalized.append(FunctionalStep(stepNo=step_no, action=action, expectedResult=expected_result)) return normalized @staticmethod def _normalize_text_list(value: Any) -> List[str]: if not value: return [] if isinstance(value, str): parts = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()] if len(parts) <= 1: parts = [item.strip() for item in re.split(r"[。;;]\s*", value) if item.strip()] return [item for item in parts if item] if isinstance(value, list): result = [] for item in value: if isinstance(item, dict): text = str(item.get("text") or item.get("content") or item.get("expected") or "").strip() else: text = str(item).strip() if text: result.append(text) return result return [str(value).strip()] if str(value).strip() else [] @staticmethod def _normalize_api_context(value: Any) -> Optional[List[ApiContextItem]]: if not value: return None if isinstance(value, dict): if "apis" in value and isinstance(value["apis"], list): value = value["apis"] elif "items" in value and isinstance(value["items"], list): value = value["items"] else: value = [value] if isinstance(value, str): value = value.strip() if not value: return None try: parsed = json.loads(value) return FunctionalCaseParser._normalize_api_context(parsed) except Exception: return FunctionalCaseParser._parse_markdown_api_context(value) if not isinstance(value, list): return None apis: List[ApiContextItem] = [] for item in value: if isinstance(item, ApiContextItem): apis.append(item) continue if not isinstance(item, dict): continue headers = item.get("headers") or item.get("header") or {} query_params = item.get("queryParams") or item.get("query") or item.get("params") or {} tags = item.get("tags") or [] if isinstance(tags, str): tags = [tag.strip() for tag in re.split(r"[\s,,/|]+", tags) if tag.strip()] apis.append( ApiContextItem( apiName=str(item.get("apiName") or item.get("name") or ""), method=str(item.get("method") or item.get("httpMethod") or "").upper(), url=str(item.get("url") or item.get("path") or ""), description=str(item.get("description") or item.get("desc") or ""), headers=headers if isinstance(headers, dict) else {}, queryParams=query_params if isinstance(query_params, dict) else {}, requestBody=item.get("requestBody") if "requestBody" in item else item.get("body"), responseExample=item.get("responseExample") or item.get("response") or item.get("example"), tags=tags if isinstance(tags, list) else [], ) ) return apis or None @staticmethod def _parse_markdown_api_context(text: str) -> Optional[List[ApiContextItem]]: lines = text.splitlines() apis: List[ApiContextItem] = [] current: Dict[str, Any] = {} in_body = False body_lines: List[str] = [] def flush_current(): nonlocal current, body_lines, in_body if not current: return body_text = "\n".join(body_lines).strip() request_body = FunctionalCaseParser._safe_json_load(body_text) apis.append( ApiContextItem( apiName=current.get("apiName", ""), method=current.get("method", ""), url=current.get("url", ""), description=current.get("description", ""), headers=current.get("headers", {}), queryParams=current.get("queryParams", {}), requestBody=request_body if request_body is not None else body_text, responseExample=current.get("responseExample"), tags=current.get("tags", []), ) ) current = {} body_lines = [] in_body = False for line in lines: stripped = line.strip() if stripped.startswith("## "): flush_current() current["apiName"] = stripped[3:].strip() continue if stripped == "**接口URL**": continue if stripped.startswith("> ") and not current.get("url"): value = stripped[2:].strip() if value and value != "暂无参数": current["url"] = value continue if stripped == "**请求方式**": continue if stripped == "**请求Body参数**": in_body = True body_lines = [] continue if stripped == "**响应示例**": in_body = False continue if in_body: if stripped.startswith("```"): continue body_lines.append(line) flush_current() return apis or None @staticmethod def _safe_json_load(text: str) -> Any: if not text: return None cleaned = text.strip() if cleaned.startswith("```"): cleaned = re.sub(r"^```[a-zA-Z0-9_-]*\s*", "", cleaned) cleaned = re.sub(r"\s*```$", "", cleaned) try: return json.loads(cleaned) except Exception: return None class ApiMatcher: def match(self, functional_case: FunctionalCaseInput, candidates: List[ApiContextItem], top_k: int = 3) -> List[Dict[str, Any]]: matched: List[Dict[str, Any]] = [] keywords = self.extract_keywords(functional_case) intent = self.detect_intent(functional_case) for api in candidates: score = self._score_api(api, keywords, intent, functional_case) if score <= 0: continue matched.append( { "score": round(score, 4), "api": api, "reason": self._build_reason(api, keywords, intent), } ) matched.sort(key=lambda item: item["score"], reverse=True) return matched[:top_k] def extract_keywords(self, functional_case: FunctionalCaseInput) -> List[str]: texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions] texts.extend(step.action for step in functional_case.steps) texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult) texts.extend(functional_case.expectedResults) texts.extend(str(value) for value in functional_case.extra.values() if value is not None) full_text = " \n ".join(texts) keywords: List[str] = [] for keyword in COMMON_CN_KEYWORDS: if keyword in full_text and keyword not in keywords: keywords.append(keyword) english_tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]{1,}", full_text) for token in english_tokens: if token not in keywords: keywords.append(token) generic_tokens = re.findall(r"[\u4e00-\u9fa5]{2,}", full_text) for token in generic_tokens: if len(token) <= 2: if token not in keywords: keywords.append(token) continue if token not in keywords: keywords.append(token) return self._dedupe(keywords) def detect_intent(self, functional_case: FunctionalCaseInput) -> str: text = self._all_text(functional_case) for method, keywords in METHOD_INTENT_KEYWORDS.items(): if any(keyword in text for keyword in keywords): return method return "GET" def _score_api(self, api: ApiContextItem, keywords: List[str], intent: str, functional_case: FunctionalCaseInput) -> float: score = 0.0 api_text = " ".join( [ api.apiName or "", api.method or "", api.url or "", api.description or "", " ".join(api.tags or []), " ".join(api.headers.keys()), " ".join(api.queryParams.keys()), ] ) if api.method and api.method.upper() == intent: score += 1.2 elif api.method and api.method.upper() in ("GET", "POST", "PUT", "PATCH", "DELETE"): score += 0.2 for keyword in keywords: if keyword and keyword in api_text: score += 0.8 for token in self._url_tokens(api.url): if token and any(keyword in token or token in keyword for keyword in keywords): score += 0.6 if functional_case.moduleName and functional_case.moduleName in api_text: score += 0.8 if functional_case.caseName and functional_case.caseName in api_text: score += 0.5 ratio = SequenceMatcher(None, self._all_text(functional_case), api_text).ratio() score += ratio * 0.8 return score @staticmethod def _build_reason(api: ApiContextItem, keywords: List[str], intent: str) -> str: matched_keywords = [keyword for keyword in keywords if keyword and keyword in (api.apiName + " " + api.description + " " + api.url)] parts = [] if api.method and api.method.upper() == intent: parts.append("方法匹配") if matched_keywords: parts.append("关键词命中:{}".format("、".join(matched_keywords[:5]))) return "; ".join(parts) or "模糊匹配" @staticmethod def _url_tokens(url: str) -> List[str]: if not url: return [] return [token for token in re.split(r"[/?&=_\-\.]+", url) if token] @staticmethod def _dedupe(items: Iterable[str]) -> List[str]: seen = set() result = [] for item in items: if item and item not in seen: seen.add(item) result.append(item) return result @staticmethod def _all_text(functional_case: FunctionalCaseInput) -> str: texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions] texts.extend(step.action for step in functional_case.steps) texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult) texts.extend(functional_case.expectedResults) return " \n ".join(filter(None, texts)) class LLMApiCaseConverter: def convert(self, prompt: str, llm_client: Any = None) -> Optional[Dict[str, Any]]: if llm_client is None: return None raw = self._invoke_client(llm_client, prompt) if raw is None: return None if isinstance(raw, dict): return raw if hasattr(raw, "content"): raw = raw.content elif hasattr(raw, "text"): raw = raw.text elif hasattr(raw, "data"): raw = raw.data if isinstance(raw, dict): return raw if not isinstance(raw, str): raw = str(raw) return self._extract_json(raw) @staticmethod def _invoke_client(llm_client: Any, prompt: str) -> Any: if callable(llm_client): try: return llm_client(prompt=prompt) except TypeError: try: return llm_client(prompt) except TypeError: return llm_client({"prompt": prompt}) if hasattr(llm_client, "generate") and callable(llm_client.generate): return llm_client.generate(prompt) if hasattr(llm_client, "chat") and callable(llm_client.chat): return llm_client.chat(prompt) raise TypeError("llm_client must be callable or expose generate/chat") @staticmethod def _extract_json(text: str) -> Optional[Dict[str, Any]]: if not text: return None cleaned = text.strip() match = re.search(r"```json\s*(\{.*?\})\s*```", cleaned, flags=re.S) if match: cleaned = match.group(1) else: first = cleaned.find("{") last = cleaned.rfind("}") if first >= 0 and last > first: cleaned = cleaned[first : last + 1] try: parsed = json.loads(cleaned) return parsed if isinstance(parsed, dict) else {"data": parsed} except Exception: return None class FunctionalCaseToApiAutomationService: def __init__(self, matcher: Optional[ApiMatcher] = None): self.matcher = matcher or ApiMatcher() self.llm_converter = LLMApiCaseConverter() def convert(self, payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]: functional_case = FunctionalCaseParser.parse(payload) options = self._build_options(functional_case) candidates = functional_case.apiContext or [] matched = self.matcher.match(functional_case, candidates, top_k=int(options.get("topK", 3))) if candidates else [] llm_result = None if options.get("useLLM", True) and llm_client is not None: prompt = self.build_prompt(functional_case, matched, options) llm_result = self.llm_converter.convert(prompt, llm_client) if llm_result: return self._finalize_llm_result(functional_case, matched, llm_result, options) return self._build_rule_result(functional_case, matched, options) def build_prompt(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> str: payload = { "functionalCase": asdict(functional_case), "matchedApis": [ { "score": item["score"], "reason": item["reason"], "api": asdict(item["api"]), } for item in matched ], "options": options, } return ( "你是接口自动化测试用例生成专家。\n" "请把功能测试用例转换成接口自动化测试用例。\n" "要求:\n" "1. 只输出严格 JSON。\n" "2. 如果给了候选接口信息,优先使用候选接口。\n" "3. 无法确认的接口信息写入 missingInfo。\n" "4. 需要包含 method、url、headers、queryParams、body、assertions、performanceAssertions。\n" "5. 如果涉及性能要求,生成 responseTime 断言。\n" "6. 如果涉及登录态,使用 ${token}。\n\n" "输入数据:\n" f"{json.dumps(payload, ensure_ascii=False, indent=2)}\n\n" "输出 JSON 结构:\n" "{\n" ' "caseId": "",\n' ' "caseKey": "",\n' ' "caseName": "",\n' ' "automationType": "api",\n' ' "convertStatus": "SUCCESS",\n' ' "apiTestCases": [],\n' ' "missingInfo": [],\n' ' "warnings": []\n' "}" ) def _build_options(self, functional_case: FunctionalCaseInput) -> Dict[str, Any]: options = copy.deepcopy(functional_case.generateOptions or {}) options.setdefault("useLLM", True) options.setdefault("outputFormat", "json") options.setdefault("targetFramework", "pytest") options.setdefault("allowMissingInfo", True) options.setdefault("topK", 3) return options def _build_rule_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> Dict[str, Any]: api_test_cases: List[Dict[str, Any]] = [] missing_info: List[str] = [] warnings: List[str] = [] if matched: for index, item in enumerate(matched, start=1): api_test_cases.append(self._build_api_test_case(functional_case, item["api"], index, item["score"], options)) else: api_test_cases.append(self._build_fallback_test_case(functional_case, options)) missing_info.extend(self._collect_missing_info(functional_case)) warnings.append("未匹配到真实接口信息,已生成占位自动化用例") missing_info = self._dedupe_text(missing_info) warnings = self._dedupe_text(warnings) convert_status = "SUCCESS" if missing_info: convert_status = "SUCCESS_WITH_MISSING_INFO" if warnings and not matched: convert_status = "DRAFT" return { "caseId": functional_case.caseId, "caseKey": functional_case.caseKey, "caseName": functional_case.caseName, "projectName": functional_case.projectName, "moduleName": functional_case.moduleName, "automationType": "api", "convertStatus": convert_status, "apiTestCases": api_test_cases, "missingInfo": missing_info, "warnings": warnings, } def _finalize_llm_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], llm_result: Dict[str, Any], options: Dict[str, Any]) -> Dict[str, Any]: result = copy.deepcopy(llm_result) result.setdefault("caseId", functional_case.caseId) result.setdefault("caseKey", functional_case.caseKey) result.setdefault("caseName", functional_case.caseName) result.setdefault("projectName", functional_case.projectName) result.setdefault("moduleName", functional_case.moduleName) result.setdefault("automationType", "api") result.setdefault("convertStatus", "SUCCESS") result.setdefault("apiTestCases", []) result.setdefault("missingInfo", []) result.setdefault("warnings", []) if not result.get("apiTestCases"): rule_result = self._build_rule_result(functional_case, matched, options) result["apiTestCases"] = rule_result["apiTestCases"] if not result.get("missingInfo"): result["missingInfo"] = rule_result["missingInfo"] if not result.get("warnings"): result["warnings"] = rule_result["warnings"] result["convertStatus"] = rule_result["convertStatus"] result["missingInfo"] = self._dedupe_text(result.get("missingInfo", [])) result["warnings"] = self._dedupe_text(result.get("warnings", [])) return result def _build_api_test_case(self, functional_case: FunctionalCaseInput, api: ApiContextItem, index: int, score: float, options: Dict[str, Any]) -> Dict[str, Any]: method = (api.method or self.matcher.detect_intent(functional_case)).upper() query_params = copy.deepcopy(api.queryParams or {}) body = copy.deepcopy(api.requestBody) headers = copy.deepcopy(api.headers or {}) if not headers: headers = {"Content-Type": "application/json"} if not any(key.lower() == "authorization" for key in headers): headers["Authorization"] = "Bearer ${token}" variables = self._build_variables(functional_case, api, query_params, body) query_params = self._fill_placeholders(query_params, variables) body = self._fill_placeholders(body, variables) assertions = self._build_assertions(functional_case, api, method) performance_assertions = self._build_performance_assertions(functional_case) step_name = api.apiName or functional_case.caseName or "接口自动化步骤" return { "stepNo": index, "name": step_name, "apiName": api.apiName or step_name, "method": method, "url": api.url or self._infer_placeholder_url(functional_case), "headers": headers, "queryParams": query_params, "body": body, "extractVariables": [], "assertions": assertions, "performanceAssertions": performance_assertions, "variables": variables, "matchedApi": { "apiName": api.apiName, "method": method, "url": api.url, "matchScore": round(score, 4), }, } def _build_fallback_test_case(self, functional_case: FunctionalCaseInput, options: Dict[str, Any]) -> Dict[str, Any]: method = self.matcher.detect_intent(functional_case) url = self._infer_placeholder_url(functional_case) variables = self._build_variables(functional_case, None, {}, None) return { "stepNo": 1, "name": functional_case.caseName or "功能用例转接口自动化", "apiName": functional_case.caseName or "待确认接口", "method": method, "url": url, "headers": { "Content-Type": "application/json", "Authorization": "Bearer ${token}", }, "queryParams": {}, "body": None, "extractVariables": [], "assertions": self._build_assertions(functional_case, None, method), "performanceAssertions": self._build_performance_assertions(functional_case), "variables": variables, "matchedApi": { "apiName": "", "method": method, "url": url, "matchScore": 0, }, } def _build_variables(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], query_params: Dict[str, Any], body: Any) -> Dict[str, Any]: variables: Dict[str, Any] = {} text = self.matcher._all_text(functional_case) for keyword in self.matcher.extract_keywords(functional_case): if keyword in ("搜索", "查询", "列表", "筛选", "关键字", "模糊搜索"): variables.setdefault("keyword", "测试") if keyword in ("分页", "分页查询"): variables.setdefault("page", 1) variables.setdefault("limit", DEFAULT_PAGE_SIZE) if keyword in ("登录",): variables.setdefault("token", "${token}") if not variables and text: variables["keyword"] = "测试" if query_params: for key in query_params.keys(): if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]): variables[key] = query_params[key] if isinstance(body, dict): for key in body.keys(): if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]): variables[key] = body[key] return variables def _build_assertions(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], method: str) -> List[Dict[str, Any]]: assertions = [ { "type": "statusCode", "actual": "$.code" if api or method != "DELETE" else "$.status", "operator": "==", "expected": 200, } ] text = self.matcher._all_text(functional_case) if api and isinstance(api.responseExample, dict): example = api.responseExample if "code" in example: assertions.append( { "type": "jsonPath", "actual": "$.code", "operator": "==", "expected": example.get("code", 0), } ) if "msg" in example: assertions.append( { "type": "jsonPath", "actual": "$.msg", "operator": "notEmpty", "expected": True, } ) if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选", "返回"]): assertions.append( { "type": "jsonPath", "actual": "$.data", "operator": "notEmpty", "expected": True, } ) return assertions def _build_performance_assertions(self, functional_case: FunctionalCaseInput) -> List[Dict[str, Any]]: text = self.matcher._all_text(functional_case) expected_ms = self._extract_response_time_ms(text) if expected_ms is None: return [] return [ { "type": "responseTime", "operator": "<=", "expected": expected_ms, "unit": "ms", } ] def _collect_missing_info(self, functional_case: FunctionalCaseInput) -> List[str]: missing = [] if not functional_case.steps: missing.append("缺少步骤信息") if not functional_case.caseName: missing.append("缺少用例名称") if not functional_case.apiContext: missing.append("未传入真实接口信息,已使用占位接口生成") if self._extract_response_time_ms(self.matcher._all_text(functional_case)) is None and any(k in self.matcher._all_text(functional_case) for k in ["性能", "响应时间", "超时"]): missing.append("性能阈值未明确,默认使用 1000ms") return missing @staticmethod def _extract_response_time_ms(text: str) -> Optional[int]: if not text: return None patterns = [ r"(\d+)\s*毫秒", r"(\d+)\s*ms", r"(\d+)\s*秒", r"不超过\s*(\d+)\s*秒", r"<=\s*(\d+)\s*秒", r"小于等于\s*(\d+)\s*秒", ] for pattern in patterns: match = re.search(pattern, text, flags=re.I) if match: value = int(match.group(1)) if "秒" in pattern: return value * 1000 return value if any(keyword in text for keyword in ["响应时间", "超时", "性能"]): return DEFAULT_RESPONSE_TIME_MS return None @staticmethod def _infer_placeholder_url(functional_case: FunctionalCaseInput) -> str: text = " ".join([functional_case.caseName, functional_case.moduleName] + [step.action for step in functional_case.steps]) if any(keyword in text for keyword in ["登录"]): return "/api/login" if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选"]): return "/api/list/query" if any(keyword in text for keyword in ["新增", "创建", "添加"]): return "/api/create" if any(keyword in text for keyword in ["修改", "编辑", "更新"]): return "/api/update" if any(keyword in text for keyword in ["删除", "移除"]): return "/api/delete" return DEFAULT_PLACEHOLDER_URL @staticmethod def _fill_placeholders(value: Any, variables: Dict[str, Any]) -> Any: if isinstance(value, dict): return {key: FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for key, item in value.items()} if isinstance(value, list): return [FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for item in value] if isinstance(value, str): result = value for key, variable in variables.items(): result = result.replace("${" + key + "}", str(variable)) return result return value @staticmethod def _dedupe_text(items: Iterable[str]) -> List[str]: seen = set() result = [] for item in items: text = str(item).strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def convert_functional_case(payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]: return FunctionalCaseToApiAutomationService().convert(payload, llm_client=llm_client) def load_payload_from_file(file_path: str) -> Dict[str, Any]: with open(file_path, "r", encoding="utf-8") as file: return json.load(file) def main(): parser = argparse.ArgumentParser(description="功能测试用例转接口自动化用例") parser.add_argument("--input", type=str, help="输入JSON文件路径,未指定则从stdin读取") parser.add_argument("--output", type=str, help="输出JSON文件路径,未指定则打印到stdout") args = parser.parse_args() if args.input: payload = load_payload_from_file(args.input) else: payload = json.load(os.sys.stdin) result = convert_functional_case(payload) output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, "w", encoding="utf-8") as file: file.write(output) else: print(output) if __name__ == "__main__": main()