Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

1546 lines
82 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
"""
Author: qiaoxinjiu
Create Data: 2020/9/24 11:39
"""
import base64
import datetime
import hashlib
import calendar
import math
import re
import time
import pytz
from os import listdir
from dateutil.relativedelta import relativedelta
from chinese_calendar import is_workday, is_holiday
import jsonpath
import requests
from bs4 import BeautifulSoup
from base_framework.base_config.current_pth import *
from base_framework.public_tools import log
from base_framework.public_tools.read_config import InitConfig
from base_framework.public_tools.read_config import ReadConfig
from base_framework.public_tools.runner import Runner
from base_framework.public_tools.sqlhelper import MySqLHelper
from base_framework.public_tools.custom_error import BusinessError
obj_runner = Runner()
obj_log = log.get_logger()
obj_evn_cfg = ReadConfig(env_choose_path)
class Tools(InitConfig):
def __init__(self):
# super().__init__()
InitConfig.__init__(self, run_user_name=None, current_evn=None)
# self.tool_jira_id = tool_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
self.db_con = MySqLHelper()
def get_jira_id_from_ini(self):
# cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# ini_path = cfg_path + '/ConfigFiles/env_choose.ini'
tool_jira_id = None
try:
tool_jira_id = obj_evn_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
except Exception as e:
print(e)
return tool_jira_id
def get_md5(self, str):
m = hashlib.md5()
m.update(str.encode("utf8"))
md5 = m.hexdigest()
return md5
def get_format_date(self, r_week=None, r_time=None, t_time=None, r_schedule=None,
r_type=1,
add_months=0, add_weeks=0, add_days=0, add_hours=0, add_minutes=0, add_seconds=0, minutes=60):
"""
| 功能说明: | 返回指定格式的日期类型 | |
| 输入参数: | | |
| | r_schedule | 传入1的时候直接返回('2023-08-18 15:44:57', '2023-08-18', '15:44:57', 6, '15:45')格式数据,用于教务的档期|
| | r_week | 传入周几,返回下个周几的日期 |
| | r_time | 需要转换的日期(格式):%Y-%m-%d %H:%M:%S, 如果不传则默认取系统当前日期, 时间戳格式不支持此参数 |
| | t_time | 需要转换的日期(格式):时间戳,秒级,默认取前十位数,当 r_time=None 时生效 |
| | r_type | 返回格式:详见下面的明细 |
| | add_month | 月 偏移量0-当前1-下月,-1上月,时间戳格式不支持此参数 |
| | add_weeks | 周 偏移量0-当前1-下周,-1上周 |
| | add_days | 日 偏移量0-当前1-明天,-1昨天 |
| | add_hours | 小时偏移量0-当前1-后一小时,-1前一小时 |
| | add_minutes | 分钟偏移量0-当前1-后一分钟,-1前一分钟 |
| | add_seconds | 秒级偏移量0-当前1-下一秒,-1前一秒 |
| | minutes | 分钟数仅用于r_schedule不为空的时候 |
| 返回参数: | 按r_type指定的类型返回 |
| 作者信息: | 吴勇刚 | 2019.05.24 |
r_type类型列表
| 类型编号 | 类型样式 | 实例 |
| 1 | %Y-%m-%d | 2019-05-23 |
| 2 | %Y年%m月%d日 | 2019年5月23日 |
| 3 | %Y年%m月%d日周X | 2019年5月23日星期四 |
| 4 | %Y-%m-%d %H:%M:%S | 2021-06-28 21:54:46 |
| 5 | %Y-%m-%d %H:%M:%S.%f | 2021-06-28 21:54:46.205213 |
| 6 | %Y-%m | 2019-05 |
| 7 | %Y%m%d | 20190505 |
| 8 | %Y%m%d%H%M%S | 20190505102030 |
| 11 | 原始时间戳 | 1499825149.2578925 |
| 12 | 秒级时间戳 | 1499825149 |
| 13 | 毫秒级时间戳 | 1499825149257 |
| 14 | 微秒级时间戳 | 1499825149257892 |
| 15 | %Y-%m-%d 00:00:00 | 2021-06-28 00:00:00 |
| 16 | %Y-%m-%d 23:59:59 | 2021-06-28 23:59:59 |
| 17 | %Y_%m_%d | 2021_06_28 |
| 18 | 返回档期所需的相关格式 | 0:%Y-%m-%d %H:%M:%S; 1:"%H:%M"; 2:档期格式专用week原week+1; 3:"%Y-%m-%d %H:%M"; 4:原week |
| 19 | 类型15对应的时间戳 | 1640620800000 |
| 20 | 毫秒级时间戳,分秒均为0 | 1499825000000 |
| 21 | 毫秒级时间戳,秒毫秒均为0 | 1499825000000 |
| 22 | 时间拆分为年 月 日 时间 24H制 | {'year':'','month':'','day':'','time_24':{'short_time':'','full_time':''},'time_12h':{'short_time':'','full_time':''}} |
| 23 | %Y%m | 202303 |
| 24 | 下周一的当前时间 | 2021-07-05 21:54:00 |
| 25 | 本月的最后一天 | 2021-07-31 23:59:59 |
| 26 | 本月的第一天 | 2021-07-01 00:00:00 |
| 28 | 当前时间所在周周一 | 2021-07-01 00:00:00 |
| 29 | 当前时间所在周周日 | 2021-07-01 23:59:59 |
| 91 | 对应年份的第几周 | (2022,35,1):2022年第35周第1天 |
| 92 | 对应年份的第几周 | 202235:2022年第35周 |
函数路径src/Public/Common/utils.py
"""
r_type = int(r_type)
add_months = int(add_months)
add_weeks = int(add_weeks)
add_days = int(add_days)
add_hours = int(add_hours)
add_minutes = int(add_minutes)
add_seconds = int(add_seconds)
if r_schedule is not None:
if r_schedule == 1:
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
update_time = (datetime.datetime.now() + datetime.timedelta(minutes=minutes))
weeks = weeks[str(update_time.isoweekday())] # 获取时间周几
return update_time.strftime("%Y-%m-%d %H:%M:%S"), update_time.strftime(
"%Y-%m-%d"), update_time.strftime(
"%H:%M:%S"), weeks, update_time.strftime("%H:%M")[0:4] + "5"
else:
raise Exception("r_schedule参数输入错误请重新输入")
if r_time is None and t_time is None and r_week is None:
r_date = datetime.datetime.now() + datetime.timedelta(weeks=add_weeks,
days=add_days,
hours=add_hours,
minutes=add_minutes,
seconds=add_seconds)
t_date = time.time() + ((add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
else:
if r_week:
today = datetime.datetime.now()
days_ahead = r_week - 1 - today.weekday() # 4 corresponds to Friday (0 is Monday, 6 is Sunday)
if days_ahead < 0:
days_ahead += 7
date_time = today + datetime.timedelta(days=days_ahead)
elif r_time:
try:
date_time = datetime.datetime.strptime(r_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
raise ValueError("输入时间格式错误,请输入格式为:%Y-%m-%d %H:%M:%S 的时间字符串")
elif t_time:
if len(str(t_time)) > 10:
t_time = int(str(t_time)[0:10])
try:
tmp_time = datetime.datetime.fromtimestamp(t_time)
# date_time = tmp_time.strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.fromtimestamp(t_time)
except ValueError as e:
raise ValueError("输入时间格式错误,错误原因:{}".format(e))
r_date = date_time + datetime.timedelta(weeks=add_weeks,
days=add_days,
hours=add_hours,
minutes=add_minutes,
seconds=add_seconds)
t_date = time.mktime(date_time.timetuple()) + (
(add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
if add_months >= 0:
r_date = r_date + relativedelta(months=add_months)
else:
r_date = r_date - relativedelta(months=abs(add_months))
if r_type == 1:
return r_date.strftime("%Y-%m-%d")
elif r_type == 2:
return str(r_date.year) + "" + str(r_date.month) + "" + str(r_date.day) + ""
elif r_type == 3:
f_date = str(r_date.year) + "" + str(r_date.month) + "" + str(r_date.day) + ""
return f_date + self.get_week_by_date(format_date=r_date.strftime("%Y-%m-%d"), week_type='Chinese')
elif r_type == 4:
return r_date.strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 5:
return r_date.strftime("%Y-%m-%d %H:%M:%S.%f")
elif r_type == 6:
return r_date.strftime("%Y-%m")
elif r_type == 7:
return r_date.strftime("%Y%m%d")
elif r_type == 8:
return r_date.strftime("%Y%m%d%H%M%S")
elif r_type == 11:
return t_date
elif r_type == 12:
return int(t_date)
elif r_type == 13:
return int(round(t_date * 1000))
elif r_type == 14:
return int(round(t_date * 1000000))
elif r_type == 15:
return "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
elif r_type == 16:
return "{} 23:59:59".format(r_date.strftime("%Y-%m-%d"))
elif r_type == 17:
return r_date.strftime("%Y_%m_%d")
elif r_type == 18:
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
weeks = weeks[str(r_date.isoweekday())] # 获取时间周几
origin_week = int(str(r_date.isoweekday()))
return r_date.strftime("%Y-%m-%d %H:%M:%S"), r_date.strftime("%H:%M")[0:4] + "5", weeks, r_date.strftime(
"%Y-%m-%d %H:%M"), origin_week
elif r_type == 19:
today = "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
time_array = time.strptime(today, "%Y-%m-%d %H:%M:%S")
timestamp = int(time.mktime(time_array))
return str(timestamp * 1000)
elif r_type == 20:
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:00:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
elif r_type == 21:
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:%M:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
elif r_type == 23:
return r_date.strftime("%Y%m")
elif r_type == 24: # 返回下周一的当前时间
today = datetime.datetime.today()
return (today + datetime.timedelta(days=7 - today.weekday())).strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 25:
last_date = calendar.monthrange(year=r_date.year, month=r_date.month)
return "{}-{} 23:59:59".format(r_date.strftime("%Y-%m"), last_date[1])
elif r_type == 26:
return "{}-{}-01 00:00:00".format(r_date.year, r_date.month)
elif r_type == 27:
# 时区转换
# 将日期和时间转换为UTC时区
now_utc_time = r_date.astimezone(pytz.utc)
obj_time = now_utc_time.isoformat()[:19]+"Z"
return obj_time, r_date.strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 28:
now = datetime.datetime.now()
last_monday = now - datetime.timedelta(days=now.weekday())
return f"{last_monday.strftime('%Y-%m-%d')} 00:00:00"
elif r_type == 29:
now = datetime.datetime.now()
last_sunday = now + datetime.timedelta(days=6 - now.weekday())
return f"{last_sunday.strftime('%Y-%m-%d')} 23:59:59"
elif r_type == 91:
return r_date.isocalendar()
elif r_type == 92:
t_week = r_date.isocalendar()
return "{}{}".format(t_week[0], t_week[1])
elif r_type == 22:
date_time_str = r_date.strftime("%Y-%m-%d %H:%M:%S")
date_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S')
year = date_obj.year
month = date_obj.month
day = date_obj.day
short_time_12h = date_obj.strftime('%I:%M %p').lower()
full_time_12h = date_obj.strftime('%I:%M:%S %p').lower()
short_time_24h = date_obj.strftime('%I:%M')
full_time_24h = date_obj.strftime('%I:%M:%S')
return {'year': year, 'month': month, 'day': day,
'time_24': {'short_time': f'{short_time_24h}', 'full_time': f'{full_time_24h}'},
'time_12h': {'short_time': f'{short_time_12h}', 'full_time': f'{full_time_12h}'}}
else:
raise Exception(
"调用get_format_date函数的r_type参数不正确....")
@staticmethod
def get_week_by_date(format_date, week_type='Arabic'):
"""
| 功能说明: | 返回format_date对应的星期 |
| 输入参数: | format_date | 要计算的日期格式2021-06-28 |
| | week_type | 返回星期的格式Arabic-阿拉伯数字Chinese-中文 |
| 返回参数: | 星期几0-6, 0-星期日 |
| 作者信息: | 吴勇刚 | 2021-06-08 |
函数路径src/Public/Common/utils.py
| format_date | week_type | 返回 |
| 2021-06-27 | Arabic | 0 |
| 2021-06-28 | Arabic | 1 |
| 2021-06-27 | Chinese | 星期日 |
| 2021-06-28 | Chinese | 星期一 |
"""
input_date = format_date.split('-')
if len(input_date) != 3:
raise Exception("输入值:{} 不是2021-06-08格式请检查".format(format_date))
year = int(input_date[0])
month = int(input_date[1])
day = int(input_date[2])
week = int(datetime.datetime(year, month, day).strftime("%w"))
if week_type == 'Arabic':
return week
elif week_type == 'Chinese':
weeks = ['星期日', '星期一', '星期二', '星期三', '星期四', '星期五', '星期六']
return weeks[week]
@staticmethod
def check_the_date_is_a_working_day(format_date=None):
"""
功能:判断给定日期是否为工作日
| 输入 | format_date | 日期格式如2021-09-01,不传则计算今天|
| 返回 | True工作日 | False,非工作日,即周末或法定节假日 |
| 作者 | 吴勇刚 | 2021.08.31 |
函数路径src/Public/Common/utils.py
"""
if not format_date: # 没有传入参数则默认计算当天是否为工作日
format_date = datetime.date.today()
else:
format_date = datetime.datetime.strptime(format_date, "%Y-%m-%d").date()
if is_workday(format_date) and not is_holiday(format_date):
return True
else:
return False
@staticmethod
def get_img_bas64(path):
img = open(path, 'rb')
img_bas64 = base64.b64encode(img.read())
img.close()
'''b 表示 byte的意思将byte转换回去'''
return str(img_bas64, 'utf-8')
@staticmethod
def get_files_path():
path = (os.path.split(os.path.realpath(__file__)))[0] + '\\' + "img"
return path
@staticmethod
def listdir(self): # 传入存储的list
list_name = []
path = self.get_files_path()
for file in os.listdir(path):
file_path = os.path.join(path, file)
if os.path.isdir(file_path):
listdir(file_path, list_name)
else:
list_name.append(file_path)
return list_name
@staticmethod
def del_file(file_name):
size = os.path.getsize(file_name)
file_size = 44 * 1024 # 100K
if size > file_size:
print('remove', size, file_name)
try:
os.remove(file_name)
except Exception as e:
print(e)
def modify_db_info(self, dbName=None, dbUsername=None,
dbPassword=None, dbHost=None, dbPort=None):
"""
Example usage:
| # explicitly specifies all db property values |
| modify_db_info | psycopg2 | my_db | postgres | s3cr3t | tiger.foobar.com | 5432 |
"""
if dbName:
self.db_rtn.set_section(
section='Mysql',
option='db_test_dbname',
value=dbName)
if dbUsername:
self.db_rtn.set_section(
section='Mysql',
option='db_test_user',
value=dbUsername)
if dbPassword:
self.db_rtn.set_section(
section='Mysql',
option='db_test_password',
value=dbName)
if dbHost:
self.db_rtn.set_section(
section='Mysql',
option='db_test_host',
value=dbHost)
if dbPort:
self.db_rtn.set_section(
section='Mysql',
option='db_test_port',
value=dbPort)
def __get_eureka_url_from_db(self, team, server):
"""
| 功能 | 从DB中获取服务对应的eureka_url供get_container_ip_from_eureka函数使用 |
"""
sql_str = "select eureka_url as hh, eureka_url_hhi as hhi from sparkatp.swagger_info where team='{0}' " \
"and server_name='{1}';".format(team, server)
res = self.db_con.select_one(sql=sql_str)
current_business = self.evn_cfg['run_evn_name']['current_business']
return res[current_business]
def get_container_ip_from_eureka(self, server_name, env=None, need_jira_id=False, jira_id_dev=None,
is_contain_qa=True, team_name=None, eureka_url=None):
"""
| 功能说明: | 获取Eureka指定服务的IP |
| 输入参数: | server_name | 服务名 |
| | env | QA or SIM |
| | need_jira_id | True从config.ini文件获取 False:取QA环境的IP |
| | jira_id_dev | 传入自定义独立环境编号 |
| | is_contain_qa | 当没有对应的独立环境时是否返回qa环境IP默认True返回 |
| | team_name | 小组名当eureka_url=None时用于确认eureka_url |
| | eureka_url | eureka地址没有传入则读取数据库中的值 |
| 返回参数: | dict | container_ip |
| 作者信息: | 吴勇刚 | 2022.03.27 |
举例说明:
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
"""
# 各项配置获取
if team_name is None: # 业务组
team_name = obj_evn_cfg.get_value(sections='run_evn_name', options='current_team')
if env is None: # 构建环境
env = obj_evn_cfg.get_value(sections='run_evn_name', options='current_evn')
if eureka_url is None: # eureka地址
eureka_url = self.__get_eureka_url_from_db(team=team_name, server=server_name)
# if env.upper() == "QA" and 'sim' in eureka_url.lower():
# eureka_url = eureka_url.replace('.sim.', '.qa.')
# elif env.upper() == "SIM" and 'qa' in eureka_url.lower():
# eureka_url = eureka_url.replace('.qa.', '.sim.')
jira_id = None
if jira_id_dev is None:
# if need_jira_id is False:
# jira_id = 'qa'
# else:
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
# if jira_id == "":
# jira_id = 'qa'
else:
jira_id = jira_id_dev
temp_text = requests.get(url=eureka_url).content
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
tree_dict = dict()
for temp in all_container_ip:
server = list(temp.parent.parent)[1].get_text()
server_ip = list(temp)
if server in tree_dict.keys():
tree_dict[server].extend(server_ip)
else:
tree_dict[server] = server_ip
server_name = server_name.upper()
return_dict = dict()
if server_name in tree_dict.keys():
if jira_id:
if jira_id in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in
filter(lambda x: x.split(':')[2] if len(x.split(':')) > 2 else "" == jira_id,
tree_dict[server_name])
if jira_id in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else: # 防止研发在本地启动了服务导致有多条数据 跳过办公网段的ip
obj_log.warning('跳过获取到的IP{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if 'allschool' in eureka_url:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
value=container_ip)
return return_dict
elif env.upper() == "SIM":
if len(tree_dict[server_name]) == 1: # sim环境如果只有一个ip就直接返回此ip
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name]][0]
return_dict['container_ip'] = container_ip
return return_dict
else:
if not is_contain_qa:
return_dict['container_ip'] = ''
return return_dict
# 如果没有对应的独立环境则返回qa环境的ip
if 'qa' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
return return_dict
elif 'groot' in str(tree_dict[server_name]):
"""解决allschool部分服务极限环境为groot而非qa的场景"""
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'groot' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
# if 'allschool' in eureka_url:
# ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
return return_dict
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[0]
for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][0]
return_dict['container_ip'] = container_ip
return return_dict
else:
return_dict['container_ip'] = ''
return return_dict
else:
# raise Exception('从{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
obj_log.error('{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
return False
# def get_url_by_eureka(self, server_name, jira_id=False):
# """
# | 功能说明: | 获取Eureka指定服务的IP后组装成需要的url前缀 |
# | 输入参数: | server_name | 服务名 |
# | 输入参数: | jira_id | 独立环境编号 |
# """
# svr_ip = self.get_container_ip_from_eureka(server_name=server_name, need_jira_id=jira_id)
# if svr_ip:
# return "http://" + svr_ip["container_ip"] + ":8080"
# else:
# return False
def get_container_ip_from_eureka_old(self, server_name, env=None, need_jira_id=False,
jira_id_dev=None, is_contain_qa=True):
"""
| 功能说明: | 获取Eureka指定服务的IP |
| 输入参数: | server_name | 服务名 |
| | env | QA or SIM |
| | need_jira_id | True从config.ini文件获取 False:取QA环境的IP |
| | jira_id_dev | 传入自定义独立环境编号 |
| 返回参数: | dict | container_ip |
| 作者信息: | 作者 林于棚 | 修改时间 |
举例说明:
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
"""
return_dict = dict()
tree_dict = dict()
# 针对于pz项目的白名单与其他项目的eureka地址不一致
pz_eureka_server = ["hulk-teaching-server", "hulk-teacher-api", "hulk-teacher-server", "hulk-teaching-listener",
"hulk-teaching-scheduler", "hulk-operation-api-server", "hulk-operation-listener",
"hulk-operation-server", "hulk-account-server",
'hulk-org-server', 'hulk-org-api', 'hulk-teacher-api', 'hulk-class-help-server',
'hulk-content-audit-server', "hulk-teach-backend-api", "hulk-teach-supply-cli-api",
"hulk-ark-gateway", "hulk-class-help-manager", "hulk-content-manage-gateway",
"hulk-teach-supply-server", "hulk-teach-classes-server", "classpod-platform-api",
"classpod-user-api",
"hulk-teach-content-server", "hulk-teach-content-cli-api", "hulk-teach-course-server"
]
if env is None:
env = obj_evn_cfg.get_value(
sections='run_evn_name', options='current_evn')
if env == "QA":
if server_name.lower() in pz_eureka_server:
# url = "http://eureka-asc.qa.huohua.cn/"
url = "http://eureka.qa.allschool.com/"
else:
url = "http://eureka.qa.huohua.cn/"
else:
if server_name.lower() in pz_eureka_server:
# url = "http://eureka-asc.sim.huohua.cn/"
url = "http://eureka.sim.allschool.com/"
else:
url = 'http://eureka.sim.huohua.cn/'
temp_text = requests.get(url=url).content
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
for temp in all_container_ip:
server = list(temp.parent.parent)[1].get_text()
server_ip = list(temp)
if server in tree_dict.keys():
tree_dict[server].extend(server_ip)
else:
tree_dict[server] = server_ip
if need_jira_id:
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
if jira_id == "":
jira_id = None
else:
jira_id = None
if jira_id_dev:
jira_id = jira_id_dev
server_name = server_name.upper()
if server_name in tree_dict.keys():
if jira_id:
if jira_id in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in
filter(lambda x: x.split(':')[2] == jira_id, tree_dict[server_name])
if jira_id in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.warning('跳过获取到的IP{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if server_name.lower() in pz_eureka_server:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
value=container_ip)
return return_dict
else:
if not is_contain_qa:
return_dict['container_ip'] = ''
return return_dict
if 'qa' in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if server_name.lower() in pz_eureka_server:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
return return_dict
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[
0] for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][
0]
return_dict['container_ip'] = container_ip
return return_dict
else:
return_dict['container_ip'] = ''
return return_dict
else:
obj_log.error('未获取到IP,请检查server_name:{} 是否正确!'.format(server_name))
return_dict['container_ip'] = ''
return return_dict
def com_get_value_from_json(self, json_object, json_path):
return jsonpath.jsonpath(json_object, json_path)
@staticmethod
def should_be_equal_as_list(sub_list, obj_list, ignore_nodes=[], check_len=False):
"""
功能判断两个list是否相等,不区分排序
| 输入参数: | sub_list | 列表1子列表 |
| | obj_list | 列表2目标列表 |
| | ignore_nodes | sub_list中要忽略的节点填需要从0开始 |
| | check_len | 是否检查长度默认不检查传入True时则检查两个list完全相同 |
| 作者: | 吴勇刚 | 2021.05. 16 |
函数位置src/Public/Common/utils.py
| 完全相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb']) | failed |
| 子集相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb'], [1,3]) | success |
"""
if isinstance(sub_list, str):
try:
sub_list = eval(sub_list)
except Exception as e:
print("输入参数类型错误:{}".format(e))
if isinstance(obj_list, str):
try:
obj_list = eval(obj_list)
except Exception as e:
print("输入参数类型错误:{}".format(e))
if not isinstance(sub_list, list) or not isinstance(obj_list, list):
raise RuntimeError("【ERROR】请求参数必现都是list类型sub_list{0} "
"obj_list:{1}".format(type(sub_list), type(obj_list)))
return
if check_len:
if len(obj_list) != len(sub_list):
raise RuntimeError("【ERROR】两个list的长度不同{}={},请检查...".format(len(sub_list), len(obj_list)))
return
equal = True
for index in range(len(sub_list)):
if index not in ignore_nodes:
if sub_list[index] not in obj_list:
print("【ERROR】{0} not in obj_list:{1}".format(sub_list[index], obj_list))
equal = False
if not equal:
raise RuntimeError("【ERROR】sub_list != obj_list请检查......")
@staticmethod
def should_be_equal_as_dict(sub_dict, obj_dict, ignore_keys=[]):
"""
功能判断两个list是否相等,不区分排序
| 输入参数: | sub_dict | 列表1子列表 |
| | obj_dict | 列表2目标列表 |
| | ignore_keys | sub_dict中要忽略的keys |
| 作者: | 吴勇刚 | 2021.05. 16 |
函数位置src/Public/Common/utils.py
| 完全相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}) | not equal |
| 子集相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}, ['c']) | equal |
"""
if not isinstance(sub_dict, dict) or not isinstance(obj_dict, dict):
raise RuntimeError("【ERROR】请求参数必现都是dict类型sub_dict{0} obj_dict:{1}".format(type(sub_dict),
type(obj_dict)))
return
if len(ignore_keys) == 0 and len(sub_dict) != len(obj_dict):
raise RuntimeError(
"【ERROR】两个dict的长度不相等sub={0},obj={1}".format(
len(sub_dict), len(obj_dict)))
return
equal = True
for key in sub_dict:
if key not in ignore_keys:
if key not in obj_dict:
print(
"【ERROR】{0} not in obj_list:{1}".format(
sub_dict[key], obj_dict))
equal = False
continue
if sub_dict[key] != obj_dict[key]:
print("【ERROR】'节点{key}':{sub} != {obj}".format(key=key,
sub=sub_dict[key],
obj=obj_dict[key]))
equal = False
if not equal:
raise RuntimeError("【ERROR】sub_dict != obj_dict请检查......")
def check_result(self, my_result, result_one=None,
result_two=None, position=None):
if result_one is None:
result_one = []
if result_two is None:
result_two = []
if position is None:
position = []
for key, value in my_result.items():
position.append(key)
if isinstance(value, dict):
Tools().check_result(value, result_one, result_two, position)
elif isinstance(value, list):
for index, it in enumerate(value):
position.append(index)
if isinstance(it, dict):
Tools().check_result(it, result_one, result_two, position)
elif isinstance(it, str):
data_in_file = Tools().generate(position)
result_one.append(it)
result_two.append(data_in_file)
elif isinstance(it, int):
data_in_file = Tools().generate(position)
result_one.append(it)
result_two.append(data_in_file)
position.pop()
else:
if isinstance(value, str):
data_in_file = Tools().generate(position)
result_one.append(value)
result_two.append(data_in_file)
elif isinstance(value, int):
data_in_file = Tools().generate(position)
result_one.append(value)
result_two.append(data_in_file)
position.pop()
return result_one, result_two
def generate(self, list_all):
y = ""
for i in list_all:
if isinstance(i, str):
y += '["{}"]'.format(i)
else:
y += '[{}]'.format(i)
return "obj_json" + y
def mq_send(self, topic, message_body, tag="tag",
key="key", pro="pro", **kwargs):
"""
已废弃请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
return_data = dict()
cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
ini_path = cfg_path + '/base_framework/base_config/env_choose.ini'
jira_id = ReadConfig(ini_path).get_value(
sections='run_jira_id', options='huohua-podenv')
post_data['topic'] = topic
post_data['messageBody'] = '{}'.format(message_body)
post_data['tag'] = tag
post_data['key'] = key
if pro == 'pro':
if jira_id:
post_data['pro'] = jira_id
else:
post_data['pro'] = 'qa'
else:
post_data['pro'] = pro
change_user = kwargs.get('user', None)
url = obj_runner.mq_host + '/topic/sendTopicMessage.do'
req_type = 'POST'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=url,
req_type=req_type,
json=post_data)
if resp['data']['sendStatus'] == 'SEND_OK':
obj_log.info('发送MQ消息成功{}:{}'.format(topic, message_body))
return True
else:
obj_log.error('发送MQ消息失败')
return False
def mq_send_by_sim(self, topic, message_body, tag="tag",
key="key", **kwargs):
"""
已废弃请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
post_data['topic'] = topic
post_data['messageBody'] = '{}'.format(message_body)
post_data['tag'] = tag
post_data['key'] = key
url = 'https://mq-console.sim.huohua.cn/topic/sendTopicMessage.do'
req_type = 'POST'
resp = obj_runner.call_rest_api(
user=None,
API_URL=url,
req_type=req_type,
json=post_data,
current_evn='sim')
if resp['data']['sendStatus'] == 'SEND_OK':
obj_log.info('发送MQ消息成功{}:{}'.format(topic, message_body))
return True
else:
obj_log.error('发送MQ消息失败')
return False
def get_mq_msg(self, topic, begin_time, end_time, **kwargs):
"""
已废弃请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
return_data = dict()
post_data['topic'] = topic
post_data['begin'] = str(time.mktime(time.strptime(begin_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
post_data['end'] = str(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
change_user = kwargs.get('user', None)
url = obj_runner.mq_host + '/message/queryMessageByTopic.query'
req_type = 'GET'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=url,
req_type=req_type,
params=post_data)
if resp['status'] == 0:
obj_log.info('查询MQ消息成功')
return resp['data']
else:
obj_log.error('查询MQ消息失败')
return False
@staticmethod
def delete_redis(key, db):
import redis
pool = redis.Redis(host='rediscourse.qa.huohua.cn', port=6379, password='Bkl6LvqfzFCzYPAh', db=db)
pool.delete(key)
@staticmethod
def should_be_exist(sql, number=1, raise_error=True, retry_time=0):
"""
| 功能说明: | 检查数据库中sql语句对应的结果应该存在number条数据 |
| 输入参数: | sql | 查询数据库对应的sql语句 |
| | number | 数据库应该存在多少条这样的数据 |
| | raise_error | True-结果不存在时抛异常阻塞后续流程False-结果不存在时返回false但不抛异常可继续流程 |
| | retry_time | 重试次数每次间隔1秒 |
| 返回参数: | 无 | sql条数匹配则通过否则抛出异常 |
| 作者信息: | 吴勇刚 | 2021.07.05 |
函数位置src/Public/Common/utils.py
"""
res = MySqLHelper().select_all(sql=sql)
if int(number) == 0 and int(retry_time) > 0:
rs_flag = False
for index in range(int(retry_time)):
time.sleep(1)
res = MySqLHelper().select_all(sql=sql)
if len(res) == int(number):
rs_flag = True
else:
rs_flag = False
return rs_flag
elif len(res) != int(number):
for index in range(int(retry_time)):
time.sleep(1)
res = MySqLHelper().select_all(sql=sql)
if len(res) != int(number):
continue
else:
return True
print("exec sql: {}".format(sql))
if raise_error:
raise Exception("check db failedexpect exist {0}, but act is {1}...".format(number, len(res)))
else:
return False
else:
return True
def should_be_not_contain_sub_json(self, obj_json, sub_json, sort_json=True):
"""
功能判断obj_json未包含sub_json
| 输入参数: | obj_json | 目标串 |
| | sub_json | 子串 |
| | sort_json | 是否对json进行排序默认-False不排序 |
| 作者: | 吴勇刚 | 2022.05.10 |
"""
tag_equal = True
# 优先替换不规则的nulltrue和false
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
# 根据类型走不同的递归函数
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json)
elif isinstance(obj_json, list) and isinstance(sub_json, list):
tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json)
else:
tag_equal = False
print("入参需为json串请检查输入参数....")
if tag_equal:
print("obj_json:{}".format(obj_json))
print("sub_json:{}".format(sub_json))
raise Exception("obj_json中包含了sub_json串校验不通过.....")
else:
print("obj_json中不包含sub_json校验通过")
def should_be_equal_as_json(self, obj_json, sub_json, sort_json=False, sort_key=None, sort_type='str'):
"""
功能判断sub_json是否被obj_json完全包含
| 输入参数: | obj_json | 目标串 |
| | sub_json | 子串 |
| | sort_json | 是否对json进行排序默认-False不排序 |
| | sort_key | [{},{}]结构排序时将sort_key对应value按升序排列 |
| | sort_type | 按sort_key排序时按int还是str类型对比默认str,非int时会按str对比 |
| 作者: | 吴勇刚 | 2021.07. 29 |
函数位置src/Public/Common/utils.py 用法举例如下:
| obj_json | sub_json | 调用方式 | 对比结果 | 备注说明 |
| {"a":11} | {"a":11} | should_be_equal_as_json(obj_json,sub_json) | 相等 | |
| {"a":11,"b":12} | {"a":11} | 同上 | 相等 | 对比原理是obj_json完整包含sub_json即可 |
| {"a":{"b":[1,2,3]}} | {"a":{"b":[1,2]} | 同上 | 相等 | 支持任一格式的嵌套 |
| {"a":11,"b":12} | {"a":11,"b":"not_check"} | 同上 | 相等 | value=not_check的节点不做检查 |
| [{"a":11},{"b":12}] | [{"a":11},{"b":12}] | 同上 | 相等 | 支持list格式对比 |
| [{"a":11},{"b":12}] | [{"b":12}{"a":11}] | 同上 | 不相等 | 因为list顺序不一样 |
| [{"a":11},{"b":12}] | [{"b":12}{"a":11}] | should_be_equal_as_json(obj_json,sub_json,sort_json=True) | 相等 | 对比前会自动排期 |
"""
tag_equal = True
# 优先替换不规则的nulltrue和false
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
# 根据类型走不同的递归函数
if sort_type.lower() not in ('str', 'int'):
sort_type = 'str'
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
# tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
r_equal = self._check_dict(obj_json, sub_json, sort_json, sort_key, sort_type)
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
tag_equal = r_equal
elif isinstance(obj_json, list) and isinstance(sub_json, list):
# tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
r_equal = self._check_list(obj_json, sub_json, sort_json, sort_key, sort_type)
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
tag_equal = r_equal
else:
raise BusinessError("入参仅支持json格式请检查输入参数")
if tag_equal:
print("obj_json中完全包含sub_json....")
else:
raise Exception("预期和实际结果不同,请检查前几步中的逻辑:\nobj_json:{}\nsub_json:{}".format(obj_json, sub_json))
# def _check_list(self, obj_list, sub_list, should_equal, sort_json=False, sort_key=None, sort_type='str'):
def _check_list(self, obj_list, sub_list, sort_json=False, sort_key=None, sort_type='str'):
"""should_be_equal_as_json的内置递归函数"""
should_equal = True
if len(obj_list) == len(sub_list) == 0: # 都为空,则直接返回
return True
elif len(obj_list) < len(sub_list): # 目标列表长度小于子列表
print("obj_json的长度是{}<{}sub_json的长度请检查...".format(len(obj_list), len(sub_list)))
return False
elif len(sub_list) == 0: # 子列表为空
return True
if sort_json:
if sort_key and sort_key in obj_list[0] and sort_key in sub_list[0]:
if sort_type.lower() == 'str':
obj_list.sort(key=lambda x: str(x[sort_key]))
sub_list.sort(key=lambda x: str(x[sort_key]))
elif sort_type.lower() == 'int':
obj_list.sort(key=lambda x: int(x[sort_key]))
sub_list.sort(key=lambda x: int(x[sort_key]))
else:
obj_list.sort(key=lambda x: str(x))
sub_list.sort(key=lambda x: str(x))
for index in range(len(sub_list)):
if index < len(obj_list):
if isinstance(sub_list[index], list) and isinstance(obj_list[index], list):
# should_equal = self._check_list(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_list(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif isinstance(sub_list[index], dict) and isinstance(obj_list[index], dict):
# should_equal = self._check_dict(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_dict(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif not isinstance(sub_list[index], list) and not isinstance(obj_list[index], list) and not isinstance(
sub_list[index], dict) and not isinstance(obj_list[index], dict):
if isinstance(sub_list[index], str) and sub_list[index] == "not_check":
continue
else:
if sub_list[index] in obj_list:
pass
else:
should_equal = False
print("{}中的第{}个元素:{}不存在于{}中,请检查..."
.format(sub_list, index + 1, sub_list[index], obj_list))
elif isinstance(sub_list[index], str) and sub_list[index] == "not_check":
continue
else:
should_equal = False
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_list[index],
type(sub_list[index]),
obj_list[index],
type(obj_list[index])))
else:
should_equal = False
print("sub_list的长度是{}!={}obj_list的长度请检查...".format(len(sub_list), len(obj_list)))
return should_equal
# def _check_dict(self, obj_dict, sub_dict, should_equal, sort_json=False, sort_key=None, sort_type='str'):
def _check_dict(self, obj_dict, sub_dict, sort_json=False, sort_key=None, sort_type='str'):
"""should_be_equal_as_json的内置递归函数"""
should_equal = True
for key in sub_dict:
if key in obj_dict:
if isinstance(sub_dict[key], list) and isinstance(obj_dict[key], list):
# should_equal = self._check_list(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_list(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif isinstance(sub_dict[key], dict) and isinstance(obj_dict[key], dict):
# should_equal = self._check_dict(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_dict(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif not isinstance(sub_dict[key], list) and not isinstance(sub_dict[key], dict) and not isinstance(
obj_dict[key], list) and not isinstance(obj_dict[key], dict):
if isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
continue
else:
if sub_dict[key] == obj_dict[key]:
pass
else:
should_equal = False
print("两个对比值中{0}对应的值不相同:{1} != {2},请确认...".format(key, sub_dict[key],
obj_dict[key]))
elif isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
continue
else:
should_equal = False
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_dict[key],
type(sub_dict[key]),
obj_dict[key],
type(obj_dict[key])))
else:
should_equal = False
print("key值{}不存在于目标串{}中,请检查...".format(key, obj_dict))
return should_equal
def remove_duplication(self, old_list):
"""
字典去重
"""
new_list = []
for i in old_list:
if i not in new_list:
new_list.append(i)
return new_list
def list_calculations(self, list_a, list_b, opt_type, removed_duplication=False):
"""
list运算
:return:
"""
# 交集
if removed_duplication:
list_a = self.remove_duplication(list_a)
list_b = self.remove_duplication(list_b)
if opt_type == 1:
# return {'intersection': list(set(list_a).intersection(set(list_b)))}
return {'intersection': [x for x in list_a if x in list_b]}
# 并集
elif opt_type == 2:
# return {'union': list(set(list_a).union(set(list_b)))}
list_a.extend(list_b)
return {'union': list_a}
# 差集
elif opt_type == 3:
# return {'a_has': list(set(list_a).difference(set(list_b))),
# 'b_has': list(set(list_b).difference(set(list_a)))}
return {'a_has_only': [x for x in list_a if x not in list_b],
'b_has_only': [x for x in list_b if x not in list_a]}
def compute_page_info(self, total_numbers=None, data_list=None, page_size=10, page_num=1):
"""
功能:根据总数,页码,每页数据条数计算查询分页信息
| 输入参数: | total_numbers | 总数据行数与data_list二传一即可 |
| | data_list | 原始数据list与total_numbers二传一即可 |
| | page_size | 每页数据条数 |
| | page_num | 当前页码 |
| 作者: | 吴勇刚 | 2021.09.06 |
函数位置src/Public/Common/utils.py
| 返回参数 | 格式{dict} |
| | total | 总数据量 |
| | pageNum | 当前页码 |
| | pageSize | 每页展示数据条数 |
| | size | 当前页实际展示条数 |
| | startRow | 当前页面展示的开始行数 |
| | endRow | 当前页面展示的结束行数 |
| | pages | 总页数 |
| | prePage | 前一个展示页面页码 |
| | nextPage | 后一个展示页面页码 |
| | isFirstPage | 是否为首页 |
| | isLastPage | 是否为尾页 |
| | hasPreviousPage | 是否有上一页 |
| | hasNextPage | 是否有下一页 |
| | navigatePages | 控件展示页码数一般固定为8 |
| | navigatepageNums | 控件展示的页码列表 |
| | navigateFirstPage | 控件开始页码 |
| | navigateLastPage | 控件结束页码 |
"""
re_data = {}
# 总数据量
if total_numbers:
re_data['total'] = int(total_numbers)
elif data_list:
re_data['total'] = len(data_list)
# 当前页码
re_data['pageNum'] = int(page_num)
# 每页数据条数
re_data['pageSize'] = int(page_size)
# 每页数据条数
if re_data['pageNum'] * re_data['pageSize'] <= re_data['total']:
re_data['size'] = re_data['pageSize']
elif (re_data['pageNum'] - 1) * re_data['pageSize'] < re_data['total'] < re_data['pageNum'] * re_data[
'pageSize']:
re_data['size'] = re_data['total'] - (re_data['pageNum'] - 1) * re_data['pageSize']
else:
re_data['size'] = 0
# 总页数
re_data['pages'] = math.ceil(re_data['total'] / re_data['pageSize'])
# 当前页面展示的开始行数,当前页面展示的结束行数
start_row = (re_data['pageNum'] - 1) * re_data['pageSize'] + 1
end_row = re_data['pageNum'] * re_data['pageSize']
if start_row > re_data['total']:
re_data['startRow'] = 0
re_data['endRow'] = 0
else:
re_data['startRow'] = start_row
if end_row > re_data['total']:
re_data['endRow'] = re_data['total']
else:
re_data['endRow'] = end_row
# 前一个展示页面页码
re_data['prePage'] = re_data['pageNum'] - 1
# 后一个展示页面页码
if end_row >= re_data['total']:
re_data['nextPage'] = 0
else:
re_data['nextPage'] = re_data['pageNum'] + 1
# 是否为首页,是否为尾页
if re_data['pageNum'] == 1:
re_data['isFirstPage'] = True
else:
re_data['isFirstPage'] = False
# 是否为尾页
if start_row > re_data['total'] or end_row < re_data['total']:
re_data['isLastPage'] = False
elif start_row <= re_data['total'] <= end_row:
re_data['isLastPage'] = True
# 是否有上一页
if not re_data['isFirstPage']:
re_data['hasPreviousPage'] = True
else:
re_data['hasPreviousPage'] = False
# 是否有下一页
if re_data['pageNum'] >= re_data['pages']:
re_data['hasNextPage'] = False
else:
if not re_data['isLastPage']:
re_data['hasNextPage'] = True
else:
re_data['hasNextPage'] = False
# 控件展示页码数一般固定为8
re_data['navigatePages'] = 8
# 控件结束页码
if 8 < re_data['pageNum'] + 3 <= re_data['pages']:
re_data['navigateLastPage'] = re_data['pageNum'] + 3
elif re_data['pageNum'] + 3 <= 8 < re_data['pages']:
re_data['navigateLastPage'] = 8
else:
re_data['navigateLastPage'] = re_data['pages']
if re_data['navigateLastPage'] - 8 + 1 > 0:
re_data['navigateFirstPage'] = re_data['navigateLastPage'] - 8 + 1
else:
re_data['navigateFirstPage'] = 1
# 控件展示的页码列表
re_data['navigatepageNums'] = [i for i in range(re_data['navigateFirstPage'], re_data['navigateLastPage'] + 1)]
return re_data
@staticmethod
def should_be_str_contain_str(src_str, *sub_str, raise_flag=True):
"""
功能判断src_str包含任意一个sub_str
| 输入参数: | src_str | 原始数据 |
| | *sub_str | 子数据 |
| | raise_flag | 不包含时是否抛出异常 |
| 作者: | 华学敏 | 2021.10. 08 |
函数位置src/Public/Common/utils.py
"""
for s_str in sub_str:
if str(s_str) in str(src_str):
return True
if raise_flag:
raise RuntimeError("{} not contain {}".format(str(src_str), str(sub_str)))
else:
return False
@staticmethod
def compare_data_equal_as_list(sub_list, obj_value):
"""
功能判断list中是否含某个值
| 输入参数: | sub_list | 列表,返回列表 |
| | obj_value | 目标值 |
| 作者: | qxj | 2021.06. 04 |
函数位置:
| 包含 | compare_data_equal_as_list([1,'a',2,'aa'], 1) | success |
"""
if not isinstance(sub_list, list):
raise RuntimeError("【ERROR】返回参数data必须是list类型sub_list{0}".format(type(sub_list)))
equal = False
for value in sub_list:
try:
sub_str = eval(obj_value)
except Exception as e:
sub_str = obj_value
if str(value) == str(sub_str):
equal = True
break
if equal:
return equal
else:
raise RuntimeError("【ERROR】返回参数data中没有找到值sub_list{0},valueP{1}".format(sub_list, obj_value))
def copy_account_to_hhi(self, email):
query_hhi_account = "SELECT id FROM hhi_account.account WHERE account_email='{}' limit 1;".format(email)
resp_hhi_account = self.db_con.select_one(query_hhi_account, choose_db='hhi')
if resp_hhi_account:
print("账号已在hhi存在请检查")
else:
query_account = "SELECT * FROM account.account WHERE account_email='{}' limit 1;".format(email)
resp_account = self.db_con.select_one(query_account, choose_db='huohua')
if not resp_account:
print("账号在火花库不存在,请检查!")
else:
# account.account
value_account_str = self._get_sql_value_by_dict(resp_account)
insert_hhi_account = "INSERT INTO `hhi_account`.`account`(`id`, `account_unique`, `account_name`, `account_email`, `area_code`, `phone_code`, `mask_phone`, `pwd`, `account_type`, `account_status`, `apply_resource`, `start_date`, `end_date`, `account_owner_id`, `account_dept_id`, `work_order_no`, `data_level`, `employee_id`, `fei_shu_id`, `dict_item_id`, `account_source`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`, `pwd_modified`) SELECT {} FROM hhi_account.account LIMIT 1;".format(
value_account_str)
print(insert_hhi_account)
self.db_con.insert_one(insert_hhi_account, choose_db='hhi')
update_end_date = "update `hhi_account`.`account` set end_date='2022-04-28',account_status=0 where account_email='{}'".format(
email)
self.db_con.execute(update_end_date, choose_db='hhi')
# account.account_email
query_account_email = "SELECT * FROM account.account_email WHERE account_email='{}' limit 1;".format(
email)
resp_account_email = self.db_con.select_one(query_account_email, choose_db='huohua')
if resp_account_email:
value_account_email_str = self._get_sql_value_by_dict(resp_account_email)
insert_hhi_account_email = "INSERT INTO `hhi_account`.`account_email`(`id`, `account_id`, `account_unique`, `account_email`, `first_choice`, `email_type_value`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`) SELECT {} FROM `hhi_account`.`account_email` LIMIT 1;".format(
value_account_email_str)
print(insert_hhi_account_email)
self.db_con.insert_one(insert_hhi_account_email, choose_db='hhi')
# emp.employee
query_employee = "SELECT `id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region` FROM `emp`.`employee` WHERE email='{}' limit 1;".format(
email)
resp_employee = self.db_con.select_one(query_employee, choose_db='huohua')
if resp_employee:
value_employee_str = self._get_sql_value_by_dict(resp_employee)
insert_hhi_employee = "INSERT INTO `hhi_emp`.`employee`(`id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region`) SELECT {} FROM `hhi_emp`.`employee` LIMIT 1;".format(
value_employee_str)
print(insert_hhi_employee)
self.db_con.insert_one(insert_hhi_employee, choose_db='hhi')
update_employee_info = "update `hhi_emp`.`employee` set birthday='2022-04-28 10:00:00',pre_prob_date='2022-05-16',dimission_time='2023-04-28 10:00:00',change_sign_time='2022-04-28 10:00:00' where email='{}'".format(
email)
self.db_con.execute(update_employee_info, choose_db='hhi')
# emp.employee_email
query_employee_email = "SELECT * FROM emp.employee_email where employee_email_value='{}' limit 1;".format(
email)
resp_employee_email = self.db_con.select_one(query_employee_email, choose_db='huohua')
if resp_employee_email:
value_employee_email_str = self._get_sql_value_by_dict(resp_employee_email)
insert_hhi_employee_email = "INSERT INTO `hhi_emp`.`employee_email`(`id`, `employee_id`, `employee_no`, `employee_email_type`, `employee_email_value`, `primary_flag`, `status`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`) SELECT {} FROM `hhi_emp`.`employee_email` LIMIT 1;".format(
value_employee_email_str)
print(insert_hhi_employee_email)
self.db_con.insert_one(insert_hhi_employee_email, choose_db='hhi')
@staticmethod
def _get_sql_value_by_dict(value_dict):
value_str = ""
for v in value_dict.values():
value_str = value_str + "'" + str(v) + "'" + ","
value_str = value_str[:-1]
# print(value_str)
return value_str
def wait_and_query_data(self, sql, wait_time=20):
"""
| 功能说明: | 针对数据入库有延迟的场景根据sql和循环的时间查询并返回数据库查询结果 |
| 输入参数: | sql | 查询数据库对应的sql语句 |
| | wait_time | 等待的最长时间 |
| 返回参数: | db_info | 数据库查询结果 |
函数位置src/Public/Common/utils.py
"""
try_time = 0
if sql:
while try_time < int(wait_time):
db_info = MySqLHelper().select_one(sql=sql)
if db_info:
obj_log.info('try_time={}'.format(try_time))
return db_info
else:
obj_log.info('try_time={}'.format(try_time))
time.sleep(5)
try_time += 5
else:
raise Exception('数据落库超时{}s请检查'.format(try_time))
else:
raise Exception('查询sql不能为空......')
def to_lower_camel(self, name, ):
"""下划线转小驼峰法命名"""
return re.sub('_([a-zA-Z])', lambda m: (m.group(1).upper()), name.lower())
def to_snake(self, name):
"""驼峰转下划线"""
name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
return name.lower()
def change_dict_key_format(self, obj_dict, c_type=1):
"""
将dict或[dict]中的所有key从下划线模式转为驼峰模式或者反向转换
Args:
obj_dict: 原始dict或list
c_type: 类型1-下划线转驼峰2-驼峰转下划线
Returns:
转化后的dict或list
"""
if isinstance(obj_dict, list):
new_dict = []
for item in obj_dict:
new_dict.append(self.__change_dict_key(c_dict=item, c_type=c_type))
return new_dict
elif isinstance(obj_dict, dict):
return self.__change_dict_key(c_dict=obj_dict, c_type=c_type)
def __change_dict_key(self, c_dict, c_type=1):
"""
内置函数供change_dict_key_format使用
Args:
c_dict: 原始dict
c_type: 类型1-下划线转驼峰2-驼峰转下划线
Returns:
转化后的dict
"""
new_dict = {}
if c_type == 1:
for k in c_dict.keys():
new_key = self.to_lower_camel(name=k)
new_dict[new_key] = c_dict[k]
elif c_type == 2:
for k in c_dict.keys():
new_key = self.to_snake(name=k)
new_dict[new_key] = c_dict[k]
return new_dict
def dict_to_sql_condition(self, dicts):
"""
| 功能说明: | 字典转sql条件默认会把驼峰key转为下划线| |
| 输入参数: | {}
| 返回参数: | WHERE key='' AND key=''
| 作者信息: | 陈江 | 2022.12.09 |
"""
try:
dicts = eval(dicts)
except:
dicts = dicts
if not dicts:
raise RuntimeError("参数不能为空")
condition = "WHERE "
count = 0
for k, v in dicts.items():
condition_key = self.to_snake(k)
if count == 0:
condition += condition_key + "='{}'".format(v)
count = count + 1
else:
condition += " AND " + condition_key + "='{}'".format(v)
return condition
def check_http_return_info(self, rsp, exp_code=200, exp_msg="", exp_success=True):
"""
功能说明检查http接口返回字段
入参:
| rsp | 要检查的接口返回值 |
| exp_code | 期望返回的code |
| exp_msg | 期望返回的message |
| exp_success | 期望返回的success |
返回值:无,检验通过则继续,否则抛出对应异常
"""
exp_info = {"code": exp_code,
"success": exp_success,
"message": exp_msg}
self.should_be_equal_as_json(obj_json=rsp, sub_json=exp_info)
@staticmethod
def update_default_parameters_by_input(input_parameters, default_parameters):
"""
功能用接口输入的参数去更新默认参数有则更新没有则在默认参数中新增要求两个参数都的是dict
Args:
input_parameters: 接口输入参数
default_parameters: 默认参数
Returns: 更新后的default_parameters
"""
for p_key in input_parameters:
default_parameters[p_key] = input_parameters[p_key]
return default_parameters
@staticmethod
def get_value_from_post_data_input(post_data_input, key, default_value=None):
"""
功能从用户入参post_data_input中找到对应key的value并返回没有则赋予default_value值并返回
Args:
post_data_input: 用户输入的入参
key: 期望从入参中找的key
default_value: 当入参为空或没有对应的key时默认赋予的值
Returns:
对应key中的value或default_value
"""
# post_data_input.get(key) == 'default' 是用于兼容GPT的调用
if post_data_input is None or post_data_input.get(key) is None \
or post_data_input.get(key) in ["", "0", "default"]:
return default_value
else:
return post_data_input.get(key)
@staticmethod
def get_value_from_kwargs(kwargs, key, required=0, default_value=None):
"""
功能从入参kwargs中获取指定key的参数值
Args:
kwargs: 参数dict
key: 参数key
required: 是否必填0-非必现1-必填若必须填不在kwargs里则抛出对应异常
default_value: 非必填项参数不在kwargs里时赋予的默认值
Returns: key在kwargs中的具体值或缺省时的默认值
"""
if required == 1 and key not in kwargs:
raise BusinessError("参数:{}是必填项,请出入....".format(key))
if key in kwargs:
return kwargs[key]
else:
return default_value
@staticmethod
def get_day_of_month():
"""
功能获取当天的日期号数比如今天是11月12号return 12
Returns:
"""
today = datetime.datetime.now()
return today.day
@staticmethod
def formate_msg_for_aita(r_msg):
"""
格式化输出用于aita页面展示的消息要求return_data
Args:
r_msg: 函数返回值,要求中含以下三个字段
for_gpt: 给gpt的内容默认第一个为标题原数据用\n字符分割,描述与值用:分割
for_user: 给用户在aita页面展示的内容原数据用\n字符分割,描述与值用:分割
for_link: 跳转链接,格式:{"link_url":"链接地址","link_str":“链接名称,没有则默认为:[点击查询详情...]“}
Returns:
标题(加粗,改色)
描述1 值1
**** ***
描述2 值2
跳转链接
"""
for_gpt = r_msg["for_gpt"] if "for_gpt" in r_msg else None
for_user = r_msg["for_user"] if "for_user" in r_msg else None
for_link = r_msg["for_link"] if "for_link" in r_msg else None
if for_gpt:
del r_msg["for_gpt"]
gpt_list = for_gpt.split('\n')
for_gpt = "<font color=\"green\">{}</font><br>".format(gpt_list[0])
for item in gpt_list[1:]:
value_list = item.replace('', ':').split(':')
# 参数display:inline-block让两个div标签在一行内显示
key_value = "<div style=\"display:inline-block;width:100px;\">" + " " + value_list[0] + "</div>" \
+ "<div style=\"display:inline-block;width:1000px;\">" + value_list[1] + "</div><br>"
for_gpt = for_gpt + key_value
r_msg["for_gpt"] = for_gpt[:-4] # 删除最后一个换行标签<br>,因为aita的前端在for_gpt和for_user的显示上做了换行
if for_user:
del r_msg["for_user"]
for_user_list = for_user.split('\n')
for_user = ""
for item in for_user_list:
value_list = item.replace('', ':').split(':')
key_value = "<div style=\"display:inline-block;width:100px;\">" + " " + value_list[0] + "</div>"\
+ "<div style=\"display:inline-block;width:1000px;\">" + value_list[1] + "</div><br>"
for_user = for_user + key_value
r_msg["for_user"] = for_user
if for_link:
del r_msg["for_link"]
if "link_str" not in for_link:
for_link["link_str"] = "点击查询详情..."
link_string = "<a href=\"{}\" target=\"_blank\">[{}]</a>".format(for_link["link_url"], for_link["link_str"])
r_msg["for_user"] = r_msg["for_user"] + "<div>" + " " + link_string + "</div>"
return r_msg
if __name__ == '__main__':
tl = Tools()
# return_data = {"for_gpt": "班级创建成功!\n班级codeCZ24011200722017\n班级id为1200120727",
# "for_user": "教师id:168389\n手机号15564555315\n课程id841626\n用户列表[1987505, 1987506]",
# "for_link": {"link_url": "https://htmv2.qa.huohua.cn/classManage/classesDetail?id=1200121194",
# "link_str": "点击查看班级详情..."}}
# aa = tl.formate_msg_for_aita(r_msg=return_data)
# print(aa)
# oj = {'data': {'result': False, 'messages': ['课程id841618已有待执行超售模式导表操作失败请检查数据']}}
# sj = {'data': {'result': True, 'messages': []}}
# # oj = [1, 2, 3]
# # sj = [1, 3, 23]
# print(tl.should_be_equal_as_json(obj_json=oj, sub_json=sj))
a = {'success': True, 'message': '', 'code': 200, 'data': [{'id': 6854, 'userId': 976049, 'employeeId': 599677, 'accountId': 0, 'code': '78843427', 'name': '教务自动化0706165031', 'nickname': '新账号599677', 'sex': 0, 'phone': '180****6279', 'maskPhone': '180****6279', 'phoneCode': '290141', 'natureOfWork': 1, 'teacherTrainNums': [0], 'email': 'jiaowuzidonghua0706165031@huohua.cn', 'idNumber': '510704199205064230', 'birthday': '1970-01-01 00:00:00', 'seniority': 0, 'education': 1, 'entryTime': '2021-01-14 00:00:00', 'graduateInstitutions': '测试', 'nationalityCountryId': 0, 'nationality': 1, 'nativePlace': 1, 'avatar': 'https://stalegacy.huohua.cn/image/huohua/avatar/default/default_avatar1.png', 'introduce': '测试', 'status': 1, 'openModeTypes': '0', 'introduceVideoUrl': '', 'teacherCertificationUrl': '', 'probationStartDate': '', 'probationEndDate': '', 'gradeGroupType': 3, 'creatorId': 1, 'creatorName': 'EHR用户同步', 'createdTime': '2021-07-06 16:48:22', 'modifierId': 1, 'modifierName': 'sys', 'modifiedTime': '2021-07-06 16:49:17', 'classLocation': 1, 'jobType': 4, 'liveroomCenter': 1, 'professorRange': '1', 'qualityScore': 0.0, 'classQuota': 16, 'teacherRank': 0, 'lastRankTime': '2021-07-06 16:49:17', 'rankEffectTime': '2021-01-14 00:00:00', 'rankStatus': 1, 'departmentId': 17, 'departmentName': '教学部', 'leaderId': 0, 'leaderName': '', 'changeSignTime': '', 'liveroomCode': '', 'eyeProtection': 0, 'legoTeacher': 0, 'legoProbationStartDate': '', 'legoProbationEndDate': '', 'activationStatus': 1, 'dimissionTime': '', 'dimissionSetTime': '', 'teacherType': 1, 'liveroomAllocationStatus': 2, 'flexible': 0, 'classroomVideo': '', 'classroomImg': '', 'pipStart': '1970-01-01 00:00:00', 'pipEnd': '1970-01-01 00:00:00', 'teacherLabel': 1, 'longLeaveStart': '1970-01-01 00:00:00', 'longLeaveEnd': '1970-01-01 00:00:00', 'forecastResignDate': '1970-01-01 00:00:00', 'teacherLabelSubType': 1, 'schoolExcellentTeacher': 0, 'innovationAPlusTeacher': 0, 'businessLine': 101, 'onlineType': 0, 'timeZoneName': '', 'timeZoneCode': '', 'teacherLabelList': [], 'labelList': ''}]}
b = {'success': True, 'message': '', 'code': 200, 'data': [{'id': 6854, 'userId': 976049, 'employeeId': 599677, 'code': '78843427', 'name': '教务自动化0706165031', 'nickname': '新账号599677', 'sex': 0, 'phone': '180****6279', 'maskPhone': '180****6279', 'phoneCode': '290141', 'natureOfWork': 1, 'teacherTrainNums': [0], 'email': 'jiaowuzidonghua0706165031@huohua.cn', 'idNumber': '510704199205064230', 'birthday': '1970-01-01 00:00:00', 'seniority': 0, 'education': 1, 'entryTime': '2021-01-14 00:00:00', 'graduateInstitutions': '测试', 'nationality': 1, 'nativePlace': 1, 'avatar': 'https://stalegacy.huohua.cn/image/huohua/avatar/default/default_avatar1.png', 'introduce': '测试', 'status': 1, 'openModeTypes': '0', 'introduceVideoUrl': '', 'teacherCertificationUrl': '', 'probationStartDate': '', 'probationEndDate': '', 'gradeGroupType': 3, 'creatorId': 1, 'creatorName': 'EHR用户同步', 'createdTime': '2021-07-06 16:48:22', 'modifierId': 1, 'modifierName': 'sys', 'modifiedTime': '2021-07-06 16:49:17', 'classLocation': 1, 'jobType': 4, 'liveroomCenter': 1, 'professorRange': '1', 'qualityScore': 0.0, 'classQuota': 16, 'teacherRank': 0, 'lastRankTime': '2021-07-06 16:49:17', 'rankEffectTime': '2021-01-14 00:00:00', 'rankStatus': 1, 'departmentId': 17, 'departmentName': '教学部', 'leaderId': 0, 'leaderName': '', 'changeSignTime': '', 'liveroomCode': '', 'eyeProtection': 0, 'legoTeacher': 0, 'legoProbationStartDate': '', 'legoProbationEndDate': '', 'activationStatus': 1, 'dimissionTime': '', 'dimissionSetTime': '', 'teacherType': 1, 'liveroomAllocationStatus': 2, 'flexible': 0, 'classroomVideo': '', 'classroomImg': '', 'pipStart': '1970-01-01 00:00:00', 'pipEnd': '1970-01-01 00:00:00', 'teacherLabel': 1, 'longLeaveStart': '1970-01-01 00:00:00', 'longLeaveEnd': '1970-01-01 00:00:00', 'forecastResignDate': '1970-01-01 00:00:00', 'teacherLabelSubType': 1, 'schoolExcellentTeacher': 0, 'innovationAPlusTeacher': 0, 'businessLine': 101, 'timeZoneName': '', 'timeZoneCode': '', 'teacherLabelList': []}]}
print(tl.should_be_equal_as_json(obj_json=a, sub_json=b))