394 lines
20 KiB
Python
394 lines
20 KiB
Python
# -*- coding:utf-8 -*-
|
||
import os
|
||
import sys
|
||
|
||
current_file_path = os.path.abspath(__file__)
|
||
project_root = os.path.abspath(os.path.join(os.path.dirname(current_file_path), '../../'))
|
||
if project_root not in sys.path:
|
||
sys.path.insert(0, project_root)
|
||
|
||
from base_framework.public_tools import log
|
||
from base_framework.public_tools.runner import Runner
|
||
from base_framework.public_tools.eureka_api import EurekaAPI
|
||
from base_framework.public_tools import utils
|
||
from base_framework.public_tools.pgsqlhelper import PgSqlHelper
|
||
import requests
|
||
import json
|
||
|
||
obj_log = log.get_logger()
|
||
obj_runner = Runner()
|
||
eureka = EurekaAPI()
|
||
|
||
|
||
class DlzhanInterface:
|
||
def __init__(self):
|
||
self.domain_url = eureka.get_url_from_config()
|
||
self.pg_db = PgSqlHelper()
|
||
self.joyhub_domain = "https://joyhub-website-manager-api-test.best-envision.com"
|
||
self.token = None
|
||
|
||
def _get_joyhub_headers(self):
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
'tenant-id': '126'
|
||
}
|
||
if self.token:
|
||
headers['Authorization'] = 'Bearer ' + self.token
|
||
return headers
|
||
|
||
def set_joyhub_token(self, token):
|
||
self.token = token
|
||
|
||
def _joyhub_request(self, method, path, is_check='', note='', return_json=True, **kwargs):
|
||
url = "{}{}".format(self.joyhub_domain, path)
|
||
headers = self._get_joyhub_headers()
|
||
obj_log.info("=========== {} ===========".format(note or path))
|
||
|
||
req_params = {}
|
||
for key, value in kwargs.items():
|
||
if value is not None and value != '':
|
||
req_params[key] = value
|
||
|
||
req_map = {
|
||
'GET': lambda: requests.get(url, headers=headers, params=req_params, verify=False),
|
||
'POST': lambda: requests.post(url, headers=headers, json=req_params, verify=False),
|
||
'PUT': lambda: requests.put(url, headers=headers, json=req_params, verify=False),
|
||
'DELETE': lambda: requests.delete(url, headers=headers, verify=False)
|
||
}
|
||
|
||
resp = req_map.get(method.upper(), lambda: None)()
|
||
self._check_resp(is_check, resp)
|
||
|
||
if return_json:
|
||
return resp.json()
|
||
else:
|
||
return resp
|
||
|
||
def _clear_user_fingerprint(self, username):
|
||
try:
|
||
sql = "UPDATE system_users SET fingerprint = '' WHERE username = %s"
|
||
cursor, conn, count = self.pg_db.execute(sql, (username,), choose_db='joyhub')
|
||
obj_log.info(f"清除用户 {username} 的指纹锁,影响行数: {count}")
|
||
self.pg_db.close(cursor, conn)
|
||
return count >= 0
|
||
except Exception as e:
|
||
obj_log.error(f"清除指纹锁失败: {str(e)}")
|
||
return False
|
||
|
||
def kw_in_zhyy_purchase_todo_get(self, is_check='', **kwargs):
|
||
user, kwargs = self._get_user(kwargs)
|
||
kwargs = self._convert_json(kwargs)
|
||
url = "%s/erp/purchase-workbench/get-todo" % self.domain_url
|
||
obj_log.info("your input:{0}".format(kwargs))
|
||
resp = obj_runner.call_rest_api(API_URL=url, req_type="GET", user=user)
|
||
self._check_resp(is_check, resp)
|
||
return resp
|
||
|
||
def kw_in_zhyy_purchase_order_page_post(self, is_check='', **kwargs):
|
||
user, kwargs = self._get_user(kwargs)
|
||
kwargs = self._convert_json(kwargs)
|
||
url = "%s/erp/purchase-order/page" % self.domain_url
|
||
obj_log.info("your input:{0}".format(kwargs))
|
||
resp = obj_runner.call_rest_api(API_URL=url, req_type="POST", json=kwargs, user=user)
|
||
self._check_resp(is_check, resp)
|
||
return resp
|
||
|
||
def _get_user(self, kwargs):
|
||
user = kwargs.get("user", "purchase")
|
||
if "user" in kwargs:
|
||
del kwargs["user"]
|
||
return user, kwargs
|
||
|
||
def _convert_json(self, kwargs):
|
||
return kwargs
|
||
|
||
def _check_resp(self, is_check, resp):
|
||
if is_check == 'true':
|
||
assert resp is not None, "响应为空"
|
||
if hasattr(resp, 'json'):
|
||
resp_json = resp.json()
|
||
assert resp_json.get('code') == 0, f"请求失败,code={resp_json.get('code')}"
|
||
return resp
|
||
|
||
def kw_in_joyhub_auth_login_post(self, is_check='', **kwargs):
|
||
obj_log.info("=========== JoyHub 登录接口 ===========")
|
||
|
||
username = kwargs.get("username", "")
|
||
password = kwargs.get("password", "")
|
||
max_retries = kwargs.get("max_retries", 3)
|
||
token = kwargs.get("token", "")
|
||
|
||
url = "https://joyhub-website-manager-api-test.best-envision.com/admin-api/system/auth/login-dev"
|
||
|
||
session = requests.session()
|
||
session.headers.update({
|
||
'isencrypt': 'true',
|
||
'x-api-encrypt': 'true',
|
||
'tenant-id': '126'
|
||
})
|
||
|
||
resp = obj_runner._Runner__call_api(session, url, req_type="POST", json=kwargs)
|
||
|
||
if resp is not None and not isinstance(resp, Exception):
|
||
try:
|
||
resp_json = resp.json()
|
||
except Exception:
|
||
try:
|
||
resp_json = json.loads(resp.text)
|
||
except Exception:
|
||
resp_json = None
|
||
|
||
if resp_json is not None:
|
||
code = resp_json.get('code')
|
||
if code == 0 or code == 200:
|
||
obj_log.info("登录成功")
|
||
self._check_resp(is_check, resp_json)
|
||
return resp_json
|
||
elif code == 401:
|
||
obj_log.warning("登录失败,尝试清除指纹后重新登录")
|
||
|
||
if resp is not None and not isinstance(resp, Exception):
|
||
try:
|
||
resp_json = resp.json()
|
||
except Exception:
|
||
resp_json = None
|
||
else:
|
||
resp_json = None
|
||
|
||
self._check_resp(is_check, resp_json)
|
||
return resp_json
|
||
|
||
def kw_in_joyhub_role_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/system/role/create', is_check, '创建角色', **kwargs)
|
||
|
||
def kw_in_joyhub_role_delete_post(self, is_check='', **kwargs):
|
||
role_id = kwargs.get('id', '')
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/role/delete?id={role_id}', is_check, '删除角色')
|
||
|
||
def kw_in_joyhub_role_delete_list_post(self, is_check='', **kwargs):
|
||
ids = kwargs.get('ids', [])
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/role/delete-list?ids={ids_str}', is_check, '批量删除角色')
|
||
|
||
def kw_in_joyhub_role_get_get(self, is_check='', **kwargs):
|
||
role_id = kwargs.get('id', '')
|
||
return self._joyhub_request('GET', f'/admin-api/system/role/get?id={role_id}', is_check, '获得角色信息')
|
||
|
||
def kw_in_joyhub_role_page_get(self, is_check='', **kwargs):
|
||
page_no = kwargs.get('pageNo', 1)
|
||
page_size = kwargs.get('pageSize', 10)
|
||
return self._joyhub_request('GET', f'/admin-api/system/role/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得角色分页', **kwargs)
|
||
|
||
def kw_in_joyhub_role_simple_list_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/role/simple-list', is_check, '获取角色精简信息列表')
|
||
|
||
def kw_in_joyhub_role_list_all_simple_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/role/list-all-simple', is_check, '获取角色精简信息列表')
|
||
|
||
def kw_in_joyhub_role_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/role/update', is_check, '修改角色', **kwargs)
|
||
|
||
def kw_in_joyhub_post_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/system/post/create', is_check, '创建岗位', **kwargs)
|
||
|
||
def kw_in_joyhub_post_delete_post(self, is_check='', **kwargs):
|
||
post_id = kwargs.get('id', '')
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/post/delete?id={post_id}', is_check, '删除岗位')
|
||
|
||
def kw_in_joyhub_post_delete_list_post(self, is_check='', **kwargs):
|
||
ids = kwargs.get('ids', [])
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/post/delete-list?ids={ids_str}', is_check, '批量删除岗位')
|
||
|
||
def kw_in_joyhub_post_get_get(self, is_check='', **kwargs):
|
||
post_id = kwargs.get('id', '')
|
||
return self._joyhub_request('GET', f'/admin-api/system/post/get?id={post_id}', is_check, '获得岗位信息')
|
||
|
||
def kw_in_joyhub_post_list_all_simple_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/post/list-all-simple', is_check, '获取岗位全列表')
|
||
|
||
def kw_in_joyhub_post_page_get(self, is_check='', **kwargs):
|
||
page_no = kwargs.get('pageNo', 1)
|
||
page_size = kwargs.get('pageSize', 10)
|
||
return self._joyhub_request('GET', f'/admin-api/system/post/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得岗位分页列表', **kwargs)
|
||
|
||
def kw_in_joyhub_post_simple_list_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/post/simple-list', is_check, '获取岗位精简列表')
|
||
|
||
def kw_in_joyhub_post_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/post/update', is_check, '修改岗位', **kwargs)
|
||
|
||
# ============ 运费模板信息接口 ============
|
||
def kw_in_joyhub_shipping_template_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/shipping-template/create', is_check, '创建运费模板信息', **kwargs)
|
||
|
||
def kw_in_joyhub_shipping_template_delete_post(self, shipping_template_id, is_check=''):
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/shipping-template/delete?id={shipping_template_id}', is_check, '删除运费模板信息')
|
||
|
||
def kw_in_joyhub_shipping_template_delete_list_post(self, ids, is_check=''):
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/shipping-template/delete-list?ids={ids_str}', is_check, '批量删除运费模板信息')
|
||
|
||
def kw_in_joyhub_shipping_template_get_detail_get(self, shipping_template_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/jh/shipping-template/get-detail?id={shipping_template_id}', is_check, '获得运费模板详情')
|
||
|
||
def kw_in_joyhub_shipping_template_page_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/shipping-template/page', is_check, '获得运费模板分页', **kwargs)
|
||
|
||
def kw_in_joyhub_shipping_template_save_with_children_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/shipping-template/save-with-children', is_check, '保存运费模板信息(含规则与子表)', **kwargs)
|
||
|
||
def kw_in_joyhub_shipping_template_shipping_rule_list_get(self, shipping_template_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/jh/shipping-template/shipping-rule/list-by-shipping-template-id?shippingTemplateId={shipping_template_id}', is_check, '获得运费规则列表')
|
||
|
||
def kw_in_joyhub_shipping_template_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/jh/shipping-template/update', is_check, '更新运费模板信息', **kwargs)
|
||
|
||
def kw_in_joyhub_banner_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/banner/create', is_check, '创建Banner管理', **kwargs)
|
||
|
||
def kw_in_joyhub_banner_delete_delete(self, banner_id, is_check=''):
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete?id={banner_id}', is_check, '删除Banner管理')
|
||
|
||
def kw_in_joyhub_banner_delete_list_delete(self, ids, is_check=''):
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/banner/delete-list?ids={ids_str}', is_check, '批量删除Banner管理')
|
||
|
||
def kw_in_joyhub_banner_get_import_template_get(self, is_check=''):
|
||
return self._joyhub_request('GET', '/admin-api/jh/banner/get-import-template', is_check, '获得导入Banner管理模板', return_json=False)
|
||
|
||
def kw_in_joyhub_banner_get_get(self, banner_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/jh/banner/get?id={banner_id}', is_check, '获得Banner管理详情')
|
||
|
||
def kw_in_joyhub_banner_page_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/banner/page', is_check, '获得Banner管理分页', **kwargs)
|
||
|
||
def kw_in_joyhub_banner_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/jh/banner/update', is_check, '更新Banner管理', **kwargs)
|
||
|
||
# ============ app版本号管理接口 ============
|
||
def kw_in_joyhub_appversion_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/appversion/create', is_check, '创建app版本号管理', **kwargs)
|
||
|
||
def kw_in_joyhub_appversion_delete_delete(self, appversion_id, is_check=''):
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/appversion/delete?id={appversion_id}', is_check, '删除app版本号管理')
|
||
|
||
def kw_in_joyhub_appversion_delete_list_delete(self, ids, is_check=''):
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/appversion/delete-list?ids={ids_str}', is_check, '批量删除app版本号管理')
|
||
|
||
def kw_in_joyhub_appversion_export_excel_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/appversion/export-excel', is_check, '导出app版本号管理 Excel', return_json=False, **kwargs)
|
||
|
||
def kw_in_joyhub_appversion_get_get(self, appversion_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/jh/appversion/get?id={appversion_id}', is_check, '获得app版本号管理')
|
||
|
||
def kw_in_joyhub_appversion_get_import_template_get(self, is_check=''):
|
||
return self._joyhub_request('GET', '/admin-api/jh/appversion/get-import-template', is_check, '获得导入app版本号管理模板', return_json=False)
|
||
|
||
def kw_in_joyhub_appversion_import_excel_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/appversion/import-excel', is_check, '导入app版本号管理Excel', **kwargs)
|
||
|
||
def kw_in_joyhub_appversion_page_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/appversion/page', is_check, '获得app版本号管理分页', **kwargs)
|
||
|
||
def kw_in_joyhub_appversion_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/jh/appversion/update', is_check, '更新app版本号管理', **kwargs)
|
||
|
||
def kw_in_joyhub_user_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/system/user/create', is_check, '创建用户', **kwargs)
|
||
|
||
def kw_in_joyhub_user_delete_post(self, is_check='', **kwargs):
|
||
user_id = kwargs.get('id', '')
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/user/delete?id={user_id}', is_check, '删除用户')
|
||
|
||
def kw_in_joyhub_user_delete_list_post(self, is_check='', **kwargs):
|
||
ids = kwargs.get('ids', [])
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/user/delete-list?ids={ids_str}', is_check, '批量删除用户')
|
||
|
||
def kw_in_joyhub_user_get_get(self, is_check='', **kwargs):
|
||
user_id = kwargs.get('id', '')
|
||
return self._joyhub_request('GET', f'/admin-api/system/user/get?id={user_id}', is_check, '获得用户详情')
|
||
|
||
def kw_in_joyhub_user_page_get(self, is_check='', **kwargs):
|
||
page_no = kwargs.get('pageNo', 1)
|
||
page_size = kwargs.get('pageSize', 10)
|
||
return self._joyhub_request('GET', f'/admin-api/system/user/page?pageNo={page_no}&pageSize={page_size}', is_check, '获得用户分页列表', **kwargs)
|
||
|
||
def kw_in_joyhub_user_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/user/update', is_check, '修改用户', **kwargs)
|
||
|
||
def kw_in_joyhub_user_update_password_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/user/update-password', is_check, '重置用户密码', **kwargs)
|
||
|
||
def kw_in_joyhub_user_update_status_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/user/update-status', is_check, '修改用户状态', **kwargs)
|
||
|
||
def kw_in_joyhub_user_simple_list_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/user/simple-list', is_check, '获取用户精简信息列表')
|
||
|
||
def kw_in_joyhub_user_profile_get_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/user/profile/get', is_check, '获得登录用户信息')
|
||
|
||
def kw_in_joyhub_user_profile_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/user/profile/update', is_check, '修改用户个人信息', **kwargs)
|
||
|
||
def kw_in_joyhub_user_profile_update_password_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/user/profile/update-password', is_check, '修改用户个人密码', **kwargs)
|
||
|
||
# ============ 部门管理接口 ============
|
||
def kw_in_joyhub_dept_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/system/dept/create', is_check, '创建部门', **kwargs)
|
||
|
||
def kw_in_joyhub_dept_delete_post(self, dept_id, is_check=''):
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/dept/delete?id={dept_id}', is_check, '删除部门')
|
||
|
||
def kw_in_joyhub_dept_delete_list_post(self, ids, is_check=''):
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/system/dept/delete-list?ids={ids_str}', is_check, '批量删除部门')
|
||
|
||
def kw_in_joyhub_dept_get_get(self, dept_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/system/dept/get?id={dept_id}', is_check, '获得部门信息')
|
||
|
||
def kw_in_joyhub_dept_list_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/system/dept/list', is_check, '获取部门列表', **kwargs)
|
||
|
||
def kw_in_joyhub_dept_list_all_simple_get(self, is_check=''):
|
||
return self._joyhub_request('GET', '/admin-api/system/dept/list-all-simple', is_check, '获取部门精简信息列表')
|
||
|
||
def kw_in_joyhub_dept_simple_list_get(self, is_check=''):
|
||
return self._joyhub_request('GET', '/admin-api/system/dept/simple-list', is_check, '获取部门精简信息列表')
|
||
|
||
def kw_in_joyhub_dept_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/system/dept/update', is_check, '更新部门', **kwargs)
|
||
|
||
# ============ 协议管理接口 ============
|
||
def kw_in_joyhub_agreement_create_post(self, is_check='', **kwargs):
|
||
return self._joyhub_request('POST', '/admin-api/jh/agreement/create', is_check, '创建协议', **kwargs)
|
||
|
||
def kw_in_joyhub_agreement_delete_delete(self, agreement_id, is_check=''):
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/agreement/delete?id={agreement_id}', is_check, '删除协议')
|
||
|
||
def kw_in_joyhub_agreement_delete_list_delete(self, ids, is_check=''):
|
||
ids_str = ','.join(map(str, ids))
|
||
return self._joyhub_request('DELETE', f'/admin-api/jh/agreement/delete-list?ids={ids_str}', is_check, '批量删除协议')
|
||
|
||
def kw_in_joyhub_agreement_export_excel_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/agreement/export-excel', is_check, '导出协议 Excel', return_json=False, **kwargs)
|
||
|
||
def kw_in_joyhub_agreement_get_get(self, agreement_id, is_check=''):
|
||
return self._joyhub_request('GET', f'/admin-api/jh/agreement/get?id={agreement_id}', is_check, '获得协议')
|
||
|
||
def kw_in_joyhub_agreement_page_get(self, is_check='', **kwargs):
|
||
return self._joyhub_request('GET', '/admin-api/jh/agreement/page', is_check, '获得协议分页', **kwargs)
|
||
|
||
def kw_in_joyhub_agreement_update_put(self, is_check='', **kwargs):
|
||
return self._joyhub_request('PUT', '/admin-api/jh/agreement/update', is_check, '更新协议', **kwargs)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
test = DlzhanInterface()
|
||
a = test.kw_in_zhyy_purchase_todo_get(user="purchase")
|
||
print(a)
|