Files

352 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
"""
Author: Auto Generated
Create Date: 2026/04/30
"""
import logging
import os
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../../../'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
import time
from base_framework.public_tools import log
from base_framework.public_tools.my_faker import MyFaker
from base_framework.public_tools.runner import Runner
from base_framework.public_tools import utils
from dulizhan.library.Dlizhan_interface import DlzhanInterface
obj_get_log = log.get_logger()
obj_my_faker = MyFaker()
obj_runner = Runner()
obj_get_way = utils.Tools()
class RoleManage(DlzhanInterface):
def __init__(self):
super().__init__()
joyhub_token = self._read_robot_variable("joyhub_login_token")
if joyhub_token:
self.set_joyhub_token(joyhub_token)
obj_get_log.info("从配置文件读取JoyHub Token成功: {}".format(joyhub_token))
else:
obj_get_log.warning("未从配置文件读取到JoyHub Token")
def _read_robot_variable(self, var_name):
import re
robot_file_path = r'C:\Users\a\PyCharmMiscProject\smart-management-auto-test\dulizhan\test_case\Resource\AdapterKws\hh-qa.robot'
obj_get_log.info("尝试读取配置文件: {}".format(robot_file_path))
try:
with open(robot_file_path, 'r', encoding='utf-8') as f:
content = f.read()
pattern = r'\$\{' + re.escape(var_name) + r'\}\s+(\S+)'
match = re.search(pattern, content)
if match:
obj_get_log.info("匹配到变量 {} = {}".format(var_name, match.group(1)))
return match.group(1)
else:
obj_get_log.warning("未匹配到变量 {}".format(var_name))
except Exception as e:
obj_get_log.error("读取robot配置文件失败: {}".format(str(e)))
return None
def kw_joyhub_role_create_post(self, note, **kwargs):
"""
| 功能说明: | 创建角色 |
| 输入参数: | note | 注释 |
| name | 角色名称 | 必填 |
| code | 角色标志 | 必填 |
| sort | 显示顺序 | 必填 |
| status | 状态 | 必填 |
| remark | 备注 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":role_id} |
"""
logging.info("==========={0}===========".format(note))
name = kwargs.get("name")
code = kwargs.get("code")
sort = kwargs.get("sort")
status = kwargs.get("status")
remark = kwargs.get("remark")
if not name or not code or sort is None or status is None:
raise Exception("角色名称、角色标志、显示顺序和状态不能为空")
request_params = {
"name": name,
"code": code,
"sort": sort,
"status": status
}
if remark is not None and remark != "":
request_params["remark"] = remark
resp = self.kw_in_joyhub_role_create_post(**request_params)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_role_delete_post(self, note, **kwargs):
"""
| 功能说明: | 删除角色 |
| 输入参数: | note | 注释 |
| id | 角色编号 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
role_id = kwargs.get("id")
if not role_id:
raise Exception("角色编号不能为空")
resp = self.kw_in_joyhub_role_delete_post(id=role_id)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_role_delete_list_post(self, note, **kwargs):
"""
| 功能说明: | 批量删除角色 |
| 输入参数: | note | 注释 |
| ids | 角色编号数组 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
ids = kwargs.get("ids")
if not ids or not isinstance(ids, list):
raise Exception("角色编号数组不能为空")
resp = self.kw_in_joyhub_role_delete_list_post(ids=ids)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_role_get_get(self, note, **kwargs):
"""
| 功能说明: | 获得角色信息 |
| 输入参数: | note | 注释 |
| id | 角色编号 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
role_id = kwargs.get("id")
if not role_id:
raise Exception("角色编号不能为空")
resp = self.kw_in_joyhub_role_get_get(id=role_id)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_role_page_get(self, note, **kwargs):
"""
| 功能说明: | 获得角色分页 |
| 输入参数: | note | 注释 |
| pageNo | 页码 | 必填 |
| pageSize | 每页条数 | 必填 |
| name | 角色名称 | 非必填 |
| code | 角色标识 | 非必填 |
| status | 状态 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
page_no = kwargs.get("pageNo")
page_size = kwargs.get("pageSize")
name = kwargs.get("name")
code = kwargs.get("code")
status = kwargs.get("status")
if not page_no or not page_size:
raise Exception("页码和每页条数不能为空")
request_params = {
"pageNo": page_no,
"pageSize": page_size
}
if name is not None and name != "":
request_params["name"] = name
if code is not None and code != "":
request_params["code"] = code
if status is not None:
request_params["status"] = status
resp = self.kw_in_joyhub_role_page_get(**request_params)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_role_simple_list_get(self, note, **kwargs):
"""
| 功能说明: | 获取角色精简信息列表 |
| 输入参数: | note | 注释 |
| 返回参数: | {"code":0,"msg":"","data":[...]} |
"""
logging.info("==========={0}===========".format(note))
resp = self.kw_in_joyhub_role_simple_list_get()
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_role_list_all_simple_get(self, note, **kwargs):
"""
| 功能说明: | 获取角色精简信息列表 |
| 输入参数: | note | 注释 |
| 返回参数: | {"code":0,"msg":"","data":[...]} |
"""
logging.info("==========={0}===========".format(note))
resp = self.kw_in_joyhub_role_list_all_simple_get()
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_role_update_put(self, note, **kwargs):
"""
| 功能说明: | 修改角色 |
| 输入参数: | note | 注释 |
| id | 角色编号 | 必填 |
| name | 角色名称 | 必填 |
| code | 角色标志 | 必填 |
| sort | 显示顺序 | 必填 |
| status | 状态 | 必填 |
| remark | 备注 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
role_id = kwargs.get("id")
name = kwargs.get("name")
code = kwargs.get("code")
sort = kwargs.get("sort")
status = kwargs.get("status")
remark = kwargs.get("remark")
if not role_id or not name or not code or sort is None or status is None:
raise Exception("角色编号、角色名称、角色标志、显示顺序和状态不能为空")
request_params = {
"id": role_id,
"name": name,
"code": code,
"sort": sort,
"status": status
}
if remark is not None and remark != "":
request_params["remark"] = remark
resp = self.kw_in_joyhub_role_update_put(**request_params)
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_auth_login_post(self, note, username=None, password=None, max_retries=3, retry_delay=2):
"""
| 功能说明: | 使用账号密码登录(带异常重试机制) |
| 输入参数: | note | 注释 |
| username | 账号 | 必填默认joytest |
| password | 密码 | 必填默认123456 |
| max_retries | 最大重试次数 | 非必填默认3 |
| retry_delay | 重试间隔(秒) | 非必填默认2 |
| 返回参数: | {"code":200,"msg":"success","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
login_username = username if username else "joytest"
login_password = password if password else "123456"
last_exception = None
for attempt in range(max_retries):
try:
logging.info("登录尝试 {}/{}".format(attempt + 1, max_retries))
request_params = {
"username": login_username,
"password": login_password
}
resp = self.kw_in_joyhub_auth_login_post(**request_params)
if resp is not None:
if isinstance(resp, dict) and resp.get('code') == 200:
logging.info("✓ 登录成功")
print("登录成功:", resp)
return resp
elif isinstance(resp, dict) and resp.get('code') == 401:
logging.warning("登录失败账号未登录或token过期重试中...")
else:
logging.warning("登录失败,响应: {}".format(resp))
else:
logging.warning("登录失败,响应为空")
last_exception = Exception("登录失败")
except Exception as e:
last_exception = e
logging.error("登录异常(第{}次尝试): {}".format(attempt + 1, str(e)))
if attempt < max_retries - 1:
logging.info("等待 {} 秒后重试...".format(retry_delay))
time.sleep(retry_delay)
logging.error("登录失败,已达到最大重试次数 {}".format(max_retries))
raise last_exception if last_exception else Exception("登录失败,已达到最大重试次数")
if __name__ == '__main__':
test = RoleManage()
a = test.kw_joyhub_role_page_get(note="测试角色分页", pageNo=1, pageSize=10)
print(a)