Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

366 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import requests
import json
import time
from urllib import parse
from base_framework.public_tools.get_token import LoginSys, SparkleLogin, AgentApiLogin, MarketApiLogin, token_dict, \
AllSchool, \
ParentLogin, student_token_cache, StudentLogin, SparkEduLogin, SparkSaasLogin, SparkSaasTeacherLogin
from base_framework.public_tools import log
from base_framework.public_tools.read_config import ReadConfig, get_current_env
from base_framework.base_config.current_pth import *
from base_framework.public_tools.read_config import get_current_config
from base_framework.public_tools.sqlhelper import MySqLHelper
obj_log = log.get_logger()
evn_cfg = ReadConfig(env_choose_path)
base_cfg = ReadConfig(config_file_path)
all_school_api_host = ['api.qa.allschool.com', 'api.sim.allschool.com', 'api.allschool.com']
class Runner(LoginSys):
def __init__(self):
super().__init__()
# self.env = get_current_config(section="run_evn_name", key="current_evn")
self.env = get_current_env()
self.is_need_sso = ["teach-api"]
self.team = get_current_config(section="run_evn_name", key="current_team")
self.db_con = MySqLHelper()
self.sql_data = [] # 存放接口响应时间
self.week = time.strftime("%w", time.localtime())
def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'instance'):
cls.instance = super().__new__(cls)
return cls.instance
def __del__(self):
if self.sql_data: # 如果有数据未插入
# sql_str = ("insert into sparkatp.interface_response_time(team, in_type, rp_time, in_url,in_id) "
# "values (%s,%s,%s,%s,%s);")
# self.db_con.insert_many(sql=sql_str, param=self.sql_data)
# self.sql_data.clear()
pass
def __call_api(self, session, api_url, req_type, **kwargs):
try:
if req_type == "GET":
req = session.get(api_url, verify=False, **kwargs)
elif req_type == "POST":
req = session.post(api_url, verify=False, **kwargs)
elif req_type == "PUT":
req = session.put(api_url, verify=False, **kwargs)
elif req_type == "DELETE":
req = session.delete(api_url, verify=False, **kwargs)
elif req_type == "PATCH":
req = session.patch(api_url, verify=False, **kwargs)
else:
obj_log.info(f'req_type,输入错误!不支持请求方法{req_type}')
raise Exception(f'req_type,输入错误!不支持请求方法{req_type}')
except Exception as e:
obj_log.info('返回数据:{}'.format(e))
return e
if self.env.lower() != "sim": # 非sim环境才记录接口响应时间
# 每周六,记录一次所有接口的响应时间
if self.week in ['6']:
current_time = int(time.strftime("%H", time.localtime()))
if current_time >= 5: # 5点之前的404校验脚本不统计因为都是非正常参数详见http://10.250.200.1:8080/jenkins/view/2-%E8%BE%85%E5%8A%A9%E7%A8%8B%E5%BA%8F/job/%E6%8E%A5%E5%8F%A3404%E6%A3%80%E6%9F%A5/
run_time = int(req.elapsed.total_seconds() * 1000)
if run_time > 200: # 超过200ms的接口记录到数据库
in_data = (self.team, req_type, run_time, api_url, 0)
self.sql_data.append(in_data)
if len(self.sql_data) >= 10: # 每10条数据插入一次, 最后一次不足100条时在__del__中插入
sql_str = (
"insert into sparkatp.interface_response_time(team, in_type, rp_time, in_url,in_id) "
"values (%s,%s,%s,%s,%s);")
self.db_con.insert_many(sql=sql_str, param=self.sql_data)
self.sql_data.clear()
return req
def call_rest_api(self, API_URL, req_type, user=None,
token=None, current_evn=None, is_file=False, **kwargs):
'''
功能:所有接口访问入口
'''
if self.env.lower() == "sim":
API_URL = API_URL.replace(".qa.", ".sim.")
return self._call_zhyy_api(API_URL, req_type, user,
token, current_evn, is_file, **kwargs)
def _call_zhyy_api(self, api_url, req_type, user=None, token=None, current_evn=None, is_file=False, **kwargs):
"""
功能模拟走ruoyi登录模式带token访问接口
:param API_URL:
:param req_type:
:param user: 使用的自定义用户登录
:param token: 从SSO获取的token此字段使用默认值None不是的情况使用False
:param current_evn: QA or SIM
:param is_file:
:param kwargs:
:return:
"""
req_type = req_type.upper()
retry_num = 4
cnt = 0
if 'as_login_type' in kwargs.keys():
kwargs.pop('as_login_type')
options_dict = dict(**kwargs)
result = parse.urlparse(url=api_url, allow_fragments=True)
host = result.hostname
if not user:
user = self.curt_user
obj_log.info('登录系统为{},用户名为手动输入:{}'.format(host, user))
obj_log.info('请求地址:{}'.format(api_url))
obj_log.info('请求数据:{}'.format(options_dict))
header = {"Authorization": "Basic c3BhcmtsZS13ZWI6c3BhcmtsZS13ZWI=", "tenant-id": "1"}
session = requests.session()
session.headers.update(header)
while cnt < retry_num:
if token is None:
if not user:
user = evn_cfg.get_value(sections='run_user_name', options='default_user')
tenant_name = base_cfg.get_value(sections=user, options='tenant')
usr_name = base_cfg.get_value(sections=user, options='username')
usr_pwd = base_cfg.get_value(sections=user, options='password')
# 检查 kwargs 中是否包含登录信息tenantName/username 和 password
# 如果包含,使用传入的参数;否则使用配置文件中的登录信息
json_data = kwargs.get('json', {})
is_login_params = isinstance(json_data, dict) and 'password' in json_data and (
'tenantName' in json_data or 'username' in json_data)
if is_login_params:
# 使用传入的登录参数
obj_log.info('检测到 kwargs 中包含登录信息,使用传入的参数进行登录')
login_req = self.__call_api(session, api_url, req_type='POST', **kwargs)
else:
# 使用配置文件中的登录信息
obj_log.info('使用配置文件中的登录信息进行登录')
request_body = {'json': {'tenantName': tenant_name, 'username': usr_name, 'password': usr_pwd,
'rememberMe': 'true'}}
api_login_url = self.zhyy_host
login_req = self.__call_api(session, api_login_url, req_type='POST', **request_body)
try:
req_json = login_req.json()
except Exception:
# 如果 json() 失败,尝试将 text 转成 JSON
try:
if isinstance(login_req, Exception):
obj_log.warning("登录请求失败: {}".format(login_req))
req_json = {}
else:
req_json = json.loads(login_req.text)
except Exception:
if isinstance(login_req, Exception):
obj_log.warning("登录请求失败: {}".format(login_req))
else:
obj_log.warning("登录响应无法解析为JSON: {}".format(login_req.text))
req_json = {}
# 判断登录是否成功code为0或200表示成功或者data中包含accessToken
login_code = req_json.get("code")
if login_code not in [0, 200] and not req_json.get("data", {}).get('accessToken'):
raise Exception("SSO登录失败: {}".format(req_json))
user_token = req_json.get("data", {}).get('accessToken')
# 如果 api_url 本身就是登录接口,登录成功后直接返回登录响应
if is_login_params:
obj_log.info('api_url 是登录接口,直接返回登录响应')
if not is_file:
return req_json
else:
return login_req.content
else:
user_token = token
if user_token:
session.headers.update({'ssotoken': user_token})
session.headers.update({'sso-token': user_token})
session.headers.update({'accesstoken': user_token})
session.headers.update({'Accesstoken': user_token})
session.headers.update({'access-token': user_token})
session.headers.update({'Authorization': user_token})
session.headers.update({'token': user_token})
obj_log.info(f'请求头headers{session.headers}')
req = self.__call_api(session, api_url, req_type, **kwargs)
if isinstance(req, Exception):
obj_log.error('请求失败:{}'.format(req))
return req
if not is_file:
try:
rtn_temp = req.json()
except Exception as e:
# 如果 json() 失败,尝试将 text 转成 JSON
try:
rtn_temp = json.loads(req.text)
except Exception:
# 如果都失败,保持为 text 字符串
rtn_temp = req.text
else:
rtn_temp = req.content
# SSO登录过期处理开始
status_code = str(req.status_code)
obj_log.info('------状态码:{} 返回信息:{}'.format(status_code, rtn_temp))
try:
# 如果 rtn_temp 是字典,才能使用 .get() 方法
if isinstance(rtn_temp, dict):
resp_code = str(rtn_temp.get('code', '200'))
else:
resp_code = '200'
except:
resp_code = '200'
if not is_file:
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("--缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
if status_code == "401" or resp_code == '401':
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
if status_code == "200" and "<!DOCTYPE html>" in str(rtn_temp):
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("--缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
# SSO登录过期处理完成
req.close()
if not is_file:
obj_log.info('返回数据:{}'.format(rtn_temp))
else:
obj_log.info('返回数据:文件字节流')
return rtn_temp
return False
def _call_sso_api(self, api_url, req_type, user=None, token=None, current_evn=None, is_file=False, **kwargs):
"""
功能模拟走sso登录模式带token访问接口
:param API_URL:
:param req_type:
:param user: 使用的自定义用户登录
:param token: 从SSO获取的token此字段使用默认值None不是的情况使用False
:param current_evn: QA or SIM
:param is_file:
:param kwargs:
:return:
"""
req_type = req_type.upper()
retry_num = 4
cnt = 0
if 'as_login_type' in kwargs.keys():
kwargs.pop('as_login_type')
options_dict = dict(**kwargs)
result = parse.urlparse(url=api_url, allow_fragments=True)
host = result.hostname
if not user:
user = self.curt_user
obj_log.info('登录系统为{},用户名为手动输入:{}'.format(host, user))
obj_log.info('请求地址:{}'.format(api_url))
obj_log.info('请求数据:{}'.format(options_dict))
header = {"Authorization": "Basic c3BhcmtsZS13ZWI6c3BhcmtsZS13ZWI=", "tenant-id": "1"}
session = requests.session()
session.headers.update(header)
while cnt < retry_num:
if token is None:
if not user:
user = evn_cfg.get_value(sections='run_user_name', options='default_user')
usr_name = base_cfg.get_value(sections=user, options='show_username')
usr_pwd = base_cfg.get_value(sections=user, options='password')
sso_url = "{}?username={}&password={}".format(self.sparkle_pc_token_url, usr_name, usr_pwd)
sso_rsp = session.post(sso_url).json()
if sso_rsp.get("code") != 200:
raise Exception("SSO登录失败: {}".format(sso_rsp))
user_token = sso_rsp.get("data").get('accessToken')
else:
user_token = token
if user_token:
session.headers.update({'ssotoken': user_token})
session.headers.update({'sso-token': user_token})
session.headers.update({'accesstoken': user_token})
session.headers.update({'Accesstoken': user_token})
session.headers.update({'access-token': user_token})
session.headers.update({'token': user_token})
obj_log.info(f'请求头headers{session.headers}')
req = self.__call_api(session, api_url, req_type, **kwargs)
if not is_file:
try:
rtn_temp = req.json()
except Exception as e:
# 如果 json() 失败,尝试将 text 转成 JSON
try:
rtn_temp = json.loads(req.text)
except Exception:
# 如果都失败,保持为 text 字符串
rtn_temp = req.text
else:
rtn_temp = req.content
# SSO登录过期处理开始
status_code = str(req.status_code)
obj_log.info('------状态码:{} 返回信息:{}'.format(status_code, rtn_temp))
try:
# 如果 rtn_temp 是字典,才能使用 .get() 方法
if isinstance(rtn_temp, dict):
resp_code = str(rtn_temp.get('code', '200'))
else:
resp_code = '200'
except:
resp_code = '200'
if not is_file:
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("--缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
if status_code == "401" or resp_code == '401':
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
if status_code == "200" and "<!DOCTYPE html>" in str(rtn_temp) and "https://sso.qa.huohua.cn" in str(
rtn_temp):
token_dict.clear() # 缓存session过期清理
student_token_cache.clear() # 缓存session过期清理
obj_log.warning("--缓存session过期清理缓存")
cnt += 1
if cnt <= 1:
continue
# SSO登录过期处理完成
req.close()
if not is_file:
obj_log.info('返回数据:{}'.format(rtn_temp))
else:
obj_log.info('返回数据:文件字节流')
return rtn_temp
return False
if __name__ == '__main__':
obj_runner = Runner()
# url = "https://api.qa.sparkedu.com/user_language/"
# url = "https://api.qa.sparkedu.com/third/student"
url = "https://api.qa.sparkedu.com/user/appoint/add"
phone = '13716640630'
# post_data = {"avatar":"https://stalegacy.huohua.cn/image/huohua/avatar/default/default_avatar1.png",
# "birthday":"2021-01-03 00:00:00",
# "id":21434807,
# "nickname":"hute222",
# "sex":0}
post_data = {"subjectType": 1}
# resp = obj_runner.call_rest_api(API_URL=url, phone=phone, req_type="POST", json=post_data)
resp = obj_runner.week
print(resp)