# -*- coding:utf-8 -*-
"""
Author: qiaoxinjiu
Create Data: 2020/9/24 11:39
"""
import base64
import datetime
import hashlib
import calendar
import math
import re
import time
import pytz
from os import listdir
from dateutil.relativedelta import relativedelta
from chinese_calendar import is_workday, is_holiday
import jsonpath
import requests
from bs4 import BeautifulSoup
from base_framework.base_config.current_pth import *
from base_framework.public_tools import log
from base_framework.public_tools.read_config import InitConfig
from base_framework.public_tools.read_config import ReadConfig
from base_framework.public_tools.runner import Runner
from base_framework.public_tools.sqlhelper import MySqLHelper
from base_framework.public_tools.custom_error import BusinessError
obj_runner = Runner()
obj_log = log.get_logger()
obj_evn_cfg = ReadConfig(env_choose_path)
class Tools(InitConfig):
def __init__(self):
# super().__init__()
InitConfig.__init__(self, run_user_name=None, current_evn=None)
# self.tool_jira_id = tool_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
self.db_con = MySqLHelper()
def get_jira_id_from_ini(self):
# cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# ini_path = cfg_path + '/ConfigFiles/env_choose.ini'
tool_jira_id = None
try:
tool_jira_id = obj_evn_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
except Exception as e:
print(e)
return tool_jira_id
def get_md5(self, str):
m = hashlib.md5()
m.update(str.encode("utf8"))
md5 = m.hexdigest()
return md5
def get_format_date(self, r_week=None, r_time=None, t_time=None, r_schedule=None,
r_type=1,
add_months=0, add_weeks=0, add_days=0, add_hours=0, add_minutes=0, add_seconds=0, minutes=60):
"""
| 功能说明: | 返回指定格式的日期类型 | |
| 输入参数: | | |
| | r_schedule | 传入1的时候,直接返回('2023-08-18 15:44:57', '2023-08-18', '15:44:57', 6, '15:45')格式数据,用于教务的档期|
| | r_week | 传入周几,返回下个周几的日期 |
| | r_time | 需要转换的日期(格式):%Y-%m-%d %H:%M:%S, 如果不传则默认取系统当前日期, 时间戳格式不支持此参数 |
| | t_time | 需要转换的日期(格式):时间戳,秒级,默认取前十位数,当 r_time=None 时生效 |
| | r_type | 返回格式:详见下面的明细 |
| | add_month | 月 偏移量:0-当前,1-下月,-1上月,时间戳格式不支持此参数 |
| | add_weeks | 周 偏移量:0-当前,1-下周,-1上周 |
| | add_days | 日 偏移量:0-当前,1-明天,-1昨天 |
| | add_hours | 小时偏移量:0-当前,1-后一小时,-1前一小时 |
| | add_minutes | 分钟偏移量:0-当前,1-后一分钟,-1前一分钟 |
| | add_seconds | 秒级偏移量:0-当前,1-下一秒,-1前一秒 |
| | minutes | 分钟数,仅用于r_schedule不为空的时候 |
| 返回参数: | 按r_type指定的类型返回 |
| 作者信息: | 吴勇刚 | 2019.05.24 |
r_type类型列表:
| 类型编号 | 类型样式 | 实例 |
| 1 | %Y-%m-%d | 2019-05-23 |
| 2 | %Y年%m月%d日 | 2019年5月23日 |
| 3 | %Y年%m月%d日周X | 2019年5月23日星期四 |
| 4 | %Y-%m-%d %H:%M:%S | 2021-06-28 21:54:46 |
| 5 | %Y-%m-%d %H:%M:%S.%f | 2021-06-28 21:54:46.205213 |
| 6 | %Y-%m | 2019-05 |
| 7 | %Y%m%d | 20190505 |
| 8 | %Y%m%d%H%M%S | 20190505102030 |
| 11 | 原始时间戳 | 1499825149.2578925 |
| 12 | 秒级时间戳 | 1499825149 |
| 13 | 毫秒级时间戳 | 1499825149257 |
| 14 | 微秒级时间戳 | 1499825149257892 |
| 15 | %Y-%m-%d 00:00:00 | 2021-06-28 00:00:00 |
| 16 | %Y-%m-%d 23:59:59 | 2021-06-28 23:59:59 |
| 17 | %Y_%m_%d | 2021_06_28 |
| 18 | 返回档期所需的相关格式 | 0:%Y-%m-%d %H:%M:%S; 1:"%H:%M"; 2:档期格式专用week(原week+1); 3:"%Y-%m-%d %H:%M"; 4:原week |
| 19 | 类型15对应的时间戳 | 1640620800000 |
| 20 | 毫秒级时间戳,分秒均为0 | 1499825000000 |
| 21 | 毫秒级时间戳,秒毫秒均为0 | 1499825000000 |
| 22 | 时间拆分为年 月 日 时间 24H制 | {'year':'','month':'','day':'','time_24':{'short_time':'','full_time':''},'time_12h':{'short_time':'','full_time':''}} |
| 23 | %Y%m | 202303 |
| 24 | 下周一的当前时间 | 2021-07-05 21:54:00 |
| 25 | 本月的最后一天 | 2021-07-31 23:59:59 |
| 26 | 本月的第一天 | 2021-07-01 00:00:00 |
| 28 | 当前时间所在周周一 | 2021-07-01 00:00:00 |
| 29 | 当前时间所在周周日 | 2021-07-01 23:59:59 |
| 91 | 对应年份的第几周 | (2022,35,1):2022年第35周第1天 |
| 92 | 对应年份的第几周 | 202235:2022年第35周 |
函数路径:src/Public/Common/utils.py
"""
r_type = int(r_type)
add_months = int(add_months)
add_weeks = int(add_weeks)
add_days = int(add_days)
add_hours = int(add_hours)
add_minutes = int(add_minutes)
add_seconds = int(add_seconds)
if r_schedule is not None:
if r_schedule == 1:
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
update_time = (datetime.datetime.now() + datetime.timedelta(minutes=minutes))
weeks = weeks[str(update_time.isoweekday())] # 获取时间周几
return update_time.strftime("%Y-%m-%d %H:%M:%S"), update_time.strftime(
"%Y-%m-%d"), update_time.strftime(
"%H:%M:%S"), weeks, update_time.strftime("%H:%M")[0:4] + "5"
else:
raise Exception("r_schedule参数输入错误,请重新输入")
if r_time is None and t_time is None and r_week is None:
r_date = datetime.datetime.now() + datetime.timedelta(weeks=add_weeks,
days=add_days,
hours=add_hours,
minutes=add_minutes,
seconds=add_seconds)
t_date = time.time() + ((add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
else:
if r_week:
today = datetime.datetime.now()
days_ahead = r_week - 1 - today.weekday() # 4 corresponds to Friday (0 is Monday, 6 is Sunday)
if days_ahead < 0:
days_ahead += 7
date_time = today + datetime.timedelta(days=days_ahead)
elif r_time:
try:
date_time = datetime.datetime.strptime(r_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
raise ValueError("输入时间格式错误,请输入格式为:%Y-%m-%d %H:%M:%S 的时间字符串")
elif t_time:
if len(str(t_time)) > 10:
t_time = int(str(t_time)[0:10])
try:
tmp_time = datetime.datetime.fromtimestamp(t_time)
# date_time = tmp_time.strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.fromtimestamp(t_time)
except ValueError as e:
raise ValueError("输入时间格式错误,错误原因:{}".format(e))
r_date = date_time + datetime.timedelta(weeks=add_weeks,
days=add_days,
hours=add_hours,
minutes=add_minutes,
seconds=add_seconds)
t_date = time.mktime(date_time.timetuple()) + (
(add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
if add_months >= 0:
r_date = r_date + relativedelta(months=add_months)
else:
r_date = r_date - relativedelta(months=abs(add_months))
if r_type == 1:
return r_date.strftime("%Y-%m-%d")
elif r_type == 2:
return str(r_date.year) + "年" + str(r_date.month) + "月" + str(r_date.day) + "日"
elif r_type == 3:
f_date = str(r_date.year) + "年" + str(r_date.month) + "月" + str(r_date.day) + "日"
return f_date + self.get_week_by_date(format_date=r_date.strftime("%Y-%m-%d"), week_type='Chinese')
elif r_type == 4:
return r_date.strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 5:
return r_date.strftime("%Y-%m-%d %H:%M:%S.%f")
elif r_type == 6:
return r_date.strftime("%Y-%m")
elif r_type == 7:
return r_date.strftime("%Y%m%d")
elif r_type == 8:
return r_date.strftime("%Y%m%d%H%M%S")
elif r_type == 11:
return t_date
elif r_type == 12:
return int(t_date)
elif r_type == 13:
return int(round(t_date * 1000))
elif r_type == 14:
return int(round(t_date * 1000000))
elif r_type == 15:
return "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
elif r_type == 16:
return "{} 23:59:59".format(r_date.strftime("%Y-%m-%d"))
elif r_type == 17:
return r_date.strftime("%Y_%m_%d")
elif r_type == 18:
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
weeks = weeks[str(r_date.isoweekday())] # 获取时间周几
origin_week = int(str(r_date.isoweekday()))
return r_date.strftime("%Y-%m-%d %H:%M:%S"), r_date.strftime("%H:%M")[0:4] + "5", weeks, r_date.strftime(
"%Y-%m-%d %H:%M"), origin_week
elif r_type == 19:
today = "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
time_array = time.strptime(today, "%Y-%m-%d %H:%M:%S")
timestamp = int(time.mktime(time_array))
return str(timestamp * 1000)
elif r_type == 20:
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:00:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
elif r_type == 21:
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:%M:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
elif r_type == 23:
return r_date.strftime("%Y%m")
elif r_type == 24: # 返回下周一的当前时间
today = datetime.datetime.today()
return (today + datetime.timedelta(days=7 - today.weekday())).strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 25:
last_date = calendar.monthrange(year=r_date.year, month=r_date.month)
return "{}-{} 23:59:59".format(r_date.strftime("%Y-%m"), last_date[1])
elif r_type == 26:
return "{}-{}-01 00:00:00".format(r_date.year, r_date.month)
elif r_type == 27:
# 时区转换
# 将日期和时间转换为UTC时区
now_utc_time = r_date.astimezone(pytz.utc)
obj_time = now_utc_time.isoformat()[:19]+"Z"
return obj_time, r_date.strftime("%Y-%m-%d %H:%M:%S")
elif r_type == 28:
now = datetime.datetime.now()
last_monday = now - datetime.timedelta(days=now.weekday())
return f"{last_monday.strftime('%Y-%m-%d')} 00:00:00"
elif r_type == 29:
now = datetime.datetime.now()
last_sunday = now + datetime.timedelta(days=6 - now.weekday())
return f"{last_sunday.strftime('%Y-%m-%d')} 23:59:59"
elif r_type == 91:
return r_date.isocalendar()
elif r_type == 92:
t_week = r_date.isocalendar()
return "{}{}".format(t_week[0], t_week[1])
elif r_type == 22:
date_time_str = r_date.strftime("%Y-%m-%d %H:%M:%S")
date_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S')
year = date_obj.year
month = date_obj.month
day = date_obj.day
short_time_12h = date_obj.strftime('%I:%M %p').lower()
full_time_12h = date_obj.strftime('%I:%M:%S %p').lower()
short_time_24h = date_obj.strftime('%I:%M')
full_time_24h = date_obj.strftime('%I:%M:%S')
return {'year': year, 'month': month, 'day': day,
'time_24': {'short_time': f'{short_time_24h}', 'full_time': f'{full_time_24h}'},
'time_12h': {'short_time': f'{short_time_12h}', 'full_time': f'{full_time_12h}'}}
else:
raise Exception(
"调用get_format_date函数的r_type参数不正确....")
@staticmethod
def get_week_by_date(format_date, week_type='Arabic'):
"""
| 功能说明: | 返回format_date对应的星期 |
| 输入参数: | format_date | 要计算的日期格式:2021-06-28 |
| | week_type | 返回星期的格式:Arabic-阿拉伯数字,Chinese-中文 |
| 返回参数: | 星期几,0-6, 0-星期日 |
| 作者信息: | 吴勇刚 | 2021-06-08 |
函数路径:src/Public/Common/utils.py
| format_date | week_type | 返回 |
| 2021-06-27 | Arabic | 0 |
| 2021-06-28 | Arabic | 1 |
| 2021-06-27 | Chinese | 星期日 |
| 2021-06-28 | Chinese | 星期一 |
"""
input_date = format_date.split('-')
if len(input_date) != 3:
raise Exception("输入值:{} 不是2021-06-08格式,请检查".format(format_date))
year = int(input_date[0])
month = int(input_date[1])
day = int(input_date[2])
week = int(datetime.datetime(year, month, day).strftime("%w"))
if week_type == 'Arabic':
return week
elif week_type == 'Chinese':
weeks = ['星期日', '星期一', '星期二', '星期三', '星期四', '星期五', '星期六']
return weeks[week]
@staticmethod
def check_the_date_is_a_working_day(format_date=None):
"""
功能:判断给定日期是否为工作日
| 输入 | format_date | 日期(格式如:2021-09-01),不传则计算今天|
| 返回 | True,工作日 | False,非工作日,即周末或法定节假日 |
| 作者 | 吴勇刚 | 2021.08.31 |
函数路径:src/Public/Common/utils.py
"""
if not format_date: # 没有传入参数则默认计算当天是否为工作日
format_date = datetime.date.today()
else:
format_date = datetime.datetime.strptime(format_date, "%Y-%m-%d").date()
if is_workday(format_date) and not is_holiday(format_date):
return True
else:
return False
@staticmethod
def get_img_bas64(path):
img = open(path, 'rb')
img_bas64 = base64.b64encode(img.read())
img.close()
'''b 表示 byte的意思,将byte转换回去'''
return str(img_bas64, 'utf-8')
@staticmethod
def get_files_path():
path = (os.path.split(os.path.realpath(__file__)))[0] + '\\' + "img"
return path
@staticmethod
def listdir(self): # 传入存储的list
list_name = []
path = self.get_files_path()
for file in os.listdir(path):
file_path = os.path.join(path, file)
if os.path.isdir(file_path):
listdir(file_path, list_name)
else:
list_name.append(file_path)
return list_name
@staticmethod
def del_file(file_name):
size = os.path.getsize(file_name)
file_size = 44 * 1024 # 100K
if size > file_size:
print('remove', size, file_name)
try:
os.remove(file_name)
except Exception as e:
print(e)
def modify_db_info(self, dbName=None, dbUsername=None,
dbPassword=None, dbHost=None, dbPort=None):
"""
Example usage:
| # explicitly specifies all db property values |
| modify_db_info | psycopg2 | my_db | postgres | s3cr3t | tiger.foobar.com | 5432 |
"""
if dbName:
self.db_rtn.set_section(
section='Mysql',
option='db_test_dbname',
value=dbName)
if dbUsername:
self.db_rtn.set_section(
section='Mysql',
option='db_test_user',
value=dbUsername)
if dbPassword:
self.db_rtn.set_section(
section='Mysql',
option='db_test_password',
value=dbName)
if dbHost:
self.db_rtn.set_section(
section='Mysql',
option='db_test_host',
value=dbHost)
if dbPort:
self.db_rtn.set_section(
section='Mysql',
option='db_test_port',
value=dbPort)
def __get_eureka_url_from_db(self, team, server):
"""
| 功能 | 从DB中获取服务对应的eureka_url,供get_container_ip_from_eureka函数使用 |
"""
sql_str = "select eureka_url as hh, eureka_url_hhi as hhi from sparkatp.swagger_info where team='{0}' " \
"and server_name='{1}';".format(team, server)
res = self.db_con.select_one(sql=sql_str)
current_business = self.evn_cfg['run_evn_name']['current_business']
return res[current_business]
def get_container_ip_from_eureka(self, server_name, env=None, need_jira_id=False, jira_id_dev=None,
is_contain_qa=True, team_name=None, eureka_url=None):
"""
| 功能说明: | 获取Eureka指定服务的IP |
| 输入参数: | server_name | 服务名 |
| | env | QA or SIM |
| | need_jira_id | True:从config.ini文件获取, False:取QA环境的IP |
| | jira_id_dev | 传入自定义独立环境编号 |
| | is_contain_qa | 当没有对应的独立环境时,是否返回qa环境IP,默认True,返回 |
| | team_name | 小组名,当eureka_url=None时,用于确认eureka_url |
| | eureka_url | eureka地址,没有传入则读取数据库中的值 |
| 返回参数: | dict | container_ip |
| 作者信息: | 吴勇刚 | 2022.03.27 |
举例说明:
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
"""
# 各项配置获取
if team_name is None: # 业务组
team_name = obj_evn_cfg.get_value(sections='run_evn_name', options='current_team')
if env is None: # 构建环境
env = obj_evn_cfg.get_value(sections='run_evn_name', options='current_evn')
if eureka_url is None: # eureka地址
eureka_url = self.__get_eureka_url_from_db(team=team_name, server=server_name)
# if env.upper() == "QA" and 'sim' in eureka_url.lower():
# eureka_url = eureka_url.replace('.sim.', '.qa.')
# elif env.upper() == "SIM" and 'qa' in eureka_url.lower():
# eureka_url = eureka_url.replace('.qa.', '.sim.')
jira_id = None
if jira_id_dev is None:
# if need_jira_id is False:
# jira_id = 'qa'
# else:
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
# if jira_id == "":
# jira_id = 'qa'
else:
jira_id = jira_id_dev
temp_text = requests.get(url=eureka_url).content
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
tree_dict = dict()
for temp in all_container_ip:
server = list(temp.parent.parent)[1].get_text()
server_ip = list(temp)
if server in tree_dict.keys():
tree_dict[server].extend(server_ip)
else:
tree_dict[server] = server_ip
server_name = server_name.upper()
return_dict = dict()
if server_name in tree_dict.keys():
if jira_id:
if jira_id in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in
filter(lambda x: x.split(':')[2] if len(x.split(':')) > 2 else "" == jira_id,
tree_dict[server_name])
if jira_id in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else: # 防止研发在本地启动了服务导致有多条数据 ,跳过办公网段的ip
obj_log.warning('跳过获取到的IP:{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if 'allschool' in eureka_url:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
value=container_ip)
return return_dict
elif env.upper() == "SIM":
if len(tree_dict[server_name]) == 1: # sim环境如果只有一个ip,就直接返回此ip
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name]][0]
return_dict['container_ip'] = container_ip
return return_dict
else:
if not is_contain_qa:
return_dict['container_ip'] = ''
return return_dict
# 如果没有对应的独立环境,则返回qa环境的ip
if 'qa' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
return return_dict
elif 'groot' in str(tree_dict[server_name]):
"""解决allschool部分服务极限环境为groot而非qa的场景"""
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'groot' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
# if 'allschool' in eureka_url:
# ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
return return_dict
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[0]
for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][0]
return_dict['container_ip'] = container_ip
return return_dict
else:
return_dict['container_ip'] = ''
return return_dict
else:
# raise Exception('从{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
obj_log.error('从{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
return False
# def get_url_by_eureka(self, server_name, jira_id=False):
# """
# | 功能说明: | 获取Eureka指定服务的IP后,组装成需要的url前缀 |
# | 输入参数: | server_name | 服务名 |
# | 输入参数: | jira_id | 独立环境编号 |
# """
# svr_ip = self.get_container_ip_from_eureka(server_name=server_name, need_jira_id=jira_id)
# if svr_ip:
# return "http://" + svr_ip["container_ip"] + ":8080"
# else:
# return False
def get_container_ip_from_eureka_old(self, server_name, env=None, need_jira_id=False,
jira_id_dev=None, is_contain_qa=True):
"""
| 功能说明: | 获取Eureka指定服务的IP |
| 输入参数: | server_name | 服务名 |
| | env | QA or SIM |
| | need_jira_id | True:从config.ini文件获取, False:取QA环境的IP |
| | jira_id_dev | 传入自定义独立环境编号 |
| 返回参数: | dict | container_ip |
| 作者信息: | 作者 林于棚 | 修改时间 |
举例说明:
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
"""
return_dict = dict()
tree_dict = dict()
# 针对于pz项目的白名单,与其他项目的eureka地址不一致
pz_eureka_server = ["hulk-teaching-server", "hulk-teacher-api", "hulk-teacher-server", "hulk-teaching-listener",
"hulk-teaching-scheduler", "hulk-operation-api-server", "hulk-operation-listener",
"hulk-operation-server", "hulk-account-server",
'hulk-org-server', 'hulk-org-api', 'hulk-teacher-api', 'hulk-class-help-server',
'hulk-content-audit-server', "hulk-teach-backend-api", "hulk-teach-supply-cli-api",
"hulk-ark-gateway", "hulk-class-help-manager", "hulk-content-manage-gateway",
"hulk-teach-supply-server", "hulk-teach-classes-server", "classpod-platform-api",
"classpod-user-api",
"hulk-teach-content-server", "hulk-teach-content-cli-api", "hulk-teach-course-server"
]
if env is None:
env = obj_evn_cfg.get_value(
sections='run_evn_name', options='current_evn')
if env == "QA":
if server_name.lower() in pz_eureka_server:
# url = "http://eureka-asc.qa.huohua.cn/"
url = "http://eureka.qa.allschool.com/"
else:
url = "http://eureka.qa.huohua.cn/"
else:
if server_name.lower() in pz_eureka_server:
# url = "http://eureka-asc.sim.huohua.cn/"
url = "http://eureka.sim.allschool.com/"
else:
url = 'http://eureka.sim.huohua.cn/'
temp_text = requests.get(url=url).content
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
for temp in all_container_ip:
server = list(temp.parent.parent)[1].get_text()
server_ip = list(temp)
if server in tree_dict.keys():
tree_dict[server].extend(server_ip)
else:
tree_dict[server] = server_ip
if need_jira_id:
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
if jira_id == "":
jira_id = None
else:
jira_id = None
if jira_id_dev:
jira_id = jira_id_dev
server_name = server_name.upper()
if server_name in tree_dict.keys():
if jira_id:
if jira_id in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in
filter(lambda x: x.split(':')[2] == jira_id, tree_dict[server_name])
if jira_id in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.warning('跳过获取到的IP:{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if server_name.lower() in pz_eureka_server:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
value=container_ip)
return return_dict
else:
if not is_contain_qa:
return_dict['container_ip'] = ''
return return_dict
if 'qa' in str(tree_dict[server_name]):
container_ip = [
temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
for ip_temp in container_ip:
if ip_temp.startswith('10'):
container_ip = ip_temp
else:
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
continue
return_dict['container_ip'] = container_ip
if server_name.lower() in pz_eureka_server:
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
return return_dict
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
container_ip = [temp.split(':')[
0] for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][
0]
return_dict['container_ip'] = container_ip
return return_dict
else:
return_dict['container_ip'] = ''
return return_dict
else:
obj_log.error('未获取到IP,请检查server_name:{} 是否正确!'.format(server_name))
return_dict['container_ip'] = ''
return return_dict
def com_get_value_from_json(self, json_object, json_path):
return jsonpath.jsonpath(json_object, json_path)
@staticmethod
def should_be_equal_as_list(sub_list, obj_list, ignore_nodes=[], check_len=False):
"""
功能:判断两个list是否相等,不区分排序
| 输入参数: | sub_list | 列表1,子列表 |
| | obj_list | 列表2,目标列表 |
| | ignore_nodes | sub_list中要忽略的节点,填需要,从0开始 |
| | check_len | 是否检查长度,默认不检查,传入True时,则检查两个list完全相同 |
| 作者: | 吴勇刚 | 2021.05. 16 |
函数位置:src/Public/Common/utils.py
| 完全相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb']) | failed |
| 子集相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb'], [1,3]) | success |
"""
if isinstance(sub_list, str):
try:
sub_list = eval(sub_list)
except Exception as e:
print("输入参数类型错误:{}".format(e))
if isinstance(obj_list, str):
try:
obj_list = eval(obj_list)
except Exception as e:
print("输入参数类型错误:{}".format(e))
if not isinstance(sub_list, list) or not isinstance(obj_list, list):
raise RuntimeError("【ERROR】请求参数必现都是list类型,sub_list:{0}, "
"obj_list:{1}".format(type(sub_list), type(obj_list)))
return
if check_len:
if len(obj_list) != len(sub_list):
raise RuntimeError("【ERROR】两个list的长度不同:{}!={},请检查...".format(len(sub_list), len(obj_list)))
return
equal = True
for index in range(len(sub_list)):
if index not in ignore_nodes:
if sub_list[index] not in obj_list:
print("【ERROR】{0} not in obj_list:{1}".format(sub_list[index], obj_list))
equal = False
if not equal:
raise RuntimeError("【ERROR】sub_list != obj_list,请检查......")
@staticmethod
def should_be_equal_as_dict(sub_dict, obj_dict, ignore_keys=[]):
"""
功能:判断两个list是否相等,不区分排序
| 输入参数: | sub_dict | 列表1,子列表 |
| | obj_dict | 列表2,目标列表 |
| | ignore_keys | sub_dict中要忽略的keys |
| 作者: | 吴勇刚 | 2021.05. 16 |
函数位置:src/Public/Common/utils.py
| 完全相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}) | not equal |
| 子集相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}, ['c']) | equal |
"""
if not isinstance(sub_dict, dict) or not isinstance(obj_dict, dict):
raise RuntimeError("【ERROR】请求参数必现都是dict类型,sub_dict:{0}, obj_dict:{1}".format(type(sub_dict),
type(obj_dict)))
return
if len(ignore_keys) == 0 and len(sub_dict) != len(obj_dict):
raise RuntimeError(
"【ERROR】两个dict的长度不相等,sub={0},obj={1}".format(
len(sub_dict), len(obj_dict)))
return
equal = True
for key in sub_dict:
if key not in ignore_keys:
if key not in obj_dict:
print(
"【ERROR】{0} not in obj_list:{1}".format(
sub_dict[key], obj_dict))
equal = False
continue
if sub_dict[key] != obj_dict[key]:
print("【ERROR】'节点{key}':{sub} != {obj}".format(key=key,
sub=sub_dict[key],
obj=obj_dict[key]))
equal = False
if not equal:
raise RuntimeError("【ERROR】sub_dict != obj_dict,请检查......")
def check_result(self, my_result, result_one=None,
result_two=None, position=None):
if result_one is None:
result_one = []
if result_two is None:
result_two = []
if position is None:
position = []
for key, value in my_result.items():
position.append(key)
if isinstance(value, dict):
Tools().check_result(value, result_one, result_two, position)
elif isinstance(value, list):
for index, it in enumerate(value):
position.append(index)
if isinstance(it, dict):
Tools().check_result(it, result_one, result_two, position)
elif isinstance(it, str):
data_in_file = Tools().generate(position)
result_one.append(it)
result_two.append(data_in_file)
elif isinstance(it, int):
data_in_file = Tools().generate(position)
result_one.append(it)
result_two.append(data_in_file)
position.pop()
else:
if isinstance(value, str):
data_in_file = Tools().generate(position)
result_one.append(value)
result_two.append(data_in_file)
elif isinstance(value, int):
data_in_file = Tools().generate(position)
result_one.append(value)
result_two.append(data_in_file)
position.pop()
return result_one, result_two
def generate(self, list_all):
y = ""
for i in list_all:
if isinstance(i, str):
y += '["{}"]'.format(i)
else:
y += '[{}]'.format(i)
return "obj_json" + y
def mq_send(self, topic, message_body, tag="tag",
key="key", pro="pro", **kwargs):
"""
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
return_data = dict()
cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
ini_path = cfg_path + '/base_framework/base_config/env_choose.ini'
jira_id = ReadConfig(ini_path).get_value(
sections='run_jira_id', options='huohua-podenv')
post_data['topic'] = topic
post_data['messageBody'] = '{}'.format(message_body)
post_data['tag'] = tag
post_data['key'] = key
if pro == 'pro':
if jira_id:
post_data['pro'] = jira_id
else:
post_data['pro'] = 'qa'
else:
post_data['pro'] = pro
change_user = kwargs.get('user', None)
url = obj_runner.mq_host + '/topic/sendTopicMessage.do'
req_type = 'POST'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=url,
req_type=req_type,
json=post_data)
if resp['data']['sendStatus'] == 'SEND_OK':
obj_log.info('发送MQ消息成功!{}:{}'.format(topic, message_body))
return True
else:
obj_log.error('发送MQ消息失败!')
return False
def mq_send_by_sim(self, topic, message_body, tag="tag",
key="key", **kwargs):
"""
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
post_data['topic'] = topic
post_data['messageBody'] = '{}'.format(message_body)
post_data['tag'] = tag
post_data['key'] = key
url = 'https://mq-console.sim.huohua.cn/topic/sendTopicMessage.do'
req_type = 'POST'
resp = obj_runner.call_rest_api(
user=None,
API_URL=url,
req_type=req_type,
json=post_data,
current_evn='sim')
if resp['data']['sendStatus'] == 'SEND_OK':
obj_log.info('发送MQ消息成功!{}:{}'.format(topic, message_body))
return True
else:
obj_log.error('发送MQ消息失败!')
return False
def get_mq_msg(self, topic, begin_time, end_time, **kwargs):
"""
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
"""
post_data = dict()
return_data = dict()
post_data['topic'] = topic
post_data['begin'] = str(time.mktime(time.strptime(begin_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
post_data['end'] = str(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
change_user = kwargs.get('user', None)
url = obj_runner.mq_host + '/message/queryMessageByTopic.query'
req_type = 'GET'
resp = obj_runner.call_rest_api(
user=change_user,
API_URL=url,
req_type=req_type,
params=post_data)
if resp['status'] == 0:
obj_log.info('查询MQ消息成功!')
return resp['data']
else:
obj_log.error('查询MQ消息失败!')
return False
@staticmethod
def delete_redis(key, db):
import redis
pool = redis.Redis(host='rediscourse.qa.huohua.cn', port=6379, password='Bkl6LvqfzFCzYPAh', db=db)
pool.delete(key)
@staticmethod
def should_be_exist(sql, number=1, raise_error=True, retry_time=0):
"""
| 功能说明: | 检查数据库中sql语句对应的结果应该存在number条数据 |
| 输入参数: | sql | 查询数据库对应的sql语句 |
| | number | 数据库应该存在多少条这样的数据 |
| | raise_error | True-结果不存在时抛异常阻塞后续流程,False-结果不存在时返回false但不抛异常可继续流程 |
| | retry_time | 重试次数,每次间隔1秒 |
| 返回参数: | 无 | sql条数匹配则通过,否则抛出异常 |
| 作者信息: | 吴勇刚 | 2021.07.05 |
函数位置:src/Public/Common/utils.py
"""
res = MySqLHelper().select_all(sql=sql)
if int(number) == 0 and int(retry_time) > 0:
rs_flag = False
for index in range(int(retry_time)):
time.sleep(1)
res = MySqLHelper().select_all(sql=sql)
if len(res) == int(number):
rs_flag = True
else:
rs_flag = False
return rs_flag
elif len(res) != int(number):
for index in range(int(retry_time)):
time.sleep(1)
res = MySqLHelper().select_all(sql=sql)
if len(res) != int(number):
continue
else:
return True
print("exec sql: {}".format(sql))
if raise_error:
raise Exception("check db failed:expect exist {0}, but act is {1}...".format(number, len(res)))
else:
return False
else:
return True
def should_be_not_contain_sub_json(self, obj_json, sub_json, sort_json=True):
"""
功能:判断obj_json未包含sub_json
| 输入参数: | obj_json | 目标串 |
| | sub_json | 子串 |
| | sort_json | 是否对json进行排序,默认-False,不排序 |
| 作者: | 吴勇刚 | 2022.05.10 |
"""
tag_equal = True
# 优先替换不规则的null,true和false
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
# 根据类型走不同的递归函数
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json)
elif isinstance(obj_json, list) and isinstance(sub_json, list):
tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json)
else:
tag_equal = False
print("入参需为json串,请检查输入参数....")
if tag_equal:
print("obj_json:{}".format(obj_json))
print("sub_json:{}".format(sub_json))
raise Exception("obj_json中包含了sub_json串,校验不通过.....")
else:
print("obj_json中不包含sub_json,校验通过")
def should_be_equal_as_json(self, obj_json, sub_json, sort_json=False, sort_key=None, sort_type='str'):
"""
功能:判断sub_json是否被obj_json完全包含
| 输入参数: | obj_json | 目标串 |
| | sub_json | 子串 |
| | sort_json | 是否对json进行排序,默认-False,不排序 |
| | sort_key | [{},{}]结构排序时,将sort_key对应value按升序排列 |
| | sort_type | 按sort_key排序时,按int还是str类型对比,默认str,非int时会按str对比 |
| 作者: | 吴勇刚 | 2021.07. 29 |
函数位置:src/Public/Common/utils.py, 用法举例如下:
| obj_json | sub_json | 调用方式 | 对比结果 | 备注说明 |
| {"a":11} | {"a":11} | should_be_equal_as_json(obj_json,sub_json) | 相等 | |
| {"a":11,"b":12} | {"a":11} | 同上 | 相等 | 对比原理是obj_json完整包含sub_json即可 |
| {"a":{"b":[1,2,3]}} | {"a":{"b":[1,2]} | 同上 | 相等 | 支持任一格式的嵌套 |
| {"a":11,"b":12} | {"a":11,"b":"not_check"} | 同上 | 相等 | value=not_check的节点不做检查 |
| [{"a":11},{"b":12}] | [{"a":11},{"b":12}] | 同上 | 相等 | 支持list格式对比 |
| [{"a":11},{"b":12}] | [{"b":12},{"a":11}] | 同上 | 不相等 | 因为list顺序不一样 |
| [{"a":11},{"b":12}] | [{"b":12},{"a":11}] | should_be_equal_as_json(obj_json,sub_json,sort_json=True) | 相等 | 对比前会自动排期 |
"""
tag_equal = True
# 优先替换不规则的null,true和false
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
# 根据类型走不同的递归函数
if sort_type.lower() not in ('str', 'int'):
sort_type = 'str'
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
# tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
r_equal = self._check_dict(obj_json, sub_json, sort_json, sort_key, sort_type)
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
tag_equal = r_equal
elif isinstance(obj_json, list) and isinstance(sub_json, list):
# tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
r_equal = self._check_list(obj_json, sub_json, sort_json, sort_key, sort_type)
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
tag_equal = r_equal
else:
raise BusinessError("入参仅支持json格式,请检查输入参数")
if tag_equal:
print("obj_json中完全包含sub_json....")
else:
raise Exception("预期和实际结果不同,请检查前几步中的逻辑:\nobj_json:{}\nsub_json:{}".format(obj_json, sub_json))
# def _check_list(self, obj_list, sub_list, should_equal, sort_json=False, sort_key=None, sort_type='str'):
def _check_list(self, obj_list, sub_list, sort_json=False, sort_key=None, sort_type='str'):
"""should_be_equal_as_json的内置递归函数"""
should_equal = True
if len(obj_list) == len(sub_list) == 0: # 都为空,则直接返回
return True
elif len(obj_list) < len(sub_list): # 目标列表长度小于子列表
print("obj_json的长度是{}<{}sub_json的长度,请检查...".format(len(obj_list), len(sub_list)))
return False
elif len(sub_list) == 0: # 子列表为空
return True
if sort_json:
if sort_key and sort_key in obj_list[0] and sort_key in sub_list[0]:
if sort_type.lower() == 'str':
obj_list.sort(key=lambda x: str(x[sort_key]))
sub_list.sort(key=lambda x: str(x[sort_key]))
elif sort_type.lower() == 'int':
obj_list.sort(key=lambda x: int(x[sort_key]))
sub_list.sort(key=lambda x: int(x[sort_key]))
else:
obj_list.sort(key=lambda x: str(x))
sub_list.sort(key=lambda x: str(x))
for index in range(len(sub_list)):
if index < len(obj_list):
if isinstance(sub_list[index], list) and isinstance(obj_list[index], list):
# should_equal = self._check_list(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_list(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif isinstance(sub_list[index], dict) and isinstance(obj_list[index], dict):
# should_equal = self._check_dict(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_dict(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif not isinstance(sub_list[index], list) and not isinstance(obj_list[index], list) and not isinstance(
sub_list[index], dict) and not isinstance(obj_list[index], dict):
if isinstance(sub_list[index], str) and sub_list[index] == "not_check":
continue
else:
if sub_list[index] in obj_list:
pass
else:
should_equal = False
print("{}中的第{}个元素:{}不存在于{}中,请检查..."
.format(sub_list, index + 1, sub_list[index], obj_list))
elif isinstance(sub_list[index], str) and sub_list[index] == "not_check":
continue
else:
should_equal = False
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_list[index],
type(sub_list[index]),
obj_list[index],
type(obj_list[index])))
else:
should_equal = False
print("sub_list的长度是{}!={}obj_list的长度,请检查...".format(len(sub_list), len(obj_list)))
return should_equal
# def _check_dict(self, obj_dict, sub_dict, should_equal, sort_json=False, sort_key=None, sort_type='str'):
def _check_dict(self, obj_dict, sub_dict, sort_json=False, sort_key=None, sort_type='str'):
"""should_be_equal_as_json的内置递归函数"""
should_equal = True
for key in sub_dict:
if key in obj_dict:
if isinstance(sub_dict[key], list) and isinstance(obj_dict[key], list):
# should_equal = self._check_list(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_list(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif isinstance(sub_dict[key], dict) and isinstance(obj_dict[key], dict):
# should_equal = self._check_dict(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
# sort_type)
r_equal = self._check_dict(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
if should_equal:
should_equal = r_equal
elif not isinstance(sub_dict[key], list) and not isinstance(sub_dict[key], dict) and not isinstance(
obj_dict[key], list) and not isinstance(obj_dict[key], dict):
if isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
continue
else:
if sub_dict[key] == obj_dict[key]:
pass
else:
should_equal = False
print("两个对比值中{0}对应的值不相同:{1} != {2},请确认...".format(key, sub_dict[key],
obj_dict[key]))
elif isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
continue
else:
should_equal = False
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_dict[key],
type(sub_dict[key]),
obj_dict[key],
type(obj_dict[key])))
else:
should_equal = False
print("key值{}不存在于目标串{}中,请检查...".format(key, obj_dict))
return should_equal
def remove_duplication(self, old_list):
"""
字典去重
"""
new_list = []
for i in old_list:
if i not in new_list:
new_list.append(i)
return new_list
def list_calculations(self, list_a, list_b, opt_type, removed_duplication=False):
"""
list运算
:return:
"""
# 交集
if removed_duplication:
list_a = self.remove_duplication(list_a)
list_b = self.remove_duplication(list_b)
if opt_type == 1:
# return {'intersection': list(set(list_a).intersection(set(list_b)))}
return {'intersection': [x for x in list_a if x in list_b]}
# 并集
elif opt_type == 2:
# return {'union': list(set(list_a).union(set(list_b)))}
list_a.extend(list_b)
return {'union': list_a}
# 差集
elif opt_type == 3:
# return {'a_has': list(set(list_a).difference(set(list_b))),
# 'b_has': list(set(list_b).difference(set(list_a)))}
return {'a_has_only': [x for x in list_a if x not in list_b],
'b_has_only': [x for x in list_b if x not in list_a]}
def compute_page_info(self, total_numbers=None, data_list=None, page_size=10, page_num=1):
"""
功能:根据总数,页码,每页数据条数计算查询分页信息
| 输入参数: | total_numbers | 总数据行数,与data_list二传一即可 |
| | data_list | 原始数据list,与total_numbers二传一即可 |
| | page_size | 每页数据条数 |
| | page_num | 当前页码 |
| 作者: | 吴勇刚 | 2021.09.06 |
函数位置:src/Public/Common/utils.py
| 返回参数 | 格式{dict} |
| | total | 总数据量 |
| | pageNum | 当前页码 |
| | pageSize | 每页展示数据条数 |
| | size | 当前页实际展示条数 |
| | startRow | 当前页面展示的开始行数 |
| | endRow | 当前页面展示的结束行数 |
| | pages | 总页数 |
| | prePage | 前一个展示页面页码 |
| | nextPage | 后一个展示页面页码 |
| | isFirstPage | 是否为首页 |
| | isLastPage | 是否为尾页 |
| | hasPreviousPage | 是否有上一页 |
| | hasNextPage | 是否有下一页 |
| | navigatePages | 控件展示页码数,一般固定为8 |
| | navigatepageNums | 控件展示的页码列表 |
| | navigateFirstPage | 控件开始页码 |
| | navigateLastPage | 控件结束页码 |
"""
re_data = {}
# 总数据量
if total_numbers:
re_data['total'] = int(total_numbers)
elif data_list:
re_data['total'] = len(data_list)
# 当前页码
re_data['pageNum'] = int(page_num)
# 每页数据条数
re_data['pageSize'] = int(page_size)
# 每页数据条数
if re_data['pageNum'] * re_data['pageSize'] <= re_data['total']:
re_data['size'] = re_data['pageSize']
elif (re_data['pageNum'] - 1) * re_data['pageSize'] < re_data['total'] < re_data['pageNum'] * re_data[
'pageSize']:
re_data['size'] = re_data['total'] - (re_data['pageNum'] - 1) * re_data['pageSize']
else:
re_data['size'] = 0
# 总页数
re_data['pages'] = math.ceil(re_data['total'] / re_data['pageSize'])
# 当前页面展示的开始行数,当前页面展示的结束行数
start_row = (re_data['pageNum'] - 1) * re_data['pageSize'] + 1
end_row = re_data['pageNum'] * re_data['pageSize']
if start_row > re_data['total']:
re_data['startRow'] = 0
re_data['endRow'] = 0
else:
re_data['startRow'] = start_row
if end_row > re_data['total']:
re_data['endRow'] = re_data['total']
else:
re_data['endRow'] = end_row
# 前一个展示页面页码
re_data['prePage'] = re_data['pageNum'] - 1
# 后一个展示页面页码
if end_row >= re_data['total']:
re_data['nextPage'] = 0
else:
re_data['nextPage'] = re_data['pageNum'] + 1
# 是否为首页,是否为尾页
if re_data['pageNum'] == 1:
re_data['isFirstPage'] = True
else:
re_data['isFirstPage'] = False
# 是否为尾页
if start_row > re_data['total'] or end_row < re_data['total']:
re_data['isLastPage'] = False
elif start_row <= re_data['total'] <= end_row:
re_data['isLastPage'] = True
# 是否有上一页
if not re_data['isFirstPage']:
re_data['hasPreviousPage'] = True
else:
re_data['hasPreviousPage'] = False
# 是否有下一页
if re_data['pageNum'] >= re_data['pages']:
re_data['hasNextPage'] = False
else:
if not re_data['isLastPage']:
re_data['hasNextPage'] = True
else:
re_data['hasNextPage'] = False
# 控件展示页码数,一般固定为8
re_data['navigatePages'] = 8
# 控件结束页码
if 8 < re_data['pageNum'] + 3 <= re_data['pages']:
re_data['navigateLastPage'] = re_data['pageNum'] + 3
elif re_data['pageNum'] + 3 <= 8 < re_data['pages']:
re_data['navigateLastPage'] = 8
else:
re_data['navigateLastPage'] = re_data['pages']
if re_data['navigateLastPage'] - 8 + 1 > 0:
re_data['navigateFirstPage'] = re_data['navigateLastPage'] - 8 + 1
else:
re_data['navigateFirstPage'] = 1
# 控件展示的页码列表
re_data['navigatepageNums'] = [i for i in range(re_data['navigateFirstPage'], re_data['navigateLastPage'] + 1)]
return re_data
@staticmethod
def should_be_str_contain_str(src_str, *sub_str, raise_flag=True):
"""
功能:判断src_str包含任意一个sub_str
| 输入参数: | src_str | 原始数据 |
| | *sub_str | 子数据 |
| | raise_flag | 不包含时是否抛出异常 |
| 作者: | 华学敏 | 2021.10. 08 |
函数位置:src/Public/Common/utils.py
"""
for s_str in sub_str:
if str(s_str) in str(src_str):
return True
if raise_flag:
raise RuntimeError("{} not contain {}".format(str(src_str), str(sub_str)))
else:
return False
@staticmethod
def compare_data_equal_as_list(sub_list, obj_value):
"""
功能:判断list中是否含某个值
| 输入参数: | sub_list | 列表,返回列表 |
| | obj_value | 目标值 |
| 作者: | qxj | 2021.06. 04 |
函数位置:
| 包含 | compare_data_equal_as_list([1,'a',2,'aa'], 1) | success |
"""
if not isinstance(sub_list, list):
raise RuntimeError("【ERROR】返回参数data必须是list类型,sub_list:{0}".format(type(sub_list)))
equal = False
for value in sub_list:
try:
sub_str = eval(obj_value)
except Exception as e:
sub_str = obj_value
if str(value) == str(sub_str):
equal = True
break
if equal:
return equal
else:
raise RuntimeError("【ERROR】返回参数data中没有找到值,sub_list:{0},valueP{1}".format(sub_list, obj_value))
def copy_account_to_hhi(self, email):
query_hhi_account = "SELECT id FROM hhi_account.account WHERE account_email='{}' limit 1;".format(email)
resp_hhi_account = self.db_con.select_one(query_hhi_account, choose_db='hhi')
if resp_hhi_account:
print("账号已在hhi存在,请检查!")
else:
query_account = "SELECT * FROM account.account WHERE account_email='{}' limit 1;".format(email)
resp_account = self.db_con.select_one(query_account, choose_db='huohua')
if not resp_account:
print("账号在火花库不存在,请检查!")
else:
# account.account
value_account_str = self._get_sql_value_by_dict(resp_account)
insert_hhi_account = "INSERT INTO `hhi_account`.`account`(`id`, `account_unique`, `account_name`, `account_email`, `area_code`, `phone_code`, `mask_phone`, `pwd`, `account_type`, `account_status`, `apply_resource`, `start_date`, `end_date`, `account_owner_id`, `account_dept_id`, `work_order_no`, `data_level`, `employee_id`, `fei_shu_id`, `dict_item_id`, `account_source`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`, `pwd_modified`) SELECT {} FROM hhi_account.account LIMIT 1;".format(
value_account_str)
print(insert_hhi_account)
self.db_con.insert_one(insert_hhi_account, choose_db='hhi')
update_end_date = "update `hhi_account`.`account` set end_date='2022-04-28',account_status=0 where account_email='{}'".format(
email)
self.db_con.execute(update_end_date, choose_db='hhi')
# account.account_email
query_account_email = "SELECT * FROM account.account_email WHERE account_email='{}' limit 1;".format(
email)
resp_account_email = self.db_con.select_one(query_account_email, choose_db='huohua')
if resp_account_email:
value_account_email_str = self._get_sql_value_by_dict(resp_account_email)
insert_hhi_account_email = "INSERT INTO `hhi_account`.`account_email`(`id`, `account_id`, `account_unique`, `account_email`, `first_choice`, `email_type_value`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`) SELECT {} FROM `hhi_account`.`account_email` LIMIT 1;".format(
value_account_email_str)
print(insert_hhi_account_email)
self.db_con.insert_one(insert_hhi_account_email, choose_db='hhi')
# emp.employee
query_employee = "SELECT `id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region` FROM `emp`.`employee` WHERE email='{}' limit 1;".format(
email)
resp_employee = self.db_con.select_one(query_employee, choose_db='huohua')
if resp_employee:
value_employee_str = self._get_sql_value_by_dict(resp_employee)
insert_hhi_employee = "INSERT INTO `hhi_emp`.`employee`(`id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region`) SELECT {} FROM `hhi_emp`.`employee` LIMIT 1;".format(
value_employee_str)
print(insert_hhi_employee)
self.db_con.insert_one(insert_hhi_employee, choose_db='hhi')
update_employee_info = "update `hhi_emp`.`employee` set birthday='2022-04-28 10:00:00',pre_prob_date='2022-05-16',dimission_time='2023-04-28 10:00:00',change_sign_time='2022-04-28 10:00:00' where email='{}'".format(
email)
self.db_con.execute(update_employee_info, choose_db='hhi')
# emp.employee_email
query_employee_email = "SELECT * FROM emp.employee_email where employee_email_value='{}' limit 1;".format(
email)
resp_employee_email = self.db_con.select_one(query_employee_email, choose_db='huohua')
if resp_employee_email:
value_employee_email_str = self._get_sql_value_by_dict(resp_employee_email)
insert_hhi_employee_email = "INSERT INTO `hhi_emp`.`employee_email`(`id`, `employee_id`, `employee_no`, `employee_email_type`, `employee_email_value`, `primary_flag`, `status`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`) SELECT {} FROM `hhi_emp`.`employee_email` LIMIT 1;".format(
value_employee_email_str)
print(insert_hhi_employee_email)
self.db_con.insert_one(insert_hhi_employee_email, choose_db='hhi')
@staticmethod
def _get_sql_value_by_dict(value_dict):
value_str = ""
for v in value_dict.values():
value_str = value_str + "'" + str(v) + "'" + ","
value_str = value_str[:-1]
# print(value_str)
return value_str
def wait_and_query_data(self, sql, wait_time=20):
"""
| 功能说明: | 针对数据入库有延迟的场景,根据sql和循环的时间查询并返回数据库查询结果 |
| 输入参数: | sql | 查询数据库对应的sql语句 |
| | wait_time | 等待的最长时间 |
| 返回参数: | db_info | 数据库查询结果 |
函数位置:src/Public/Common/utils.py
"""
try_time = 0
if sql:
while try_time < int(wait_time):
db_info = MySqLHelper().select_one(sql=sql)
if db_info:
obj_log.info('try_time={}'.format(try_time))
return db_info
else:
obj_log.info('try_time={}'.format(try_time))
time.sleep(5)
try_time += 5
else:
raise Exception('数据落库超时{}s,请检查'.format(try_time))
else:
raise Exception('查询sql不能为空......')
def to_lower_camel(self, name, ):
"""下划线转小驼峰法命名"""
return re.sub('_([a-zA-Z])', lambda m: (m.group(1).upper()), name.lower())
def to_snake(self, name):
"""驼峰转下划线"""
name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
return name.lower()
def change_dict_key_format(self, obj_dict, c_type=1):
"""
将dict或[dict]中的所有key,从下划线模式转为驼峰模式,或者反向转换
Args:
obj_dict: 原始dict或list
c_type: 类型:1-下划线转驼峰,2-驼峰转下划线
Returns:
转化后的dict,或list
"""
if isinstance(obj_dict, list):
new_dict = []
for item in obj_dict:
new_dict.append(self.__change_dict_key(c_dict=item, c_type=c_type))
return new_dict
elif isinstance(obj_dict, dict):
return self.__change_dict_key(c_dict=obj_dict, c_type=c_type)
def __change_dict_key(self, c_dict, c_type=1):
"""
内置函数,供change_dict_key_format使用
Args:
c_dict: 原始dict
c_type: 类型:1-下划线转驼峰,2-驼峰转下划线
Returns:
转化后的dict
"""
new_dict = {}
if c_type == 1:
for k in c_dict.keys():
new_key = self.to_lower_camel(name=k)
new_dict[new_key] = c_dict[k]
elif c_type == 2:
for k in c_dict.keys():
new_key = self.to_snake(name=k)
new_dict[new_key] = c_dict[k]
return new_dict
def dict_to_sql_condition(self, dicts):
"""
| 功能说明: | 字典转sql条件;默认会把驼峰key转为下划线| |
| 输入参数: | {}
| 返回参数: | WHERE key='' AND key=''
| 作者信息: | 陈江 | 2022.12.09 |
"""
try:
dicts = eval(dicts)
except:
dicts = dicts
if not dicts:
raise RuntimeError("参数不能为空")
condition = "WHERE "
count = 0
for k, v in dicts.items():
condition_key = self.to_snake(k)
if count == 0:
condition += condition_key + "='{}'".format(v)
count = count + 1
else:
condition += " AND " + condition_key + "='{}'".format(v)
return condition
def check_http_return_info(self, rsp, exp_code=200, exp_msg="", exp_success=True):
"""
功能说明:检查http接口返回字段
入参:
| rsp | 要检查的接口返回值 |
| exp_code | 期望返回的code |
| exp_msg | 期望返回的message |
| exp_success | 期望返回的success |
返回值:无,检验通过则继续,否则抛出对应异常
"""
exp_info = {"code": exp_code,
"success": exp_success,
"message": exp_msg}
self.should_be_equal_as_json(obj_json=rsp, sub_json=exp_info)
@staticmethod
def update_default_parameters_by_input(input_parameters, default_parameters):
"""
功能:用接口输入的参数去更新默认参数:有则更新,没有则在默认参数中新增,要求两个参数都的是dict
Args:
input_parameters: 接口输入参数
default_parameters: 默认参数
Returns: 更新后的default_parameters
"""
for p_key in input_parameters:
default_parameters[p_key] = input_parameters[p_key]
return default_parameters
@staticmethod
def get_value_from_post_data_input(post_data_input, key, default_value=None):
"""
功能:从用户入参post_data_input中找到对应key的value并返回,没有则赋予default_value值并返回
Args:
post_data_input: 用户输入的入参
key: 期望从入参中找的key
default_value: 当入参为空,或没有对应的key时,默认赋予的值
Returns:
对应key中的value或default_value
"""
# post_data_input.get(key) == 'default' 是用于兼容GPT的调用
if post_data_input is None or post_data_input.get(key) is None \
or post_data_input.get(key) in ["", "0", "default"]:
return default_value
else:
return post_data_input.get(key)
@staticmethod
def get_value_from_kwargs(kwargs, key, required=0, default_value=None):
"""
功能:从入参kwargs中获取指定key的参数值
Args:
kwargs: 参数dict
key: 参数key
required: 是否必填:0-非必现,1-必填,若必须填不在kwargs里,则抛出对应异常
default_value: 非必填项参数不在kwargs里时,赋予的默认值
Returns: key在kwargs中的具体值,或缺省时的默认值
"""
if required == 1 and key not in kwargs:
raise BusinessError("参数:{}是必填项,请出入....".format(key))
if key in kwargs:
return kwargs[key]
else:
return default_value
@staticmethod
def get_day_of_month():
"""
功能:获取当天的日期号数,比如今天是11月12号,return 12
Returns:
"""
today = datetime.datetime.now()
return today.day
@staticmethod
def formate_msg_for_aita(r_msg):
"""
格式化输出用于aita页面展示的消息,要求return_data
Args:
r_msg: 函数返回值,要求中含以下三个字段
for_gpt: 给gpt的内容,默认第一个为标题,原数据用\n字符分割,描述与值用:分割
for_user: 给用户在aita页面展示的内容,原数据用\n字符分割,描述与值用:分割
for_link: 跳转链接,格式:{"link_url":"链接地址","link_str":“链接名称,没有则默认为:[点击查询详情...]“}
Returns:
标题(加粗,改色)
描述1 值1
**** ***
描述2 值2
跳转链接
"""
for_gpt = r_msg["for_gpt"] if "for_gpt" in r_msg else None
for_user = r_msg["for_user"] if "for_user" in r_msg else None
for_link = r_msg["for_link"] if "for_link" in r_msg else None
if for_gpt:
del r_msg["for_gpt"]
gpt_list = for_gpt.split('\n')
for_gpt = "{}
".format(gpt_list[0])
for item in gpt_list[1:]:
value_list = item.replace(':', ':').split(':')
# 参数display:inline-block让两个div标签在一行内显示
key_value = "