Files

452 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
"""
Author: Auto Generated
Create Date: 2026/04/28
"""
import logging
import os
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../../../'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
import time
from base_framework.public_tools import log
from base_framework.public_tools.my_faker import MyFaker
from base_framework.public_tools.runner import Runner
from base_framework.public_tools import utils
from dulizhan.library.Dlizhan_interface import DlzhanInterface
obj_get_log = log.get_logger()
obj_my_faker = MyFaker()
obj_runner = Runner()
obj_get_way = utils.Tools()
class UserManage(DlzhanInterface):
def __init__(self):
super().__init__()
# 从robot配置文件读取 JoyHub Token
joyhub_token = self._read_robot_variable("joyhub_login_token")
if joyhub_token:
self.set_joyhub_token(joyhub_token)
obj_get_log.info("从配置文件读取JoyHub Token成功: {}".format(joyhub_token))
else:
obj_get_log.warning("未从配置文件读取到JoyHub Token")
def _read_robot_variable(self, var_name):
"""从robot配置文件读取变量"""
import re
# 使用相对路径定位配置文件
robot_file_path = os.path.join(os.path.dirname(__file__), '../../../../test_case/Resource/AdapterKws/hh-qa.robot')
obj_get_log.info("尝试读取配置文件: {}".format(robot_file_path))
try:
with open(robot_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 匹配 ${var_name} value 格式
pattern = r'\$\{' + re.escape(var_name) + r'\}\s+(\S+)'
match = re.search(pattern, content)
if match:
obj_get_log.info("匹配到变量 {} = {}".format(var_name, match.group(1)))
return match.group(1)
else:
obj_get_log.warning("未匹配到变量 {}".format(var_name))
except Exception as e:
obj_get_log.error("读取robot配置文件失败: {}".format(str(e)))
return None
def kw_joyhub_user_create_post(self, note, **kwargs):
"""
| 功能说明: | 新增用户 |
| 输入参数: | note | 注释 |
| username | 用户账号 | 必填 |
| nickname | 用户昵称 | 必填 |
| password | 密码 | 必填 |
| email | 用户邮箱 | 非必填 |
| mobile | 手机号码 | 非必填 |
| sex | 用户性别 | 非必填 |
| avatar | 用户头像 | 非必填 |
| remark | 备注 | 非必填 |
| deptId | 部门编号 | 非必填 |
| postIds | 岗位编号数组 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":user_id} |
"""
logging.info("==========={0}===========".format(note))
username = kwargs.get("username")
nickname = kwargs.get("nickname")
password = kwargs.get("password")
email = kwargs.get("email")
mobile = kwargs.get("mobile")
sex = kwargs.get("sex")
avatar = kwargs.get("avatar")
remark = kwargs.get("remark")
deptId = kwargs.get("deptId")
postIds = kwargs.get("postIds")
if not username or not nickname or not password:
raise Exception("用户账号、用户昵称和密码不能为空")
request_params = {
"username": username,
"nickname": nickname,
"password": password
}
if email is not None and email != "":
request_params["email"] = email
if mobile is not None and mobile != "":
request_params["mobile"] = mobile
if sex is not None:
request_params["sex"] = sex
if avatar is not None and avatar != "":
request_params["avatar"] = avatar
if remark is not None and remark != "":
request_params["remark"] = remark
if deptId is not None:
request_params["deptId"] = deptId
if postIds is not None:
request_params["postIds"] = postIds
resp = self.kw_in_joyhub_user_create_post(**request_params)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_delete_post(self, note, **kwargs):
"""
| 功能说明: | 删除用户 |
| 输入参数: | note | 注释 |
| id | 用户编号 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
user_id = kwargs.get("id")
if not user_id:
raise Exception("用户编号不能为空")
resp = self.kw_in_joyhub_user_delete_post(id=user_id)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_delete_list_post(self, note, **kwargs):
"""
| 功能说明: | 批量删除用户 |
| 输入参数: | note | 注释 |
| ids | 用户编号数组 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
ids = kwargs.get("ids")
if not ids or not isinstance(ids, list):
raise Exception("用户编号数组不能为空")
resp = self.kw_in_joyhub_user_delete_list_post(ids=ids)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_get_get(self, note, **kwargs):
"""
| 功能说明: | 获得用户详情 |
| 输入参数: | note | 注释 |
| id | 用户编号 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
user_id = kwargs.get("id")
if not user_id:
raise Exception("用户编号不能为空")
resp = self.kw_in_joyhub_user_get_get(id=user_id)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_user_page_get(self, note, **kwargs):
"""
| 功能说明: | 获得用户分页列表 |
| 输入参数: | note | 注释 |
| pageNo | 页码 | 必填 |
| pageSize | 每页条数 | 必填 |
| username | 用户账号 | 非必填 |
| nickname | 用户昵称 | 非必填 |
| mobile | 手机号码 | 非必填 |
| status | 用户状态 | 非必填 |
| deptId | 部门编号 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
page_no = kwargs.get("pageNo")
page_size = kwargs.get("pageSize")
username = kwargs.get("username")
nickname = kwargs.get("nickname")
mobile = kwargs.get("mobile")
status = kwargs.get("status")
deptId = kwargs.get("deptId")
if not page_no or not page_size:
raise Exception("页码和每页条数不能为空")
request_params = {
"pageNo": page_no,
"pageSize": page_size
}
if username is not None and username != "":
request_params["username"] = username
if nickname is not None and nickname != "":
request_params["nickname"] = nickname
if mobile is not None and mobile != "":
request_params["mobile"] = mobile
if status is not None:
request_params["status"] = status
if deptId is not None:
request_params["deptId"] = deptId
resp = self.kw_in_joyhub_user_page_get(**request_params)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_user_update_put(self, note, **kwargs):
"""
| 功能说明: | 修改用户 |
| 输入参数: | note | 注释 |
| id | 用户编号 | 必填 |
| username | 用户账号 | 必填 |
| nickname | 用户昵称 | 必填 |
| password | 密码 | 非必填 |
| email | 用户邮箱 | 非必填 |
| mobile | 手机号码 | 非必填 |
| sex | 用户性别 | 非必填 |
| avatar | 用户头像 | 非必填 |
| remark | 备注 | 非必填 |
| deptId | 部门编号 | 非必填 |
| postIds | 岗位编号数组 | 非必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
user_id = kwargs.get("id")
username = kwargs.get("username")
nickname = kwargs.get("nickname")
password = kwargs.get("password")
email = kwargs.get("email")
mobile = kwargs.get("mobile")
sex = kwargs.get("sex")
avatar = kwargs.get("avatar")
remark = kwargs.get("remark")
deptId = kwargs.get("deptId")
postIds = kwargs.get("postIds")
if not user_id or not username or not nickname:
raise Exception("用户编号、用户账号和用户昵称不能为空")
request_params = {
"id": user_id,
"username": username,
"nickname": nickname
}
if password is not None and password != "":
request_params["password"] = password
if email is not None and email != "":
request_params["email"] = email
if mobile is not None and mobile != "":
request_params["mobile"] = mobile
if sex is not None:
request_params["sex"] = sex
if avatar is not None and avatar != "":
request_params["avatar"] = avatar
if remark is not None and remark != "":
request_params["remark"] = remark
if deptId is not None:
request_params["deptId"] = deptId
if postIds is not None:
request_params["postIds"] = postIds
resp = self.kw_in_joyhub_user_update_put(**request_params)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_update_password_put(self, note, **kwargs):
"""
| 功能说明: | 重置用户密码 |
| 输入参数: | note | 注释 |
| id | 用户编号 | 必填 |
| password | 新密码 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
user_id = kwargs.get("id")
password = kwargs.get("password")
if not user_id or not password:
raise Exception("用户编号和新密码不能为空")
resp = self.kw_in_joyhub_user_update_password_put(id=user_id, password=password)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_update_status_put(self, note, **kwargs):
"""
| 功能说明: | 修改用户状态 |
| 输入参数: | note | 注释 |
| id | 用户编号 | 必填 |
| status | 用户状态 | 必填 |
| 返回参数: | {"code":0,"msg":"","data":true} |
"""
logging.info("==========={0}===========".format(note))
user_id = kwargs.get("id")
status = kwargs.get("status")
if not user_id or status is None:
raise Exception("用户编号和用户状态不能为空")
resp = self.kw_in_joyhub_user_update_status_put(id=user_id, status=status)
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "操作失败")
return resp_json
def kw_joyhub_user_simple_list_get(self, note, **kwargs):
"""
| 功能说明: | 获取用户精简信息列表 |
| 输入参数: | note | 注释 |
| 返回参数: | {"code":0,"msg":"","data":[...]} |
"""
logging.info("==========={0}===========".format(note))
resp = self.kw_in_joyhub_user_simple_list_get()
# 解析响应
if hasattr(resp, 'json'):
resp_json = resp.json()
else:
resp_json = resp
print(resp_json if resp_json else "查询失败")
return resp_json
def kw_joyhub_auth_login_post(self, note, username=None, password=None, max_retries=3, retry_delay=2):
"""
| 功能说明: | 使用账号密码登录(带异常重试机制) |
| 输入参数: | note | 注释 |
| username | 账号 | 必填默认joytest |
| password | 密码 | 必填默认123456 |
| max_retries | 最大重试次数 | 非必填默认3 |
| retry_delay | 重试间隔(秒) | 非必填默认2 |
| 返回参数: | {"code":200,"msg":"success","data":{...}} |
"""
logging.info("==========={0}===========".format(note))
# 使用默认账号密码
login_username = username if username else "joytest"
login_password = password if password else "123456"
last_exception = None
for attempt in range(max_retries):
try:
logging.info("登录尝试 {}/{}".format(attempt + 1, max_retries))
request_params = {
"username": login_username,
"password": login_password
}
resp = self.kw_in_joyhub_auth_login_post(**request_params)
if resp is not None:
if isinstance(resp, dict) and resp.get('code') == 200:
logging.info("✓ 登录成功")
print("登录成功:", resp)
return resp
elif isinstance(resp, dict) and resp.get('code') == 401:
logging.warning("登录失败账号未登录或token过期重试中...")
else:
logging.warning("登录失败,响应: {}".format(resp))
else:
logging.warning("登录失败,响应为空")
last_exception = Exception("登录失败")
except Exception as e:
last_exception = e
logging.error("登录异常(第{}次尝试): {}".format(attempt + 1, str(e)))
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries - 1:
logging.info("等待 {} 秒后重试...".format(retry_delay))
time.sleep(retry_delay)
logging.error("登录失败,已达到最大重试次数 {}".format(max_retries))
raise last_exception if last_exception else Exception("登录失败,已达到最大重试次数")
if __name__ == '__main__':
test = UserManage()
a = test.kw_joyhub_user_page_get(note="测试用户分页", user='admin', pageNo=1, pageSize=10)
print(a)