3 Commits

19 changed files with 210914 additions and 3 deletions

208181
HubOps.md Normal file

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,153 @@
# -*- coding:utf-8 -*-
import os
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../../'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from dulizhan.library.Dlizhan_interface import DlzhanInterface
from base_framework.public_tools import log
import allure
obj_log = log.get_logger()
class AgreementManage(DlzhanInterface):
def __init__(self):
super().__init__()
@allure.step("创建协议")
def kw_joyhub_agreement_create_post(self, type, title, content, terminal="web", lang="en", rank_num=1, status=1, id=0):
"""
创建协议业务关键字
:param id: 主键新增为0
:param type: 类型1隐私协议 2用户协议
:param title: 标题
:param content: 内容
:param terminal: 终端(1web 2app)
:param lang: 语言 (en 英语 de 德语 ja 日语)
:param rank_num: 排序号
:param status: 状态 (1正常 2停用)
:return: 响应结果
"""
obj_log.info(f"创建协议 - type: {type}, title: {title}, content: {content[:50]}..., terminal: {terminal}, lang: {lang}, rank_num: {rank_num}, status: {status}")
params = {
"id": id,
"type": type,
"title": title,
"content": content,
"terminal": terminal,
"lang": lang,
"rankNum": rank_num,
"status": status
}
resp = self.kw_in_joyhub_agreement_create_post(**params)
obj_log.info(f"创建协议响应: {resp}")
return resp
@allure.step("删除协议")
def kw_joyhub_agreement_delete_delete(self, agreement_id):
"""
删除协议业务关键字
:param agreement_id: 协议ID
:return: 响应结果
"""
obj_log.info(f"删除协议 - agreement_id: {agreement_id}")
resp = self.kw_in_joyhub_agreement_delete_delete(agreement_id)
obj_log.info(f"删除协议响应: {resp}")
return resp
@allure.step("批量删除协议")
def kw_joyhub_agreement_delete_list_delete(self, agreement_ids):
"""
批量删除协议业务关键字
:param agreement_ids: 协议ID列表
:return: 响应结果
"""
obj_log.info(f"批量删除协议 - agreement_ids: {agreement_ids}")
resp = self.kw_in_joyhub_agreement_delete_list_delete(agreement_ids)
obj_log.info(f"批量删除协议响应: {resp}")
return resp
@allure.step("导出协议Excel")
def kw_joyhub_agreement_export_excel_get(self, **kwargs):
"""
导出协议Excel业务关键字
:param kwargs: 查询参数type, title, content, status
:return: 响应结果
"""
obj_log.info(f"导出协议Excel - 参数: {kwargs}")
resp = self.kw_in_joyhub_agreement_export_excel_get(**kwargs)
obj_log.info(f"导出协议Excel响应: {resp}")
return resp
@allure.step("获得协议详情")
def kw_joyhub_agreement_get_get(self, agreement_id):
"""
获得协议详情业务关键字
:param agreement_id: 协议ID
:return: 响应结果
"""
obj_log.info(f"获得协议详情 - agreement_id: {agreement_id}")
resp = self.kw_in_joyhub_agreement_get_get(agreement_id)
obj_log.info(f"获得协议详情响应: {resp}")
return resp
@allure.step("获得协议分页列表")
def kw_joyhub_agreement_page_get(self, **kwargs):
"""
获得协议分页列表业务关键字
:param kwargs: 查询参数type, title, content, status, page_no, page_size
:return: 响应结果
"""
obj_log.info(f"获得协议分页列表 - 参数: {kwargs}")
resp = self.kw_in_joyhub_agreement_page_get(**kwargs)
obj_log.info(f"获得协议分页列表响应: {resp}")
return resp
@allure.step("更新协议")
def kw_joyhub_agreement_update_put(self, agreement_id, type, title, content, terminal="web", lang="en", rank_num=1, status=1):
"""
更新协议业务关键字
:param agreement_id: 协议ID
:param type: 类型1隐私协议 2用户协议
:param title: 标题
:param content: 内容
:param terminal: 终端(1web 2app)
:param lang: 语言 (en 英语 de 德语 ja 日语)
:param rank_num: 排序号
:param status: 状态 (1正常 2停用)
:return: 响应结果
"""
obj_log.info(f"更新协议 - agreement_id: {agreement_id}, type: {type}, title: {title}, content: {content[:50]}..., terminal: {terminal}, lang: {lang}, rank_num: {rank_num}, status: {status}")
params = {
"id": agreement_id,
"type": type,
"title": title,
"content": content,
"terminal": terminal,
"lang": lang,
"rankNum": rank_num,
"status": status
}
resp = self.kw_in_joyhub_agreement_update_put(**params)
obj_log.info(f"更新协议响应: {resp}")
return resp

View File

@@ -0,0 +1,202 @@
import logging
import allure
from dulizhan.library.Dlizhan_interface import DlzhanInterface
obj_log = logging.getLogger("logger")
class AppVersionManage(DlzhanInterface):
def __init__(self):
super().__init__()
@allure.step("创建app版本号")
def kw_joyhub_appversion_create_post(self, store_name, version, download_url, is_on, status, id=0):
"""
创建app版本号管理业务关键字
:param id: 主键新增为0
:param store_name: 应用商店
:param version: 版本号
:param download_url: 商城下载地址
:param is_on: 是否上架1是2否
:param status: 状态 (1正常 2停用)
:return: 响应结果
"""
obj_log.info(f"创建app版本号 - storeName: {store_name}, version: {version}, downloadUrl: {download_url}, isOn: {is_on}, status: {status}")
params = {
"id": id,
"storeName": store_name,
"version": version,
"downloadUrl": download_url,
"isOn": is_on,
"status": status
}
resp = self.kw_in_joyhub_appversion_create_post(**params)
obj_log.info(f"创建app版本号响应: {resp}")
return resp
@allure.step("删除app版本号")
def kw_joyhub_appversion_delete_delete(self, appversion_id):
"""
删除app版本号管理业务关键字
:param appversion_id: app版本号编号
:return: 响应结果
"""
obj_log.info(f"删除app版本号 - id: {appversion_id}")
resp = self.kw_in_joyhub_appversion_delete_delete(appversion_id=appversion_id)
obj_log.info(f"删除app版本号响应: {resp}")
return resp
@allure.step("批量删除app版本号")
def kw_joyhub_appversion_delete_list_delete(self, appversion_ids):
"""
批量删除app版本号管理业务关键字
:param appversion_ids: app版本号编号列表
:return: 响应结果
"""
obj_log.info(f"批量删除app版本号 - ids: {appversion_ids}")
resp = self.kw_in_joyhub_appversion_delete_list_delete(ids=appversion_ids)
obj_log.info(f"批量删除app版本号响应: {resp}")
return resp
@allure.step("导出app版本号Excel")
def kw_joyhub_appversion_export_excel_get(self, page_no=1, page_size=10, **kwargs):
"""
导出app版本号管理 Excel业务关键字
:param page_no: 页码
:param page_size: 每页条数
:param store_name: 应用商店
:param version: 版本号
:param download_url: 商城下载地址
:param is_on: 是否上架
:param status: 状态
:return: 响应结果
"""
obj_log.info(f"导出app版本号Excel - pageNo: {page_no}, pageSize: {page_size}")
params = {
"pageNo": page_no,
"pageSize": page_size,
"storeName": kwargs.get("store_name", ""),
"version": kwargs.get("version", ""),
"downloadUrl": kwargs.get("download_url", ""),
"isOn": kwargs.get("is_on", ""),
"status": kwargs.get("status", "")
}
resp = self.kw_in_joyhub_appversion_export_excel_get(**params)
obj_log.info(f"导出app版本号Excel响应: {resp}")
return resp
@allure.step("获得app版本号详情")
def kw_joyhub_appversion_get_get(self, appversion_id):
"""
获得app版本号管理业务关键字
:param appversion_id: app版本号编号
:return: 响应结果
"""
obj_log.info(f"获得app版本号详情 - id: {appversion_id}")
resp = self.kw_in_joyhub_appversion_get_get(appversion_id=appversion_id)
obj_log.info(f"获得app版本号详情响应: {resp}")
return resp
@allure.step("获得导入app版本号模板")
def kw_joyhub_appversion_get_import_template_get(self):
"""
获得导入app版本号管理模板业务关键字
:return: 响应结果
"""
obj_log.info("获得导入app版本号模板")
resp = self.kw_in_joyhub_appversion_get_import_template_get()
obj_log.info(f"获得导入app版本号模板响应: {resp}")
return resp
@allure.step("导入app版本号Excel")
def kw_joyhub_appversion_import_excel_post(self, update_support, user_id=None):
"""
导入app版本号管理Excel业务关键字
:param update_support: 是否支持更新
:param user_id: 创建人-为空则取当前人
:return: 响应结果
"""
obj_log.info(f"导入app版本号Excel - updateSupport: {update_support}, userId: {user_id}")
params = {
"updateSupport": update_support
}
if user_id is not None:
params["userId"] = user_id
resp = self.kw_in_joyhub_appversion_import_excel_post(**params)
obj_log.info(f"导入app版本号Excel响应: {resp}")
return resp
@allure.step("获得app版本号分页列表")
def kw_joyhub_appversion_page_get(self, page_no=1, page_size=10, **kwargs):
"""
获得app版本号管理分页业务关键字
:param page_no: 页码
:param page_size: 每页条数
:param store_name: 应用商店
:param version: 版本号
:param download_url: 商城下载地址
:param is_on: 是否上架
:param status: 状态
:return: 响应结果
"""
obj_log.info(f"获得app版本号分页列表 - pageNo: {page_no}, pageSize: {page_size}")
params = {
"pageNo": page_no,
"pageSize": page_size,
"storeName": kwargs.get("store_name", ""),
"version": kwargs.get("version", ""),
"downloadUrl": kwargs.get("download_url", ""),
"isOn": kwargs.get("is_on", ""),
"status": kwargs.get("status", "")
}
resp = self.kw_in_joyhub_appversion_page_get(**params)
obj_log.info(f"获得app版本号分页列表响应: {resp}")
return resp
@allure.step("更新app版本号")
def kw_joyhub_appversion_update_put(self, appversion_id, store_name, version, download_url, is_on, status):
"""
更新app版本号管理业务关键字
:param appversion_id: 主键
:param store_name: 应用商店
:param version: 版本号
:param download_url: 商城下载地址
:param is_on: 是否上架1是2否
:param status: 状态 (1正常 2停用)
:return: 响应结果
"""
obj_log.info(f"更新app版本号 - id: {appversion_id}, storeName: {store_name}, version: {version}, downloadUrl: {download_url}, isOn: {is_on}, status: {status}")
params = {
"id": appversion_id,
"storeName": store_name,
"version": version,
"downloadUrl": download_url,
"isOn": is_on,
"status": status
}
resp = self.kw_in_joyhub_appversion_update_put(**params)
obj_log.info(f"更新app版本号响应: {resp}")
return resp

