Files
smart-management-auto-test/dulizhan/library/Dlizhan_interface.py

338 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
import os
import sys
current_file_path = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from base_framework.public_tools import log
from base_framework.public_tools.runner import Runner
from base_framework.public_tools.eureka_api import EurekaAPI
from base_framework.public_tools import utils
from base_framework.public_tools.pgsqlhelper import PgSqlHelper
import requests
import json
obj_log = log.get_logger()
obj_runner = Runner()
eureka = EurekaAPI()
class DlzhanInterface:
def __init__(self):
self.domain_url = eureka.get_url_from_config()
self.pg_db = PgSqlHelper()
self.joyhub_domain = "https://joyhub-website-manager-api-test.best-envision.com"
self.token = None
def _get_joyhub_headers(self):
headers = {
'Content-Type': 'application/json',
'tenant-id': '126'
}
if self.token:
headers['Authorization'] = 'Bearer ' + self.token
return headers
def set_joyhub_token(self, token):
self.token = token
def _joyhub_request(self, method, path, is_check='', note='', **kwargs):
url = "{}{}".format(self.joyhub_domain, path)
headers = self._get_joyhub_headers()
obj_log.info("=========== {} ===========".format(note or path))
req_params = {}
for key, value in kwargs.items():
if value is not None and value != '':
req_params[key] = value
req_map = {
'GET': lambda: requests.get(url, headers=headers, params=req_params, verify=False),
'POST': lambda: requests.post(url, headers=headers, json=req_params, verify=False),
'PUT': lambda: requests.put(url, headers=headers, json=req_params, verify=False),
'DELETE': lambda: requests.delete(url, headers=headers, verify=False)
}
resp = req_map.get(method.upper(), lambda: None)()
self._check_resp(is_check, resp)
return resp.json()
def _clear_user_fingerprint(self, username):
try:
sql = "UPDATE system_users SET fingerprint = '' WHERE username = %s"
cursor, conn, count = self.pg_db.execute(sql, (username,), choose_db='joyhub')
obj_log.info(f"清除用户 {username} 的指纹锁,影响行数: {count}")
self.pg_db.close(cursor, conn)
return count >= 0
except Exception as e:
obj_log.error(f"清除指纹锁失败: {str(e)}")
return False
def kw_in_zhyy_purchase_todo_get(self, is_check='', **kwargs):
user, kwargs = self._get_user(kwargs)
kwargs = self._convert_json(kwargs)
url = "%s/erp/purchase-workbench/get-todo" % self.domain_url
obj_log.info("your input:{0}".format(kwargs))
resp = obj_runner.call_rest_api(API_URL=url, req_type="GET", user=user)
self._check_resp(is_check, resp)
return resp
def kw_in_zhyy_purchase_order_page_post(self, is_check='', **kwargs):
user, kwargs = self._get_user(kwargs)
kwargs = self._convert_json(kwargs)
url = "%s/erp/purchase-order/page" % self.domain_url
obj_log.info("your input:{0}".format(kwargs))
resp = obj_runner.call_rest_api(API_URL=url, req_type="POST", json=kwargs, user=user)
self._check_resp(is_check, resp)
return resp
def _get_user(self, kwargs):
user = kwargs.get("user", "purchase")
if "user" in kwargs:
del kwargs["user"]
return user, kwargs
def _convert_json(self, kwargs):
return kwargs
def _check_resp(self, is_check, resp):
if is_check == 'true':
assert resp is not None, "响应为空"
if hasattr(resp, 'json'):
resp_json = resp.json()
assert resp_json.get('code') == 0, f"请求失败code={resp_json.get('code')}"
return resp
def kw_in_joyhub_auth_login_post(self, is_check='', **kwargs):
obj_log.info("=========== JoyHub 登录接口 ===========")
username = kwargs.get("username", "")
password = kwargs.get("password", "")
max_retries = kwargs.get("max_retries", 3)
token = kwargs.get("token", "")
url = "https://joyhub-website-manager-api-test.best-envision.com/admin-api/system/auth/login-dev"
session = requests.session()
session.headers.update({
'isencrypt': 'true',
'x-api-encrypt': 'true',
'tenant-id': '126'
})
resp = obj_runner._Runner__call_api(session, url, req_type="POST", json=kwargs)
if resp is not None and not isinstance(resp, Exception):
try:
resp_json = resp.json()
except Exception:
try:
resp_json = json.loads(resp.text)
except Exception:
resp_json = None
if resp_json is not None:
code = resp_json.get('code')
if code == 0 or code == 200:
obj_log.info("登录成功")
self._check_resp(is_check, resp_json)
return resp_json
elif code == 401:
obj_log.warning("登录失败,尝试清除指纹后重新登录")
if resp is not None and not isinstance(resp, Exception):
try:
resp_json = resp.json()
except Exception:
resp_json = None
else:
resp_json = None
self._check_resp(is_check, resp_json)
return resp_json
def kw_in_joyhub_role_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/system/role/create', is_check, '创建角色', **kwargs)
def kw_in_joyhub_role_delete_post(self, is_check='', **kwargs):
role_id = kwargs.get('id', '')
return self._joyhub_request('DELETE', f'/admin-api/system/role/delete?id={role_id}', is_check, '删除角色')
def kw_in_joyhub_role_delete_list_post(self, is_check='', **kwargs):
ids = kwargs.get('ids', [])
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/system/role/delete-list?ids={ids_str}', is_check, '批量删除角色')
def kw_in_joyhub_role_get_get(self, is_check='', **kwargs):
role_id = kwargs.get('id', '')
return self._joyhub_request('GET', f'/admin-api/system/role/get?id={role_id}', is_check, '获得角色信息')
def kw_in_joyhub_role_page_get(self, is_check='', **kwargs):
page_no = kwargs.get('pageNo', 1)
page_size = kwargs.get('pageSize', 10)
return self._joyhub_request('GET', f'/admin-api/system/role/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得角色分页', **kwargs)
def kw_in_joyhub_role_simple_list_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/role/simple-list', is_check, '获取角色精简信息列表')
def kw_in_joyhub_role_list_all_simple_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/role/list-all-simple', is_check, '获取角色精简信息列表')
def kw_in_joyhub_role_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/role/update', is_check, '修改角色', **kwargs)
def kw_in_joyhub_post_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/system/post/create', is_check, '创建岗位', **kwargs)
def kw_in_joyhub_post_delete_post(self, is_check='', **kwargs):
post_id = kwargs.get('id', '')
return self._joyhub_request('DELETE', f'/admin-api/system/post/delete?id={post_id}', is_check, '删除岗位')
def kw_in_joyhub_post_delete_list_post(self, is_check='', **kwargs):
ids = kwargs.get('ids', [])
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/system/post/delete-list?ids={ids_str}', is_check, '批量删除岗位')
def kw_in_joyhub_post_get_get(self, is_check='', **kwargs):
post_id = kwargs.get('id', '')
return self._joyhub_request('GET', f'/admin-api/system/post/get?id={post_id}', is_check, '获得岗位信息')
def kw_in_joyhub_post_list_all_simple_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/post/list-all-simple', is_check, '获取岗位全列表')
def kw_in_joyhub_post_page_get(self, is_check='', **kwargs):
page_no = kwargs.get('pageNo', 1)
page_size = kwargs.get('pageSize', 10)
return self._joyhub_request('GET', f'/admin-api/system/post/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得岗位分页列表', **kwargs)
def kw_in_joyhub_post_simple_list_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/post/simple-list', is_check, '获取岗位精简列表')
def kw_in_joyhub_post_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/post/update', is_check, '修改岗位', **kwargs)
# ============ 运费模板信息接口 ============
def kw_in_joyhub_shipping_template_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/shipping-template/create', is_check, '创建运费模板信息', **kwargs)
def kw_in_joyhub_shipping_template_delete_post(self, shipping_template_id, is_check=''):
return self._joyhub_request('DELETE', f'/admin-api/jh/shipping-template/delete?id={shipping_template_id}', is_check, '删除运费模板信息')
def kw_in_joyhub_shipping_template_delete_list_post(self, ids, is_check=''):
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/jh/shipping-template/delete-list?ids={ids_str}', is_check, '批量删除运费模板信息')
def kw_in_joyhub_shipping_template_get_detail_get(self, shipping_template_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/shipping-template/get-detail?id={shipping_template_id}', is_check, '获得运费模板详情')
def kw_in_joyhub_shipping_template_page_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/shipping-template/page', is_check, '获得运费模板分页', **kwargs)
def kw_in_joyhub_shipping_template_save_with_children_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/shipping-template/save-with-children', is_check, '保存运费模板信息(含规则与子表)', **kwargs)
def kw_in_joyhub_shipping_template_shipping_rule_list_get(self, shipping_template_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/shipping-template/shipping-rule/list-by-shipping-template-id?shippingTemplateId={shipping_template_id}', is_check, '获得运费规则列表')
def kw_in_joyhub_shipping_template_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/jh/shipping-template/update', is_check, '更新运费模板信息', **kwargs)
def kw_in_joyhub_banner_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/jh/banner/create', is_check, '创建Banner管理', **kwargs)
def kw_in_joyhub_banner_delete_delete(self, banner_id, is_check=''):
return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete?id={banner_id}', is_check, '删除Banner管理')
def kw_in_joyhub_banner_delete_list_delete(self, ids, is_check=''):
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete-list?ids={ids_str}', is_check, '批量删除Banner管理')
def kw_in_joyhub_banner_get_import_template_get(self, is_check=''):
return self._joyhub_request('GET', '/admin-api/jh/banner/get-import-template', is_check, '获得导入Banner管理模板')
def kw_in_joyhub_banner_get_get(self, banner_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/jh/banner/get?id={banner_id}', is_check, '获得Banner管理详情')
def kw_in_joyhub_banner_page_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/jh/banner/page', is_check, '获得Banner管理分页', **kwargs)
def kw_in_joyhub_banner_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/jh/banner/update', is_check, '更新Banner管理', **kwargs)
def kw_in_joyhub_user_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/system/user/create', is_check, '创建用户', **kwargs)
def kw_in_joyhub_user_delete_post(self, is_check='', **kwargs):
user_id = kwargs.get('id', '')
return self._joyhub_request('DELETE', f'/admin-api/system/user/delete?id={user_id}', is_check, '删除用户')
def kw_in_joyhub_user_delete_list_post(self, is_check='', **kwargs):
ids = kwargs.get('ids', [])
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/system/user/delete-list?ids={ids_str}', is_check, '批量删除用户')
def kw_in_joyhub_user_get_get(self, is_check='', **kwargs):
user_id = kwargs.get('id', '')
return self._joyhub_request('GET', f'/admin-api/system/user/get?id={user_id}', is_check, '获得用户详情')
def kw_in_joyhub_user_page_get(self, is_check='', **kwargs):
page_no = kwargs.get('pageNo', 1)
page_size = kwargs.get('pageSize', 10)
return self._joyhub_request('GET', f'/admin-api/system/user/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得用户分页列表', **kwargs)
def kw_in_joyhub_user_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/user/update', is_check, '修改用户', **kwargs)
def kw_in_joyhub_user_update_password_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/user/update-password', is_check, '重置用户密码', **kwargs)
def kw_in_joyhub_user_update_status_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/user/update-status', is_check, '修改用户状态', **kwargs)
def kw_in_joyhub_user_simple_list_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/user/simple-list', is_check, '获取用户精简信息列表')
def kw_in_joyhub_user_profile_get_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/user/profile/get', is_check, '获得登录用户信息')
def kw_in_joyhub_user_profile_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/user/profile/update', is_check, '修改用户个人信息', **kwargs)
def kw_in_joyhub_user_profile_update_password_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/user/profile/update-password', is_check, '修改用户个人密码', **kwargs)
# ============ 部门管理接口 ============
def kw_in_joyhub_dept_create_post(self, is_check='', **kwargs):
return self._joyhub_request('POST', '/admin-api/system/dept/create', is_check, '创建部门', **kwargs)
def kw_in_joyhub_dept_delete_post(self, dept_id, is_check=''):
return self._joyhub_request('DELETE', f'/admin-api/system/dept/delete?id={dept_id}', is_check, '删除部门')
def kw_in_joyhub_dept_delete_list_post(self, ids, is_check=''):
ids_str = ','.join(map(str, ids))
return self._joyhub_request('DELETE', f'/admin-api/system/dept/delete-list?ids={ids_str}', is_check, '批量删除部门')
def kw_in_joyhub_dept_get_get(self, dept_id, is_check=''):
return self._joyhub_request('GET', f'/admin-api/system/dept/get?id={dept_id}', is_check, '获得部门信息')
def kw_in_joyhub_dept_list_get(self, is_check='', **kwargs):
return self._joyhub_request('GET', '/admin-api/system/dept/list', is_check, '获取部门列表', **kwargs)
def kw_in_joyhub_dept_list_all_simple_get(self, is_check=''):
return self._joyhub_request('GET', '/admin-api/system/dept/list-all-simple', is_check, '获取部门精简信息列表')
def kw_in_joyhub_dept_simple_list_get(self, is_check=''):
return self._joyhub_request('GET', '/admin-api/system/dept/simple-list', is_check, '获取部门精简信息列表')
def kw_in_joyhub_dept_update_put(self, is_check='', **kwargs):
return self._joyhub_request('PUT', '/admin-api/system/dept/update', is_check, '更新部门', **kwargs)
if __name__ == '__main__':
test = DlzhanInterface()
a = test.kw_in_zhyy_purchase_todo_get(user="purchase")
print(a)