Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

169 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# encoding: utf-8
# @Time : 2022/4/18 上午10:36
# @Author : chenjiang
# @Site :
# @File : edu_user_helper.py
import requests
# import py_eureka_client.eureka_client as eureka_client
from base_framework.public_tools.sqlhelper import MySqLHelper
from base_framework.public_tools.my_faker import MyFaker
from base_framework.public_tools import log
from base_framework.public_tools.utils import Tools
obj_log = log.get_logger()
obj_my_faker = MyFaker()
obj_my_sql_helper = MySqLHelper()
obj_tools = Tools()
# def get_ip_by_server_name(env_name, service_name, type):
# """
# 根据服务名称环境as/hh
# :param env_name: 环境信息
# :param service_name: 服务名称
# :param type: as/hh
# :return:
# """
# all_school_eureka_server= 'http://eureka.qa.allschool.com/eureka/'
# hh_eureka_server = 'http://eureka.qa.huohua.cn/eureka/'
# try:
# if type.lower() == 'as':
# eureka_server = all_school_eureka_server
# else:
# eureka_server = hh_eureka_server
#
# eureka_client.init(
# eureka_server=eureka_server,
# app_name="ASC--",
# instance_ip="127.0.0.1",
# instance_port=8080)
# client = eureka_client.get_client()
# app = client.applications.get_application(service_name)
# ip_list = []
# for app_in in app.up_instances:
# if app_in.metadata.get('ver').lower() == env_name.lower() and app_in.ipAddr:
# ip_list.append(app_in.ipAddr)
# else:
# pass
# if ip_list:
# return ip_list
# else:
# obj_log.info("未获取到ip")
# return False
# except Exception as e:
# obj_log.info(e)
# return False
class EDUUserHelper:
"""
用户中心用户相关操作
"""
def __init__(self):
pass
def get_unregistered_phone(self):
"""
| 功能说明: | 获取未注册的手机号 |
| 输入参数: | |
| 返回参数: | phone |
| 作者信息: | 陈江 | 2022/4/18 |
"""
phone = obj_my_faker.gen_phone_number()
if phone:
is_exit = obj_my_sql_helper.select_one(
'SELECT id FROM `ucenter`.`user_profile` WHERE `phone` = \'{}\''.format(phone[0]))
if is_exit:
obj_log.info('{}手机号码已经存在,正在重新获取'.format(phone))
self.get_unregistered_phone()
else:
return phone[0]
else:
obj_log.error('生成手机号码失败')
return False
def get_unused_email(self):
"""
| 功能说明: | 获取未使用的邮箱 |
| 输入参数: | |
| 返回参数: | email|
| 作者信息: | 陈江 | 2022/4/18 |
"""
email = obj_my_faker.gen_email()
if email:
is_exit = obj_my_sql_helper.select_one(
'SELECT id FROM `ucenter`.`user_contact` WHERE `contact_info` = \'{}\''.format(email))
if is_exit:
obj_log.info('{}邮箱已经存在,正在重新获取'.format(email))
self.get_unused_email()
else:
return email
else:
obj_log.error('生成邮箱失败')
return False
def get_sms_code_by_phone(self, phone):
"""
| 功能说明: | 根据手机号码获取短信验证码 |
| 输入参数: | phone | 手机号|
| 返回参数: | 验证码 |
| 作者信息: | 陈江 | 2022/4/18 |
"""
# 根据phone获取phone_code
phone_server_ip = obj_tools.get_container_ip_from_eureka('PHONE-SERVER', need_jira_id='qa',
eureka_url='http://eureka.qa.huohua.cn')
if phone_server_ip.get('container_ip'):
url = 'http://{}:8080/encrypt/regdata?biztype=phone&uid=123456&sourceData={}'.format(
phone_server_ip.get('container_ip'), phone)
response = requests.post(url=url) # 三个参数
response_json = response.json()
else:
obj_log.info('未获取到phone-server的ip')
return False
if response_json.get('data'):
msg = obj_my_sql_helper.select_all(
'SELECT msg FROM `push_service`.`sms` WHERE `phone_code` = \'{}\' ORDER BY id DESC LIMIT 10 '.format(
response_json.get('data')))
if msg:
for m in msg:
try:
if ',' in m.get('msg'):
msg_split = m.get('msg').split(',')[0]
elif '' in m.get('msg'):
msg_split = m.get('msg').split('')[0]
else:
return False
if ':' in msg_split:
code = msg_split.split(':')[1].strip(' ')
elif '' in msg_split:
code = msg_split.split('')[1].strip(' ')
else:
import re
pattern = r'\b(\d{4,6})\b.*?verification code'
match = re.search(pattern, msg_split)
if match:
verification_code = match.group(1)
return verification_code
return False
return code
except Exception as e:
return e
else:
obj_log.info('未找到发送的短信记录')
return False
else:
obj_log.info('获取{}该手机号的phonecode失败'.format(phone))
return False
if __name__ == '__main__':
a = EDUUserHelper()
print(a.get_sms_code_by_phone('13563963497'))
# c = {'msg': 'SMS verification code: 6378, valid within 10 minutes, please ignore if you are not operating by yourself.'}
#
# print(c.get('msg').split(',')[0].split(':')[1].strip(' '))