View File

@@ -39,7 +39,7 @@ class DlzhanInterface:
def set_joyhub_token(self, token): def set_joyhub_token(self, token):
self.token = token self.token = token
def _joyhub_request(self, method, path, is_check='', note='', **kwargs): def _joyhub_request(self, method, path, is_check='', note='', return_json=True, **kwargs):
url = "{}{}".format(self.joyhub_domain, path) url = "{}{}".format(self.joyhub_domain, path)
headers = self._get_joyhub_headers() headers = self._get_joyhub_headers()
obj_log.info("=========== {} ===========".format(note or path)) obj_log.info("=========== {} ===========".format(note or path))
@@ -58,7 +58,11 @@ class DlzhanInterface:
resp = req_map.get(method.upper(), lambda: None)() resp = req_map.get(method.upper(), lambda: None)()
self._check_resp(is_check, resp) self._check_resp(is_check, resp)
return resp.json()
if return_json:
return resp.json()
else:
return resp
def _clear_user_fingerprint(self, username): def _clear_user_fingerprint(self, username):
try: try:
@@ -251,7 +255,7 @@ class DlzhanInterface:
return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete-list?ids={ids_str}', is_check, '批量删除Banner管理') return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete-list?ids={ids_str}', is_check, '批量删除Banner管理')
def kw_in_joyhub_banner_get_import_template_get(self, is_check=''): def kw_in_joyhub_banner_get_import_template_get(self, is_check=''):
return self._joyhub_request('GET', '/admin-api/jh/banner/get-import-template', is_check, '获得导入Banner管理模板') return self._joyhub_request('GET', '/admin-api/jh/banner/get-import-template', is_check, '获得导入Banner管理模板', return_json=False)
def kw_in_joyhub_banner_get_get(self, banner_id, is_check=''): def kw_in_joyhub_banner_get_get(self, banner_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/banner/get?id={banner_id}', is_check, '获得Banner管理详情') return self._joyhub_request('GET', f'/admin-api/jh/banner/get?id={banner_id}', is_check, '获得Banner管理详情')
@@ -262,6 +266,35 @@ class DlzhanInterface:
def kw_in_joyhub_banner_update_put(self, is_check='', **kwargs): def kw_in_joyhub_banner_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/jh/banner/update', is_check, '更新Banner管理', **kwargs) return self._joyhub_request('PUT', '/admin-api/jh/banner/update', is_check, '更新Banner管理', **kwargs)
# ============ app版本号管理接口 ============
def kw_in_joyhub_appversion_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/appversion/create', is_check, '创建app版本号管理', **kwargs)
def kw_in_joyhub_appversion_delete_delete(self, appversion_id, is_check=''):
return self._joyhub_request('DELETE', f'/admin-api/jh/appversion/delete?id={appversion_id}', is_check, '删除app版本号管理')
def kw_in_joyhub_appversion_delete_list_delete(self, ids, is_check=''):
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/jh/appversion/delete-list?ids={ids_str}', is_check, '批量删除app版本号管理')
def kw_in_joyhub_appversion_export_excel_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/appversion/export-excel', is_check, '导出app版本号管理 Excel', return_json=False, **kwargs)
def kw_in_joyhub_appversion_get_get(self, appversion_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/appversion/get?id={appversion_id}', is_check, '获得app版本号管理')
def kw_in_joyhub_appversion_get_import_template_get(self, is_check=''):
return self._joyhub_request('GET', '/admin-api/jh/appversion/get-import-template', is_check, '获得导入app版本号管理模板', return_json=False)
def kw_in_joyhub_appversion_import_excel_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/appversion/import-excel', is_check, '导入app版本号管理Excel', **kwargs)
def kw_in_joyhub_appversion_page_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/appversion/page', is_check, '获得app版本号管理分页', **kwargs)
def kw_in_joyhub_appversion_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/jh/appversion/update', is_check, '更新app版本号管理', **kwargs)
def kw_in_joyhub_user_create_post(self, is_check='', **kwargs): def kw_in_joyhub_user_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/system/user/create', is_check, '创建用户', **kwargs) return self._joyhub_request('POST', '/admin-api/system/user/create', is_check, '创建用户', **kwargs)
@@ -330,6 +363,29 @@ class DlzhanInterface:
def kw_in_joyhub_dept_update_put(self, is_check='', **kwargs): def kw_in_joyhub_dept_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/dept/update', is_check, '更新部门', **kwargs) return self._joyhub_request('PUT', '/admin-api/system/dept/update', is_check, '更新部门', **kwargs)
# ============ 协议管理接口 ============
def kw_in_joyhub_agreement_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/agreement/create', is_check, '创建协议', **kwargs)
def kw_in_joyhub_agreement_delete_delete(self, agreement_id, is_check=''):
return self._joyhub_request('DELETE', f'/admin-api/jh/agreement/delete?id={agreement_id}', is_check, '删除协议')
def kw_in_joyhub_agreement_delete_list_delete(self, ids, is_check=''):
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/jh/agreement/delete-list?ids={ids_str}', is_check, '批量删除协议')
def kw_in_joyhub_agreement_export_excel_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/agreement/export-excel', is_check, '导出协议 Excel', return_json=False, **kwargs)
def kw_in_joyhub_agreement_get_get(self, agreement_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/agreement/get?id={agreement_id}', is_check, '获得协议')
def kw_in_joyhub_agreement_page_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/agreement/page', is_check, '获得协议分页', **kwargs)
def kw_in_joyhub_agreement_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/jh/agreement/update', is_check, '更新协议', **kwargs)
if __name__ == '__main__': if __name__ == '__main__':
test = DlzhanInterface() test = DlzhanInterface()

View File

@@ -0,0 +1,299 @@
import pytest
import allure
import logging
import requests
import json
import time
from dulizhan.library.BusinessKw.JoyHub.AgreementManage import AgreementManage
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@allure.feature("管理后台 - 协议管理模块")
class TestAgreementManage:
agreement_id = None
token_set = False
@classmethod
def setup_class(cls):
"""在整个测试类开始时登录一次所有测试用例共享token"""
logging.info("=============================================")
logging.info("=========== 开始登录获取Token ============")
logging.info("=============================================")
cls.test_case = AgreementManage()
username = "joytest"
password = "Zhou1599"
cls.test_case._clear_user_fingerprint(username)
url = "https://joyhub-website-manager-api-test.best-envision.com/admin-api/system/auth/login-dev"
payload = {"username": username, "password": password}
headers = {'Content-Type': 'application/json', 'tenant-id': '126'}
try:
response = requests.post(url, json=payload, headers=headers, verify=False, timeout=10)
login_response = response.json()
if login_response and login_response.get('code') == 0:
token = login_response.get('data', {}).get('accessToken', '')
if token:
cls.test_case.set_joyhub_token(token)
cls.token_set = True
logging.info("登录成功获取到Token: {}...".format(token[:20]))
else:
logging.warning("登录成功但未获取到Token")
else:
logging.error("登录失败: {}".format(login_response))
except Exception as e:
logging.error("登录异常: {}".format(str(e)))
@allure.story("验证登录")
@allure.title("测试登录接口")
def test_joyhub_login_post(self):
"""测试登录接口"""
assert self.token_set is True, "登录失败Token未设置"
logging.info("登录验证通过Token已设置")
@allure.story("验证获得协议分页")
@allure.title("测试获得协议分页接口")
def test_joyhub_agreement_page_get(self):
"""测试获得协议分页接口"""
with allure.step("1. 准备请求参数"):
params = {
"page_no": 1,
"page_size": 10,
"type": "",
"title": "",
"content": "",
"status": ""
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用接口"):
resp = self.test_case.kw_joyhub_agreement_page_get(**params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert "list" in resp["data"], "响应中缺少list字段"
assert "total" in resp["data"], "响应中缺少total字段"
assert isinstance(resp["data"]["list"], list), "list字段不是列表类型"
assert isinstance(resp["data"]["total"], int), "total字段不是整数类型"
logging.info("获得协议分页列表验证通过")
@allure.story("验证创建协议")
@allure.title("测试创建协议接口")
def test_joyhub_agreement_create_post(self):
"""测试创建协议接口"""
with allure.step("1. 准备请求参数"):
timestamp = int(time.time())
params = {
"type": 1,
"title": f"测试协议_{timestamp}",
"content": f"这是测试协议内容_{timestamp}",
"terminal": "web",
"lang": "de",
"rank_num": timestamp % 1000,
"status": 2
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用接口"):
resp = self.test_case.kw_joyhub_agreement_create_post(**params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert isinstance(resp["data"], int), "data字段不是整数类型"
TestAgreementManage.agreement_id = resp["data"]
logging.info(f"创建协议成功协议ID: {TestAgreementManage.agreement_id}")
@allure.story("验证获得协议详情")
@allure.title("测试获得协议详情接口")
def test_joyhub_agreement_get_get(self):
"""测试获得协议详情接口"""
with allure.step("1. 先创建一个协议"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_agreement_create_post(
type=2,
title=f"详情测试协议_{timestamp}",
content=f"详情测试协议内容_{timestamp}",
terminal="app",
lang="ja",
rank_num=(timestamp % 1000) + 1000,
status=1
)
agreement_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not agreement_id:
pytest.skip("创建测试协议失败,跳过详情测试")
allure.attach(json.dumps({"id": agreement_id}, ensure_ascii=False), name="协议ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用获得详情接口"):
resp = self.test_case.kw_joyhub_agreement_get_get(agreement_id=agreement_id)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert isinstance(resp["data"], dict), "data字段不是字典类型"
assert "id" in resp["data"], "响应中缺少id字段"
assert resp["data"]["id"] == agreement_id, "返回的ID与请求的不一致"
logging.info("获得协议详情验证通过")
@allure.story("验证更新协议")
@allure.title("测试更新协议接口")
def test_joyhub_agreement_update_put(self):
"""测试更新协议接口"""
with allure.step("1. 先创建一个协议"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_agreement_create_post(
type=1,
title=f"待更新协议_{timestamp}",
content=f"待更新协议内容_{timestamp}",
terminal="app",
lang="de",
rank_num=(timestamp % 1000) + 2000,
status=1
)
agreement_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not agreement_id:
pytest.skip("创建测试协议失败,跳过更新测试")
allure.attach(json.dumps({"id": agreement_id}, ensure_ascii=False), name="协议ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用更新接口"):
timestamp = int(time.time())
update_params = {
"agreement_id": agreement_id,
"type": 2,
"title": f"已更新协议_{timestamp}",
"content": f"已更新协议内容_{timestamp}",
"terminal": "web",
"lang": "ja",
"rank_num": (timestamp % 1000) + 3000,
"status": 2
}
resp = self.test_case.kw_joyhub_agreement_update_put(**update_params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "更新协议失败"
logging.info("更新协议验证通过")
@allure.story("验证删除协议")
@allure.title("测试删除协议接口")
def test_joyhub_agreement_delete_delete(self):
"""测试删除协议接口"""
with allure.step("1. 先创建一个测试协议"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_agreement_create_post(
type=1,
title=f"待删除协议_{timestamp}",
content=f"待删除协议内容_{timestamp}",
terminal="web",
lang="ja",
rank_num=(timestamp % 1000) + 4000,
status=2
)
agreement_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not agreement_id:
pytest.skip("创建测试协议失败,跳过删除测试")
allure.attach(json.dumps({"id": agreement_id}, ensure_ascii=False), name="待删除协议ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用删除接口"):
resp = self.test_case.kw_joyhub_agreement_delete_delete(agreement_id=agreement_id)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "删除协议失败"
logging.info("删除协议验证通过")
@allure.story("验证批量删除协议")
@allure.title("测试批量删除协议接口")
def test_joyhub_agreement_delete_list_delete(self):
"""测试批量删除协议接口"""
with allure.step("1. 先创建两个测试协议"):
timestamp = int(time.time())
resp1 = self.test_case.kw_joyhub_agreement_create_post(
type=2,
title=f"批量删除协议1_{timestamp}",
content=f"批量删除协议内容1_{timestamp}",
terminal="app",
lang="de",
rank_num=(timestamp % 1000) + 5000,
status=2
)
resp2 = self.test_case.kw_joyhub_agreement_create_post(
type=1,
title=f"批量删除协议2_{timestamp}",
content=f"批量删除协议内容2_{timestamp}",
terminal="app",
lang="ja",
rank_num=(timestamp % 1000) + 6000,
status=2
)
agreement_id1 = resp1.get("data") if resp1 and resp1.get("code") == 0 else None
agreement_id2 = resp2.get("data") if resp2 and resp2.get("code") == 0 else None
if not agreement_id1 or not agreement_id2:
pytest.skip("创建测试协议失败,跳过批量删除测试")
agreement_ids = [agreement_id1, agreement_id2]
allure.attach(json.dumps({"ids": agreement_ids}, ensure_ascii=False), name="待删除协议IDs", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用批量删除接口"):
resp = self.test_case.kw_joyhub_agreement_delete_list_delete(agreement_ids=agreement_ids)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "批量删除协议失败"
logging.info("批量删除协议验证通过")
@allure.story("验证导出Excel")
@allure.title("测试导出协议Excel接口")
def test_joyhub_agreement_export_excel_get(self):
"""测试导出协议Excel接口"""
with allure.step("1. 准备请求参数"):
params = {
"type": "",
"title": "",
"content": "",
"status": ""
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用导出Excel接口"):
resp = self.test_case.kw_joyhub_agreement_export_excel_get(**params)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert resp.status_code == 200, f"请求失败status_code={resp.status_code}"
assert len(resp.content) > 0, "响应内容为空"
logging.info("导出协议Excel验证通过")

View File

@@ -0,0 +1,302 @@
import pytest
import allure
import logging
import requests
import json
import time
from dulizhan.library.BusinessKw.JoyHub.AppVersionManage import AppVersionManage
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@allure.feature("管理后台 - app版本号管理模块")
class TestAppVersionManage:
appversion_id = None
token_set = False
@classmethod
def setup_class(cls):
"""在整个测试类开始时登录一次所有测试用例共享token"""
logging.info("=============================================")
logging.info("=========== 开始登录获取Token ============")
logging.info("=============================================")
cls.test_case = AppVersionManage()
username = "joytest"
password = "Zhou1599"
cls.test_case._clear_user_fingerprint(username)
url = "https://joyhub-website-manager-api-test.best-envision.com/admin-api/system/auth/login-dev"
payload = {"username": username, "password": password}
headers = {'Content-Type': 'application/json', 'tenant-id': '126'}
try:
response = requests.post(url, json=payload, headers=headers, verify=False, timeout=10)
login_response = response.json()
if login_response and login_response.get('code') == 0:
token = login_response.get('data', {}).get('accessToken', '')
if token:
cls.test_case.set_joyhub_token(token)
cls.token_set = True
logging.info("登录成功获取到Token: {}...".format(token[:20]))
else:
logging.warning("登录成功但未获取到Token")
else:
logging.error("登录失败: {}".format(login_response))
except Exception as e:
logging.error("登录异常: {}".format(str(e)))
@allure.story("验证登录")
@allure.title("测试登录接口")
def test_joyhub_login_post(self):
"""测试登录接口"""
assert self.token_set is True, "登录失败Token未设置"
logging.info("登录验证通过Token已设置")
@allure.story("验证获得app版本号分页")
@allure.title("测试获得app版本号管理分页接口")
def test_joyhub_appversion_page_get(self):
"""测试获得app版本号管理分页接口"""
with allure.step("1. 准备请求参数"):
params = {
"page_no": 1,
"page_size": 10,
"store_name": "",
"version": "",
"download_url": "",
"is_on": "",
"status": ""
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用接口"):
resp = self.test_case.kw_joyhub_appversion_page_get(**params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert "list" in resp["data"], "响应中缺少list字段"
assert "total" in resp["data"], "响应中缺少total字段"
assert isinstance(resp["data"]["list"], list), "list字段不是列表类型"
assert isinstance(resp["data"]["total"], int), "total字段不是整数类型"
logging.info("获得app版本号分页列表验证通过")
@allure.story("验证创建app版本号")
@allure.title("测试创建app版本号管理接口")
def test_joyhub_appversion_create_post(self):
"""测试创建app版本号管理接口"""
with allure.step("1. 准备请求参数"):
timestamp = int(time.time())
params = {
"store_name": "App Store",
"version": f"1.0.{timestamp % 1000}",
"download_url": "https://www.example.com/download",
"is_on": 1,
"status": 1
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用接口"):
resp = self.test_case.kw_joyhub_appversion_create_post(**params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert isinstance(resp["data"], int), "data字段不是整数类型"
TestAppVersionManage.appversion_id = resp["data"]
logging.info(f"创建app版本号成功app版本号ID: {TestAppVersionManage.appversion_id}")
@allure.story("验证获得app版本号详情")
@allure.title("测试获得app版本号详情接口")
def test_joyhub_appversion_get_get(self):
"""测试获得app版本号详情接口"""
with allure.step("1. 先创建一个app版本号"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_appversion_create_post(
store_name="Google Play",
version=f"2.0.{timestamp % 1000}",
download_url="https://www.example.com/download",
is_on=1,
status=1
)
appversion_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not appversion_id:
pytest.skip("创建测试app版本号失败跳过详情测试")
allure.attach(json.dumps({"id": appversion_id}, ensure_ascii=False), name="app版本号 ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用获得详情接口"):
resp = self.test_case.kw_joyhub_appversion_get_get(appversion_id=appversion_id)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert isinstance(resp["data"], dict), "data字段不是字典类型"
assert "id" in resp["data"], "响应中缺少id字段"
assert resp["data"]["id"] == appversion_id, "返回的ID与请求的不一致"
logging.info("获得app版本号详情验证通过")
@allure.story("验证更新app版本号")
@allure.title("测试更新app版本号管理接口")
def test_joyhub_appversion_update_put(self):
"""测试更新app版本号管理接口"""
with allure.step("1. 先创建一个app版本号"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_appversion_create_post(
store_name="Huawei Store",
version=f"3.0.{timestamp % 1000}",
download_url="https://www.example.com/download",
is_on=1,
status=1
)
appversion_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not appversion_id:
pytest.skip("创建测试app版本号失败跳过更新测试")
allure.attach(json.dumps({"id": appversion_id}, ensure_ascii=False), name="app版本号 ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用更新接口"):
timestamp = int(time.time())
update_params = {
"appversion_id": appversion_id,
"store_name": "Updated Store",
"version": f"4.0.{timestamp % 1000}",
"download_url": "https://www.updated-example.com/download",
"is_on": 2,
"status": 2
}
resp = self.test_case.kw_joyhub_appversion_update_put(**update_params)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "更新app版本号失败"
logging.info("更新app版本号验证通过")
@allure.story("验证删除app版本号")
@allure.title("测试删除app版本号管理接口")
def test_joyhub_appversion_delete_delete(self):
"""测试删除app版本号管理接口"""
with allure.step("1. 先创建一个测试app版本号"):
timestamp = int(time.time())
create_resp = self.test_case.kw_joyhub_appversion_create_post(
store_name="Test Store",
version=f"5.0.{timestamp % 1000}",
download_url="https://www.example.com/download",
is_on=1,
status=2
)
appversion_id = create_resp.get("data") if create_resp and create_resp.get("code") == 0 else None
if not appversion_id:
pytest.skip("创建测试app版本号失败跳过删除测试")
allure.attach(json.dumps({"id": appversion_id}, ensure_ascii=False), name="待删除app版本号 ID", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用删除接口"):
resp = self.test_case.kw_joyhub_appversion_delete_delete(appversion_id=appversion_id)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "删除app版本号失败"
logging.info("删除app版本号验证通过")
@allure.story("验证批量删除app版本号")
@allure.title("测试批量删除app版本号管理接口")
def test_joyhub_appversion_delete_list_delete(self):
"""测试批量删除app版本号管理接口"""
with allure.step("1. 先创建两个测试app版本号"):
timestamp = int(time.time())
resp1 = self.test_case.kw_joyhub_appversion_create_post(
store_name="Batch Store 1",
version=f"6.0.{timestamp % 1000}",
download_url="https://www.example.com/download",
is_on=1,
status=2
)
resp2 = self.test_case.kw_joyhub_appversion_create_post(
store_name="Batch Store 2",
version=f"7.0.{timestamp % 1000 + 1}",
download_url="https://www.example.com/download",
is_on=1,
status=2
)
appversion_id1 = resp1.get("data") if resp1 and resp1.get("code") == 0 else None
appversion_id2 = resp2.get("data") if resp2 and resp2.get("code") == 0 else None
if not appversion_id1 or not appversion_id2:
pytest.skip("创建测试app版本号失败跳过批量删除测试")
appversion_ids = [appversion_id1, appversion_id2]
allure.attach(json.dumps({"ids": appversion_ids}, ensure_ascii=False), name="待删除app版本号 IDs", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用批量删除接口"):
resp = self.test_case.kw_joyhub_appversion_delete_list_delete(appversion_ids=appversion_ids)
allure.attach(json.dumps(resp, ensure_ascii=False, indent=2), name="响应数据", attachment_type=allure.attachment_type.JSON)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert "code" in resp, "响应中缺少code字段"
assert resp["code"] == 0, f"请求失败code={resp.get('code')}"
assert "data" in resp, "响应中缺少data字段"
assert resp["data"] is True, "批量删除app版本号失败"
logging.info("批量删除app版本号验证通过")
@allure.story("验证获得导入模板")
@allure.title("测试获得导入app版本号管理模板接口")
def test_joyhub_appversion_get_import_template_get(self):
"""测试获得导入app版本号管理模板接口"""
with allure.step("1. 调用获得导入模板接口"):
resp = self.test_case.kw_joyhub_appversion_get_import_template_get()
with allure.step("2. 验证响应"):
assert resp is not None, "响应为空"
assert resp.status_code == 200, f"请求失败status_code={resp.status_code}"
assert len(resp.content) > 0, "响应内容为空"
logging.info("获得导入app版本号模板验证通过")
@allure.story("验证导出Excel")
@allure.title("测试导出app版本号管理Excel接口")
def test_joyhub_appversion_export_excel_get(self):
"""测试导出app版本号管理Excel接口"""
with allure.step("1. 准备请求参数"):
params = {
"page_no": 1,
"page_size": 10,
"store_name": "",
"version": "",
"download_url": "",
"is_on": "",
"status": ""
}
allure.attach(json.dumps(params, ensure_ascii=False), name="请求参数", attachment_type=allure.attachment_type.TEXT)
with allure.step("2. 调用导出Excel接口"):
resp = self.test_case.kw_joyhub_appversion_export_excel_get(**params)
with allure.step("3. 验证响应"):
assert resp is not None, "响应为空"
assert resp.status_code == 200, f"请求失败status_code={resp.status_code}"
assert len(resp.content) > 0, "响应内容为空"
logging.info("导出app版本号Excel验证通过")

View File

View File

@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
from joyhub_backend.library.functional_case_converter import (
ApiContextItem,
ApiMatcher,
FunctionalCaseInput,
FunctionalCaseParser,
FunctionalCaseToApiAutomationService,
FunctionalStep,
LLMApiCaseConverter,
convert_functional_case,
)

View File

@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
import json
import logging
import os
import allure
import requests
CAPTCHA_URL = os.getenv(
"JOYHUB_CAPTCHA_URL",
"http://test-manager-api.best-envision.com/admin/login/captcha",
)
LOGIN_BASE_URL = os.getenv(
"JOYHUB_LOGIN_BASE_URL",
"http://test-manager-api.best-envision.com",
)
LOGIN_PATH = os.getenv("JOYHUB_LOGIN_PATH", "/admin/login/login")
USERNAME = os.getenv("JOYHUB_USERNAME", "guojiabao")
PASSWORD = os.getenv("JOYHUB_PASSWORD", "gjb123456")
CAPTCHA = os.getenv("JOYHUB_CAPTCHA", "1111")
TIMEOUT = int(os.getenv("JOYHUB_TIMEOUT", "20"))
TENANT_ID = os.getenv("JOYHUB_TENANT_ID", "126")
class JoyhubAuth(object):
def __init__(self):
self.session = requests.Session()
self._token = None
@property
def login_url(self):
return LOGIN_BASE_URL.rstrip("/") + LOGIN_PATH
def get_captcha_key(self):
with allure.step("前置:获取登录验证码 key"):
logging.info("GET %s", CAPTCHA_URL)
response = self.session.get(CAPTCHA_URL, timeout=TIMEOUT)
self._attach_response("captcha", response)
response.raise_for_status()
data = response.json()
key = self._extract_key(data)
assert key, "验证码接口未返回 key响应{}".format(data)
return key
def login(self):
if self._token:
return self._token
key = self.get_captcha_key()
body = {
"key": key,
"username": USERNAME,
"password": PASSWORD,
"captcha": CAPTCHA,
}
headers = {"Content-Type": "application/json", "tenant-id": TENANT_ID}
with allure.step("前置:登录并获取 token"):
logging.info("POST %s", self.login_url)
logging.info("request headers: %s", headers)
logging.info("request body: %s", self._safe_body(body))
response = self.session.post(self.login_url, json=body, headers=headers, timeout=TIMEOUT)
self._attach_request(self.login_url, "POST", headers, self._safe_body(body))
self._attach_response("login", response)
response.raise_for_status()
data = response.json()
token = self._extract_token(data)
assert token, "登录接口未返回 token响应{}".format(data)
self._token = token if token.startswith("Bearer ") else "Bearer " + token
return self._token
def auth_headers(self):
return {
"Authorization": self.login(),
"Content-Type": "application/json",
"tenant-id": TENANT_ID,
}
@staticmethod
def _extract_key(data):
if isinstance(data, dict):
for field in ("key", "captchaKey", "captcha_key", "uuid"):
if data.get(field):
return data.get(field)
nested = data.get("data")
if isinstance(nested, dict):
for field in ("key", "captchaKey", "captcha_key", "uuid"):
if nested.get(field):
return nested.get(field)
if isinstance(nested, str):
return nested
return None
@staticmethod
def _extract_token(data):
token_fields = (
"token",
"access_token",
"accessToken",
"Authorization",
"authorization",
"userToken",
"user_token",
"jwt",
)
if isinstance(data, dict):
for field in token_fields:
token = data.get(field)
if JoyhubAuth._looks_like_token(token):
return token
nested = data.get("data")
if isinstance(nested, dict):
for field in token_fields:
token = nested.get(field)
if JoyhubAuth._looks_like_token(token):
return token
return None
@staticmethod
def _looks_like_token(value):
if not isinstance(value, str):
return False
value = value.strip()
if not value:
return False
if any(ord(char) > 127 for char in value):
return False
return len(value) >= 16 or value.startswith("Bearer ")
@staticmethod
def _safe_body(body):
safe = dict(body)
if "password" in safe:
safe["password"] = "******"
return safe
@staticmethod
def _attach_request(url, method, headers, body):
allure.attach(str(url), "请求url", allure.attachment_type.TEXT)
allure.attach(str(method), "请求方式", allure.attachment_type.TEXT)
allure.attach(json.dumps(headers, ensure_ascii=False, indent=2), "请求头", allure.attachment_type.JSON)
allure.attach(json.dumps(body, ensure_ascii=False, indent=2), "请求体", allure.attachment_type.JSON)
@staticmethod
def _attach_response(name, response):
logging.info("%s response status: %s", name, response.status_code)
logging.info("%s response body: %s", name, response.text)
allure.attach(str(response.status_code), "{} 响应状态码".format(name), allure.attachment_type.TEXT)
allure.attach(response.text, "{} 响应体".format(name), allure.attachment_type.JSON)

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
import logging
from joyhub_backend.library.joyhub_interface import JoyhubInterface
class JoyhubBusiness(JoyhubInterface):
def get_video_label_list(self):
logging.info("==========获取视频标签列表==========")
body = {
"page": 1,
"limit": 10,
"sort": "id",
"label_name": "",
"category_id": 0,
"video_type": 0,
"video_num": 0,
"created_at": [],
"order": "descending",
}
return self.request("获取视频标签列表", "POST", "admin/video/getVideoLabelList", body=body)
def get_video_category_list(self):
logging.info("==========获取视频分类列表==========")
body = {
"page": 1,
"limit": 10,
"sort": "id",
"category_name": "",
"order": "descending",
}
return self.request("获取视频分类列表", "POST", "admin/video/getVideoCategoryList", body=body)
def get_exchange_record_list(self):
logging.info("==========兑换记录列表==========")
body = {
"page": 1,
"limit": 10,
"status": "",
"goods_type": "",
"created_at": [],
}
return self.request("兑换记录列表", "POST", "admin/exchange/recordList", body=body)
def get_app_feedback_list(self):
logging.info("==========app反馈意见列表==========")
body = {
"page": 1,
"limit": 10,
"status": "",
"keyword": "",
}
return self.request("app反馈意见列表", "POST", "admin/feedback/list", body=body)
def get_sensitive_word_list(self):
logging.info("==========敏感词列表==========")
body = {
"page": 1,
"limit": 10,
"keyword": "",
}
return self.request("敏感词列表", "POST", "admin/sensitiveWord/list", body=body)

View File

@@ -0,0 +1,917 @@
# -*- coding: utf-8 -*-
import argparse
import copy
import json
import os
import re
from dataclasses import asdict, dataclass, field
from difflib import SequenceMatcher
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
COMMON_CN_KEYWORDS = [
"登录",
"查询",
"搜索",
"筛选",
"列表",
"详情",
"新增",
"创建",
"添加",
"修改",
"编辑",
"更新",
"删除",
"导出",
"导入",
"上传",
"下载",
"提交",
"审核",
"确认",
"保存",
"分页",
"分页查询",
"性能",
"响应时间",
"超时",
"返回",
"结果",
"成功",
"失败",
"校验",
"重置",
"启用",
"停用",
"开关",
"状态",
"排序",
"标签",
"分类",
"关键字",
"关键字搜索",
"模糊搜索",
]
METHOD_INTENT_KEYWORDS = {
"GET": ["查询", "搜索", "筛选", "列表", "详情", "获取", "查看", "分页", "返回"],
"POST": ["新增", "创建", "添加", "登录", "提交", "导入", "上传", "确认", "审核", "保存"],
"PUT": ["修改", "编辑", "更新", "重置"],
"PATCH": ["修改", "编辑", "更新", "状态", "启用", "停用", "开关"],
"DELETE": ["删除", "移除"],
}
DEFAULT_PLACEHOLDER_URL = "/api/need-confirm"
DEFAULT_PAGE_SIZE = 10
DEFAULT_RESPONSE_TIME_MS = 1000
@dataclass
class FunctionalStep:
stepNo: int
action: str
expectedResult: str = ""
@dataclass
class ApiContextItem:
apiName: str = ""
method: str = ""
url: str = ""
description: str = ""
headers: Dict[str, Any] = field(default_factory=dict)
queryParams: Dict[str, Any] = field(default_factory=dict)
requestBody: Any = None
responseExample: Any = None
tags: List[str] = field(default_factory=list)
@dataclass
class FunctionalCaseInput:
caseId: str = ""
caseKey: str = ""
caseName: str = ""
projectName: str = ""
moduleName: str = ""
priority: str = ""
preconditions: str = ""
steps: List[FunctionalStep] = field(default_factory=list)
expectedResults: List[str] = field(default_factory=list)
apiContext: Optional[List[ApiContextItem]] = None
generateOptions: Dict[str, Any] = field(default_factory=dict)
extra: Dict[str, Any] = field(default_factory=dict)
class FunctionalCaseParser:
@classmethod
def parse(cls, payload: Union[str, Dict[str, Any], FunctionalCaseInput]) -> FunctionalCaseInput:
if isinstance(payload, FunctionalCaseInput):
return payload
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
payload = {
"caseName": "手工输入功能用例",
"steps": [payload],
"expectedResults": [],
}
if not isinstance(payload, dict):
raise TypeError("functional case payload must be dict, json string or FunctionalCaseInput")
steps = cls._normalize_steps(payload.get("steps") or payload.get("step") or [])
expected_results = cls._normalize_text_list(
payload.get("expectedResults")
or payload.get("expectedResult")
or payload.get("expectations")
or []
)
api_context = cls._normalize_api_context(payload.get("apiContext") or payload.get("apis") or payload.get("apiInfo"))
generate_options = payload.get("generateOptions") or payload.get("options") or {}
known_keys = {
"caseId",
"caseKey",
"caseName",
"projectName",
"moduleName",
"priority",
"preconditions",
"steps",
"step",
"expectedResults",
"expectedResult",
"expectations",
"apiContext",
"apis",
"apiInfo",
"generateOptions",
"options",
}
extra = {key: value for key, value in payload.items() if key not in known_keys}
return FunctionalCaseInput(
caseId=str(payload.get("caseId", "") or ""),
caseKey=str(payload.get("caseKey", "") or ""),
caseName=str(payload.get("caseName", "") or payload.get("name", "") or ""),
projectName=str(payload.get("projectName", "") or ""),
moduleName=str(payload.get("moduleName", "") or ""),
priority=str(payload.get("priority", "") or ""),
preconditions=str(payload.get("preconditions", "") or payload.get("precondition", "") or ""),
steps=steps,
expectedResults=expected_results,
apiContext=api_context,
generateOptions=generate_options if isinstance(generate_options, dict) else {},
extra=extra,
)
@staticmethod
def _normalize_steps(value: Any) -> List[FunctionalStep]:
if not value:
return []
if isinstance(value, str):
raw_items = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()]
if len(raw_items) <= 1:
raw_items = [item.strip() for item in re.split(r"(?<=[。;;])", value) if item.strip()]
items = raw_items
elif isinstance(value, list):
items = value
else:
items = [value]
normalized: List[FunctionalStep] = []
for index, item in enumerate(items, start=1):
if isinstance(item, dict):
step_no = int(item.get("stepNo") or item.get("step_no") or item.get("no") or index)
action = str(item.get("action") or item.get("step") or item.get("content") or item.get("description") or "")
expected_result = str(item.get("expectedResult") or item.get("expected") or "")
else:
raw_text = str(item).strip()
step_no = index
action = re.sub(r"^\s*\d+[\.、\)]\s*", "", raw_text)
expected_result = ""
if action:
normalized.append(FunctionalStep(stepNo=step_no, action=action, expectedResult=expected_result))
return normalized
@staticmethod
def _normalize_text_list(value: Any) -> List[str]:
if not value:
return []
if isinstance(value, str):
parts = [item.strip() for item in re.split(r"[\n\r]+", value) if item.strip()]
if len(parts) <= 1:
parts = [item.strip() for item in re.split(r"[。;;]\s*", value) if item.strip()]
return [item for item in parts if item]
if isinstance(value, list):
result = []
for item in value:
if isinstance(item, dict):
text = str(item.get("text") or item.get("content") or item.get("expected") or "").strip()
else:
text = str(item).strip()
if text:
result.append(text)
return result
return [str(value).strip()] if str(value).strip() else []
@staticmethod
def _normalize_api_context(value: Any) -> Optional[List[ApiContextItem]]:
if not value:
return None
if isinstance(value, dict):
if "apis" in value and isinstance(value["apis"], list):
value = value["apis"]
elif "items" in value and isinstance(value["items"], list):
value = value["items"]
else:
value = [value]
if isinstance(value, str):
value = value.strip()
if not value:
return None
try:
parsed = json.loads(value)
return FunctionalCaseParser._normalize_api_context(parsed)
except Exception:
return FunctionalCaseParser._parse_markdown_api_context(value)
if not isinstance(value, list):
return None
apis: List[ApiContextItem] = []
for item in value:
if isinstance(item, ApiContextItem):
apis.append(item)
continue
if not isinstance(item, dict):
continue
headers = item.get("headers") or item.get("header") or {}
query_params = item.get("queryParams") or item.get("query") or item.get("params") or {}
tags = item.get("tags") or []
if isinstance(tags, str):
tags = [tag.strip() for tag in re.split(r"[\s,/|]+", tags) if tag.strip()]
apis.append(
ApiContextItem(
apiName=str(item.get("apiName") or item.get("name") or ""),
method=str(item.get("method") or item.get("httpMethod") or "").upper(),
url=str(item.get("url") or item.get("path") or ""),
description=str(item.get("description") or item.get("desc") or ""),
headers=headers if isinstance(headers, dict) else {},
queryParams=query_params if isinstance(query_params, dict) else {},
requestBody=item.get("requestBody") if "requestBody" in item else item.get("body"),
responseExample=item.get("responseExample") or item.get("response") or item.get("example"),
tags=tags if isinstance(tags, list) else [],
)
)
return apis or None
@staticmethod
def _parse_markdown_api_context(text: str) -> Optional[List[ApiContextItem]]:
lines = text.splitlines()
apis: List[ApiContextItem] = []
current: Dict[str, Any] = {}
in_body = False
body_lines: List[str] = []
def flush_current():
nonlocal current, body_lines, in_body
if not current:
return
body_text = "\n".join(body_lines).strip()
request_body = FunctionalCaseParser._safe_json_load(body_text)
apis.append(
ApiContextItem(
apiName=current.get("apiName", ""),
method=current.get("method", ""),
url=current.get("url", ""),
description=current.get("description", ""),
headers=current.get("headers", {}),
queryParams=current.get("queryParams", {}),
requestBody=request_body if request_body is not None else body_text,
responseExample=current.get("responseExample"),
tags=current.get("tags", []),
)
)
current = {}
body_lines = []
in_body = False
for line in lines:
stripped = line.strip()
if stripped.startswith("## "):
flush_current()
current["apiName"] = stripped[3:].strip()
continue
if stripped == "**接口URL**":
continue
if stripped.startswith("> ") and not current.get("url"):
value = stripped[2:].strip()
if value and value != "暂无参数":
current["url"] = value
continue
if stripped == "**请求方式**":
continue
if stripped == "**请求Body参数**":
in_body = True
body_lines = []
continue
if stripped == "**响应示例**":
in_body = False
continue
if in_body:
if stripped.startswith("```"):
continue
body_lines.append(line)
flush_current()
return apis or None
@staticmethod
def _safe_json_load(text: str) -> Any:
if not text:
return None
cleaned = text.strip()
if cleaned.startswith("```"):
cleaned = re.sub(r"^```[a-zA-Z0-9_-]*\s*", "", cleaned)
cleaned = re.sub(r"\s*```$", "", cleaned)
try:
return json.loads(cleaned)
except Exception:
return None
class ApiMatcher:
def match(self, functional_case: FunctionalCaseInput, candidates: List[ApiContextItem], top_k: int = 3) -> List[Dict[str, Any]]:
matched: List[Dict[str, Any]] = []
keywords = self.extract_keywords(functional_case)
intent = self.detect_intent(functional_case)
for api in candidates:
score = self._score_api(api, keywords, intent, functional_case)
if score <= 0:
continue
matched.append(
{
"score": round(score, 4),
"api": api,
"reason": self._build_reason(api, keywords, intent),
}
)
matched.sort(key=lambda item: item["score"], reverse=True)
return matched[:top_k]
def extract_keywords(self, functional_case: FunctionalCaseInput) -> List[str]:
texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions]
texts.extend(step.action for step in functional_case.steps)
texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult)
texts.extend(functional_case.expectedResults)
texts.extend(str(value) for value in functional_case.extra.values() if value is not None)
full_text = " \n ".join(texts)
keywords: List[str] = []
for keyword in COMMON_CN_KEYWORDS:
if keyword in full_text and keyword not in keywords:
keywords.append(keyword)
english_tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]{1,}", full_text)
for token in english_tokens:
if token not in keywords:
keywords.append(token)
generic_tokens = re.findall(r"[\u4e00-\u9fa5]{2,}", full_text)
for token in generic_tokens:
if len(token) <= 2:
if token not in keywords:
keywords.append(token)
continue
if token not in keywords:
keywords.append(token)
return self._dedupe(keywords)
def detect_intent(self, functional_case: FunctionalCaseInput) -> str:
text = self._all_text(functional_case)
for method, keywords in METHOD_INTENT_KEYWORDS.items():
if any(keyword in text for keyword in keywords):
return method
return "GET"
def _score_api(self, api: ApiContextItem, keywords: List[str], intent: str, functional_case: FunctionalCaseInput) -> float:
score = 0.0
api_text = " ".join(
[
api.apiName or "",
api.method or "",
api.url or "",
api.description or "",
" ".join(api.tags or []),
" ".join(api.headers.keys()),
" ".join(api.queryParams.keys()),
]
)
if api.method and api.method.upper() == intent:
score += 1.2
elif api.method and api.method.upper() in ("GET", "POST", "PUT", "PATCH", "DELETE"):
score += 0.2
for keyword in keywords:
if keyword and keyword in api_text:
score += 0.8
for token in self._url_tokens(api.url):
if token and any(keyword in token or token in keyword for keyword in keywords):
score += 0.6
if functional_case.moduleName and functional_case.moduleName in api_text:
score += 0.8
if functional_case.caseName and functional_case.caseName in api_text:
score += 0.5
ratio = SequenceMatcher(None, self._all_text(functional_case), api_text).ratio()
score += ratio * 0.8
return score
@staticmethod
def _build_reason(api: ApiContextItem, keywords: List[str], intent: str) -> str:
matched_keywords = [keyword for keyword in keywords if keyword and keyword in (api.apiName + " " + api.description + " " + api.url)]
parts = []
if api.method and api.method.upper() == intent:
parts.append("方法匹配")
if matched_keywords:
parts.append("关键词命中:{}".format("".join(matched_keywords[:5])))
return "; ".join(parts) or "模糊匹配"
@staticmethod
def _url_tokens(url: str) -> List[str]:
if not url:
return []
return [token for token in re.split(r"[/?&=_\-\.]+", url) if token]
@staticmethod
def _dedupe(items: Iterable[str]) -> List[str]:
seen = set()
result = []
for item in items:
if item and item not in seen:
seen.add(item)
result.append(item)
return result
@staticmethod
def _all_text(functional_case: FunctionalCaseInput) -> str:
texts = [functional_case.caseName, functional_case.moduleName, functional_case.preconditions]
texts.extend(step.action for step in functional_case.steps)
texts.extend(step.expectedResult for step in functional_case.steps if step.expectedResult)
texts.extend(functional_case.expectedResults)
return " \n ".join(filter(None, texts))
class LLMApiCaseConverter:
def convert(self, prompt: str, llm_client: Any = None) -> Optional[Dict[str, Any]]:
if llm_client is None:
return None
raw = self._invoke_client(llm_client, prompt)
if raw is None:
return None
if isinstance(raw, dict):
return raw
if hasattr(raw, "content"):
raw = raw.content
elif hasattr(raw, "text"):
raw = raw.text
elif hasattr(raw, "data"):
raw = raw.data
if isinstance(raw, dict):
return raw
if not isinstance(raw, str):
raw = str(raw)
return self._extract_json(raw)
@staticmethod
def _invoke_client(llm_client: Any, prompt: str) -> Any:
if callable(llm_client):
try:
return llm_client(prompt=prompt)
except TypeError:
try:
return llm_client(prompt)
except TypeError:
return llm_client({"prompt": prompt})
if hasattr(llm_client, "generate") and callable(llm_client.generate):
return llm_client.generate(prompt)
if hasattr(llm_client, "chat") and callable(llm_client.chat):
return llm_client.chat(prompt)
raise TypeError("llm_client must be callable or expose generate/chat")
@staticmethod
def _extract_json(text: str) -> Optional[Dict[str, Any]]:
if not text:
return None
cleaned = text.strip()
match = re.search(r"```json\s*(\{.*?\})\s*```", cleaned, flags=re.S)
if match:
cleaned = match.group(1)
else:
first = cleaned.find("{")
last = cleaned.rfind("}")
if first >= 0 and last > first:
cleaned = cleaned[first : last + 1]
try:
parsed = json.loads(cleaned)
return parsed if isinstance(parsed, dict) else {"data": parsed}
except Exception:
return None
class FunctionalCaseToApiAutomationService:
def __init__(self, matcher: Optional[ApiMatcher] = None):
self.matcher = matcher or ApiMatcher()
self.llm_converter = LLMApiCaseConverter()
def convert(self, payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]:
functional_case = FunctionalCaseParser.parse(payload)
options = self._build_options(functional_case)
candidates = functional_case.apiContext or []
matched = self.matcher.match(functional_case, candidates, top_k=int(options.get("topK", 3))) if candidates else []
llm_result = None
if options.get("useLLM", True) and llm_client is not None:
prompt = self.build_prompt(functional_case, matched, options)
llm_result = self.llm_converter.convert(prompt, llm_client)
if llm_result:
return self._finalize_llm_result(functional_case, matched, llm_result, options)
return self._build_rule_result(functional_case, matched, options)
def build_prompt(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> str:
payload = {
"functionalCase": asdict(functional_case),
"matchedApis": [
{
"score": item["score"],
"reason": item["reason"],
"api": asdict(item["api"]),
}
for item in matched
],
"options": options,
}
return (
"你是接口自动化测试用例生成专家。\n"
"请把功能测试用例转换成接口自动化测试用例。\n"
"要求:\n"
"1. 只输出严格 JSON。\n"
"2. 如果给了候选接口信息,优先使用候选接口。\n"
"3. 无法确认的接口信息写入 missingInfo。\n"
"4. 需要包含 method、url、headers、queryParams、body、assertions、performanceAssertions。\n"
"5. 如果涉及性能要求,生成 responseTime 断言。\n"
"6. 如果涉及登录态,使用 ${token}\n\n"
"输入数据:\n"
f"{json.dumps(payload, ensure_ascii=False, indent=2)}\n\n"
"输出 JSON 结构:\n"
"{\n"
' "caseId": "",\n'
' "caseKey": "",\n'
' "caseName": "",\n'
' "automationType": "api",\n'
' "convertStatus": "SUCCESS",\n'
' "apiTestCases": [],\n'
' "missingInfo": [],\n'
' "warnings": []\n'
"}"
)
def _build_options(self, functional_case: FunctionalCaseInput) -> Dict[str, Any]:
options = copy.deepcopy(functional_case.generateOptions or {})
options.setdefault("useLLM", True)
options.setdefault("outputFormat", "json")
options.setdefault("targetFramework", "pytest")
options.setdefault("allowMissingInfo", True)
options.setdefault("topK", 3)
return options
def _build_rule_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], options: Dict[str, Any]) -> Dict[str, Any]:
api_test_cases: List[Dict[str, Any]] = []
missing_info: List[str] = []
warnings: List[str] = []
if matched:
for index, item in enumerate(matched, start=1):
api_test_cases.append(self._build_api_test_case(functional_case, item["api"], index, item["score"], options))
else:
api_test_cases.append(self._build_fallback_test_case(functional_case, options))
missing_info.extend(self._collect_missing_info(functional_case))
warnings.append("未匹配到真实接口信息,已生成占位自动化用例")
missing_info = self._dedupe_text(missing_info)
warnings = self._dedupe_text(warnings)
convert_status = "SUCCESS"
if missing_info:
convert_status = "SUCCESS_WITH_MISSING_INFO"
if warnings and not matched:
convert_status = "DRAFT"
return {
"caseId": functional_case.caseId,
"caseKey": functional_case.caseKey,
"caseName": functional_case.caseName,
"projectName": functional_case.projectName,
"moduleName": functional_case.moduleName,
"automationType": "api",
"convertStatus": convert_status,
"apiTestCases": api_test_cases,
"missingInfo": missing_info,
"warnings": warnings,
}
def _finalize_llm_result(self, functional_case: FunctionalCaseInput, matched: List[Dict[str, Any]], llm_result: Dict[str, Any], options: Dict[str, Any]) -> Dict[str, Any]:
result = copy.deepcopy(llm_result)
result.setdefault("caseId", functional_case.caseId)
result.setdefault("caseKey", functional_case.caseKey)
result.setdefault("caseName", functional_case.caseName)
result.setdefault("projectName", functional_case.projectName)
result.setdefault("moduleName", functional_case.moduleName)
result.setdefault("automationType", "api")
result.setdefault("convertStatus", "SUCCESS")
result.setdefault("apiTestCases", [])
result.setdefault("missingInfo", [])
result.setdefault("warnings", [])
if not result.get("apiTestCases"):
rule_result = self._build_rule_result(functional_case, matched, options)
result["apiTestCases"] = rule_result["apiTestCases"]
if not result.get("missingInfo"):
result["missingInfo"] = rule_result["missingInfo"]
if not result.get("warnings"):
result["warnings"] = rule_result["warnings"]
result["convertStatus"] = rule_result["convertStatus"]
result["missingInfo"] = self._dedupe_text(result.get("missingInfo", []))
result["warnings"] = self._dedupe_text(result.get("warnings", []))
return result
def _build_api_test_case(self, functional_case: FunctionalCaseInput, api: ApiContextItem, index: int, score: float, options: Dict[str, Any]) -> Dict[str, Any]:
method = (api.method or self.matcher.detect_intent(functional_case)).upper()
query_params = copy.deepcopy(api.queryParams or {})
body = copy.deepcopy(api.requestBody)
headers = copy.deepcopy(api.headers or {})
if not headers:
headers = {"Content-Type": "application/json"}
if not any(key.lower() == "authorization" for key in headers):
headers["Authorization"] = "Bearer ${token}"
variables = self._build_variables(functional_case, api, query_params, body)
query_params = self._fill_placeholders(query_params, variables)
body = self._fill_placeholders(body, variables)
assertions = self._build_assertions(functional_case, api, method)
performance_assertions = self._build_performance_assertions(functional_case)
step_name = api.apiName or functional_case.caseName or "接口自动化步骤"
return {
"stepNo": index,
"name": step_name,
"apiName": api.apiName or step_name,
"method": method,
"url": api.url or self._infer_placeholder_url(functional_case),
"headers": headers,
"queryParams": query_params,
"body": body,
"extractVariables": [],
"assertions": assertions,
"performanceAssertions": performance_assertions,
"variables": variables,
"matchedApi": {
"apiName": api.apiName,
"method": method,
"url": api.url,
"matchScore": round(score, 4),
},
}
def _build_fallback_test_case(self, functional_case: FunctionalCaseInput, options: Dict[str, Any]) -> Dict[str, Any]:
method = self.matcher.detect_intent(functional_case)
url = self._infer_placeholder_url(functional_case)
variables = self._build_variables(functional_case, None, {}, None)
return {
"stepNo": 1,
"name": functional_case.caseName or "功能用例转接口自动化",
"apiName": functional_case.caseName or "待确认接口",
"method": method,
"url": url,
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${token}",
},
"queryParams": {},
"body": None,
"extractVariables": [],
"assertions": self._build_assertions(functional_case, None, method),
"performanceAssertions": self._build_performance_assertions(functional_case),
"variables": variables,
"matchedApi": {
"apiName": "",
"method": method,
"url": url,
"matchScore": 0,
},
}
def _build_variables(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], query_params: Dict[str, Any], body: Any) -> Dict[str, Any]:
variables: Dict[str, Any] = {}
text = self.matcher._all_text(functional_case)
for keyword in self.matcher.extract_keywords(functional_case):
if keyword in ("搜索", "查询", "列表", "筛选", "关键字", "模糊搜索"):
variables.setdefault("keyword", "测试")
if keyword in ("分页", "分页查询"):
variables.setdefault("page", 1)
variables.setdefault("limit", DEFAULT_PAGE_SIZE)
if keyword in ("登录",):
variables.setdefault("token", "${token}")
if not variables and text:
variables["keyword"] = "测试"
if query_params:
for key in query_params.keys():
if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]):
variables[key] = query_params[key]
if isinstance(body, dict):
for key in body.keys():
if key not in variables and any(token in key.lower() for token in ["keyword", "name", "id", "page", "limit", "status", "type"]):
variables[key] = body[key]
return variables
def _build_assertions(self, functional_case: FunctionalCaseInput, api: Optional[ApiContextItem], method: str) -> List[Dict[str, Any]]:
assertions = [
{
"type": "statusCode",
"actual": "$.code" if api or method != "DELETE" else "$.status",
"operator": "==",
"expected": 200,
}
]
text = self.matcher._all_text(functional_case)
if api and isinstance(api.responseExample, dict):
example = api.responseExample
if "code" in example:
assertions.append(
{
"type": "jsonPath",
"actual": "$.code",
"operator": "==",
"expected": example.get("code", 0),
}
)
if "msg" in example:
assertions.append(
{
"type": "jsonPath",
"actual": "$.msg",
"operator": "notEmpty",
"expected": True,
}
)
if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选", "返回"]):
assertions.append(
{
"type": "jsonPath",
"actual": "$.data",
"operator": "notEmpty",
"expected": True,
}
)
return assertions
def _build_performance_assertions(self, functional_case: FunctionalCaseInput) -> List[Dict[str, Any]]:
text = self.matcher._all_text(functional_case)
expected_ms = self._extract_response_time_ms(text)
if expected_ms is None:
return []
return [
{
"type": "responseTime",
"operator": "<=",
"expected": expected_ms,
"unit": "ms",
}
]
def _collect_missing_info(self, functional_case: FunctionalCaseInput) -> List[str]:
missing = []
if not functional_case.steps:
missing.append("缺少步骤信息")
if not functional_case.caseName:
missing.append("缺少用例名称")
if not functional_case.apiContext:
missing.append("未传入真实接口信息,已使用占位接口生成")
if self._extract_response_time_ms(self.matcher._all_text(functional_case)) is None and any(k in self.matcher._all_text(functional_case) for k in ["性能", "响应时间", "超时"]):
missing.append("性能阈值未明确,默认使用 1000ms")
return missing
@staticmethod
def _extract_response_time_ms(text: str) -> Optional[int]:
if not text:
return None
patterns = [
r"(\d+)\s*毫秒",
r"(\d+)\s*ms",
r"(\d+)\s*秒",
r"不超过\s*(\d+)\s*秒",
r"<=\s*(\d+)\s*秒",
r"小于等于\s*(\d+)\s*秒",
]
for pattern in patterns:
match = re.search(pattern, text, flags=re.I)
if match:
value = int(match.group(1))
if "" in pattern:
return value * 1000
return value
if any(keyword in text for keyword in ["响应时间", "超时", "性能"]):
return DEFAULT_RESPONSE_TIME_MS
return None
@staticmethod
def _infer_placeholder_url(functional_case: FunctionalCaseInput) -> str:
text = " ".join([functional_case.caseName, functional_case.moduleName] + [step.action for step in functional_case.steps])
if any(keyword in text for keyword in ["登录"]):
return "/api/login"
if any(keyword in text for keyword in ["搜索", "查询", "列表", "筛选"]):
return "/api/list/query"
if any(keyword in text for keyword in ["新增", "创建", "添加"]):
return "/api/create"
if any(keyword in text for keyword in ["修改", "编辑", "更新"]):
return "/api/update"
if any(keyword in text for keyword in ["删除", "移除"]):
return "/api/delete"
return DEFAULT_PLACEHOLDER_URL
@staticmethod
def _fill_placeholders(value: Any, variables: Dict[str, Any]) -> Any:
if isinstance(value, dict):
return {key: FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for key, item in value.items()}
if isinstance(value, list):
return [FunctionalCaseToApiAutomationService._fill_placeholders(item, variables) for item in value]
if isinstance(value, str):
result = value
for key, variable in variables.items():
result = result.replace("${" + key + "}", str(variable))
return result
return value
@staticmethod
def _dedupe_text(items: Iterable[str]) -> List[str]:
seen = set()
result = []
for item in items:
text = str(item).strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def convert_functional_case(payload: Union[str, Dict[str, Any], FunctionalCaseInput], llm_client: Any = None) -> Dict[str, Any]:
return FunctionalCaseToApiAutomationService().convert(payload, llm_client=llm_client)
def load_payload_from_file(file_path: str) -> Dict[str, Any]:
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
def main():
parser = argparse.ArgumentParser(description="功能测试用例转接口自动化用例")
parser.add_argument("--input", type=str, help="输入JSON文件路径未指定则从stdin读取")
parser.add_argument("--output", type=str, help="输出JSON文件路径未指定则打印到stdout")
args = parser.parse_args()
if args.input:
payload = load_payload_from_file(args.input)
else:
payload = json.load(os.sys.stdin)
result = convert_functional_case(payload)
output = json.dumps(result, ensure_ascii=False, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as file:
file.write(output)
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-
import ast
import json
import os
import re
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
DEFAULT_HUBOPS_PATH = os.path.join(PROJECT_ROOT, "HubOps.md")
class HubOpsParser(object):
def __init__(self, file_path=None):
self.file_path = file_path or os.getenv("JOYHUB_DOC_PATH", DEFAULT_HUBOPS_PATH)
def parse(self):
with open(self.file_path, "r", encoding="utf-8") as file:
lines = file.read().splitlines()
cases = []
headings = []
for index, line in enumerate(lines):
heading = self._parse_heading(line)
if heading:
level, title = heading
headings = [item for item in headings if item[0] < level]
headings.append((level, title))
continue
if line.strip() != "**接口URL**":
continue
case = self._parse_case(lines, index, headings)
if case.get("url"):
case["case_id"] = "joyhub_{:04d}".format(len(cases) + 1)
cases.append(case)
return cases
def _parse_case(self, lines, url_index, headings):
title = self._case_title(headings, url_index)
url = self._next_quote_value(lines, url_index)
method = self._find_section_quote(lines, url_index, "**请求方式**") or "POST"
content_type = self._find_section_quote(lines, url_index, "**Content-Type**") or "json"
body_text = self._find_code_block(lines, url_index, "**请求Body参数**")
query = self._find_param_table(lines, url_index, "**请求Query参数**")
headers = self._find_headers(lines, url_index)
body = self._parse_body(body_text)
return {
"name": title,
"method": method.upper(),
"url": url,
"content_type": content_type,
"headers": headers,
"query": query,
"body": body,
"raw_body": body_text,
}
@staticmethod
def _parse_heading(line):
match = re.match(r"^(#{2,6})\s+(.+?)\s*$", line)
if not match:
return None
return len(match.group(1)), match.group(2).strip()
@staticmethod
def _case_title(headings, url_index):
if headings:
return " / ".join(title for _, title in headings[-3:])
return "HubOps接口{}".format(url_index + 1)
@staticmethod
def _next_quote_value(lines, start):
for index in range(start + 1, min(start + 8, len(lines))):
line = lines[index].strip()
if line.startswith(">"):
value = line[1:].strip()
if value and value != "暂无参数":
return value
return ""
@staticmethod
def _section_end(lines, start):
for index in range(start + 1, len(lines)):
line = lines[index].strip()
if line.startswith("## ") or line.startswith("### ") or line == "**接口URL**":
return index
return len(lines)
def _find_section_quote(self, lines, url_index, section_name):
start = max(0, url_index - 80)
end = self._section_end(lines, url_index)
for index in range(start, min(end, len(lines))):
if lines[index].strip() == section_name:
return self._next_quote_value(lines, index)
return None
def _find_code_block(self, lines, url_index, section_name):
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != section_name:
continue
for code_start in range(index + 1, end):
if lines[code_start].strip().startswith("```"):
block = []
for code_end in range(code_start + 1, end):
if lines[code_end].strip().startswith("```"):
return "\n".join(block).strip()
block.append(lines[code_end])
return ""
def _find_param_table(self, lines, url_index, section_name):
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != section_name:
continue
params = {}
for row_index in range(index + 1, end):
row = lines[row_index].strip()
if not row.startswith("|"):
if params:
break
continue
if "---" in row or "参数名" in row or "暂无参数" in row:
continue
columns = [column.strip() for column in row.strip("|").split("|")]
if len(columns) >= 2 and columns[0]:
params[columns[0]] = self._coerce_value(columns[1])
return params
return {}
def _find_headers(self, lines, url_index):
headers = {}
end = self._section_end(lines, url_index)
for index in range(url_index, end):
if lines[index].strip() != "**请求Header参数**":
continue
table_started = False
for row_index in range(index + 1, end):
row = lines[row_index].strip()
if row.startswith("**") or row.startswith("#") or row.startswith("*"):
break
if not row.startswith("|"):
if table_started:
break
continue
table_started = True
if "---" in row or "参数名" in row or "暂无参数" in row:
continue
columns = [column.strip() for column in row.strip("|").split("|")]
if len(columns) >= 2 and columns[0] and columns[0] != "Authorization":
headers[columns[0]] = columns[1]
return headers
return headers
@classmethod
def _parse_body(cls, body_text):
if not body_text or body_text == "暂无数据":
return {}
text = cls._clean_body(body_text)
for loader in (json.loads, ast.literal_eval):
try:
value = loader(text)
return value if isinstance(value, (dict, list)) else {}
except Exception:
pass
return {}
@staticmethod
def _clean_body(text):
text = re.sub(r"//.*", "", text)
text = re.sub(r"/\*.*?\*/", "", text, flags=re.S)
text = text.replace("{{token}}", "")
text = re.sub(r",\s*([}\]])", r"\1", text)
return text.strip()
@staticmethod
def _coerce_value(value):
if value in ("-", "暂无参数", ""):
return ""
if value.isdigit():
return int(value)
if value.lower() in ("true", "false"):
return value.lower() == "true"
return value
def load_hubops_cases():
return HubOpsParser().parse()

View File

@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
import json
import logging
import os
from urllib.parse import urljoin
import allure
import requests
from joyhub_backend.library.auth import JoyhubAuth, TIMEOUT
MANAGER_BASE_URL = os.getenv(
"JOYHUB_MANAGER_BASE_URL",
"http://test-manager-api.best-envision.com",
)
class JoyhubInterface(object):
def __init__(self):
self.auth = JoyhubAuth()
self.session = requests.Session()
self.base_url = MANAGER_BASE_URL.rstrip("/") + "/"
def request(self, case_name, method, path, body=None, query=None, headers=None, expected_code=0):
url = path if path.startswith("http") else urljoin(self.base_url, path.lstrip("/"))
request_headers = self.auth.auth_headers()
if headers:
request_headers.update({key: value for key, value in headers.items() if key.lower() != "authorization"})
with allure.step("操作步骤:{}".format(case_name)):
self._attach_request(url, method, request_headers, body, query)
logging.info("case: %s", case_name)
logging.info("request url: %s", url)
logging.info("request method: %s", method)
logging.info("request headers: %s", request_headers)
logging.info("request body: %s", body)
logging.info("request query: %s", query)
request_kwargs = {
"method": method,
"url": url,
"params": query,
"headers": request_headers,
"timeout": TIMEOUT,
}
if method.upper() in ("POST", "PUT", "PATCH"):
request_kwargs["json"] = body
elif body:
request_kwargs["params"] = dict(query or {}, **body) if isinstance(body, dict) else query
response = self.session.request(**request_kwargs)
self._attach_response(response)
self._attach_log(case_name, url, method, request_headers, body, query, response)
logging.info("response status: %s", response.status_code)
logging.info("response body: %s", response.text)
is_business_api = self._is_business_api(url)
assertion_text = "HTTP状态码为200且业务code为{}".format(expected_code) if is_business_api else "HTTP状态码为2xx兼容非JSON/SSE响应"
allure.attach(assertion_text, "断言内容", allure.attachment_type.TEXT)
with allure.step("断言内容:{}".format(assertion_text)):
try:
if is_business_api:
assert response.status_code == 200, "HTTP状态码期望200实际{},响应{}".format(
response.status_code, response.text
)
data = self._safe_json(response)
assert isinstance(data, dict), "业务接口响应不是JSON对象响应{}".format(response.text)
assert "code" in data, "响应缺少 code 字段,响应{}".format(data)
assert data.get("code") == expected_code, "业务code期望{},实际{}msg={},响应{}".format(
expected_code, data.get("code"), data.get("msg"), data
)
else:
assert 200 <= response.status_code < 300, "HTTP状态码期望2xx实际{},响应{}".format(
response.status_code, response.text
)
data = self._non_json_result(response)
allure.attach("", "断言失败原因", allure.attachment_type.TEXT)
return data
except AssertionError as error:
allure.attach(str(error), "断言失败原因", allure.attachment_type.TEXT)
raise AssertionError("断言失败原因:{}".format(error))
@staticmethod
def _attach_request(url, method, headers, body, query):
allure.attach(str(url), "请求url", allure.attachment_type.TEXT)
allure.attach(str(method), "请求方式", allure.attachment_type.TEXT)
allure.attach(json.dumps(headers, ensure_ascii=False, indent=2), "请求头", allure.attachment_type.JSON)
allure.attach(json.dumps(query or {}, ensure_ascii=False, indent=2), "请求query", allure.attachment_type.JSON)
allure.attach(json.dumps(body or {}, ensure_ascii=False, indent=2), "请求体", allure.attachment_type.JSON)
@staticmethod
def _attach_response(response):
allure.attach(str(response.status_code), "响应状态码", allure.attachment_type.TEXT)
content_type = response.headers.get("Content-Type", "")
attachment_type = allure.attachment_type.JSON if "json" in content_type.lower() else allure.attachment_type.TEXT
allure.attach(response.text, "响应体/日志", attachment_type)
def _is_business_api(self, url):
return url.startswith(self.base_url)
@staticmethod
def _safe_json(response):
if not response.text.strip():
return None
return response.json()
@staticmethod
def _non_json_result(response):
try:
data = response.json()
except ValueError:
data = {
"status_code": response.status_code,
"content_type": response.headers.get("Content-Type", ""),
"text": response.text,
}
return data
@staticmethod
def _attach_log(case_name, url, method, headers, body, query, response):
log_text = "\n".join([
"case: {}".format(case_name),
"request url: {}".format(url),
"request method: {}".format(method),
"request headers: {}".format(headers),
"request query: {}".format(query or {}),
"request body: {}".format(body or {}),
"response status: {}".format(response.status_code),
"response body: {}".format(response.text),
])
allure.attach(log_text, "日志", allure.attachment_type.TEXT)

View File

@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
import os
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../../..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
try:
import allure_pytest.listener
except ImportError:
allure_pytest = None
def _allure_test_fixtures_compatible(item):
fixturemanager = item.session._fixturemanager
fixturedefs = []
if hasattr(item, "_request") and hasattr(item._request, "fixturenames"):
for name in item._request.fixturenames:
try:
fixturedefs_pytest = fixturemanager.getfixturedefs(name, item)
except AttributeError:
fixturedefs_pytest = fixturemanager.getfixturedefs(name, item.nodeid)
if fixturedefs_pytest:
fixturedefs.extend(fixturedefs_pytest)
return fixturedefs
if allure_pytest is not None:
allure_pytest.listener._test_fixtures = _allure_test_fixtures_compatible

View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import logging
import re
from urllib.parse import urljoin
import allure
import pytest
from joyhub_backend.library.hubops_parser import load_hubops_cases
from joyhub_backend.library.joyhub_interface import JoyhubInterface
ALL_CASES = load_hubops_cases()
def case_id(case):
raw_id = "{}-{}".format(case.get("case_id"), case.get("name"))
safe_id = re.sub(r"[^0-9A-Za-z_\u4e00-\u9fa5-]+", "_", raw_id)
return safe_id[:120]
@allure.feature("JoyHub Backend 接口自动化")
class TestHubOps(object):
test_case = JoyhubInterface()
def teardown_method(self):
with allure.step("后置:记录用例结束日志"):
logging.info("-----------------------------End-------------------------------")
@pytest.mark.parametrize("case", ALL_CASES, ids=case_id)
def test_hub_ops_api(self, case):
allure.dynamic.title("{} {}".format(case.get("case_id"), case.get("name")))
allure.dynamic.story(case.get("name"))
with allure.step("前置:初始化接口客户端并准备鉴权 token"):
headers = case.get("headers") or {}
query = case.get("query") or {}
body = case.get("body") or {}
response_data = self.test_case.request(
case.get("name"),
case.get("method"),
case.get("url"),
body=body,
query=query,
headers=headers,
)
with allure.step("断言内容:校验响应基础字段"):
request_url = case.get("url") or ""
full_url = request_url if request_url.startswith("http") else urljoin(self.test_case.base_url, request_url.lstrip("/"))
if full_url.startswith(self.test_case.base_url):
assert isinstance(response_data, dict), "断言失败原因响应不是JSON对象响应{}".format(response_data)
assert "code" in response_data, "断言失败原因响应缺少code字段响应{}".format(response_data)
assert response_data.get("msg") is not None, "断言失败原因msg字段不能为空响应{}".format(response_data)
else:
assert response_data is not None, "断言失败原因:非业务接口响应为空"
logging.info("断言通过:%s", case.get("name"))

View File

View File

@@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
import argparse
import os
import shutil
import subprocess
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
TEST_CASE_DIR = 'joyhub_backend/test_case/TestCase'
case_dir = os.path.join(project_root, TEST_CASE_DIR)
ALLURE_RESULTS_DIR = os.path.join(project_root, 'joyhub_backend', 'test_case', 'reports', 'allure-results')
ALLURE_REPORT_DIR = os.path.join(project_root, 'joyhub_backend', 'test_case', 'reports', 'allure-report')
def ensure_dirs():
os.makedirs(ALLURE_RESULTS_DIR, exist_ok=True)
os.makedirs(ALLURE_REPORT_DIR, exist_ok=True)
def clean_allure_results():
if os.path.exists(ALLURE_RESULTS_DIR):
shutil.rmtree(ALLURE_RESULTS_DIR)
os.makedirs(ALLURE_RESULTS_DIR, exist_ok=True)
def find_test_files(directory):
test_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.py') and not file.startswith('__') and file != 'conftest.py':
test_files.append(os.path.join(root, file))
return test_files
def run_pytest(args_list):
env = os.environ.copy()
env['PYTHONPATH'] = project_root + (os.pathsep + env['PYTHONPATH'] if 'PYTHONPATH' in env else '')
cmd = ['python', '-m', 'pytest'] + args_list
print("开始执行pytest...")
print("执行命令: {}".format(' '.join('"{}"'.format(item) if ' ' in item else item for item in cmd)), flush=True)
result = subprocess.run(cmd, cwd=project_root, env=env)
print("pytest执行结束退出码: {}".format(result.returncode), flush=True)
return result.returncode
def run_tests(target=None, test_type='all'):
base_args = ['-v', '--tb=short', '--alluredir={}'.format(ALLURE_RESULTS_DIR)]
if test_type == 'all':
print("运行所有测试用例...")
test_files = find_test_files(case_dir)
if not test_files:
print("错误: 未找到测试文件")
return 1
args = test_files + base_args
elif test_type == 'dir':
full_path = os.path.join(case_dir, target.replace('/', os.sep).replace('\\', os.sep))
if not os.path.exists(full_path):
print("错误: 目录不存在: {}".format(full_path))
return 1
print("按目录运行: {}".format(target))
test_files = find_test_files(full_path)
if not test_files:
print("错误: 未找到测试文件")
return 1
args = test_files + base_args
elif test_type == 'file':
full_path = os.path.join(case_dir, target.replace('/', os.sep).replace('\\', os.sep))
if not os.path.exists(full_path):
print("错误: 文件不存在: {}".format(full_path))
return 1
print("按文件运行: {}".format(target))
args = [full_path] + base_args
elif test_type == 'keyword':
print("按关键字运行: {}".format(target))
test_files = find_test_files(case_dir)
args = test_files + ['-k={}'.format(target)] + base_args
elif test_type == 'marker':
print("按pytest标记运行: {}".format(target))
test_files = find_test_files(case_dir)
args = test_files + ['-m={}'.format(target)] + base_args
elif test_type == 'feature':
print("按Allure feature运行: {}".format(target))
test_files = find_test_files(case_dir)
args = test_files + ['--allure-features={}'.format(target)] + base_args
elif test_type == 'story':
print("按Allure story运行: {}".format(target))
test_files = find_test_files(case_dir)
args = test_files + ['--allure-stories={}'.format(target)] + base_args
else:
print("错误: 未知的测试类型: {}".format(test_type))
return 1
return run_pytest(args)
def generate_allure_report():
print("开始生成Allure报告...", flush=True)
if os.path.exists(ALLURE_REPORT_DIR):
shutil.rmtree(ALLURE_REPORT_DIR)
cmd = 'allure generate "{}" --output "{}"'.format(ALLURE_RESULTS_DIR, ALLURE_REPORT_DIR)
print("执行命令: {}".format(cmd), flush=True)
try:
subprocess.run(cmd, check=True, shell=True)
print("Allure报告生成成功: {}".format(ALLURE_REPORT_DIR))
print("打开报告命令: allure open \"{}\"".format(ALLURE_REPORT_DIR))
return 0
except (subprocess.CalledProcessError, FileNotFoundError, OSError) as error:
print("生成Allure报告失败: {}".format(error))
print("手动执行: {}".format(cmd))
return 1
def open_allure_report():
cmd = 'allure open "{}"'.format(ALLURE_REPORT_DIR)
try:
subprocess.Popen(cmd, shell=True)
print("Allure报告已打开: {}".format(ALLURE_REPORT_DIR))
return 0
except (FileNotFoundError, OSError) as error:
print("打开Allure报告失败: {}".format(error))
return 1
def main():
parser = argparse.ArgumentParser(description='JoyHub Backend 接口自动化测试执行工具')
run_group = parser.add_mutually_exclusive_group(required=False)
run_group.add_argument('--feature', type=str, help='按Allure feature运行')
run_group.add_argument('--story', type=str, help='按Allure story运行')
run_group.add_argument('--dir', type=str, help='按目录运行相对于TestCase目录')
run_group.add_argument('--file', type=str, help='按文件运行相对于TestCase目录')
run_group.add_argument('--keyword', type=str, help='按关键字运行')
run_group.add_argument('--marker', type=str, help='按pytest标记运行')
parser.add_argument('--report', action='store_true', help='生成Allure报告')
parser.add_argument('--open', action='store_true', help='打开Allure报告')
parser.add_argument('--no-report', action='store_true', help='不生成Allure报告')
args = parser.parse_args()
ensure_dirs()
clean_allure_results()
if args.feature:
exit_code = run_tests(args.feature, 'feature')
elif args.story:
exit_code = run_tests(args.story, 'story')
elif args.dir:
exit_code = run_tests(args.dir, 'dir')
elif args.file:
exit_code = run_tests(args.file, 'file')
elif args.keyword:
exit_code = run_tests(args.keyword, 'keyword')
elif args.marker:
exit_code = run_tests(args.marker, 'marker')
else:
exit_code = run_tests()
if args.report or not args.no_report:
generate_allure_report()
if args.open:
open_allure_report()
print("=" * 80)
print("测试执行完成" if exit_code == 0 else "测试执行失败,退出码: {}".format(exit_code))
print("=" * 80)
sys.exit(exit_code)
if __name__ == '__main__':
main()