Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

232 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
"""
Author: qiaoxinjiu
Create Data: 2020/10/15 17:35
"""
import time
from base_framework.public_tools.log import get_logger
from base_framework.public_tools.my_faker import MyFaker
from base_framework.public_tools.runner import Runner
import re
obj_log = get_logger()
obj_faker = MyFaker()
obj_runner = Runner()
class ManageKeyWord:
def __init__(self):
pass
@staticmethod
def kw_get_my_permissions(app_type, employee_id, **kwargs):
"""
| 功能说明: | 获取权限 |
| 输入参数: | phone | 新增线索的手机号 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
| 存放位置: | | |
举例说明:
| kw_get_my_permissions | |
"""
options_dict = dict(**kwargs)
change_user = options_dict.get('user', None)
api_url = obj_runner.manage_host + '/permission/getMyPermissions'
req_type = 'POST'
post_data = {
"appType": app_type,
"employeeId": employee_id}
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=api_url,
req_type=req_type,
params=post_data)
return resp
@staticmethod
def kw_get_sms_code(phone=None, **kwargs):
"""
| 功能说明: | 获取短信验证码 |
| 输入参数: | phone | 手机号码 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
| 存放位置: | | |
举例说明:
| kw_get_sms_code | |
"""
options_dict = dict(**kwargs)
change_user = options_dict.get('user', None)
current_evn = options_dict.get('current_evn', None)
url = "https://smm.qa.huohua.cn/sms/log/page/all?WHERE.%5BEQ%5Dphone={}&WHERE.%5BEQ%5DtypeName=&WHERE.%5BEQ%5Dchannel=&WHERE.%5BEQ%5DgatewayType=&WHERE.%5BGT%5DcreateTime=&WHERE.%5BLT%5DcreateTime=&WHERE.%5BGT%5DsubmitTime=&WHERE.%5BLT%5DsubmitTime=&pageNum=1&pageSize=20&orderBy=id%20desc".format(
phone)
req_type = 'POST'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=url,
req_type=req_type,
current_evn=current_evn)
smm_code = resp['list'][0]['msg']
reg = re.compile(r"(?<=验证码:)\d+")
match = reg.search(smm_code)
smm_code = match.group(0)
return smm_code
@staticmethod
def kw_get_authcode_without_message(phone,type=2,**kwargs):
"""
| 功能说明: | 直接获取短信验证码,不会发送短信 |
| 输入参数: | phone | 手机号码 |
| | type | 获取类型(2、磐石) |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
| 存放位置: | | |
举例说明:
| kw_get_authcode_without_message | phone=XXX type=XXX |
"""
if "country_code" in kwargs.keys():
if "-" not in phone and not str(phone).startswith("0") and int(kwargs.get("country_code")) != 86:
phone="{}-{}".format(kwargs.get("country_code"),phone)
url = "{}/user_profile/sendLoginAuthCodeWithoutMessage?phone={}&type={}".format(obj_runner.manage_host, phone, type)
obj_log.info("your input:{0}".format(phone))
resp = obj_runner.call_rest_api(API_URL=url, req_type="POST")
try:
auth_code = resp['data'][-4::1]
obj_log.info(auth_code)
except:
raise Exception("未获取到验证码,获取接口返回:%s"%resp)
return auth_code
@staticmethod
def kw_execute_xxl_job(job_id, para="", **kwargs):
"""
| 功能说明: | 执行xxljob定时任务 |
| 输入参数: | job_id | job的任务id |
| | para=none | job运行时的参数 |
| 返回参数: | 无 |
| 作者信息: | 林于棚 | 2020.12.14 |
| 函数位置: | Public/Common/mg_keyword.py | |
举例说明:
| execute_xxl_job | 2109 |
"""
startTime_stamp = time.localtime()
# startTime = time.strftime("%Y-%m-%d %H:%M:%S", startTime_stamp)
endTime = time.strftime("%Y-%m-%d 23:59:59", startTime_stamp)
jobGroup = None
if "jobGroup" in kwargs:
jobGroup = kwargs.pop("jobGroup")
post_data = {
"id": job_id,
'executorParam': para}
api_url = obj_runner.xxl_job_host + "/jobinfo/trigger"
# obj_runner.call_rest_api(API_URL=api_url, req_type="POST", data=post_data, user="ci009")
startTime = time.strftime("%Y-%m-%d %H:%M:%S", startTime_stamp)
time.sleep(1)
tragger_resp = obj_runner.call_rest_api(API_URL=api_url, req_type="POST", data=post_data)
if tragger_resp.get("code") != 200:
raise EnvironmentError("%s job 触发失败!" % job_id)
time.sleep(3)
if jobGroup:
data_query = {"jobGroup": jobGroup, "jobId": job_id, "logStatus": -1,
"filterTime": "%s - %s" % (startTime, endTime), "start": 0, "length": 10}
url = obj_runner.xxl_job_host + "/joblog/pageList"
i = 0
while i < 300:
resp = obj_runner.call_rest_api(API_URL=url, params=data_query, req_type="POST")
if 'data' in resp and len(resp['data']) > 0:
if resp['data'][0].get("handleCode") == 200:
obj_log.info(resp['data'][0])
obj_log.info("job%s执行成功" % job_id)
break
time.sleep(5)
i += 1
else:
raise EnvironmentError("%s job 执行失败!" % job_id)
@staticmethod
def kw_modify_apollo_configuration():
"""
| 功能说明: | 修改apollo配置 |
| 输入参数: | phone | 手机号码 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
| 存放位置: | | |
举例说明:
| execute_xxl_job | |
"""
post_data = {
"id": 28959,
"key": "sms.start.time",
"value": "9:50",
"comment": "",
"dataChangeCreatedBy": "apollo",
"tableViewOperType": "update"}
api_url = "http://apollo.qa.huohua.cn/apps/peppa-cc-manage/envs/QA/clusters/default/namespaces/application/item"
obj_runner.call_rest_api(API_URL=api_url, req_type="PUT", json=post_data)
@staticmethod
def course_upload(**kwargs):
"""
| 功能说明: | 上传图片 |
| 输入参数: | phone | 手机号码 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
| 存放位置: | | |
举例说明:
| execute_xxl_job | |
"""
options_dict = dict(**kwargs)
change_user = options_dict.get('user', None)
file = r"C:\Users\HuoHua\Pictures\桌面图片\download.jpg"
api_url = obj_runner.teach_host + "/peppa-teach-api/common/upload"
files = {
"type": (None, "lesson"),
"file": ("file", open(file, "rb"), "image/jpeg"),
}
# headers = {
# "Content-Type": "multipart/form-data" # 上传文件不要设置type会报错
# }
response = obj_runner.call_rest_api(
user=change_user,
req_type="POST",
API_URL=api_url,
files=files
)
return response
@staticmethod
def sku_classfiy_get(**kwargs):
"""
| 功能说明: | 获取SKU分类枚举值 |
| 输入参数: | 无 | 无 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
举例说明:
| sku_classfiy_get | |
"""
options_dict = dict(**kwargs)
change_user = options_dict.get('user', None)
api_url = obj_runner.scm_host + '/smart/expressSku/skuClassifyEnum'
req_type = 'GET'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=api_url,
req_type=req_type)
return resp
if __name__ == '__main__':
url = "https://teach-opt-api.qa.huohua.cn/api/quality-manage/template/items/385/1/"
res = obj_runner.call_rest_api(url, "get")
a = ManageKeyWord()
# [a.kw_add_case(phone) for phone in ['1878201' +
# str(random.randrange(1111, 9999, 2)) for _ in range(2)]]
# a.kw_get_my_permissions('crmnew', 586470)
# a.kw_get_sms_code('18782019436')
# a.course_upload(user='xl')
a.execute_xxl_job(2865, "[262]")
# a.sku_classfiy_get()