new joyhub_backend
This commit is contained in:
917
joyhub_backend/library/functional_case_converter.py
Normal file
917
joyhub_backend/library/functional_case_converter.py
Normal file
@@ -0,0 +1,917 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import argparse
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
|
||||
|
||||
|
||||
COMMON_CN_KEYWORDS = [
|
||||
"登录",
|
||||
"查询",
|
||||
"搜索",
|
||||
"筛选",
|
||||
"列表",
|
||||
"详情",
|
||||
"新增",
|
||||
"创建",
|
||||
"添加",
|
||||
"修改",
|
||||
"编辑",
|
||||
"更新",
|
||||
"删除",
|
||||
"导出",
|
||||
"导入",
|
||||
"上传",
|
||||
"下载",
|
||||
"提交",
|
||||
"审核",
|
||||
"确认",
|
||||
"保存",
|
||||
"分页",
|
||||
"分页查询",
|
||||
"性能",
|
||||
"响应时间",
|
||||
"超时",
|
||||
"返回",
|
||||
"结果",
|
||||
"成功",
|
||||
"失败",
|
||||
"校验",
|
||||
"重置",
|
||||
"启用",
|
||||
"停用",
|
||||
"开关",
|
||||
"状态",
|
||||
"排序",
|
||||
"标签",
|
||||
"分类",
|
||||
"关键字",
|
||||
"关键字搜索",
|
||||
"模糊搜索",
|
||||
]
|
||||
|
||||
METHOD_INTENT_KEYWORDS = {
|
||||
"GET": ["查询", "搜索", "筛选", "列表", "详情", "获取", "查看", "分页", "返回"],
|
||||
"POST": ["新增", "创建", "添加", "登录", "提交", "导入", "上传", "确认", "审核", "保存"],
|
||||
"PUT": ["修改", "编辑", "更新", "重置"],
|
||||
"PATCH": ["修改", "编辑", "更新", "状态", "启用", "停用", "开关"],
|
||||
"DELETE": ["删除", "移除"],
|
||||
}
|
||||
|
||||
DEFAULT_PLACEHOLDER_URL = "/api/need-confirm"
|
||||
DEFAULT_PAGE_SIZE = 10
|
||||
DEFAULT_RESPONSE_TIME_MS = 1000
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionalStep:
|
||||
stepNo: int
|
||||
action: str
|
||||
expectedResult: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApiContextItem:
|
||||
apiName: str = ""
|
||||
method: str = ""
|
||||
url: str = ""
|
||||
description: str = ""
|
||||
headers: Dict[str, Any] = field(default_factory=dict)
|
||||
queryParams: Dict[str, Any] = field(default_factory=dict)
|
||||
requestBody: Any = None
|
||||
responseExample: Any = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionalCaseInput:
|
||||
caseId: str = ""
|
||||
caseKey: str = ""
|
||||
caseName: str = ""
|
||||
projectName: str = ""
|
||||
moduleName: str = ""
|
||||
priority: str = ""
|
||||
preconditions: str = ""
|
||||
steps: List[FunctionalStep] = field(default_factory=list)
|
||||
expectedResults: List[str] = field(default_factory=list)
|
||||
apiContext: Optional[List[ApiContextItem]] = None
|
||||
generateOptions: Dict[str, Any] = field(default_factory=dict)
|
||||
extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
class FunctionalCaseParser:
|
||||
@classmethod
|
||||
def parse(cls, payload: Union[str, Dict[str, Any], FunctionalCaseInput]) -> FunctionalCaseInput:
|
||||
if isinstance(payload, FunctionalCaseInput):
|
||||
return payload
|
||||
|
||||
if isinstance(payload, str):
|
||||
try:
|
||||
payload = json.loads(payload)
|
||||
except Exception:
|
||||
payload = {
|
||||
"caseName": "手工输入功能用例",
|
||||
"steps": [payload],
|
||||
"expectedResults": [],
|
||||
}
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
raise TypeError("functional case payload must be dict, json string or FunctionalCaseInput")
|
||||
|
||||
steps = cls._normalize_steps(payload.get("steps") or payload.get("step") or [])
|
||||
expected_results = cls._normalize_text_list(
|
||||
payload.get("expectedResults")
|
||||
or payload.get("expectedResult")
|
||||
or payload.get("expectations")
|
||||
or []
|
||||
)
|
||||
api_context = cls._normalize_api_context(payload.get("apiContext") or payload.get("apis") or payload.get("apiInfo"))
|
||||
generate_options = payload.get("generateOptions") or payload.get("options") or {}
|
||||
|
||||
known_keys = {
|
||||
"caseId",
|
||||
"caseKey",
|
||||
"caseName",
|
||||
"projectName",
|
||||
"moduleName",
|
||||
"priority",
|
||||
"preconditions",
|
||||
"steps",
|
||||
"step",
|
||||
"expectedResults",
|
||||
"expectedResult",
|
||||
"expectations",
|
||||
"apiContext",
|
||||
"apis",
|
||||
"apiInfo",
|
||||
"generateOptions",
|
||||
"options",
|
||||
}
|
||||
extra = {key: value for key, value in payload.items() if key not in known_keys}
|
||||
|
||||
return FunctionalCaseInput(
|
||||
caseId=str(payload.get("caseId", "") or ""),
|
||||
caseKey=str(payload.get("caseKey", "") or ""),
|
||||
caseName=str(payload.get("caseName", "") or payload.get("name", "") or ""),
|
||||
projectName=str(payload.get("projectName", "") or ""),
|
||||
moduleName=str(payload.get("moduleName", "") or ""),
|
||||
priority=str(payload.get("priority", "") or ""),
|
||||
preconditions=str(payload.get("preconditions", "") or payload.get("precondition", "") or ""),
|
||||
steps=steps,
|
||||
expectedResults=expected_results,
|
||||
apiContext=api_context,
|
||||
generateOptions=generate_options if isinstance(generate_options, dict) else {},
|
||||
extra=extra,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_steps(value: Any) -> List[FunctionalStep]:
|
||||
if not value:
|
||||
return []
|
||||
|
||||
if isinstance(value, str):
|
||||
raw_items = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()]
|
||||
if len(raw_items) <= 1:
|
||||
raw_items = [item.strip() for item in re.split(r"(?<=[。;;])", value) if item.strip()]
|
||||
items = raw_items
|
||||
elif isinstance(value, list):
|
||||
items = value
|
||||
else:
|
||||
items = [value]
|
||||
|
||||
normalized: List[FunctionalStep] = []
|
||||
for index, item in enumerate(items, start=1):
|
||||
if isinstance(item, dict):
|
||||
step_no = int(item.get("stepNo") or item.get("step_no") or item.get("no") or index)
|
||||
action = str(item.get("action") or item.get("step") or item.get("content") or item.get("description") or "")
|
||||
expected_result = str(item.get("expectedResult") or item.get("expected") or "")
|
||||
else:
|
||||
raw_text = str(item).strip()
|
||||
step_no = index
|
||||
action = re.sub(r"^\s*\d+[\.、\)]\s*", "", raw_text)
|
||||
expected_result = ""
|
||||
if action:
|
||||
normalized.append(FunctionalStep(stepNo=step_no, action=action, expectedResult=expected_result))
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def _normalize_text_list(value: Any) -> List[str]:
|
||||
if not value:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
parts = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()]
|
||||
if len(parts) <= 1:
|
||||
parts = [item.strip() for item in re.split(r"[。;;]\s*", value) if item.strip()]
|
||||
return [item for item in parts if item]
|
||||
if isinstance(value, list):
|
||||
result = []
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
text = str(item.get("text") or item.get("content") or item.get("expected") or "").strip()
|
||||
else:
|
||||
text = str(item).strip()
|
||||
if text:
|
||||
result.append(text)
|
||||
return result
|
||||
return [str(value).strip()] if str(value).strip() else []
|
||||
|
||||
@staticmethod
|
||||
def _normalize_api_context(value: Any) -> Optional[List[ApiContextItem]]:
|
||||
if not value:
|
||||
return None
|
||||
|
||||
if isinstance(value, dict):
|
||||
if "apis" in value and isinstance(value["apis"], list):
|
||||
value = value["apis"]
|
||||
elif "items" in value and isinstance(value["items"], list):
|
||||
value = value["items"]
|
||||
else:
|
||||
value = [value]
|
||||
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
parsed = json.loads(value)
|
||||
return FunctionalCaseParser._normalize_api_context(parsed)
|
||||
except Exception:
|
||||
return FunctionalCaseParser._parse_markdown_api_context(value)
|
||||
|
||||
if not isinstance(value, list):
|
||||
return None
|
||||
|
||||
apis: List[ApiContextItem] = []
|
||||
for item in value:
|
||||
if isinstance(item, ApiContextItem):
|
||||
apis.append(item)
|
||||
continue
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
headers = item.get("headers") or item.get("header") or {}
|
||||
query_params = item.get("queryParams") or item.get("query") or item.get("params") or {}
|
||||
tags = item.get("tags") or []
|
||||
if isinstance(tags, str):
|
||||
tags = [tag.strip() for tag in re.split(r"[\s,,/|]+", tags) if tag.strip()]
|
||||
apis.append(
|
||||
ApiContextItem(
|
||||
apiName=str(item.get("apiName") or item.get("name") or ""),
|
||||
method=str(item.get("method") or item.get("httpMethod") or "").upper(),
|
||||
url=str(item.get("url") or item.get("path") or ""),
|
||||
description=str(item.get("description") or item.get("desc") or ""),
|
||||
headers=headers if isinstance(headers, dict) else {},
|
||||
queryParams=query_params if isinstance(query_params, dict) else {},
|
||||
requestBody=item.get("requestBody") if "requestBody" in item else item.get("body"),
|
||||
responseExample=item.get("responseExample") or item.get("response") or item.get("example"),
|
||||
tags=tags if isinstance(tags, list) else [],
|
||||
)
|
||||
)
|
||||
return apis or None
|
||||
|
||||
@staticmethod
|
||||
def _parse_markdown_api_context(text: str) -> Optional[List[ApiContextItem]]:
|
||||
lines = text.splitlines()
|
||||
apis: List[ApiContextItem] = []
|
||||
current: Dict[str, Any] = {}
|
||||
in_body = False
|
||||
body_lines: List[str] = []
|
||||
|
||||
def flush_current():
|
||||
nonlocal current, body_lines, in_body
|
||||
if not current:
|
||||
return
|
||||
body_text = "\n".join(body_lines).strip()
|
||||
request_body = FunctionalCaseParser._safe_json_load(body_text)
|
||||
apis.append(
|
||||
ApiContextItem(
|
||||
apiName=current.get("apiName", ""),
|
||||
method=current.get("method", ""),
|
||||
url=current.get("url", ""),
|
||||
description=current.get("description", ""),
|
||||
headers=current.get("headers", {}),
|
||||
queryParams=current.get("queryParams", {}),
|
||||
requestBody=request_body if request_body is not None else body_text,
|
||||
responseExample=current.get("responseExample"),
|
||||
tags=current.get("tags", []),
|
||||
)
|
||||
)
|
||||
current = {}
|
||||
body_lines = []
|
||||
in_body = False
|
||||
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("## "):
|
||||
flush_current()
|
||||
current["apiName"] = stripped[3:].strip()
|
||||
continue
|
||||
if stripped == "**接口URL**":
|
||||
continue
|
||||
if stripped.startswith("> ") and not current.get("url"):
|
||||
value = stripped[2:].strip()
|
||||
if value and value != "暂无参数":
|
||||
current["url"] = value
|
||||
continue
|
||||
if stripped == "**请求方式**":
|
||||
continue
|
||||
if stripped == "**请求Body参数**":
|
||||
in_body = True
|
||||
body_lines = []
|
||||
continue
|
||||
if stripped == "**响应示例**":
|
||||
in_body = False
|
||||
continue
|
||||
if in_body:
|
||||
if stripped.startswith("```"):
|
||||
continue
|
||||
body_lines.append(line)
|
||||
|
||||
flush_current()
|
||||
return apis or None
|
||||
|
||||
@staticmethod
|
||||
def _safe_json_load(text: str) -> Any:
|
||||
if not text:
|
||||
return None
|
||||
cleaned = text.strip()
|
||||
if cleaned.startswith("```"):
|
||||
cleaned = re.sub(r"^```[a-zA-Z0-9_-]*\s*", "", cleaned)
|
||||
cleaned = re.sub(r"\s*```$", "", cleaned)
|
||||
try:
|
||||
return json.loads(cleaned)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class ApiMatcher:
|
||||
def match(self, functional_case: FunctionalCaseInput, candidates: List[ApiContextItem], top_k: int = 3) -> List[Dict[str, Any]]:
|
||||
matched: List[Dict[str, Any]] = []
|
||||
keywords = self.extract_keywords(functional_case)
|
||||
intent = self.detect_intent(functional_case)
|
||||
|
||||
for api in candidates:
|
||||
score = self._score_api(api, keywords, intent, functional_case)
|
||||
if score <= 0:
|
||||
continue
|
||||
matched.append(
|
||||
{
|
||||
"score": round(score, 4),
|
||||
"api": api,
|
||||
"reason": self._build_reason(api, keywords, intent),
|
||||
}
|
||||
)
|
||||
|
||||
matched.sort(key=lambda item: item["score"], reverse=True)
|
||||
return matched[:top_k]
|
||||
|
||||
def extract_keywords(self, functional_case: FunctionalCaseInput) -> List[str]:
|
||||
texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions]
|
||||
texts.extend(step.action for step in functional_case.steps)
|
||||
texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult)
|
||||
texts.extend(functional_case.expectedResults)
|
||||
texts.extend(str(value) for value in functional_case.extra.values() if value is not None)
|
||||
full_text = " \n ".join(texts)
|
||||
keywords: List[str] = []
|
||||
|
||||
for keyword in COMMON_CN_KEYWORDS:
|
||||
if keyword in full_text and keyword not in keywords:
|
||||
keywords.append(keyword)
|
||||
|
||||
english_tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]{1,}", full_text)
|
||||
for token in english_tokens:
|
||||
if token not in keywords:
|
||||
keywords.append(token)
|
||||
|
||||
generic_tokens = re.findall(r"[\u4e00-\u9fa5]{2,}", full_text)
|
||||
for token in generic_tokens:
|
||||
if len(token) <= 2:
|
||||
if token not in keywords:
|
||||
keywords.append(token)
|
||||
continue
|
||||
if token not in keywords:
|
||||
keywords.append(token)
|
||||
|
||||
return self._dedupe(keywords)
|
||||
|
||||
def detect_intent(self, functional_case: FunctionalCaseInput) -> str:
|
||||
text = self._all_text(functional_case)
|
||||
for method, keywords in METHOD_INTENT_KEYWORDS.items():
|
||||
if any(keyword in text for keyword in keywords):
|
||||
return method
|
||||
return "GET"
|
||||
|
||||
def _score_api(self, api: ApiContextItem, keywords: List[str], intent: str, functional_case: FunctionalCaseInput) -> float:
|
||||
score = 0.0
|
||||
api_text = " ".join(
|
||||
[
|
||||
api.apiName or "",
|
||||
api.method or "",
|
||||
api.url or "",
|
||||
api.description or "",
|
||||
" ".join(api.tags or []),
|
||||
" ".join(api.headers.keys()),
|
||||
" ".join(api.queryParams.keys()),
|
||||
]
|
||||
)
|
||||
if api.method and api.method.upper() == intent:
|
||||
score += 1.2
|
||||
elif api.method and api.method.upper() in ("GET", "POST", "PUT", "PATCH", "DELETE"):
|
||||
score += 0.2
|
||||
|
||||
for keyword in keywords:
|
||||
if keyword and keyword in api_text:
|
||||
score += 0.8
|
||||
|
||||
for token in self._url_tokens(api.url):
|
||||
if token and any(keyword in token or token in keyword for keyword in keywords):
|
||||
score += 0.6
|
||||
|
||||
if functional_case.moduleName and functional_case.moduleName in api_text:
|
||||
score += 0.8
|
||||
if functional_case.caseName and functional_case.caseName in api_text:
|
||||
score += 0.5
|
||||
|
||||
ratio = SequenceMatcher(None, self._all_text(functional_case), api_text).ratio()
|
||||
score += ratio * 0.8
|
||||
return score
|
||||
|
||||
@staticmethod
|
||||
def _build_reason(api: ApiContextItem, keywords: List[str], intent: str) -> str:
|
||||
matched_keywords = [keyword for keyword in keywords if keyword and keyword in (api.apiName + " " + api.description + " " + api.url)]
|
||||
parts = []
|
||||
if api.method and api.method.upper() == intent:
|
||||
parts.append("方法匹配")
|
||||
if matched_keywords:
|
||||
parts.append("关键词命中:{}".format("、".join(matched_keywords[:5])))
|
||||
return "; ".join(parts) or "模糊匹配"
|
||||
|
||||
@staticmethod
|
||||
def _url_tokens(url: str) -> List[str]:
|
||||
if not url:
|
||||
return []
|
||||
return [token for token in re.split(r"[/?&=_\-\.]+", url) if token]
|
||||
|
||||
@staticmethod
|
||||
def _dedupe(items: Iterable[str]) -> List[str]:
|
||||
seen = set()
|
||||
result = []
|
||||
for item in items:
|
||||
if item and item not in seen:
|
||||
seen.add(item)
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _all_text(functional_case: FunctionalCaseInput) -> str:
|
||||
texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions]
|
||||
texts.extend(step.action for step in functional_case.steps)
|
||||
texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult)
|
||||
texts.extend(functional_case.expectedResults)
|
||||
return " \n ".join(filter(None, texts))
|
||||
|
||||
|
||||
class LLMApiCaseConverter:
|
||||
def convert(self, prompt: str, llm_client: Any = None) -> Optional[Dict[str, Any]]:
|
||||
if llm_client is None:
|
||||
return None
|
||||
|
||||
raw = self._invoke_client(llm_client, prompt)
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, dict):
|
||||
return raw
|
||||
if hasattr(raw, "content"):
|
||||
raw = raw.content
|
||||
elif hasattr(raw, "text"):
|
||||
raw = raw.text
|
||||
elif hasattr(raw, "data"):
|
||||
raw = raw.data
|
||||
if isinstance(raw, dict):
|
||||
return raw
|
||||
if not isinstance(raw, str):
|
||||
raw = str(raw)
|
||||
return self._extract_json(raw)
|
||||
|
||||
@staticmethod
|
||||
def _invoke_client(llm_client: Any, prompt: str) -> Any:
|
||||
if callable(llm_client):
|
||||
try:
|
||||
return llm_client(prompt=prompt)
|
||||
except TypeError:
|
||||
try:
|
||||
return llm_client(prompt)
|
||||
except TypeError:
|
||||
return llm_client({"prompt": prompt})
|
||||
if hasattr(llm_client, "generate") and callable(llm_client.generate):
|
||||
return llm_client.generate(prompt)
|
||||
if hasattr(llm_client, "chat") and callable(llm_client.chat):
|
||||
return llm_client.chat(prompt)
|
||||
raise TypeError("llm_client must be callable or expose generate/chat")
|
||||
|
||||
@staticmethod
|
||||
def _extract_json(text: str) -> Optional[Dict[str, Any]]:
|
||||
if not text:
|
||||
return None
|
||||
cleaned = text.strip()
|
||||
match = re.search(r"```json\s*(\{.*?\})\s*```", cleaned, flags=re.S)
|
||||
if match:
|
||||
cleaned = match.group(1)
|
||||
else:
|
||||
first = cleaned.find("{")
|
||||
last = cleaned.rfind("}")
|
||||
if first >= 0 and last > first:
|
||||
cleaned = cleaned[first : last + 1]
|
||||
try:
|
||||
parsed = json.loads(cleaned)
|
||||
return parsed if isinstance(parsed, dict) else {"data": parsed}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class FunctionalCaseToApiAutomationService:
|
||||
def __init__(self, matcher: Optional[ApiMatcher] = None):
|
||||
self.matcher = matcher or ApiMatcher()
|
||||
self.llm_converter = LLMApiCaseConverter()
|
||||
|
||||
def convert(self, payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]:
|
||||
functional_case = FunctionalCaseParser.parse(payload)
|
||||
options = self._build_options(functional_case)
|
||||
candidates = functional_case.apiContext or []
|
||||
matched = self.matcher.match(functional_case, candidates, top_k=int(options.get("topK", 3))) if candidates else []
|
||||
|
||||
llm_result = None
|
||||
if options.get("useLLM", True) and llm_client is not None:
|
||||
prompt = self.build_prompt(functional_case, matched, options)
|
||||
llm_result = self.llm_converter.convert(prompt, llm_client)
|
||||
|
||||
if llm_result:
|
||||
return self._finalize_llm_result(functional_case, matched, llm_result, options)
|
||||
|
||||
return self._build_rule_result(functional_case, matched, options)
|
||||
|
||||
def build_prompt(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> str:
|
||||
payload = {
|
||||
"functionalCase": asdict(functional_case),
|
||||
"matchedApis": [
|
||||
{
|
||||
"score": item["score"],
|
||||
"reason": item["reason"],
|
||||
"api": asdict(item["api"]),
|
||||
}
|
||||
for item in matched
|
||||
],
|
||||
"options": options,
|
||||
}
|
||||
return (
|
||||
"你是接口自动化测试用例生成专家。\n"
|
||||
"请把功能测试用例转换成接口自动化测试用例。\n"
|
||||
"要求:\n"
|
||||
"1. 只输出严格 JSON。\n"
|
||||
"2. 如果给了候选接口信息,优先使用候选接口。\n"
|
||||
"3. 无法确认的接口信息写入 missingInfo。\n"
|
||||
"4. 需要包含 method、url、headers、queryParams、body、assertions、performanceAssertions。\n"
|
||||
"5. 如果涉及性能要求,生成 responseTime 断言。\n"
|
||||
"6. 如果涉及登录态,使用 ${token}。\n\n"
|
||||
"输入数据:\n"
|
||||
f"{json.dumps(payload, ensure_ascii=False, indent=2)}\n\n"
|
||||
"输出 JSON 结构:\n"
|
||||
"{\n"
|
||||
' "caseId": "",\n'
|
||||
' "caseKey": "",\n'
|
||||
' "caseName": "",\n'
|
||||
' "automationType": "api",\n'
|
||||
' "convertStatus": "SUCCESS",\n'
|
||||
' "apiTestCases": [],\n'
|
||||
' "missingInfo": [],\n'
|
||||
' "warnings": []\n'
|
||||
"}"
|
||||
)
|
||||
|
||||
def _build_options(self, functional_case: FunctionalCaseInput) -> Dict[str, Any]:
|
||||
options = copy.deepcopy(functional_case.generateOptions or {})
|
||||
options.setdefault("useLLM", True)
|
||||
options.setdefault("outputFormat", "json")
|
||||
options.setdefault("targetFramework", "pytest")
|
||||
options.setdefault("allowMissingInfo", True)
|
||||
options.setdefault("topK", 3)
|
||||
return options
|
||||
|
||||
def _build_rule_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> Dict[str, Any]:
|
||||
api_test_cases: List[Dict[str, Any]] = []
|
||||
missing_info: List[str] = []
|
||||
warnings: List[str] = []
|
||||
|
||||
if matched:
|
||||
for index, item in enumerate(matched, start=1):
|
||||
api_test_cases.append(self._build_api_test_case(functional_case, item["api"], index, item["score"], options))
|
||||
else:
|
||||
api_test_cases.append(self._build_fallback_test_case(functional_case, options))
|
||||
missing_info.extend(self._collect_missing_info(functional_case))
|
||||
warnings.append("未匹配到真实接口信息,已生成占位自动化用例")
|
||||
|
||||
missing_info = self._dedupe_text(missing_info)
|
||||
warnings = self._dedupe_text(warnings)
|
||||
|
||||
convert_status = "SUCCESS"
|
||||
if missing_info:
|
||||
convert_status = "SUCCESS_WITH_MISSING_INFO"
|
||||
if warnings and not matched:
|
||||
convert_status = "DRAFT"
|
||||
|
||||
return {
|
||||
"caseId": functional_case.caseId,
|
||||
"caseKey": functional_case.caseKey,
|
||||
"caseName": functional_case.caseName,
|
||||
"projectName": functional_case.projectName,
|
||||
"moduleName": functional_case.moduleName,
|
||||
"automationType": "api",
|
||||
"convertStatus": convert_status,
|
||||
"apiTestCases": api_test_cases,
|
||||
"missingInfo": missing_info,
|
||||
"warnings": warnings,
|
||||
}
|
||||
|
||||
def _finalize_llm_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], llm_result: Dict[str, Any], options: Dict[str, Any]) -> Dict[str, Any]:
|
||||
result = copy.deepcopy(llm_result)
|
||||
result.setdefault("caseId", functional_case.caseId)
|
||||
result.setdefault("caseKey", functional_case.caseKey)
|
||||
result.setdefault("caseName", functional_case.caseName)
|
||||
result.setdefault("projectName", functional_case.projectName)
|
||||
result.setdefault("moduleName", functional_case.moduleName)
|
||||
result.setdefault("automationType", "api")
|
||||
result.setdefault("convertStatus", "SUCCESS")
|
||||
result.setdefault("apiTestCases", [])
|
||||
result.setdefault("missingInfo", [])
|
||||
result.setdefault("warnings", [])
|
||||
|
||||
if not result.get("apiTestCases"):
|
||||
rule_result = self._build_rule_result(functional_case, matched, options)
|
||||
result["apiTestCases"] = rule_result["apiTestCases"]
|
||||
if not result.get("missingInfo"):
|
||||
result["missingInfo"] = rule_result["missingInfo"]
|
||||
if not result.get("warnings"):
|
||||
result["warnings"] = rule_result["warnings"]
|
||||
result["convertStatus"] = rule_result["convertStatus"]
|
||||
|
||||
result["missingInfo"] = self._dedupe_text(result.get("missingInfo", []))
|
||||
result["warnings"] = self._dedupe_text(result.get("warnings", []))
|
||||
return result
|
||||
|
||||
def _build_api_test_case(self, functional_case: FunctionalCaseInput, api: ApiContextItem, index: int, score: float, options: Dict[str, Any]) -> Dict[str, Any]:
|
||||
method = (api.method or self.matcher.detect_intent(functional_case)).upper()
|
||||
query_params = copy.deepcopy(api.queryParams or {})
|
||||
body = copy.deepcopy(api.requestBody)
|
||||
headers = copy.deepcopy(api.headers or {})
|
||||
if not headers:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if not any(key.lower() == "authorization" for key in headers):
|
||||
headers["Authorization"] = "Bearer ${token}"
|
||||
|
||||
variables = self._build_variables(functional_case, api, query_params, body)
|
||||
query_params = self._fill_placeholders(query_params, variables)
|
||||
body = self._fill_placeholders(body, variables)
|
||||
|
||||
assertions = self._build_assertions(functional_case, api, method)
|
||||
performance_assertions = self._build_performance_assertions(functional_case)
|
||||
step_name = api.apiName or functional_case.caseName or "接口自动化步骤"
|
||||
|
||||
return {
|
||||
"stepNo": index,
|
||||
"name": step_name,
|
||||
"apiName": api.apiName or step_name,
|
||||
"method": method,
|
||||
"url": api.url or self._infer_placeholder_url(functional_case),
|
||||
"headers": headers,
|
||||
"queryParams": query_params,
|
||||
"body": body,
|
||||
"extractVariables": [],
|
||||
"assertions": assertions,
|
||||
"performanceAssertions": performance_assertions,
|
||||
"variables": variables,
|
||||
"matchedApi": {
|
||||
"apiName": api.apiName,
|
||||
"method": method,
|
||||
"url": api.url,
|
||||
"matchScore": round(score, 4),
|
||||
},
|
||||
}
|
||||
|
||||
def _build_fallback_test_case(self, functional_case: FunctionalCaseInput, options: Dict[str, Any]) -> Dict[str, Any]:
|
||||
method = self.matcher.detect_intent(functional_case)
|
||||
url = self._infer_placeholder_url(functional_case)
|
||||
variables = self._build_variables(functional_case, None, {}, None)
|
||||
return {
|
||||
"stepNo": 1,
|
||||
"name": functional_case.caseName or "功能用例转接口自动化",
|
||||
"apiName": functional_case.caseName or "待确认接口",
|
||||
"method": method,
|
||||
"url": url,
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": "Bearer ${token}",
|
||||
},
|
||||
"queryParams": {},
|
||||
"body": None,
|
||||
"extractVariables": [],
|
||||
"assertions": self._build_assertions(functional_case, None, method),
|
||||
"performanceAssertions": self._build_performance_assertions(functional_case),
|
||||
"variables": variables,
|
||||
"matchedApi": {
|
||||
"apiName": "",
|
||||
"method": method,
|
||||
"url": url,
|
||||
"matchScore": 0,
|
||||
},
|
||||
}
|
||||
|
||||
def _build_variables(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], query_params: Dict[str, Any], body: Any) -> Dict[str, Any]:
|
||||
variables: Dict[str, Any] = {}
|
||||
text = self.matcher._all_text(functional_case)
|
||||
for keyword in self.matcher.extract_keywords(functional_case):
|
||||
if keyword in ("搜索", "查询", "列表", "筛选", "关键字", "模糊搜索"):
|
||||
variables.setdefault("keyword", "测试")
|
||||
if keyword in ("分页", "分页查询"):
|
||||
variables.setdefault("page", 1)
|
||||
variables.setdefault("limit", DEFAULT_PAGE_SIZE)
|
||||
if keyword in ("登录",):
|
||||
variables.setdefault("token", "${token}")
|
||||
if not variables and text:
|
||||
variables["keyword"] = "测试"
|
||||
if query_params:
|
||||
for key in query_params.keys():
|
||||
if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]):
|
||||
variables[key] = query_params[key]
|
||||
if isinstance(body, dict):
|
||||
for key in body.keys():
|
||||
if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]):
|
||||
variables[key] = body[key]
|
||||
return variables
|
||||
|
||||
def _build_assertions(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], method: str) -> List[Dict[str, Any]]:
|
||||
assertions = [
|
||||
{
|
||||
"type": "statusCode",
|
||||
"actual": "$.code" if api or method != "DELETE" else "$.status",
|
||||
"operator": "==",
|
||||
"expected": 200,
|
||||
}
|
||||
]
|
||||
text = self.matcher._all_text(functional_case)
|
||||
if api and isinstance(api.responseExample, dict):
|
||||
example = api.responseExample
|
||||
if "code" in example:
|
||||
assertions.append(
|
||||
{
|
||||
"type": "jsonPath",
|
||||
"actual": "$.code",
|
||||
"operator": "==",
|
||||
"expected": example.get("code", 0),
|
||||
}
|
||||
)
|
||||
if "msg" in example:
|
||||
assertions.append(
|
||||
{
|
||||
"type": "jsonPath",
|
||||
"actual": "$.msg",
|
||||
"operator": "notEmpty",
|
||||
"expected": True,
|
||||
}
|
||||
)
|
||||
|
||||
if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选", "返回"]):
|
||||
assertions.append(
|
||||
{
|
||||
"type": "jsonPath",
|
||||
"actual": "$.data",
|
||||
"operator": "notEmpty",
|
||||
"expected": True,
|
||||
}
|
||||
)
|
||||
return assertions
|
||||
|
||||
def _build_performance_assertions(self, functional_case: FunctionalCaseInput) -> List[Dict[str, Any]]:
|
||||
text = self.matcher._all_text(functional_case)
|
||||
expected_ms = self._extract_response_time_ms(text)
|
||||
if expected_ms is None:
|
||||
return []
|
||||
return [
|
||||
{
|
||||
"type": "responseTime",
|
||||
"operator": "<=",
|
||||
"expected": expected_ms,
|
||||
"unit": "ms",
|
||||
}
|
||||
]
|
||||
|
||||
def _collect_missing_info(self, functional_case: FunctionalCaseInput) -> List[str]:
|
||||
missing = []
|
||||
if not functional_case.steps:
|
||||
missing.append("缺少步骤信息")
|
||||
if not functional_case.caseName:
|
||||
missing.append("缺少用例名称")
|
||||
if not functional_case.apiContext:
|
||||
missing.append("未传入真实接口信息,已使用占位接口生成")
|
||||
if self._extract_response_time_ms(self.matcher._all_text(functional_case)) is None and any(k in self.matcher._all_text(functional_case) for k in ["性能", "响应时间", "超时"]):
|
||||
missing.append("性能阈值未明确,默认使用 1000ms")
|
||||
return missing
|
||||
|
||||
@staticmethod
|
||||
def _extract_response_time_ms(text: str) -> Optional[int]:
|
||||
if not text:
|
||||
return None
|
||||
patterns = [
|
||||
r"(\d+)\s*毫秒",
|
||||
r"(\d+)\s*ms",
|
||||
r"(\d+)\s*秒",
|
||||
r"不超过\s*(\d+)\s*秒",
|
||||
r"<=\s*(\d+)\s*秒",
|
||||
r"小于等于\s*(\d+)\s*秒",
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, text, flags=re.I)
|
||||
if match:
|
||||
value = int(match.group(1))
|
||||
if "秒" in pattern:
|
||||
return value * 1000
|
||||
return value
|
||||
if any(keyword in text for keyword in ["响应时间", "超时", "性能"]):
|
||||
return DEFAULT_RESPONSE_TIME_MS
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _infer_placeholder_url(functional_case: FunctionalCaseInput) -> str:
|
||||
text = " ".join([functional_case.caseName, functional_case.moduleName] + [step.action for step in functional_case.steps])
|
||||
if any(keyword in text for keyword in ["登录"]):
|
||||
return "/api/login"
|
||||
if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选"]):
|
||||
return "/api/list/query"
|
||||
if any(keyword in text for keyword in ["新增", "创建", "添加"]):
|
||||
return "/api/create"
|
||||
if any(keyword in text for keyword in ["修改", "编辑", "更新"]):
|
||||
return "/api/update"
|
||||
if any(keyword in text for keyword in ["删除", "移除"]):
|
||||
return "/api/delete"
|
||||
return DEFAULT_PLACEHOLDER_URL
|
||||
|
||||
@staticmethod
|
||||
def _fill_placeholders(value: Any, variables: Dict[str, Any]) -> Any:
|
||||
if isinstance(value, dict):
|
||||
return {key: FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for key, item in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for item in value]
|
||||
if isinstance(value, str):
|
||||
result = value
|
||||
for key, variable in variables.items():
|
||||
result = result.replace("${" + key + "}", str(variable))
|
||||
return result
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _dedupe_text(items: Iterable[str]) -> List[str]:
|
||||
seen = set()
|
||||
result = []
|
||||
for item in items:
|
||||
text = str(item).strip()
|
||||
if not text or text in seen:
|
||||
continue
|
||||
seen.add(text)
|
||||
result.append(text)
|
||||
return result
|
||||
|
||||
|
||||
def convert_functional_case(payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]:
|
||||
return FunctionalCaseToApiAutomationService().convert(payload, llm_client=llm_client)
|
||||
|
||||
|
||||
def load_payload_from_file(file_path: str) -> Dict[str, Any]:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
return json.load(file)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="功能测试用例转接口自动化用例")
|
||||
parser.add_argument("--input", type=str, help="输入JSON文件路径,未指定则从stdin读取")
|
||||
parser.add_argument("--output", type=str, help="输出JSON文件路径,未指定则打印到stdout")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.input:
|
||||
payload = load_payload_from_file(args.input)
|
||||
else:
|
||||
payload = json.load(os.sys.stdin)
|
||||
|
||||
result = convert_functional_case(payload)
|
||||
|
||||
output = json.dumps(result, ensure_ascii=False, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as file:
|
||||
file.write(output)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user