Files
smart-management-auto-test/base_framework/public_tools/get_token.py
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

912 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from urllib import parse
import requests
from retrying import retry
from base_framework.public_tools.read_config import InitConfig
from base_framework.public_tools.read_config import ReadConfig, get_current_config, get_current_env
from base_framework.base_config.current_pth import *
from requests.packages.urllib3.connectionpool import InsecureRequestWarning
from base_framework.public_tools import log
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
import base64
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import time
import re
# get_cfg = ReadConfig(env_choose_path)
token_dict = dict()
student_token_cache = {}
obj_log = log.get_logger()
class LazyProperty:
def __init__(self, fun):
self.fun = fun
def __get__(self, instance, owner):
if instance is None:
return self
value = self.fun(instance)
setattr(instance, self.fun.__name__, value)
return value
class LoginSys(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
try:
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
except Exception as e:
print(e)
self.host = host
self.curt_user = self.current_user
self.env = get_current_env()
@LazyProperty
def get_session(self):
"""
获取SSO session
:return:
"""
sys_type = self.host.split(".")
if 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
api_url = self.curt_sso_url
elif 'hulk-content-audit-server' in sys_type or 'hulk-content-manage-gateway' in sys_type or 'hulk-class-help-manager' in sys_type or 'hulk-ark-gateway' in sys_type or 'manage-api' in sys_type :
api_url = self.all_school_sso_url
else:
api_url = self.curt_sso_url
post_data = dict()
post_data['showUsername'] = self.show_username
post_data['username'] = self.username
post_data['password'] = self.password
req_session = requests.session()
resp = req_session.post(
url=api_url,
data=post_data,
allow_redirects=False,
verify=False)
return req_session
@LazyProperty
def get_code(self):
"""
获取code
:return:
"""
post_data = dict()
sys_type = self.host.split(".")
if 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
post_data['client_id'] = "vispark-crm"
post_data['response_type'] = 'code'
post_data['redirect_uri'] = self.crm_vispark_url
authorization = self.cc_auth
# elif sys_type == "smm":
# post_data['client_id'] = "sms-manage"
# post_data['response_type'] = 'code'
# post_data['redirect_uri'] = self.sms_url
if 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
api_url = self.get_code_url
elif 'hulk-content-audit-server' in sys_type or 'hulk-content-manage-gateway' in sys_type or 'hulk-class-help-manager' in sys_type or 'hulk-ark-gateway' in sys_type or 'manage-api' in sys_type:
api_url = self.get_all_school_code_url
else:
api_url = self.get_code_url
req_session = self.get_session
if 'mq-console' in sys_type:
allow_redirects = True
else:
allow_redirects = False
obj_log.info(api_url)
obj_log.info(post_data)
resp = req_session.get(
url=api_url,
params=post_data,
allow_redirects=allow_redirects,
verify=False)
if 'mq-console' in sys_type:
return [req_session, 'MQ']
code_temp = resp.headers
obj_log.info(code_temp)
code = code_temp.get('location')
obj_log.info(code)
if 'client_id=gmp' in code:
return code
try:
code = code.split("=")[-1]
except Exception:
raise IndexError("获取token失败请检查环境或者登录信息是否正确")
resp_list = [code, post_data['redirect_uri'], authorization]
return resp_list
# @LazyProperty
def get_token(self, type_name=None):
"""
获取token
:return:
"""
try:
token = token_dict[self.host].get(self.curt_user, None)
except KeyError as e:
token = False
if token:
return token
else:
token_dict_temp = dict()
sys_type = self.host.split(".")
post_data = dict()
temp_code = self.get_code
if isinstance(temp_code, str):
session = self.get_session
resp = session.get(url=temp_code)
for key, value in session.cookies.items():
if key == 'peppa_sso_token':
token = value
break
token_header = {"accesstoken": token}
session.headers.update(token_header)
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
session.headers.update(podenv)
token_dict_temp[self.curt_user] = session
token_dict[self.host] = token_dict_temp
return session
if temp_code[1] == 'MQ': # MQ特殊处理
return temp_code[0]
post_data['code'] = temp_code[0]
post_data['redirect_uri'] = temp_code[1]
post_data['grant_type'] = 'authorization_code'
req_session = self.get_session
authorization = temp_code[2]
if authorization:
header = {'authorization': authorization}
if 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
resp = req_session.get(
url=api_url+'&code='+temp_code[0], data=post_data, headers=header, allow_redirects=False, verify=False)
elif 'visparklearning' in sys_type and 'manage-gw' in sys_type:
resp = req_session.get(url=api_url + '&code=' + temp_code[0], data=post_data, headers=header,
allow_redirects=False, verify=False)
else:
resp = req_session.post(
url=api_url, data=post_data, headers=header, allow_redirects=False, verify=False)
if 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
token_temp = parse.parse_qs(parse.urlparse(resp.headers['location']).query)
elif 'visparklearning' in sys_type and 'manage-gw' in sys_type:
token_temp = parse.parse_qs(parse.urlparse(resp.headers['location']).query)
else:
token_temp = resp.json()
if type_name == 'BFF': # bff架构使用
token = {"token": token_temp['access_token']}
elif 'visparklearning' in sys_type and ('cc-manage' or 'cti-manage' in sys_type):
token = {"accesstoken": token_temp['peppa_sso_token'][0]}
elif 'visparklearning' in sys_type and 'manage-gw' in sys_type:
token = {"accesstoken": token_temp['peppa_sso_token'][0]}
else:
if "access_token" not in token_temp:
obj_log.warning("---------获取token失败-------")
obj_log.warning("type_name:{}\ntoken_temp:{}".format(type_name, token_temp))
obj_log.warning("-----------------------------")
token = {"accesstoken": token_temp['access_token']}
req_session.headers.update(token)
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
req_session.headers.update(podenv)
token_dict_temp[self.curt_user] = req_session
token_dict[self.host] = token_dict_temp
return req_session
else:
if 'kunpeng' in post_data['redirect_uri']: # 鲲鹏/mp系统登录
req_session.get(url=post_data['redirect_uri'] + '&code=' + temp_code[0], verify=False)
return req_session
elif 'allschool-mp' in post_data['redirect_uri']:
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
req_session.headers.update(podenv)
req_session.get(url=post_data['redirect_uri'] + '&code=' + temp_code[0], verify=False)
access_token = req_session.cookies.get("peppa_sso_token")
req_session.headers.update({"accesstoken": access_token})
return req_session
run_session = self.sso_oauth_login(session=req_session, code=temp_code[0])
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
run_session.headers.update(podenv)
token_dict_temp[self.curt_user] = run_session
token_dict[self.host] = token_dict_temp
return run_session
def sso_oauth_login(self, type='manage', session=None, code=None):
"""
| 功能说明: | |
| 输入参数: | | |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
举例说明:
| sso_oauth_login | |
"""
if type == 'manage':
url = self.manage_url + '?code=' + code
elif type == 'opengalaxy': # 独立环境发布系统
url = self.opengalaxy_url + '?code=' + code
session.get(url=url, allow_redirects=True, verify=False)
return session
def ug_hhi_sso_login_by_session_id(self):
"""
ug的manage后台登录获取sessionId
:return:
"""
req_session = requests.session()
headers = {"Content-Type": "application/json"}
# sso登录
url_form = self.curt_sso_url
obj_log.info(url_form)
post_data = dict()
post_data['showUsername'] = self.show_username
post_data['username'] = self.username
post_data['password'] = self.password
res_form = req_session.post(url=url_form, data=post_data, allow_redirects=False, verify=False)
obj_log.info("ug_hhi_sso_login_by_session_id_url_form:{}".format(url_form))
obj_log.info("ug_hhi_sso_login_by_session_id_res_form:{}".format(res_form.headers))
# 进行sso.qa.sparkedu.com/oauth/authorize
url_authorize = "{}?client_id=manage&response_type=code&redirect_uri={}".format(
self.get_code_url, self.manage_url)
res_authorize = req_session.get(url_authorize, allow_redirects=False)
obj_log.info("ug_hhi_sso_login_by_session_id_url_authorize:{}".format(url_authorize))
obj_log.info("ug_hhi_sso_login_by_session_id_res_authorize:{}".format(res_authorize.headers))
# 获取到res_authorize的location地址(主要是用到code)
url_sso_oauth_login = res_authorize.headers.get('location')
obj_log.info("ug_hhi_sso_login_by_session_id_url_sso_oauth_login:{}".format(url_sso_oauth_login))
res_url_sso_oauth_login = req_session.get(url_sso_oauth_login, allow_redirects=False)
obj_log.info("ug_hhi_sso_login_by_session_id_res_url_sso_oauth_login:{}".format(res_url_sso_oauth_login.headers))
# 因为环境问题所以多请求一次域名
res_manage_host = req_session.get(self.manage_host)
obj_log.info("ug_hhi_sso_login_by_session_id_url_manage:{}".format(self.manage_host))
obj_log.info(
"ug_hhi_sso_login_by_session_id_res_manage_host:{}".format(res_manage_host.headers))
token_dict_temp = dict()
token_dict_temp[self.curt_user] = req_session
token_dict[self.host] = token_dict_temp
return req_session
def ug_sso_login_by_starlight(self):
"""
ug的starlight.qa.huohua.cn登录
:return:
"""
req_session = requests.session()
headers = {"Content-Type": "application/json"}
# 访问starlight.qa.huohua.cn/生成JSESSION_STARLIGHT
starlight_url = self.starlight_url
obj_log.info(starlight_url)
req_session.get(starlight_url, allow_redirects=False)
# sso登录
url_form = self.curt_sso_url
obj_log.info(url_form)
post_data = dict()
post_data['showUsername'] = self.show_username
post_data['username'] = self.username
post_data['password'] = self.password
res_form = req_session.post(url=url_form, data=post_data, allow_redirects=False, verify=False)
obj_log.info(f'res_form的header{res_form.headers}')
# 进行sso.qa.sparkedu.com/oauth/authorize
url_authorize = "{}?client_id=starlight&response_type=code&redirect_uri={}".format(
self.get_code_url, self.suyang_manage_sso_login_url)
res_authorize = req_session.get(url_authorize, allow_redirects=False)
obj_log.info(f"res_authorize的header{res_authorize.headers.get('location')}")
# 获取到res_authorize的location地址(主要是用到code)
req_session.get(res_authorize.headers.get('location'), allow_redirects=False)
# 访问https://starlight.qa.huohua.cn/
req_session.get(self.starlight_url, allow_redirects=False)
token_dict_temp = dict()
token_dict_temp[self.curt_user] = req_session
token_dict[self.host] = token_dict_temp
return req_session
class SparkleLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=None, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=None, current_evn=current_evn)
self.pc_token = None
self.session = requests.session()
# self.jira_id = get_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
# def sparkle_pc_login(self, phone, passwd=123456):
def sparkle_pc_login(self, phone, passwd='Mima@123'):
if token_dict.get(str(phone)):
return token_dict.get(str(phone))
url = self.sparkle_pc_token_url
data = {"username": phone, "userType": 2, "password": str(passwd)}
self.session.headers.update({"Authorization": self.spark_pc_auth})
resp = json.loads(self.session.post(url, params=data).content)
header = {"account-token": resp.get("data").get("accessToken")}
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
self.session.headers.update(podenv)
self.session.headers.update(header)
token_dict[str(phone)] = self.session
return self.session
class AgentApiLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=None, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=None, current_evn=current_evn)
self.agent_api_token = None
self.session = requests.session()
# self.jira_id = get_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def agent_api_login(self, phone, authCode, countryCode=86):
if token_dict.get(str(phone)):
return token_dict.get(str(phone))
url = '{}/login/loginByAuthCode'.format(self.hhr_api_host)
data = {"phone": str(phone), "authCode": str(authCode), "countryCode": str(countryCode)}
resp = json.loads(self.session.post(url, json=data).content)
header = {"user-token": resp.get("data").get("token")}
if self.jira_id:
podenv = {"huohua-podenv": self.jira_id}
self.session.headers.update(podenv)
self.session.headers.update(header)
token_dict[str(phone)] = self.session
return self.session
class MarketApiLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
InitConfig.__init__(self, run_user_name=None, current_evn=current_evn)
self.session = requests.session()
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def market_api_login(self, user=None):
login_url = self.market_url
result = parse.urlparse(url=login_url ,allow_fragments=True)
host = result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
if host in choose_user:
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name' ,options=host)
self.current_user = default_user
if user:
self.current_user = user
try:
login_user = ReadConfig().get_value(sections=self.current_user ,options='username')
login_password = ReadConfig().get_value(sections=self.current_user ,options='password')
except Exception as e:
login_user = ReadConfig(astwb_config).get_value(sections=self.current_user ,options='username')
login_password = ReadConfig(astwb_config).get_value(sections=self.current_user ,options='password')
post_data = {'phone': login_user ,'password': login_password}
resp = self.session.post(url=login_url ,json=post_data , verify=False,
headers={'huohua-podenv': self.jira_id})
if resp.status_code != 200:
raise KeyError('获取 市场管理投放系统 token失败')
set_cookie = resp.headers['set-cookie'].split(";")[-1].split(",")[1]
headers = {"Cookie":"gray_id=3077c7810744b47fc06ec513cf07801e; gray_tag=stable; "+set_cookie}
self.session.headers.update(headers)
return self.session
class AllSchool(InitConfig):
def __init__(self, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def all_school_token(self, user=None, login_type=1):
"""
:param user: 登录用户
:param login_type: 登录系统 1教师工作台
2机构工作台
3选课平台
:return: token
"""
if login_type == 1:
login_url = self.all_school_url
elif login_type == 2:
login_url = self.all_school_org_url
elif login_type == 3:
login_url = self.find_classes_url
result = parse.urlparse(url=login_url, allow_fragments=True)
host = result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
if host in choose_user:
if login_type == 2:
host = 'org-' + host
elif login_type == 3:
host = 'find-' + host
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name', options=host)
self.current_user = default_user
if user:
self.current_user = user
try:
login_user = ReadConfig().get_value(sections=self.current_user, options='username')
login_password = ReadConfig().get_value(sections=self.current_user, options='password')
except Exception as e:
login_user = ReadConfig(astwb_config).get_value(sections=self.current_user, options='username')
login_password = ReadConfig(astwb_config).get_value(sections=self.current_user, options='password')
if int(login_type) == 3:
post_data = {"data": {"email": login_user, "password": login_password, "countryCode": "86",
"loginType": "EMAIL_PSW"}}
else:
post_data = {'email': login_user, 'password': login_password}
as_token = requests.post(url=login_url, json=post_data, verify=False,
headers={'huohua-podenv': self.jira_id})
rtn_temp = as_token.json()
if as_token.status_code != 200:
raise KeyError('获取 allSchool token失败')
user_token = rtn_temp['data']['token']
return user_token
class ParentLogin(InitConfig):
def __init__(self,host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
@retry(stop_max_attempt_number=5, wait_fixed=3000)
def parent_send_msg(self, phone, authType=2, countryCode=None,
sessionId=None, **kwargs):
"""
| 功能说明: | M站学生端发送短信 |
| 输入参数: | phone | 手机号 |
| 返回参数: | XXX |
| 作者信息: | 作者 | 修改时间 |
举例说明:
| m_send_msg | |
"""
url = self.m_host + '/passport/auth_code/send'
post_data = {
"phone": phone,
"authType": authType,
"countryCode": countryCode,
"isLogin": False,
"sessionId": sessionId}
resp = requests.post(
url=url,
json=post_data,
verify=False)
time.sleep(2)
return resp
def get_parent_sms_code(self, phone):
"""
M站获取手机验证码
:param phone:电话号码
:return: token
"""
auth_url = self.curt_sso_url
get_code_url = self.get_code_url
token_url = self.get_token_url
sms_url = "https://smm.qa.huohua.cn/sms/log/page/all?WHERE.%5BEQ%5Dphone={}&WHERE.%5BEQ%5DtypeName=&WHERE.%5BEQ%5Dchannel=&WHERE.%5BEQ%5DgatewayType=&WHERE.%5BGT%5DcreateTime=&WHERE.%5BLT%5DcreateTime=&WHERE.%5BGT%5DsubmitTime=&WHERE.%5BLT%5DsubmitTime=&pageNum=1&pageSize=20&orderBy=id%20desc".format(
phone)
session = requests.session()
session.post(url=auth_url, verify=False,
data={'showUsername': 'liuruiquan', 'username': 'liuruiquan@huohua.cn', 'password': 'lrq5823LRQ'})
get_code = session.get(url=get_code_url, verify=False,
params={'client_id': 'crmnew', 'response_type': 'code',
'redirect_uri': 'https://crmv2.qa.huohua.cn'}, allow_redirects=False)
code_temp = get_code.headers
code = code_temp['location']
code = code.split("=")[-1]
token_get = session.post(url=token_url, verify=False, data={'code': code, 'redirect_uri': 'https://crmv2.qa.huohua.cn',
'grant_type': 'authorization_code'},
headers={'authorization': 'Basic Y3JtbmV3OmNybW5ldw=='})
token_temp = token_get.json()
token = {"accesstoken": token_temp['access_token']}
sms_temp = session.post(url=sms_url, verify=False, headers=token).json()
smm_code = sms_temp['list'][0]['msg']
reg = re.compile(r"(?<=手机短信验证码:)\d+")
match = reg.search(smm_code)
smm_code = match.group(0)
return smm_code
def get_parent_token(self, phone=None):
"""
M站token获取
:param phone:电话号码
:return: token
"""
login_url = "http://m.qa.huohua.cn/passport/login"
result = parse.urlparse(url=login_url, allow_fragments=True)
host = 'parent-' + result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
if host in choose_user:
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name', options=host)
if phone:
user = ReadConfig().get_value(sections=phone, options='username')
self.current_user = user
else:
phone = ReadConfig().get_value(
sections=default_user, options='username')
self.current_user = phone
if student_token_cache.get(self.current_user):
return student_token_cache[self.current_user]
send_code = self.parent_send_msg(phone=self.current_user).json()
# if not send_code['success']:
# raise ValueError(send_code)
verify_code = self.get_parent_sms_code(phone=self.current_user)
post_data = {"phone": self.current_user, "authCode": verify_code, "countryCode": None, "loginType": 1,
"userType": 1,
"subjectType": 0}
m_token = requests.post(url=login_url, json=post_data, verify=False, headers={'huohua-podenv': self.jira_id})
if m_token.status_code == 200:
token = m_token.json()
token = token['data']['token']
student_token_cache[self.current_user] = token
return token
else:
raise KeyError('获取 parent token失败')
def get_parent_token_by_pwd(self, phone=None):
env_name = get_current_config(section='run_evn_name', key='current_business')
obj_log.info("env_name:{}".format(env_name))
if env_name and env_name=='hhi':
login_url = "https://hhi-user-auth-api.qa.visparklearning.com/login"
elif 'hhi' in phone.split('-'):
login_url = "https://pst-gw.qa.huohua.cn/passport/login"
else:
login_url = "http://m.qa.huohua.cn/passport/login"
result = parse.urlparse(url=login_url, allow_fragments=True)
host = 'parent-' + result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
obj_log.info("host:{},choose_user:{}".format(host, choose_user))
if host in choose_user:
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name', options=host)
if phone.split('-')[1]:
self.current_user = phone.split('-')[1]
else:
phone = ReadConfig(config_file_path).get_value(
sections=default_user, options='username')
self.current_user = phone
else:
phone = ReadConfig(config_file_path).get_value(
sections='lzp', options='username')
self.current_user = phone
if student_token_cache.get(self.current_user):
return student_token_cache[self.current_user]
# 支持传入海外区号用户
if '+' in self.current_user :
phone_list =self.current_user.split("+")
countryCode = phone_list[1]
self.current_user = phone_list[0]
else:
countryCode = 86
post_data = {"phone": self.current_user, "password": "A123456", "countryCode": countryCode, "loginType": 2,
"userType": 1,
"subjectType": 0}
try:
m_token = requests.post(url=login_url, json=post_data, verify=False,
headers={'huohua-podenv': self.jira_id})
if m_token.status_code != 200:
raise KeyError('获取 parent token失败')
token = m_token.json()
token = token['data']['token']
student_token_cache[self.current_user] = token
return token
except :
raise TypeError('获取 parent token失败')
def get_xdu_parent_token_by_pwd(self, phone=None, host=None):
env_name = get_current_config(section='run_evn_name', key='current_business')
obj_log.info("env_name:{}".format(env_name))
login_url = 'http://'+host+"/app/sign/phone"
result = parse.urlparse(url=login_url, allow_fragments=True)
host = result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
obj_log.info("host:{},choose_user:{}".format(host, choose_user))
if host in choose_user:
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name', options=host)
post_data = {"phone": phone, "verifyCode": 1111, "countryCode": 86, "type": 1}
try:
m_token = requests.post(url=login_url, json=post_data, verify=False,
headers={'huohua-podenv': self.jira_id})
if m_token.status_code != 200:
raise KeyError('获取 parent token失败')
token = m_token.json()
token = token['data']['token']
student_token_cache[self.current_user] = token
return token
except :
raise TypeError('获取 parent token失败')
class StudentLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def get_student_token_by_pwd(self, phone=None):
env_name = get_current_config(section='run_evn_name', key='current_business')
if env_name and env_name=='hhi':
login_url = "https://hhi-core-api.qa.visparklearning.com/token"
else:
login_url = "https://core-api.qa.huohua.cn/token"
result = parse.urlparse(url=login_url, allow_fragments=True)
host = result.hostname
choose_user = ReadConfig(env_choose_path).get_options('run_user_name')
if host in choose_user:
default_user = ReadConfig(env_choose_path).get_value(
sections='run_user_name', options=host)
if phone:
self.current_user = phone
else:
phone = ReadConfig(config_file_path).get_value(
sections=default_user, options='username')
self.current_user = phone
else:
phone = ReadConfig(config_file_path).get_value(
sections='lzp', options='username')
self.current_user = phone
if student_token_cache.get(self.current_user):
return student_token_cache[self.current_user]
# 支持传入海外区号用户
if '+' in self.current_user :
phone_list =self.current_user.split("+")
countryCode = phone_list[1]
self.current_user = phone_list[0]
else:
countryCode = 86
post_data = {"phone": self.current_user, "password": "A123456", "countryCode": countryCode, "authCodeType":2,"loginType": 2}
try:
m_token = requests.post(url=login_url, json=post_data, verify=False, headers={'huohua-podenv': self.jira_id})
if m_token.status_code != 200:
raise KeyError('获取 student token失败')
token = m_token.json()
token = token['data']['token']
student_token_cache[self.current_user] = token
return token
except :
raise TypeError('获取 student token失败')
class SparkEduLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
team_name = ReadConfig(env_choose_path).get_value(sections='run_evn_name', options="current_team")
def get_spark_edu_graphic_code(self):
"""
功能:获取图形验证码
"""
code_url = "https://api.qa.sparkedu.com/third/passport/captcha"
post_data = {"width":240,"height":90}
resp = requests.post(url=code_url, json=post_data, verify=False, headers={'huohua-podenv': self.jira_id})
resp = resp.json()
return resp['data']['captchaId']
def check_spark_edu_graphic_code(self, captcha_id, captcha='1234',):
"""
功能:校验图形验证码
captcha_id图形验证码id由get_spark_edu_graphic_code返回
captcha图形验证码默认1234需事先配置好
"""
check_url = "https://api.qa.sparkedu.com/third/passport/captcha/check"
post_data = {"captchaId":captcha_id,"captcha":"1234"}
resp = requests.post(url=check_url, json=post_data, verify=False, headers={'huohua-podenv': self.jira_id})
resp = resp.json()
print(resp)
def spark_edu_login_by_code(self, phone=None, auth_code=None,
auth_code_type=17, country_code=86, language="zh-HK", platform="1",
login_type=1, user_type=1):
"""
功能:验证码登录
phone: 登录用户手机号
auth_code验证码这里默认验证码为1234需提前配置
其余参数:默认
"""
login_url = "https://api.qa.sparkedu.com/third/passport/login"
if not phone:
# 如果没有传入phone则从小组配置文件中读取
team_config = os.path.abspath(os.path.join(ROOT_PATH, 'UBJ/library/Config/team_config.ini'))
phone = ReadConfig(team_config).get_value(sections='default_spark_edu_user', options='phone')
if not auth_code:
# 如果没有传入验证码,则从小组配置文件中读取
team_config = os.path.abspath(os.path.join(ROOT_PATH, 'UBJ/library/Config/team_config.ini'))
auth_code = ReadConfig(team_config).get_value(sections='default_spark_edu_user', options='auth_code')
post_data = {"phone":phone,
"authCode":auth_code,
"countryCode":country_code,
"authCodeType":auth_code_type,
"language":language,
"platform":platform,
"loginType":login_type,
"userType":user_type}
m_token = requests.post(url=login_url, json=post_data, verify=False, headers={'huohua-podenv': self.jira_id})
if m_token.status_code == 200:
token = m_token.json()
return token['data']['token']
else:
print(m_token.content)
raise KeyError('获取 student token失败')
class SparkSaasLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.team_config = os.path.abspath(os.path.join(ROOT_PATH, 'BIZ/library/Config/team_config.ini'))
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def login_biz_saas(self, phone="", email="", password="", country_code="", auth_code="9999"):
"""
Args:
phone:
email:
password: 密码,
country_code: 86, config.country_code - 目前有问题暂时写到86
auth_code: 验证码
Returns:
"""
url = "https://sc-saas-api.qa.huohua.cn/api/session/integration/login"
team_config = os.path.abspath(os.path.join(ROOT_PATH, 'BIZ/library/Config/team_config.ini'))
a = ReadConfig(team_config).get_value(sections='saas_org_info', options='SAAS_BELONG_SCHOOL')
country_code = ReadConfig(team_config).get_value(sections='saas_org_info', options='COUNTRY_CODE')
data = {
}
if phone and password == "":
data["authCode"] = auth_code
data["loginType"] = 2 # 登录类型
data["phone"] = str(phone)
data["countryCode"] = country_code
elif phone and password:
data["password"] = password
data["loginType"] = 1 # 登录类型
data["phone"] = str(phone)
data["countryCode"] = country_code
elif email and password:
data["password"] = password
data["loginType"] = 3 # 登录类型
data["email"] = email
elif phone == "" and password == "":
phone = ReadConfig(team_config).get_value(sections='saas_org_info', options='SAAS_NAME')
data["authCode"] = auth_code
data["loginType"] = 2 # 登录类型
data["phone"] = str(phone)
data["countryCode"] = country_code
print(str(country_code),str(phone),str(auth_code))
else:
print("登录方式错误...")
return False
print("开始登录 saas 系统..., 打印登录信息: {}".format(phone))
response = requests.post(url=url, json=data, verify=False, headers={'huohua-podenv': self.jira_id, "Content-Type": "application/json;charset=UTF-8"})
if response:
pass
else:
print("登录 SaaS 系统失败...{}".format(response))
return False
print(response.text)
token = response.json()['data']['token']
status = response.status_code
if status:
if status == 200 and token:
print("登录 SaaS 系统成功..., Token: {}".format(token[0]))
print("写入到config配置文件...")
return token
else:
print("登录 SaaS 系统失败...{}".format(response))
return False
else:
print("登录 SaaS 系统失败...{}".format(response))
return False
class SparkSaasTeacherLogin(InitConfig):
def __init__(self, host=None, curt_user=None, current_evn=None):
# super().__init__(run_user_name=curt_user, current_evn=current_evn)
InitConfig.__init__(self, run_user_name=curt_user, current_evn=current_evn)
self.team_config = os.path.abspath(os.path.join(ROOT_PATH, 'BIZ/library/Config/team_config.ini'))
self.jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
def rsa_encrypt(self, text):
"""
Args:
text:
Returns:
"""
public_key = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDGsNh8Pu1zY9SrblsJDuITHQ+ObAyjGbQARgh3TQtaz5vYQ+avTZJJsJLxiZ5hpf1/5OCb3QZRzbWebrk9cgU892GGc8MDBtH7l/I1GrZq2mY9LO4wWjA0gC7qK0o3yqAwwNyh3uXkSsN6KlpXnac1G1o+Sjlr1P1IXoB4I5MlAQIDAQAB"
public_key_str = "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----".format(public_key)
# 字符串指定编码转为bytes
text = text.encode('utf-8')
# 构建公钥对象
cipher_public = PKCS1_v1_5.new(RSA.importKey(public_key_str))
# 加密bytes
text_encrypted = cipher_public.encrypt(text)
# base64编码并转为字符串
text_encrypted_base64 = base64.b64encode(text_encrypted).decode()
return text_encrypted_base64
def spark_login_teacher(self, phone, auth_code='9999', auth_code_type=2, country_code="", pass_word="abc123", login_type="验证码"):
"""
Args:
phone:
auth_code:
auth_code_type:
country_code:
login_type: 验证码,密码
pass_word: 密码
Returns: token
"""
team_config = os.path.abspath(os.path.join(ROOT_PATH, 'BIZ/library/Config/team_config.ini'))
country_code = ReadConfig(team_config).get_value(sections='saas_org_info', options='TEACHER_COUNTRY_CODE')
data = {
"authCodeType": auth_code_type,
"countryCode": country_code
}
phone_encrypt = SparkSaasTeacherLogin().rsa_encrypt(str(phone))
data["phone"] = phone_encrypt
if login_type == "验证码":
print("教师端验证码登录")
url = "https://sc-mce-teacher-api.qa.huohua.cn/api/session"
data["authCode"] = auth_code
print(data)
else:
print("教师端密码登录")
url = "https://sc-mce-teacher-api.qa.huohua.cn/api/session/phone_pass"
pass_word_encrypt = SparkSaasTeacherLogin().rsa_encrypt(pass_word)
data["password"] = pass_word
print("开始登录 教师端 系统..., 打印登录手机号码, {}".format(phone))
# response = self.request.http_request(url, method='post', json=data)
response = requests.post(url=url, json=data, verify=False, headers={'huohua-podenv': self.jira_id, "Content-Type": "application/json;charset=UTF-8"})
if response:
pass
else:
print("登录 教师端 系统失败...{}".format(response))
return False
token = response.json()['data']['token']
status = response.status_code
if status:
if status and token:
print("登录 教师端 系统成功..., Token: {}".format(token))
print("写入到config配置文件...")
return token
else:
print("登录 教师端 系统失败...{}".format(response))
return False
else:
print("登录 教师端 系统失败...{}".format(response))
return False
if __name__ == '__main__':
test = LoginSys()
id = test.ug_sso_login_by_starlight()
print(id)
headers = {"Content-Type": "application/json;charset=UTF-8"}
kwargs = {"lessonQueryModel":{},"pageModel":{"pageNum":1,"pageSize":10}}
url = "https://starlight.qa.huohua.cn/api/lesson/queryList"
resp = id.post(url=url, json=kwargs, headers=headers, allow_redirects=False, verify=False)
print(resp)