1546 lines
82 KiB
Python
1546 lines
82 KiB
Python
# -*- coding:utf-8 -*-
|
||
|
||
"""
|
||
Author: qiaoxinjiu
|
||
Create Data: 2020/9/24 11:39
|
||
"""
|
||
import base64
|
||
import datetime
|
||
import hashlib
|
||
import calendar
|
||
import math
|
||
import re
|
||
import time
|
||
import pytz
|
||
from os import listdir
|
||
from dateutil.relativedelta import relativedelta
|
||
from chinese_calendar import is_workday, is_holiday
|
||
|
||
import jsonpath
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
from base_framework.base_config.current_pth import *
|
||
from base_framework.public_tools import log
|
||
from base_framework.public_tools.read_config import InitConfig
|
||
from base_framework.public_tools.read_config import ReadConfig
|
||
from base_framework.public_tools.runner import Runner
|
||
from base_framework.public_tools.sqlhelper import MySqLHelper
|
||
from base_framework.public_tools.custom_error import BusinessError
|
||
|
||
obj_runner = Runner()
|
||
obj_log = log.get_logger()
|
||
obj_evn_cfg = ReadConfig(env_choose_path)
|
||
|
||
|
||
class Tools(InitConfig):
|
||
|
||
def __init__(self):
|
||
# super().__init__()
|
||
InitConfig.__init__(self, run_user_name=None, current_evn=None)
|
||
# self.tool_jira_id = tool_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
|
||
self.db_con = MySqLHelper()
|
||
|
||
def get_jira_id_from_ini(self):
|
||
# cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
# ini_path = cfg_path + '/ConfigFiles/env_choose.ini'
|
||
tool_jira_id = None
|
||
try:
|
||
tool_jira_id = obj_evn_cfg.get_value(sections='run_jira_id', options='huohua-podenv')
|
||
except Exception as e:
|
||
print(e)
|
||
return tool_jira_id
|
||
|
||
def get_md5(self, str):
|
||
m = hashlib.md5()
|
||
m.update(str.encode("utf8"))
|
||
md5 = m.hexdigest()
|
||
return md5
|
||
|
||
def get_format_date(self, r_week=None, r_time=None, t_time=None, r_schedule=None,
|
||
r_type=1,
|
||
add_months=0, add_weeks=0, add_days=0, add_hours=0, add_minutes=0, add_seconds=0, minutes=60):
|
||
"""
|
||
| 功能说明: | 返回指定格式的日期类型 | |
|
||
| 输入参数: | | |
|
||
| | r_schedule | 传入1的时候,直接返回('2023-08-18 15:44:57', '2023-08-18', '15:44:57', 6, '15:45')格式数据,用于教务的档期|
|
||
| | r_week | 传入周几,返回下个周几的日期 |
|
||
| | r_time | 需要转换的日期(格式):%Y-%m-%d %H:%M:%S, 如果不传则默认取系统当前日期, 时间戳格式不支持此参数 |
|
||
| | t_time | 需要转换的日期(格式):时间戳,秒级,默认取前十位数,当 r_time=None 时生效 |
|
||
| | r_type | 返回格式:详见下面的明细 |
|
||
| | add_month | 月 偏移量:0-当前,1-下月,-1上月,时间戳格式不支持此参数 |
|
||
| | add_weeks | 周 偏移量:0-当前,1-下周,-1上周 |
|
||
| | add_days | 日 偏移量:0-当前,1-明天,-1昨天 |
|
||
| | add_hours | 小时偏移量:0-当前,1-后一小时,-1前一小时 |
|
||
| | add_minutes | 分钟偏移量:0-当前,1-后一分钟,-1前一分钟 |
|
||
| | add_seconds | 秒级偏移量:0-当前,1-下一秒,-1前一秒 |
|
||
| | minutes | 分钟数,仅用于r_schedule不为空的时候 |
|
||
| 返回参数: | 按r_type指定的类型返回 |
|
||
| 作者信息: | 吴勇刚 | 2019.05.24 |
|
||
r_type类型列表:
|
||
| 类型编号 | 类型样式 | 实例 |
|
||
| 1 | %Y-%m-%d | 2019-05-23 |
|
||
| 2 | %Y年%m月%d日 | 2019年5月23日 |
|
||
| 3 | %Y年%m月%d日周X | 2019年5月23日星期四 |
|
||
| 4 | %Y-%m-%d %H:%M:%S | 2021-06-28 21:54:46 |
|
||
| 5 | %Y-%m-%d %H:%M:%S.%f | 2021-06-28 21:54:46.205213 |
|
||
| 6 | %Y-%m | 2019-05 |
|
||
| 7 | %Y%m%d | 20190505 |
|
||
| 8 | %Y%m%d%H%M%S | 20190505102030 |
|
||
| 11 | 原始时间戳 | 1499825149.2578925 |
|
||
| 12 | 秒级时间戳 | 1499825149 |
|
||
| 13 | 毫秒级时间戳 | 1499825149257 |
|
||
| 14 | 微秒级时间戳 | 1499825149257892 |
|
||
| 15 | %Y-%m-%d 00:00:00 | 2021-06-28 00:00:00 |
|
||
| 16 | %Y-%m-%d 23:59:59 | 2021-06-28 23:59:59 |
|
||
| 17 | %Y_%m_%d | 2021_06_28 |
|
||
| 18 | 返回档期所需的相关格式 | 0:%Y-%m-%d %H:%M:%S; 1:"%H:%M"; 2:档期格式专用week(原week+1); 3:"%Y-%m-%d %H:%M"; 4:原week |
|
||
| 19 | 类型15对应的时间戳 | 1640620800000 |
|
||
| 20 | 毫秒级时间戳,分秒均为0 | 1499825000000 |
|
||
| 21 | 毫秒级时间戳,秒毫秒均为0 | 1499825000000 |
|
||
| 22 | 时间拆分为年 月 日 时间 24H制 | {'year':'','month':'','day':'','time_24':{'short_time':'','full_time':''},'time_12h':{'short_time':'','full_time':''}} |
|
||
| 23 | %Y%m | 202303 |
|
||
| 24 | 下周一的当前时间 | 2021-07-05 21:54:00 |
|
||
| 25 | 本月的最后一天 | 2021-07-31 23:59:59 |
|
||
| 26 | 本月的第一天 | 2021-07-01 00:00:00 |
|
||
| 28 | 当前时间所在周周一 | 2021-07-01 00:00:00 |
|
||
| 29 | 当前时间所在周周日 | 2021-07-01 23:59:59 |
|
||
| 91 | 对应年份的第几周 | (2022,35,1):2022年第35周第1天 |
|
||
| 92 | 对应年份的第几周 | 202235:2022年第35周 |
|
||
函数路径:src/Public/Common/utils.py
|
||
"""
|
||
r_type = int(r_type)
|
||
add_months = int(add_months)
|
||
add_weeks = int(add_weeks)
|
||
add_days = int(add_days)
|
||
add_hours = int(add_hours)
|
||
add_minutes = int(add_minutes)
|
||
add_seconds = int(add_seconds)
|
||
if r_schedule is not None:
|
||
if r_schedule == 1:
|
||
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
|
||
update_time = (datetime.datetime.now() + datetime.timedelta(minutes=minutes))
|
||
weeks = weeks[str(update_time.isoweekday())] # 获取时间周几
|
||
return update_time.strftime("%Y-%m-%d %H:%M:%S"), update_time.strftime(
|
||
"%Y-%m-%d"), update_time.strftime(
|
||
"%H:%M:%S"), weeks, update_time.strftime("%H:%M")[0:4] + "5"
|
||
else:
|
||
raise Exception("r_schedule参数输入错误,请重新输入")
|
||
if r_time is None and t_time is None and r_week is None:
|
||
r_date = datetime.datetime.now() + datetime.timedelta(weeks=add_weeks,
|
||
days=add_days,
|
||
hours=add_hours,
|
||
minutes=add_minutes,
|
||
seconds=add_seconds)
|
||
t_date = time.time() + ((add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
|
||
else:
|
||
if r_week:
|
||
today = datetime.datetime.now()
|
||
days_ahead = r_week - 1 - today.weekday() # 4 corresponds to Friday (0 is Monday, 6 is Sunday)
|
||
if days_ahead < 0:
|
||
days_ahead += 7
|
||
date_time = today + datetime.timedelta(days=days_ahead)
|
||
elif r_time:
|
||
try:
|
||
date_time = datetime.datetime.strptime(r_time, "%Y-%m-%d %H:%M:%S")
|
||
except ValueError:
|
||
raise ValueError("输入时间格式错误,请输入格式为:%Y-%m-%d %H:%M:%S 的时间字符串")
|
||
elif t_time:
|
||
if len(str(t_time)) > 10:
|
||
t_time = int(str(t_time)[0:10])
|
||
try:
|
||
tmp_time = datetime.datetime.fromtimestamp(t_time)
|
||
# date_time = tmp_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
date_time = datetime.datetime.fromtimestamp(t_time)
|
||
except ValueError as e:
|
||
raise ValueError("输入时间格式错误,错误原因:{}".format(e))
|
||
r_date = date_time + datetime.timedelta(weeks=add_weeks,
|
||
days=add_days,
|
||
hours=add_hours,
|
||
minutes=add_minutes,
|
||
seconds=add_seconds)
|
||
t_date = time.mktime(date_time.timetuple()) + (
|
||
(add_weeks * 7 + add_days) * 24 + add_hours) * 3600 + add_minutes * 60 + add_seconds
|
||
|
||
if add_months >= 0:
|
||
r_date = r_date + relativedelta(months=add_months)
|
||
else:
|
||
r_date = r_date - relativedelta(months=abs(add_months))
|
||
|
||
if r_type == 1:
|
||
return r_date.strftime("%Y-%m-%d")
|
||
elif r_type == 2:
|
||
return str(r_date.year) + "年" + str(r_date.month) + "月" + str(r_date.day) + "日"
|
||
elif r_type == 3:
|
||
f_date = str(r_date.year) + "年" + str(r_date.month) + "月" + str(r_date.day) + "日"
|
||
return f_date + self.get_week_by_date(format_date=r_date.strftime("%Y-%m-%d"), week_type='Chinese')
|
||
elif r_type == 4:
|
||
return r_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
elif r_type == 5:
|
||
return r_date.strftime("%Y-%m-%d %H:%M:%S.%f")
|
||
elif r_type == 6:
|
||
return r_date.strftime("%Y-%m")
|
||
elif r_type == 7:
|
||
return r_date.strftime("%Y%m%d")
|
||
elif r_type == 8:
|
||
return r_date.strftime("%Y%m%d%H%M%S")
|
||
elif r_type == 11:
|
||
return t_date
|
||
elif r_type == 12:
|
||
return int(t_date)
|
||
elif r_type == 13:
|
||
return int(round(t_date * 1000))
|
||
elif r_type == 14:
|
||
return int(round(t_date * 1000000))
|
||
elif r_type == 15:
|
||
return "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
|
||
elif r_type == 16:
|
||
return "{} 23:59:59".format(r_date.strftime("%Y-%m-%d"))
|
||
elif r_type == 17:
|
||
return r_date.strftime("%Y_%m_%d")
|
||
elif r_type == 18:
|
||
weeks = {"1": 2, "2": 3, "3": 4, "4": 5, "5": 6, "6": 7, "7": 1}
|
||
weeks = weeks[str(r_date.isoweekday())] # 获取时间周几
|
||
origin_week = int(str(r_date.isoweekday()))
|
||
return r_date.strftime("%Y-%m-%d %H:%M:%S"), r_date.strftime("%H:%M")[0:4] + "5", weeks, r_date.strftime(
|
||
"%Y-%m-%d %H:%M"), origin_week
|
||
elif r_type == 19:
|
||
today = "{} 00:00:00".format(r_date.strftime("%Y-%m-%d"))
|
||
time_array = time.strptime(today, "%Y-%m-%d %H:%M:%S")
|
||
timestamp = int(time.mktime(time_array))
|
||
return str(timestamp * 1000)
|
||
elif r_type == 20:
|
||
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:00:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
|
||
elif r_type == 21:
|
||
return int(time.mktime(time.strptime(r_date.strftime("%Y-%m-%d %H:%M:00"), '%Y-%m-%d %H:%M:%S'))) * 1000
|
||
elif r_type == 23:
|
||
return r_date.strftime("%Y%m")
|
||
elif r_type == 24: # 返回下周一的当前时间
|
||
today = datetime.datetime.today()
|
||
return (today + datetime.timedelta(days=7 - today.weekday())).strftime("%Y-%m-%d %H:%M:%S")
|
||
elif r_type == 25:
|
||
last_date = calendar.monthrange(year=r_date.year, month=r_date.month)
|
||
return "{}-{} 23:59:59".format(r_date.strftime("%Y-%m"), last_date[1])
|
||
elif r_type == 26:
|
||
return "{}-{}-01 00:00:00".format(r_date.year, r_date.month)
|
||
elif r_type == 27:
|
||
# 时区转换
|
||
# 将日期和时间转换为UTC时区
|
||
now_utc_time = r_date.astimezone(pytz.utc)
|
||
obj_time = now_utc_time.isoformat()[:19]+"Z"
|
||
return obj_time, r_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
elif r_type == 28:
|
||
now = datetime.datetime.now()
|
||
last_monday = now - datetime.timedelta(days=now.weekday())
|
||
return f"{last_monday.strftime('%Y-%m-%d')} 00:00:00"
|
||
elif r_type == 29:
|
||
now = datetime.datetime.now()
|
||
last_sunday = now + datetime.timedelta(days=6 - now.weekday())
|
||
return f"{last_sunday.strftime('%Y-%m-%d')} 23:59:59"
|
||
elif r_type == 91:
|
||
return r_date.isocalendar()
|
||
elif r_type == 92:
|
||
t_week = r_date.isocalendar()
|
||
return "{}{}".format(t_week[0], t_week[1])
|
||
elif r_type == 22:
|
||
date_time_str = r_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
date_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S')
|
||
|
||
year = date_obj.year
|
||
month = date_obj.month
|
||
day = date_obj.day
|
||
short_time_12h = date_obj.strftime('%I:%M %p').lower()
|
||
full_time_12h = date_obj.strftime('%I:%M:%S %p').lower()
|
||
short_time_24h = date_obj.strftime('%I:%M')
|
||
full_time_24h = date_obj.strftime('%I:%M:%S')
|
||
return {'year': year, 'month': month, 'day': day,
|
||
'time_24': {'short_time': f'{short_time_24h}', 'full_time': f'{full_time_24h}'},
|
||
'time_12h': {'short_time': f'{short_time_12h}', 'full_time': f'{full_time_12h}'}}
|
||
else:
|
||
raise Exception(
|
||
"调用get_format_date函数的r_type参数不正确....")
|
||
|
||
@staticmethod
|
||
def get_week_by_date(format_date, week_type='Arabic'):
|
||
"""
|
||
| 功能说明: | 返回format_date对应的星期 |
|
||
| 输入参数: | format_date | 要计算的日期格式:2021-06-28 |
|
||
| | week_type | 返回星期的格式:Arabic-阿拉伯数字,Chinese-中文 |
|
||
| 返回参数: | 星期几,0-6, 0-星期日 |
|
||
| 作者信息: | 吴勇刚 | 2021-06-08 |
|
||
函数路径:src/Public/Common/utils.py
|
||
| format_date | week_type | 返回 |
|
||
| 2021-06-27 | Arabic | 0 |
|
||
| 2021-06-28 | Arabic | 1 |
|
||
| 2021-06-27 | Chinese | 星期日 |
|
||
| 2021-06-28 | Chinese | 星期一 |
|
||
"""
|
||
input_date = format_date.split('-')
|
||
if len(input_date) != 3:
|
||
raise Exception("输入值:{} 不是2021-06-08格式,请检查".format(format_date))
|
||
year = int(input_date[0])
|
||
month = int(input_date[1])
|
||
day = int(input_date[2])
|
||
week = int(datetime.datetime(year, month, day).strftime("%w"))
|
||
if week_type == 'Arabic':
|
||
return week
|
||
elif week_type == 'Chinese':
|
||
weeks = ['星期日', '星期一', '星期二', '星期三', '星期四', '星期五', '星期六']
|
||
return weeks[week]
|
||
|
||
@staticmethod
|
||
def check_the_date_is_a_working_day(format_date=None):
|
||
"""
|
||
功能:判断给定日期是否为工作日
|
||
| 输入 | format_date | 日期(格式如:2021-09-01),不传则计算今天|
|
||
| 返回 | True,工作日 | False,非工作日,即周末或法定节假日 |
|
||
| 作者 | 吴勇刚 | 2021.08.31 |
|
||
函数路径:src/Public/Common/utils.py
|
||
"""
|
||
if not format_date: # 没有传入参数则默认计算当天是否为工作日
|
||
format_date = datetime.date.today()
|
||
else:
|
||
format_date = datetime.datetime.strptime(format_date, "%Y-%m-%d").date()
|
||
if is_workday(format_date) and not is_holiday(format_date):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def get_img_bas64(path):
|
||
img = open(path, 'rb')
|
||
img_bas64 = base64.b64encode(img.read())
|
||
img.close()
|
||
'''b 表示 byte的意思,将byte转换回去'''
|
||
return str(img_bas64, 'utf-8')
|
||
|
||
@staticmethod
|
||
def get_files_path():
|
||
path = (os.path.split(os.path.realpath(__file__)))[0] + '\\' + "img"
|
||
return path
|
||
|
||
@staticmethod
|
||
def listdir(self): # 传入存储的list
|
||
list_name = []
|
||
path = self.get_files_path()
|
||
for file in os.listdir(path):
|
||
file_path = os.path.join(path, file)
|
||
if os.path.isdir(file_path):
|
||
listdir(file_path, list_name)
|
||
else:
|
||
list_name.append(file_path)
|
||
return list_name
|
||
|
||
@staticmethod
|
||
def del_file(file_name):
|
||
size = os.path.getsize(file_name)
|
||
file_size = 44 * 1024 # 100K
|
||
if size > file_size:
|
||
print('remove', size, file_name)
|
||
try:
|
||
os.remove(file_name)
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
def modify_db_info(self, dbName=None, dbUsername=None,
|
||
dbPassword=None, dbHost=None, dbPort=None):
|
||
"""
|
||
Example usage:
|
||
| # explicitly specifies all db property values |
|
||
| modify_db_info | psycopg2 | my_db | postgres | s3cr3t | tiger.foobar.com | 5432 |
|
||
"""
|
||
if dbName:
|
||
self.db_rtn.set_section(
|
||
section='Mysql',
|
||
option='db_test_dbname',
|
||
value=dbName)
|
||
if dbUsername:
|
||
self.db_rtn.set_section(
|
||
section='Mysql',
|
||
option='db_test_user',
|
||
value=dbUsername)
|
||
if dbPassword:
|
||
self.db_rtn.set_section(
|
||
section='Mysql',
|
||
option='db_test_password',
|
||
value=dbName)
|
||
if dbHost:
|
||
self.db_rtn.set_section(
|
||
section='Mysql',
|
||
option='db_test_host',
|
||
value=dbHost)
|
||
if dbPort:
|
||
self.db_rtn.set_section(
|
||
section='Mysql',
|
||
option='db_test_port',
|
||
value=dbPort)
|
||
|
||
def __get_eureka_url_from_db(self, team, server):
|
||
"""
|
||
| 功能 | 从DB中获取服务对应的eureka_url,供get_container_ip_from_eureka函数使用 |
|
||
"""
|
||
sql_str = "select eureka_url as hh, eureka_url_hhi as hhi from sparkatp.swagger_info where team='{0}' " \
|
||
"and server_name='{1}';".format(team, server)
|
||
res = self.db_con.select_one(sql=sql_str)
|
||
current_business = self.evn_cfg['run_evn_name']['current_business']
|
||
return res[current_business]
|
||
|
||
def get_container_ip_from_eureka(self, server_name, env=None, need_jira_id=False, jira_id_dev=None,
|
||
is_contain_qa=True, team_name=None, eureka_url=None):
|
||
"""
|
||
| 功能说明: | 获取Eureka指定服务的IP |
|
||
| 输入参数: | server_name | 服务名 |
|
||
| | env | QA or SIM |
|
||
| | need_jira_id | True:从config.ini文件获取, False:取QA环境的IP |
|
||
| | jira_id_dev | 传入自定义独立环境编号 |
|
||
| | is_contain_qa | 当没有对应的独立环境时,是否返回qa环境IP,默认True,返回 |
|
||
| | team_name | 小组名,当eureka_url=None时,用于确认eureka_url |
|
||
| | eureka_url | eureka地址,没有传入则读取数据库中的值 |
|
||
| 返回参数: | dict | container_ip |
|
||
| 作者信息: | 吴勇刚 | 2022.03.27 |
|
||
举例说明:
|
||
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
|
||
"""
|
||
# 各项配置获取
|
||
if team_name is None: # 业务组
|
||
team_name = obj_evn_cfg.get_value(sections='run_evn_name', options='current_team')
|
||
if env is None: # 构建环境
|
||
env = obj_evn_cfg.get_value(sections='run_evn_name', options='current_evn')
|
||
if eureka_url is None: # eureka地址
|
||
eureka_url = self.__get_eureka_url_from_db(team=team_name, server=server_name)
|
||
# if env.upper() == "QA" and 'sim' in eureka_url.lower():
|
||
# eureka_url = eureka_url.replace('.sim.', '.qa.')
|
||
# elif env.upper() == "SIM" and 'qa' in eureka_url.lower():
|
||
# eureka_url = eureka_url.replace('.qa.', '.sim.')
|
||
jira_id = None
|
||
if jira_id_dev is None:
|
||
# if need_jira_id is False:
|
||
# jira_id = 'qa'
|
||
# else:
|
||
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
|
||
# if jira_id == "":
|
||
# jira_id = 'qa'
|
||
else:
|
||
jira_id = jira_id_dev
|
||
temp_text = requests.get(url=eureka_url).content
|
||
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
|
||
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
|
||
tree_dict = dict()
|
||
for temp in all_container_ip:
|
||
server = list(temp.parent.parent)[1].get_text()
|
||
server_ip = list(temp)
|
||
if server in tree_dict.keys():
|
||
tree_dict[server].extend(server_ip)
|
||
else:
|
||
tree_dict[server] = server_ip
|
||
server_name = server_name.upper()
|
||
return_dict = dict()
|
||
if server_name in tree_dict.keys():
|
||
if jira_id:
|
||
if jira_id in str(tree_dict[server_name]):
|
||
container_ip = [
|
||
temp.split(':')[0] for temp in
|
||
filter(lambda x: x.split(':')[2] if len(x.split(':')) > 2 else "" == jira_id,
|
||
tree_dict[server_name])
|
||
if jira_id in temp]
|
||
for ip_temp in container_ip:
|
||
if ip_temp.startswith('10'):
|
||
container_ip = ip_temp
|
||
else: # 防止研发在本地启动了服务导致有多条数据 ,跳过办公网段的ip
|
||
obj_log.warning('跳过获取到的IP:{}'.format(ip_temp))
|
||
continue
|
||
return_dict['container_ip'] = container_ip
|
||
if 'allschool' in eureka_url:
|
||
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
|
||
value=container_ip)
|
||
return return_dict
|
||
elif env.upper() == "SIM":
|
||
if len(tree_dict[server_name]) == 1: # sim环境如果只有一个ip,就直接返回此ip
|
||
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name]][0]
|
||
return_dict['container_ip'] = container_ip
|
||
return return_dict
|
||
else:
|
||
if not is_contain_qa:
|
||
return_dict['container_ip'] = ''
|
||
return return_dict
|
||
# 如果没有对应的独立环境,则返回qa环境的ip
|
||
if 'qa' in str(tree_dict[server_name]):
|
||
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
|
||
for ip_temp in container_ip:
|
||
if ip_temp.startswith('10'):
|
||
container_ip = ip_temp
|
||
else:
|
||
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
|
||
continue
|
||
return_dict['container_ip'] = container_ip
|
||
return return_dict
|
||
elif 'groot' in str(tree_dict[server_name]):
|
||
"""解决allschool部分服务极限环境为groot而非qa的场景"""
|
||
container_ip = [temp.split(':')[0] for temp in tree_dict[server_name] if 'groot' in temp]
|
||
for ip_temp in container_ip:
|
||
if ip_temp.startswith('10'):
|
||
container_ip = ip_temp
|
||
else:
|
||
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
|
||
continue
|
||
return_dict['container_ip'] = container_ip
|
||
# if 'allschool' in eureka_url:
|
||
# ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
|
||
return return_dict
|
||
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
|
||
container_ip = [temp.split(':')[0]
|
||
for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][0]
|
||
return_dict['container_ip'] = container_ip
|
||
return return_dict
|
||
else:
|
||
return_dict['container_ip'] = ''
|
||
return return_dict
|
||
else:
|
||
# raise Exception('从{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
|
||
obj_log.error('从{}中未获取到{}的IP,请检查服务名是否正确!!!'.format(eureka_url, server_name))
|
||
return False
|
||
|
||
# def get_url_by_eureka(self, server_name, jira_id=False):
|
||
# """
|
||
# | 功能说明: | 获取Eureka指定服务的IP后,组装成需要的url前缀 |
|
||
# | 输入参数: | server_name | 服务名 |
|
||
# | 输入参数: | jira_id | 独立环境编号 |
|
||
# """
|
||
# svr_ip = self.get_container_ip_from_eureka(server_name=server_name, need_jira_id=jira_id)
|
||
# if svr_ip:
|
||
# return "http://" + svr_ip["container_ip"] + ":8080"
|
||
# else:
|
||
# return False
|
||
|
||
def get_container_ip_from_eureka_old(self, server_name, env=None, need_jira_id=False,
|
||
jira_id_dev=None, is_contain_qa=True):
|
||
"""
|
||
| 功能说明: | 获取Eureka指定服务的IP |
|
||
| 输入参数: | server_name | 服务名 |
|
||
| | env | QA or SIM |
|
||
| | need_jira_id | True:从config.ini文件获取, False:取QA环境的IP |
|
||
| | jira_id_dev | 传入自定义独立环境编号 |
|
||
| 返回参数: | dict | container_ip |
|
||
| 作者信息: | 作者 林于棚 | 修改时间 |
|
||
举例说明:
|
||
| get_container_ip_from_eureka(server_name='PEPPA-QI-API') |
|
||
"""
|
||
return_dict = dict()
|
||
tree_dict = dict()
|
||
# 针对于pz项目的白名单,与其他项目的eureka地址不一致
|
||
pz_eureka_server = ["hulk-teaching-server", "hulk-teacher-api", "hulk-teacher-server", "hulk-teaching-listener",
|
||
"hulk-teaching-scheduler", "hulk-operation-api-server", "hulk-operation-listener",
|
||
"hulk-operation-server", "hulk-account-server",
|
||
'hulk-org-server', 'hulk-org-api', 'hulk-teacher-api', 'hulk-class-help-server',
|
||
'hulk-content-audit-server', "hulk-teach-backend-api", "hulk-teach-supply-cli-api",
|
||
"hulk-ark-gateway", "hulk-class-help-manager", "hulk-content-manage-gateway",
|
||
"hulk-teach-supply-server", "hulk-teach-classes-server", "classpod-platform-api",
|
||
"classpod-user-api",
|
||
"hulk-teach-content-server", "hulk-teach-content-cli-api", "hulk-teach-course-server"
|
||
]
|
||
if env is None:
|
||
env = obj_evn_cfg.get_value(
|
||
sections='run_evn_name', options='current_evn')
|
||
if env == "QA":
|
||
if server_name.lower() in pz_eureka_server:
|
||
# url = "http://eureka-asc.qa.huohua.cn/"
|
||
url = "http://eureka.qa.allschool.com/"
|
||
else:
|
||
url = "http://eureka.qa.huohua.cn/"
|
||
else:
|
||
if server_name.lower() in pz_eureka_server:
|
||
# url = "http://eureka-asc.sim.huohua.cn/"
|
||
url = "http://eureka.sim.allschool.com/"
|
||
else:
|
||
url = 'http://eureka.sim.huohua.cn/'
|
||
temp_text = requests.get(url=url).content
|
||
soup = BeautifulSoup(temp_text.decode('utf-8'), "html.parser")
|
||
all_container_ip = soup.find_all(href=re.compile("actuator/info"))
|
||
for temp in all_container_ip:
|
||
server = list(temp.parent.parent)[1].get_text()
|
||
server_ip = list(temp)
|
||
if server in tree_dict.keys():
|
||
tree_dict[server].extend(server_ip)
|
||
else:
|
||
tree_dict[server] = server_ip
|
||
if need_jira_id:
|
||
jira_id = ReadConfig(env_choose_path).get_value(sections='run_jira_id', options='huohua-podenv')
|
||
if jira_id == "":
|
||
jira_id = None
|
||
else:
|
||
jira_id = None
|
||
if jira_id_dev:
|
||
jira_id = jira_id_dev
|
||
server_name = server_name.upper()
|
||
if server_name in tree_dict.keys():
|
||
if jira_id:
|
||
if jira_id in str(tree_dict[server_name]):
|
||
container_ip = [
|
||
temp.split(':')[0] for temp in
|
||
filter(lambda x: x.split(':')[2] == jira_id, tree_dict[server_name])
|
||
if jira_id in temp]
|
||
for ip_temp in container_ip:
|
||
if ip_temp.startswith('10'):
|
||
container_ip = ip_temp
|
||
else:
|
||
obj_log.warning('跳过获取到的IP:{}'.format(ip_temp))
|
||
continue
|
||
return_dict['container_ip'] = container_ip
|
||
if server_name.lower() in pz_eureka_server:
|
||
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name,
|
||
value=container_ip)
|
||
return return_dict
|
||
else:
|
||
if not is_contain_qa:
|
||
return_dict['container_ip'] = ''
|
||
return return_dict
|
||
if 'qa' in str(tree_dict[server_name]):
|
||
container_ip = [
|
||
temp.split(':')[0] for temp in tree_dict[server_name] if 'qa' in temp]
|
||
for ip_temp in container_ip:
|
||
if ip_temp.startswith('10'):
|
||
container_ip = ip_temp
|
||
else:
|
||
obj_log.error('跳过获取到的IP:{}'.format(ip_temp))
|
||
continue
|
||
return_dict['container_ip'] = container_ip
|
||
if server_name.lower() in pz_eureka_server:
|
||
ReadConfig(pz_all_server_ip_path).set_section(section='PZ', option=server_name, value=container_ip)
|
||
return return_dict
|
||
elif '${server.port}' in str(tree_dict[server_name]) or 'sim' in str(tree_dict[server_name]):
|
||
container_ip = [temp.split(':')[
|
||
0] for temp in tree_dict[server_name] if '${server.port}' in temp or 'sim' in temp][
|
||
0]
|
||
return_dict['container_ip'] = container_ip
|
||
return return_dict
|
||
else:
|
||
return_dict['container_ip'] = ''
|
||
return return_dict
|
||
else:
|
||
obj_log.error('未获取到IP,请检查server_name:{} 是否正确!'.format(server_name))
|
||
return_dict['container_ip'] = ''
|
||
return return_dict
|
||
|
||
def com_get_value_from_json(self, json_object, json_path):
|
||
return jsonpath.jsonpath(json_object, json_path)
|
||
|
||
@staticmethod
|
||
def should_be_equal_as_list(sub_list, obj_list, ignore_nodes=[], check_len=False):
|
||
"""
|
||
功能:判断两个list是否相等,不区分排序
|
||
| 输入参数: | sub_list | 列表1,子列表 |
|
||
| | obj_list | 列表2,目标列表 |
|
||
| | ignore_nodes | sub_list中要忽略的节点,填需要,从0开始 |
|
||
| | check_len | 是否检查长度,默认不检查,传入True时,则检查两个list完全相同 |
|
||
| 作者: | 吴勇刚 | 2021.05. 16 |
|
||
函数位置:src/Public/Common/utils.py
|
||
| 完全相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb']) | failed |
|
||
| 子集相等 | should_be_equal_as_list([1,'a',2,'aa'], [1,2,'b','bb'], [1,3]) | success |
|
||
"""
|
||
if isinstance(sub_list, str):
|
||
try:
|
||
sub_list = eval(sub_list)
|
||
except Exception as e:
|
||
print("输入参数类型错误:{}".format(e))
|
||
if isinstance(obj_list, str):
|
||
try:
|
||
obj_list = eval(obj_list)
|
||
except Exception as e:
|
||
print("输入参数类型错误:{}".format(e))
|
||
if not isinstance(sub_list, list) or not isinstance(obj_list, list):
|
||
raise RuntimeError("【ERROR】请求参数必现都是list类型,sub_list:{0}, "
|
||
"obj_list:{1}".format(type(sub_list), type(obj_list)))
|
||
return
|
||
if check_len:
|
||
if len(obj_list) != len(sub_list):
|
||
raise RuntimeError("【ERROR】两个list的长度不同:{}!={},请检查...".format(len(sub_list), len(obj_list)))
|
||
return
|
||
equal = True
|
||
for index in range(len(sub_list)):
|
||
if index not in ignore_nodes:
|
||
if sub_list[index] not in obj_list:
|
||
print("【ERROR】{0} not in obj_list:{1}".format(sub_list[index], obj_list))
|
||
equal = False
|
||
if not equal:
|
||
raise RuntimeError("【ERROR】sub_list != obj_list,请检查......")
|
||
|
||
@staticmethod
|
||
def should_be_equal_as_dict(sub_dict, obj_dict, ignore_keys=[]):
|
||
"""
|
||
功能:判断两个list是否相等,不区分排序
|
||
| 输入参数: | sub_dict | 列表1,子列表 |
|
||
| | obj_dict | 列表2,目标列表 |
|
||
| | ignore_keys | sub_dict中要忽略的keys |
|
||
| 作者: | 吴勇刚 | 2021.05. 16 |
|
||
函数位置:src/Public/Common/utils.py
|
||
| 完全相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}) | not equal |
|
||
| 子集相等 | should_be_equal_as_dict({'a':1,'b':2,'c':True}, {'a':1,'b':2,'c':False}, ['c']) | equal |
|
||
"""
|
||
if not isinstance(sub_dict, dict) or not isinstance(obj_dict, dict):
|
||
raise RuntimeError("【ERROR】请求参数必现都是dict类型,sub_dict:{0}, obj_dict:{1}".format(type(sub_dict),
|
||
type(obj_dict)))
|
||
return
|
||
if len(ignore_keys) == 0 and len(sub_dict) != len(obj_dict):
|
||
raise RuntimeError(
|
||
"【ERROR】两个dict的长度不相等,sub={0},obj={1}".format(
|
||
len(sub_dict), len(obj_dict)))
|
||
return
|
||
equal = True
|
||
for key in sub_dict:
|
||
if key not in ignore_keys:
|
||
if key not in obj_dict:
|
||
print(
|
||
"【ERROR】{0} not in obj_list:{1}".format(
|
||
sub_dict[key], obj_dict))
|
||
equal = False
|
||
continue
|
||
if sub_dict[key] != obj_dict[key]:
|
||
print("【ERROR】'节点{key}':{sub} != {obj}".format(key=key,
|
||
sub=sub_dict[key],
|
||
obj=obj_dict[key]))
|
||
equal = False
|
||
if not equal:
|
||
raise RuntimeError("【ERROR】sub_dict != obj_dict,请检查......")
|
||
|
||
def check_result(self, my_result, result_one=None,
|
||
result_two=None, position=None):
|
||
if result_one is None:
|
||
result_one = []
|
||
|
||
if result_two is None:
|
||
result_two = []
|
||
|
||
if position is None:
|
||
position = []
|
||
|
||
for key, value in my_result.items():
|
||
position.append(key)
|
||
if isinstance(value, dict):
|
||
Tools().check_result(value, result_one, result_two, position)
|
||
elif isinstance(value, list):
|
||
for index, it in enumerate(value):
|
||
position.append(index)
|
||
if isinstance(it, dict):
|
||
Tools().check_result(it, result_one, result_two, position)
|
||
elif isinstance(it, str):
|
||
data_in_file = Tools().generate(position)
|
||
result_one.append(it)
|
||
result_two.append(data_in_file)
|
||
elif isinstance(it, int):
|
||
data_in_file = Tools().generate(position)
|
||
result_one.append(it)
|
||
result_two.append(data_in_file)
|
||
position.pop()
|
||
else:
|
||
if isinstance(value, str):
|
||
data_in_file = Tools().generate(position)
|
||
result_one.append(value)
|
||
result_two.append(data_in_file)
|
||
elif isinstance(value, int):
|
||
data_in_file = Tools().generate(position)
|
||
result_one.append(value)
|
||
result_two.append(data_in_file)
|
||
position.pop()
|
||
return result_one, result_two
|
||
|
||
def generate(self, list_all):
|
||
y = ""
|
||
for i in list_all:
|
||
if isinstance(i, str):
|
||
y += '["{}"]'.format(i)
|
||
else:
|
||
y += '[{}]'.format(i)
|
||
return "obj_json" + y
|
||
|
||
def mq_send(self, topic, message_body, tag="tag",
|
||
key="key", pro="pro", **kwargs):
|
||
"""
|
||
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
|
||
"""
|
||
post_data = dict()
|
||
return_data = dict()
|
||
cfg_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
ini_path = cfg_path + '/base_framework/base_config/env_choose.ini'
|
||
jira_id = ReadConfig(ini_path).get_value(
|
||
sections='run_jira_id', options='huohua-podenv')
|
||
post_data['topic'] = topic
|
||
post_data['messageBody'] = '{}'.format(message_body)
|
||
post_data['tag'] = tag
|
||
post_data['key'] = key
|
||
if pro == 'pro':
|
||
if jira_id:
|
||
post_data['pro'] = jira_id
|
||
else:
|
||
post_data['pro'] = 'qa'
|
||
else:
|
||
post_data['pro'] = pro
|
||
change_user = kwargs.get('user', None)
|
||
url = obj_runner.mq_host + '/topic/sendTopicMessage.do'
|
||
req_type = 'POST'
|
||
resp = obj_runner.call_rest_api(
|
||
user=change_user,
|
||
API_URL=url,
|
||
req_type=req_type,
|
||
json=post_data)
|
||
if resp['data']['sendStatus'] == 'SEND_OK':
|
||
obj_log.info('发送MQ消息成功!{}:{}'.format(topic, message_body))
|
||
return True
|
||
else:
|
||
obj_log.error('发送MQ消息失败!')
|
||
return False
|
||
|
||
def mq_send_by_sim(self, topic, message_body, tag="tag",
|
||
key="key", **kwargs):
|
||
"""
|
||
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
|
||
"""
|
||
post_data = dict()
|
||
post_data['topic'] = topic
|
||
post_data['messageBody'] = '{}'.format(message_body)
|
||
post_data['tag'] = tag
|
||
post_data['key'] = key
|
||
url = 'https://mq-console.sim.huohua.cn/topic/sendTopicMessage.do'
|
||
req_type = 'POST'
|
||
resp = obj_runner.call_rest_api(
|
||
user=None,
|
||
API_URL=url,
|
||
req_type=req_type,
|
||
json=post_data,
|
||
current_evn='sim')
|
||
if resp['data']['sendStatus'] == 'SEND_OK':
|
||
obj_log.info('发送MQ消息成功!{}:{}'.format(topic, message_body))
|
||
return True
|
||
else:
|
||
obj_log.error('发送MQ消息失败!')
|
||
return False
|
||
|
||
def get_mq_msg(self, topic, begin_time, end_time, **kwargs):
|
||
"""
|
||
已废弃,请使用base_framework/public_tools/rocket_mq.py下的函数
|
||
"""
|
||
post_data = dict()
|
||
return_data = dict()
|
||
post_data['topic'] = topic
|
||
post_data['begin'] = str(time.mktime(time.strptime(begin_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
|
||
post_data['end'] = str(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M'))).split('.')[0] + '000'
|
||
change_user = kwargs.get('user', None)
|
||
url = obj_runner.mq_host + '/message/queryMessageByTopic.query'
|
||
req_type = 'GET'
|
||
resp = obj_runner.call_rest_api(
|
||
user=change_user,
|
||
API_URL=url,
|
||
req_type=req_type,
|
||
params=post_data)
|
||
if resp['status'] == 0:
|
||
obj_log.info('查询MQ消息成功!')
|
||
return resp['data']
|
||
else:
|
||
obj_log.error('查询MQ消息失败!')
|
||
return False
|
||
|
||
@staticmethod
|
||
def delete_redis(key, db):
|
||
import redis
|
||
pool = redis.Redis(host='rediscourse.qa.huohua.cn', port=6379, password='Bkl6LvqfzFCzYPAh', db=db)
|
||
pool.delete(key)
|
||
|
||
@staticmethod
|
||
def should_be_exist(sql, number=1, raise_error=True, retry_time=0):
|
||
"""
|
||
| 功能说明: | 检查数据库中sql语句对应的结果应该存在number条数据 |
|
||
| 输入参数: | sql | 查询数据库对应的sql语句 |
|
||
| | number | 数据库应该存在多少条这样的数据 |
|
||
| | raise_error | True-结果不存在时抛异常阻塞后续流程,False-结果不存在时返回false但不抛异常可继续流程 |
|
||
| | retry_time | 重试次数,每次间隔1秒 |
|
||
| 返回参数: | 无 | sql条数匹配则通过,否则抛出异常 |
|
||
| 作者信息: | 吴勇刚 | 2021.07.05 |
|
||
函数位置:src/Public/Common/utils.py
|
||
"""
|
||
res = MySqLHelper().select_all(sql=sql)
|
||
if int(number) == 0 and int(retry_time) > 0:
|
||
rs_flag = False
|
||
for index in range(int(retry_time)):
|
||
time.sleep(1)
|
||
res = MySqLHelper().select_all(sql=sql)
|
||
if len(res) == int(number):
|
||
rs_flag = True
|
||
else:
|
||
rs_flag = False
|
||
return rs_flag
|
||
elif len(res) != int(number):
|
||
for index in range(int(retry_time)):
|
||
time.sleep(1)
|
||
res = MySqLHelper().select_all(sql=sql)
|
||
if len(res) != int(number):
|
||
continue
|
||
else:
|
||
return True
|
||
print("exec sql: {}".format(sql))
|
||
if raise_error:
|
||
raise Exception("check db failed:expect exist {0}, but act is {1}...".format(number, len(res)))
|
||
else:
|
||
return False
|
||
else:
|
||
return True
|
||
|
||
def should_be_not_contain_sub_json(self, obj_json, sub_json, sort_json=True):
|
||
"""
|
||
功能:判断obj_json未包含sub_json
|
||
| 输入参数: | obj_json | 目标串 |
|
||
| | sub_json | 子串 |
|
||
| | sort_json | 是否对json进行排序,默认-False,不排序 |
|
||
| 作者: | 吴勇刚 | 2022.05.10 |
|
||
"""
|
||
tag_equal = True
|
||
# 优先替换不规则的null,true和false
|
||
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
|
||
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
|
||
# 根据类型走不同的递归函数
|
||
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
|
||
tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json)
|
||
elif isinstance(obj_json, list) and isinstance(sub_json, list):
|
||
tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json)
|
||
else:
|
||
tag_equal = False
|
||
print("入参需为json串,请检查输入参数....")
|
||
if tag_equal:
|
||
print("obj_json:{}".format(obj_json))
|
||
print("sub_json:{}".format(sub_json))
|
||
raise Exception("obj_json中包含了sub_json串,校验不通过.....")
|
||
else:
|
||
print("obj_json中不包含sub_json,校验通过")
|
||
|
||
def should_be_equal_as_json(self, obj_json, sub_json, sort_json=False, sort_key=None, sort_type='str'):
|
||
"""
|
||
功能:判断sub_json是否被obj_json完全包含
|
||
| 输入参数: | obj_json | 目标串 |
|
||
| | sub_json | 子串 |
|
||
| | sort_json | 是否对json进行排序,默认-False,不排序 |
|
||
| | sort_key | [{},{}]结构排序时,将sort_key对应value按升序排列 |
|
||
| | sort_type | 按sort_key排序时,按int还是str类型对比,默认str,非int时会按str对比 |
|
||
| 作者: | 吴勇刚 | 2021.07. 29 |
|
||
函数位置:src/Public/Common/utils.py, 用法举例如下:
|
||
| obj_json | sub_json | 调用方式 | 对比结果 | 备注说明 |
|
||
| {"a":11} | {"a":11} | should_be_equal_as_json(obj_json,sub_json) | 相等 | |
|
||
| {"a":11,"b":12} | {"a":11} | 同上 | 相等 | 对比原理是obj_json完整包含sub_json即可 |
|
||
| {"a":{"b":[1,2,3]}} | {"a":{"b":[1,2]} | 同上 | 相等 | 支持任一格式的嵌套 |
|
||
| {"a":11,"b":12} | {"a":11,"b":"not_check"} | 同上 | 相等 | value=not_check的节点不做检查 |
|
||
| [{"a":11},{"b":12}] | [{"a":11},{"b":12}] | 同上 | 相等 | 支持list格式对比 |
|
||
| [{"a":11},{"b":12}] | [{"b":12},{"a":11}] | 同上 | 不相等 | 因为list顺序不一样 |
|
||
| [{"a":11},{"b":12}] | [{"b":12},{"a":11}] | should_be_equal_as_json(obj_json,sub_json,sort_json=True) | 相等 | 对比前会自动排期 |
|
||
"""
|
||
tag_equal = True
|
||
# 优先替换不规则的null,true和false
|
||
obj_json = eval(str(obj_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
|
||
sub_json = eval(str(sub_json).replace('null', '""').replace('true', 'True').replace('false', 'False'))
|
||
# 根据类型走不同的递归函数
|
||
if sort_type.lower() not in ('str', 'int'):
|
||
sort_type = 'str'
|
||
if isinstance(obj_json, dict) and isinstance(sub_json, dict):
|
||
# tag_equal = self._check_dict(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
|
||
r_equal = self._check_dict(obj_json, sub_json, sort_json, sort_key, sort_type)
|
||
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
|
||
tag_equal = r_equal
|
||
elif isinstance(obj_json, list) and isinstance(sub_json, list):
|
||
# tag_equal = self._check_list(obj_json, sub_json, tag_equal, sort_json, sort_key, sort_type)
|
||
r_equal = self._check_list(obj_json, sub_json, sort_json, sort_key, sort_type)
|
||
if tag_equal: # 当未检查到不等项时才重新赋值,预防最后一项相等时覆盖前面不等项的结果
|
||
tag_equal = r_equal
|
||
else:
|
||
raise BusinessError("入参仅支持json格式,请检查输入参数")
|
||
if tag_equal:
|
||
print("obj_json中完全包含sub_json....")
|
||
else:
|
||
raise Exception("预期和实际结果不同,请检查前几步中的逻辑:\nobj_json:{}\nsub_json:{}".format(obj_json, sub_json))
|
||
|
||
# def _check_list(self, obj_list, sub_list, should_equal, sort_json=False, sort_key=None, sort_type='str'):
|
||
def _check_list(self, obj_list, sub_list, sort_json=False, sort_key=None, sort_type='str'):
|
||
"""should_be_equal_as_json的内置递归函数"""
|
||
should_equal = True
|
||
if len(obj_list) == len(sub_list) == 0: # 都为空,则直接返回
|
||
return True
|
||
elif len(obj_list) < len(sub_list): # 目标列表长度小于子列表
|
||
print("obj_json的长度是{}<{}sub_json的长度,请检查...".format(len(obj_list), len(sub_list)))
|
||
return False
|
||
elif len(sub_list) == 0: # 子列表为空
|
||
return True
|
||
if sort_json:
|
||
if sort_key and sort_key in obj_list[0] and sort_key in sub_list[0]:
|
||
if sort_type.lower() == 'str':
|
||
obj_list.sort(key=lambda x: str(x[sort_key]))
|
||
sub_list.sort(key=lambda x: str(x[sort_key]))
|
||
elif sort_type.lower() == 'int':
|
||
obj_list.sort(key=lambda x: int(x[sort_key]))
|
||
sub_list.sort(key=lambda x: int(x[sort_key]))
|
||
else:
|
||
obj_list.sort(key=lambda x: str(x))
|
||
sub_list.sort(key=lambda x: str(x))
|
||
for index in range(len(sub_list)):
|
||
if index < len(obj_list):
|
||
if isinstance(sub_list[index], list) and isinstance(obj_list[index], list):
|
||
# should_equal = self._check_list(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
|
||
# sort_type)
|
||
r_equal = self._check_list(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
|
||
if should_equal:
|
||
should_equal = r_equal
|
||
elif isinstance(sub_list[index], dict) and isinstance(obj_list[index], dict):
|
||
# should_equal = self._check_dict(obj_list[index], sub_list[index], should_equal, sort_json, sort_key,
|
||
# sort_type)
|
||
r_equal = self._check_dict(obj_list[index], sub_list[index], sort_json, sort_key, sort_type)
|
||
if should_equal:
|
||
should_equal = r_equal
|
||
elif not isinstance(sub_list[index], list) and not isinstance(obj_list[index], list) and not isinstance(
|
||
sub_list[index], dict) and not isinstance(obj_list[index], dict):
|
||
if isinstance(sub_list[index], str) and sub_list[index] == "not_check":
|
||
continue
|
||
else:
|
||
if sub_list[index] in obj_list:
|
||
pass
|
||
else:
|
||
should_equal = False
|
||
print("{}中的第{}个元素:{}不存在于{}中,请检查..."
|
||
.format(sub_list, index + 1, sub_list[index], obj_list))
|
||
elif isinstance(sub_list[index], str) and sub_list[index] == "not_check":
|
||
continue
|
||
else:
|
||
should_equal = False
|
||
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_list[index],
|
||
type(sub_list[index]),
|
||
obj_list[index],
|
||
type(obj_list[index])))
|
||
else:
|
||
should_equal = False
|
||
print("sub_list的长度是{}!={}obj_list的长度,请检查...".format(len(sub_list), len(obj_list)))
|
||
return should_equal
|
||
|
||
# def _check_dict(self, obj_dict, sub_dict, should_equal, sort_json=False, sort_key=None, sort_type='str'):
|
||
def _check_dict(self, obj_dict, sub_dict, sort_json=False, sort_key=None, sort_type='str'):
|
||
"""should_be_equal_as_json的内置递归函数"""
|
||
should_equal = True
|
||
for key in sub_dict:
|
||
if key in obj_dict:
|
||
if isinstance(sub_dict[key], list) and isinstance(obj_dict[key], list):
|
||
# should_equal = self._check_list(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
|
||
# sort_type)
|
||
r_equal = self._check_list(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
|
||
if should_equal:
|
||
should_equal = r_equal
|
||
elif isinstance(sub_dict[key], dict) and isinstance(obj_dict[key], dict):
|
||
# should_equal = self._check_dict(obj_dict[key], sub_dict[key], should_equal, sort_json, sort_key,
|
||
# sort_type)
|
||
r_equal = self._check_dict(obj_dict[key], sub_dict[key], sort_json, sort_key, sort_type)
|
||
if should_equal:
|
||
should_equal = r_equal
|
||
elif not isinstance(sub_dict[key], list) and not isinstance(sub_dict[key], dict) and not isinstance(
|
||
obj_dict[key], list) and not isinstance(obj_dict[key], dict):
|
||
if isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
|
||
continue
|
||
else:
|
||
if sub_dict[key] == obj_dict[key]:
|
||
pass
|
||
else:
|
||
should_equal = False
|
||
print("两个对比值中{0}对应的值不相同:{1} != {2},请确认...".format(key, sub_dict[key],
|
||
obj_dict[key]))
|
||
elif isinstance(sub_dict[key], str) and sub_dict[key] == "not_check":
|
||
continue
|
||
else:
|
||
should_equal = False
|
||
print("类型不同:{}的类型是{},但{}的类型是{}".format(sub_dict[key],
|
||
type(sub_dict[key]),
|
||
obj_dict[key],
|
||
type(obj_dict[key])))
|
||
else:
|
||
should_equal = False
|
||
print("key值{}不存在于目标串{}中,请检查...".format(key, obj_dict))
|
||
return should_equal
|
||
|
||
def remove_duplication(self, old_list):
|
||
"""
|
||
字典去重
|
||
"""
|
||
new_list = []
|
||
for i in old_list:
|
||
if i not in new_list:
|
||
new_list.append(i)
|
||
return new_list
|
||
|
||
def list_calculations(self, list_a, list_b, opt_type, removed_duplication=False):
|
||
"""
|
||
list运算
|
||
:return:
|
||
"""
|
||
# 交集
|
||
if removed_duplication:
|
||
list_a = self.remove_duplication(list_a)
|
||
list_b = self.remove_duplication(list_b)
|
||
if opt_type == 1:
|
||
# return {'intersection': list(set(list_a).intersection(set(list_b)))}
|
||
return {'intersection': [x for x in list_a if x in list_b]}
|
||
# 并集
|
||
elif opt_type == 2:
|
||
# return {'union': list(set(list_a).union(set(list_b)))}
|
||
list_a.extend(list_b)
|
||
return {'union': list_a}
|
||
# 差集
|
||
elif opt_type == 3:
|
||
# return {'a_has': list(set(list_a).difference(set(list_b))),
|
||
# 'b_has': list(set(list_b).difference(set(list_a)))}
|
||
return {'a_has_only': [x for x in list_a if x not in list_b],
|
||
'b_has_only': [x for x in list_b if x not in list_a]}
|
||
|
||
def compute_page_info(self, total_numbers=None, data_list=None, page_size=10, page_num=1):
|
||
"""
|
||
功能:根据总数,页码,每页数据条数计算查询分页信息
|
||
| 输入参数: | total_numbers | 总数据行数,与data_list二传一即可 |
|
||
| | data_list | 原始数据list,与total_numbers二传一即可 |
|
||
| | page_size | 每页数据条数 |
|
||
| | page_num | 当前页码 |
|
||
| 作者: | 吴勇刚 | 2021.09.06 |
|
||
函数位置:src/Public/Common/utils.py
|
||
| 返回参数 | 格式{dict} |
|
||
| | total | 总数据量 |
|
||
| | pageNum | 当前页码 |
|
||
| | pageSize | 每页展示数据条数 |
|
||
| | size | 当前页实际展示条数 |
|
||
| | startRow | 当前页面展示的开始行数 |
|
||
| | endRow | 当前页面展示的结束行数 |
|
||
| | pages | 总页数 |
|
||
| | prePage | 前一个展示页面页码 |
|
||
| | nextPage | 后一个展示页面页码 |
|
||
| | isFirstPage | 是否为首页 |
|
||
| | isLastPage | 是否为尾页 |
|
||
| | hasPreviousPage | 是否有上一页 |
|
||
| | hasNextPage | 是否有下一页 |
|
||
| | navigatePages | 控件展示页码数,一般固定为8 |
|
||
| | navigatepageNums | 控件展示的页码列表 |
|
||
| | navigateFirstPage | 控件开始页码 |
|
||
| | navigateLastPage | 控件结束页码 |
|
||
"""
|
||
re_data = {}
|
||
# 总数据量
|
||
if total_numbers:
|
||
re_data['total'] = int(total_numbers)
|
||
elif data_list:
|
||
re_data['total'] = len(data_list)
|
||
# 当前页码
|
||
re_data['pageNum'] = int(page_num)
|
||
# 每页数据条数
|
||
re_data['pageSize'] = int(page_size)
|
||
# 每页数据条数
|
||
if re_data['pageNum'] * re_data['pageSize'] <= re_data['total']:
|
||
re_data['size'] = re_data['pageSize']
|
||
elif (re_data['pageNum'] - 1) * re_data['pageSize'] < re_data['total'] < re_data['pageNum'] * re_data[
|
||
'pageSize']:
|
||
re_data['size'] = re_data['total'] - (re_data['pageNum'] - 1) * re_data['pageSize']
|
||
else:
|
||
re_data['size'] = 0
|
||
# 总页数
|
||
re_data['pages'] = math.ceil(re_data['total'] / re_data['pageSize'])
|
||
# 当前页面展示的开始行数,当前页面展示的结束行数
|
||
start_row = (re_data['pageNum'] - 1) * re_data['pageSize'] + 1
|
||
end_row = re_data['pageNum'] * re_data['pageSize']
|
||
if start_row > re_data['total']:
|
||
re_data['startRow'] = 0
|
||
re_data['endRow'] = 0
|
||
else:
|
||
re_data['startRow'] = start_row
|
||
if end_row > re_data['total']:
|
||
re_data['endRow'] = re_data['total']
|
||
else:
|
||
re_data['endRow'] = end_row
|
||
# 前一个展示页面页码
|
||
re_data['prePage'] = re_data['pageNum'] - 1
|
||
# 后一个展示页面页码
|
||
if end_row >= re_data['total']:
|
||
re_data['nextPage'] = 0
|
||
else:
|
||
re_data['nextPage'] = re_data['pageNum'] + 1
|
||
# 是否为首页,是否为尾页
|
||
if re_data['pageNum'] == 1:
|
||
re_data['isFirstPage'] = True
|
||
else:
|
||
re_data['isFirstPage'] = False
|
||
# 是否为尾页
|
||
if start_row > re_data['total'] or end_row < re_data['total']:
|
||
re_data['isLastPage'] = False
|
||
elif start_row <= re_data['total'] <= end_row:
|
||
re_data['isLastPage'] = True
|
||
# 是否有上一页
|
||
if not re_data['isFirstPage']:
|
||
re_data['hasPreviousPage'] = True
|
||
else:
|
||
re_data['hasPreviousPage'] = False
|
||
# 是否有下一页
|
||
if re_data['pageNum'] >= re_data['pages']:
|
||
re_data['hasNextPage'] = False
|
||
else:
|
||
if not re_data['isLastPage']:
|
||
re_data['hasNextPage'] = True
|
||
else:
|
||
re_data['hasNextPage'] = False
|
||
# 控件展示页码数,一般固定为8
|
||
re_data['navigatePages'] = 8
|
||
# 控件结束页码
|
||
if 8 < re_data['pageNum'] + 3 <= re_data['pages']:
|
||
re_data['navigateLastPage'] = re_data['pageNum'] + 3
|
||
elif re_data['pageNum'] + 3 <= 8 < re_data['pages']:
|
||
re_data['navigateLastPage'] = 8
|
||
else:
|
||
re_data['navigateLastPage'] = re_data['pages']
|
||
if re_data['navigateLastPage'] - 8 + 1 > 0:
|
||
re_data['navigateFirstPage'] = re_data['navigateLastPage'] - 8 + 1
|
||
else:
|
||
re_data['navigateFirstPage'] = 1
|
||
# 控件展示的页码列表
|
||
re_data['navigatepageNums'] = [i for i in range(re_data['navigateFirstPage'], re_data['navigateLastPage'] + 1)]
|
||
return re_data
|
||
|
||
@staticmethod
|
||
def should_be_str_contain_str(src_str, *sub_str, raise_flag=True):
|
||
"""
|
||
功能:判断src_str包含任意一个sub_str
|
||
| 输入参数: | src_str | 原始数据 |
|
||
| | *sub_str | 子数据 |
|
||
| | raise_flag | 不包含时是否抛出异常 |
|
||
| 作者: | 华学敏 | 2021.10. 08 |
|
||
函数位置:src/Public/Common/utils.py
|
||
"""
|
||
for s_str in sub_str:
|
||
if str(s_str) in str(src_str):
|
||
return True
|
||
if raise_flag:
|
||
raise RuntimeError("{} not contain {}".format(str(src_str), str(sub_str)))
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
def compare_data_equal_as_list(sub_list, obj_value):
|
||
"""
|
||
功能:判断list中是否含某个值
|
||
| 输入参数: | sub_list | 列表,返回列表 |
|
||
| | obj_value | 目标值 |
|
||
| 作者: | qxj | 2021.06. 04 |
|
||
函数位置:
|
||
| 包含 | compare_data_equal_as_list([1,'a',2,'aa'], 1) | success |
|
||
"""
|
||
if not isinstance(sub_list, list):
|
||
raise RuntimeError("【ERROR】返回参数data必须是list类型,sub_list:{0}".format(type(sub_list)))
|
||
equal = False
|
||
for value in sub_list:
|
||
try:
|
||
sub_str = eval(obj_value)
|
||
except Exception as e:
|
||
sub_str = obj_value
|
||
|
||
if str(value) == str(sub_str):
|
||
equal = True
|
||
break
|
||
if equal:
|
||
return equal
|
||
else:
|
||
raise RuntimeError("【ERROR】返回参数data中没有找到值,sub_list:{0},valueP{1}".format(sub_list, obj_value))
|
||
|
||
def copy_account_to_hhi(self, email):
|
||
query_hhi_account = "SELECT id FROM hhi_account.account WHERE account_email='{}' limit 1;".format(email)
|
||
resp_hhi_account = self.db_con.select_one(query_hhi_account, choose_db='hhi')
|
||
if resp_hhi_account:
|
||
print("账号已在hhi存在,请检查!")
|
||
else:
|
||
query_account = "SELECT * FROM account.account WHERE account_email='{}' limit 1;".format(email)
|
||
resp_account = self.db_con.select_one(query_account, choose_db='huohua')
|
||
if not resp_account:
|
||
print("账号在火花库不存在,请检查!")
|
||
else:
|
||
# account.account
|
||
value_account_str = self._get_sql_value_by_dict(resp_account)
|
||
insert_hhi_account = "INSERT INTO `hhi_account`.`account`(`id`, `account_unique`, `account_name`, `account_email`, `area_code`, `phone_code`, `mask_phone`, `pwd`, `account_type`, `account_status`, `apply_resource`, `start_date`, `end_date`, `account_owner_id`, `account_dept_id`, `work_order_no`, `data_level`, `employee_id`, `fei_shu_id`, `dict_item_id`, `account_source`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`, `pwd_modified`) SELECT {} FROM hhi_account.account LIMIT 1;".format(
|
||
value_account_str)
|
||
print(insert_hhi_account)
|
||
self.db_con.insert_one(insert_hhi_account, choose_db='hhi')
|
||
|
||
update_end_date = "update `hhi_account`.`account` set end_date='2022-04-28',account_status=0 where account_email='{}'".format(
|
||
email)
|
||
self.db_con.execute(update_end_date, choose_db='hhi')
|
||
|
||
# account.account_email
|
||
query_account_email = "SELECT * FROM account.account_email WHERE account_email='{}' limit 1;".format(
|
||
email)
|
||
resp_account_email = self.db_con.select_one(query_account_email, choose_db='huohua')
|
||
if resp_account_email:
|
||
value_account_email_str = self._get_sql_value_by_dict(resp_account_email)
|
||
insert_hhi_account_email = "INSERT INTO `hhi_account`.`account_email`(`id`, `account_id`, `account_unique`, `account_email`, `first_choice`, `email_type_value`, `created_time`, `creator_id`, `creator_name`, `modified_time`, `modifier_id`, `modifier_name`) SELECT {} FROM `hhi_account`.`account_email` LIMIT 1;".format(
|
||
value_account_email_str)
|
||
print(insert_hhi_account_email)
|
||
self.db_con.insert_one(insert_hhi_account_email, choose_db='hhi')
|
||
|
||
# emp.employee
|
||
query_employee = "SELECT `id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region` FROM `emp`.`employee` WHERE email='{}' limit 1;".format(
|
||
email)
|
||
resp_employee = self.db_con.select_one(query_employee, choose_db='huohua')
|
||
if resp_employee:
|
||
value_employee_str = self._get_sql_value_by_dict(resp_employee)
|
||
insert_hhi_employee = "INSERT INTO `hhi_emp`.`employee`(`id`, `name`, `employee_no`, `pwd`, `phone`, `email`, `leader_id`, `leader_name`, `position`, `position_no`, `position_level`, `data_level`, `department_id`, `department_no`, `department_name`, `corp_code`, `status`, `cid`, `cname`, `mid`, `mname`, `ctime`, `mtime`, `deleted`, `wechat_code`, `wechat_qrcode_image`, `wechat_qrcode_string`, `avatar`, `cc_hot_line`, `cc_cno`, `cc_pwd`, `cc_bind_tel`, `cc_bind_tel_tmp`, `work_phone`, `frozen`, `qywx_user_id`, `work_tel`, `work_mobile`, `bind_tel_type`, `city_id`, `city_name`, `city_level`, `nick_name`, `sex`, `birthday`, `birthplace`, `education`, `graduated_school`, `intro`, `nature_of_work`, `position_time`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`, `change_sign_time`, `account_status`, `dingding_id`, `department_post_id`, `department_post_name`, `dimission_time`, `cc_work_tel`, `cc_work_mobile`, `mask_phone`, `phone_code`, `contract_company`, `pre_prob_date`, `show_name`, `name_pinyin`, `feishu_zhs`, `feishu_eng`, `business_unit`, `international_code`, `first_name_py`, `last_name_py`, `name_ac`, `name_format`, `first_name`, `middle_name`, `last_name`, `reg_region`) SELECT {} FROM `hhi_emp`.`employee` LIMIT 1;".format(
|
||
value_employee_str)
|
||
print(insert_hhi_employee)
|
||
self.db_con.insert_one(insert_hhi_employee, choose_db='hhi')
|
||
|
||
update_employee_info = "update `hhi_emp`.`employee` set birthday='2022-04-28 10:00:00',pre_prob_date='2022-05-16',dimission_time='2023-04-28 10:00:00',change_sign_time='2022-04-28 10:00:00' where email='{}'".format(
|
||
email)
|
||
self.db_con.execute(update_employee_info, choose_db='hhi')
|
||
|
||
# emp.employee_email
|
||
query_employee_email = "SELECT * FROM emp.employee_email where employee_email_value='{}' limit 1;".format(
|
||
email)
|
||
resp_employee_email = self.db_con.select_one(query_employee_email, choose_db='huohua')
|
||
if resp_employee_email:
|
||
value_employee_email_str = self._get_sql_value_by_dict(resp_employee_email)
|
||
insert_hhi_employee_email = "INSERT INTO `hhi_emp`.`employee_email`(`id`, `employee_id`, `employee_no`, `employee_email_type`, `employee_email_value`, `primary_flag`, `status`, `creator_id`, `creator_name`, `created_time`, `modifier_id`, `modifier_name`, `modified_time`) SELECT {} FROM `hhi_emp`.`employee_email` LIMIT 1;".format(
|
||
value_employee_email_str)
|
||
print(insert_hhi_employee_email)
|
||
self.db_con.insert_one(insert_hhi_employee_email, choose_db='hhi')
|
||
|
||
@staticmethod
|
||
def _get_sql_value_by_dict(value_dict):
|
||
value_str = ""
|
||
for v in value_dict.values():
|
||
value_str = value_str + "'" + str(v) + "'" + ","
|
||
value_str = value_str[:-1]
|
||
# print(value_str)
|
||
return value_str
|
||
|
||
def wait_and_query_data(self, sql, wait_time=20):
|
||
"""
|
||
| 功能说明: | 针对数据入库有延迟的场景,根据sql和循环的时间查询并返回数据库查询结果 |
|
||
| 输入参数: | sql | 查询数据库对应的sql语句 |
|
||
| | wait_time | 等待的最长时间 |
|
||
| 返回参数: | db_info | 数据库查询结果 |
|
||
函数位置:src/Public/Common/utils.py
|
||
"""
|
||
try_time = 0
|
||
if sql:
|
||
while try_time < int(wait_time):
|
||
db_info = MySqLHelper().select_one(sql=sql)
|
||
if db_info:
|
||
obj_log.info('try_time={}'.format(try_time))
|
||
return db_info
|
||
else:
|
||
obj_log.info('try_time={}'.format(try_time))
|
||
time.sleep(5)
|
||
try_time += 5
|
||
else:
|
||
raise Exception('数据落库超时{}s,请检查'.format(try_time))
|
||
else:
|
||
raise Exception('查询sql不能为空......')
|
||
|
||
def to_lower_camel(self, name, ):
|
||
"""下划线转小驼峰法命名"""
|
||
return re.sub('_([a-zA-Z])', lambda m: (m.group(1).upper()), name.lower())
|
||
|
||
def to_snake(self, name):
|
||
"""驼峰转下划线"""
|
||
name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
|
||
return name.lower()
|
||
|
||
def change_dict_key_format(self, obj_dict, c_type=1):
|
||
"""
|
||
将dict或[dict]中的所有key,从下划线模式转为驼峰模式,或者反向转换
|
||
Args:
|
||
obj_dict: 原始dict或list
|
||
c_type: 类型:1-下划线转驼峰,2-驼峰转下划线
|
||
Returns:
|
||
转化后的dict,或list
|
||
"""
|
||
if isinstance(obj_dict, list):
|
||
new_dict = []
|
||
for item in obj_dict:
|
||
new_dict.append(self.__change_dict_key(c_dict=item, c_type=c_type))
|
||
return new_dict
|
||
elif isinstance(obj_dict, dict):
|
||
return self.__change_dict_key(c_dict=obj_dict, c_type=c_type)
|
||
|
||
def __change_dict_key(self, c_dict, c_type=1):
|
||
"""
|
||
内置函数,供change_dict_key_format使用
|
||
Args:
|
||
c_dict: 原始dict
|
||
c_type: 类型:1-下划线转驼峰,2-驼峰转下划线
|
||
Returns:
|
||
转化后的dict
|
||
"""
|
||
new_dict = {}
|
||
if c_type == 1:
|
||
for k in c_dict.keys():
|
||
new_key = self.to_lower_camel(name=k)
|
||
new_dict[new_key] = c_dict[k]
|
||
elif c_type == 2:
|
||
for k in c_dict.keys():
|
||
new_key = self.to_snake(name=k)
|
||
new_dict[new_key] = c_dict[k]
|
||
return new_dict
|
||
|
||
def dict_to_sql_condition(self, dicts):
|
||
"""
|
||
| 功能说明: | 字典转sql条件;默认会把驼峰key转为下划线| |
|
||
| 输入参数: | {}
|
||
| 返回参数: | WHERE key='' AND key=''
|
||
| 作者信息: | 陈江 | 2022.12.09 |
|
||
"""
|
||
try:
|
||
dicts = eval(dicts)
|
||
except:
|
||
dicts = dicts
|
||
if not dicts:
|
||
raise RuntimeError("参数不能为空")
|
||
condition = "WHERE "
|
||
count = 0
|
||
for k, v in dicts.items():
|
||
condition_key = self.to_snake(k)
|
||
if count == 0:
|
||
condition += condition_key + "='{}'".format(v)
|
||
count = count + 1
|
||
else:
|
||
condition += " AND " + condition_key + "='{}'".format(v)
|
||
return condition
|
||
|
||
def check_http_return_info(self, rsp, exp_code=200, exp_msg="", exp_success=True):
|
||
"""
|
||
功能说明:检查http接口返回字段
|
||
入参:
|
||
| rsp | 要检查的接口返回值 |
|
||
| exp_code | 期望返回的code |
|
||
| exp_msg | 期望返回的message |
|
||
| exp_success | 期望返回的success |
|
||
返回值:无,检验通过则继续,否则抛出对应异常
|
||
"""
|
||
exp_info = {"code": exp_code,
|
||
"success": exp_success,
|
||
"message": exp_msg}
|
||
self.should_be_equal_as_json(obj_json=rsp, sub_json=exp_info)
|
||
|
||
@staticmethod
|
||
def update_default_parameters_by_input(input_parameters, default_parameters):
|
||
"""
|
||
功能:用接口输入的参数去更新默认参数:有则更新,没有则在默认参数中新增,要求两个参数都的是dict
|
||
Args:
|
||
input_parameters: 接口输入参数
|
||
default_parameters: 默认参数
|
||
Returns: 更新后的default_parameters
|
||
"""
|
||
for p_key in input_parameters:
|
||
default_parameters[p_key] = input_parameters[p_key]
|
||
return default_parameters
|
||
|
||
@staticmethod
|
||
def get_value_from_post_data_input(post_data_input, key, default_value=None):
|
||
"""
|
||
功能:从用户入参post_data_input中找到对应key的value并返回,没有则赋予default_value值并返回
|
||
Args:
|
||
post_data_input: 用户输入的入参
|
||
key: 期望从入参中找的key
|
||
default_value: 当入参为空,或没有对应的key时,默认赋予的值
|
||
Returns:
|
||
对应key中的value或default_value
|
||
"""
|
||
# post_data_input.get(key) == 'default' 是用于兼容GPT的调用
|
||
if post_data_input is None or post_data_input.get(key) is None \
|
||
or post_data_input.get(key) in ["", "0", "default"]:
|
||
return default_value
|
||
else:
|
||
return post_data_input.get(key)
|
||
|
||
@staticmethod
|
||
def get_value_from_kwargs(kwargs, key, required=0, default_value=None):
|
||
"""
|
||
功能:从入参kwargs中获取指定key的参数值
|
||
Args:
|
||
kwargs: 参数dict
|
||
key: 参数key
|
||
required: 是否必填:0-非必现,1-必填,若必须填不在kwargs里,则抛出对应异常
|
||
default_value: 非必填项参数不在kwargs里时,赋予的默认值
|
||
Returns: key在kwargs中的具体值,或缺省时的默认值
|
||
"""
|
||
if required == 1 and key not in kwargs:
|
||
raise BusinessError("参数:{}是必填项,请出入....".format(key))
|
||
if key in kwargs:
|
||
return kwargs[key]
|
||
else:
|
||
return default_value
|
||
|
||
@staticmethod
|
||
def get_day_of_month():
|
||
"""
|
||
功能:获取当天的日期号数,比如今天是11月12号,return 12
|
||
Returns:
|
||
"""
|
||
today = datetime.datetime.now()
|
||
return today.day
|
||
|
||
@staticmethod
|
||
def formate_msg_for_aita(r_msg):
|
||
"""
|
||
格式化输出用于aita页面展示的消息,要求return_data
|
||
Args:
|
||
r_msg: 函数返回值,要求中含以下三个字段
|
||
for_gpt: 给gpt的内容,默认第一个为标题,原数据用\n字符分割,描述与值用:分割
|
||
for_user: 给用户在aita页面展示的内容,原数据用\n字符分割,描述与值用:分割
|
||
for_link: 跳转链接,格式:{"link_url":"链接地址","link_str":“链接名称,没有则默认为:[点击查询详情...]“}
|
||
Returns:
|
||
标题(加粗,改色)
|
||
描述1 值1
|
||
**** ***
|
||
描述2 值2
|
||
跳转链接
|
||
"""
|
||
for_gpt = r_msg["for_gpt"] if "for_gpt" in r_msg else None
|
||
for_user = r_msg["for_user"] if "for_user" in r_msg else None
|
||
for_link = r_msg["for_link"] if "for_link" in r_msg else None
|
||
if for_gpt:
|
||
del r_msg["for_gpt"]
|
||
gpt_list = for_gpt.split('\n')
|
||
for_gpt = "<font color=\"green\">{}</font><br>".format(gpt_list[0])
|
||
for item in gpt_list[1:]:
|
||
value_list = item.replace(':', ':').split(':')
|
||
# 参数display:inline-block让两个div标签在一行内显示
|
||
key_value = "<div style=\"display:inline-block;width:100px;\">" + " " + value_list[0] + "</div>" \
|
||
+ "<div style=\"display:inline-block;width:1000px;\">" + value_list[1] + "</div><br>"
|
||
for_gpt = for_gpt + key_value
|
||
r_msg["for_gpt"] = for_gpt[:-4] # 删除最后一个换行标签<br>,因为aita的前端在for_gpt和for_user的显示上做了换行
|
||
if for_user:
|
||
del r_msg["for_user"]
|
||
for_user_list = for_user.split('\n')
|
||
for_user = ""
|
||
for item in for_user_list:
|
||
value_list = item.replace(':', ':').split(':')
|
||
key_value = "<div style=\"display:inline-block;width:100px;\">" + " " + value_list[0] + "</div>"\
|
||
+ "<div style=\"display:inline-block;width:1000px;\">" + value_list[1] + "</div><br>"
|
||
for_user = for_user + key_value
|
||
r_msg["for_user"] = for_user
|
||
if for_link:
|
||
del r_msg["for_link"]
|
||
if "link_str" not in for_link:
|
||
for_link["link_str"] = "点击查询详情..."
|
||
link_string = "<a href=\"{}\" target=\"_blank\">[{}]</a>".format(for_link["link_url"], for_link["link_str"])
|
||
r_msg["for_user"] = r_msg["for_user"] + "<div>" + " " + link_string + "</div>"
|
||
return r_msg
|
||
|
||
|
||
if __name__ == '__main__':
|
||
tl = Tools()
|
||
# return_data = {"for_gpt": "班级创建成功!\n班级code:CZ24011200722017\n班级id为:1200120727",
|
||
# "for_user": "教师id:168389\n手机号:15564555315\n课程id:841626\n用户列表:[1987505, 1987506]",
|
||
# "for_link": {"link_url": "https://htmv2.qa.huohua.cn/classManage/classesDetail?id=1200121194",
|
||
# "link_str": "点击查看班级详情..."}}
|
||
# aa = tl.formate_msg_for_aita(r_msg=return_data)
|
||
# print(aa)
|
||
# oj = {'data': {'result': False, 'messages': ['课程id:841618已有待执行超售模式,导表操作失败,请检查数据']}}
|
||
# sj = {'data': {'result': True, 'messages': []}}
|
||
# # oj = [1, 2, 3]
|
||
# # sj = [1, 3, 23]
|
||
# print(tl.should_be_equal_as_json(obj_json=oj, sub_json=sj))
|
||
|
||
a = {'success': True, 'message': '', 'code': 200, 'data': [{'id': 6854, 'userId': 976049, 'employeeId': 599677, 'accountId': 0, 'code': '78843427', 'name': '教务自动化0706165031', 'nickname': '新账号599677', 'sex': 0, 'phone': '180****6279', 'maskPhone': '180****6279', 'phoneCode': '290141', 'natureOfWork': 1, 'teacherTrainNums': [0], 'email': 'jiaowuzidonghua0706165031@huohua.cn', 'idNumber': '510704199205064230', 'birthday': '1970-01-01 00:00:00', 'seniority': 0, 'education': 1, 'entryTime': '2021-01-14 00:00:00', 'graduateInstitutions': '测试', 'nationalityCountryId': 0, 'nationality': 1, 'nativePlace': 1, 'avatar': 'https://stalegacy.huohua.cn/image/huohua/avatar/default/default_avatar1.png', 'introduce': '测试', 'status': 1, 'openModeTypes': '0', 'introduceVideoUrl': '', 'teacherCertificationUrl': '', 'probationStartDate': '', 'probationEndDate': '', 'gradeGroupType': 3, 'creatorId': 1, 'creatorName': 'EHR用户同步', 'createdTime': '2021-07-06 16:48:22', 'modifierId': 1, 'modifierName': 'sys', 'modifiedTime': '2021-07-06 16:49:17', 'classLocation': 1, 'jobType': 4, 'liveroomCenter': 1, 'professorRange': '1', 'qualityScore': 0.0, 'classQuota': 16, 'teacherRank': 0, 'lastRankTime': '2021-07-06 16:49:17', 'rankEffectTime': '2021-01-14 00:00:00', 'rankStatus': 1, 'departmentId': 17, 'departmentName': '教学部', 'leaderId': 0, 'leaderName': '‘’', 'changeSignTime': '', 'liveroomCode': '', 'eyeProtection': 0, 'legoTeacher': 0, 'legoProbationStartDate': '', 'legoProbationEndDate': '', 'activationStatus': 1, 'dimissionTime': '', 'dimissionSetTime': '', 'teacherType': 1, 'liveroomAllocationStatus': 2, 'flexible': 0, 'classroomVideo': '', 'classroomImg': '', 'pipStart': '1970-01-01 00:00:00', 'pipEnd': '1970-01-01 00:00:00', 'teacherLabel': 1, 'longLeaveStart': '1970-01-01 00:00:00', 'longLeaveEnd': '1970-01-01 00:00:00', 'forecastResignDate': '1970-01-01 00:00:00', 'teacherLabelSubType': 1, 'schoolExcellentTeacher': 0, 'innovationAPlusTeacher': 0, 'businessLine': 101, 'onlineType': 0, 'timeZoneName': '', 'timeZoneCode': '', 'teacherLabelList': [], 'labelList': ''}]}
|
||
b = {'success': True, 'message': '', 'code': 200, 'data': [{'id': 6854, 'userId': 976049, 'employeeId': 599677, 'code': '78843427', 'name': '教务自动化0706165031', 'nickname': '新账号599677', 'sex': 0, 'phone': '180****6279', 'maskPhone': '180****6279', 'phoneCode': '290141', 'natureOfWork': 1, 'teacherTrainNums': [0], 'email': 'jiaowuzidonghua0706165031@huohua.cn', 'idNumber': '510704199205064230', 'birthday': '1970-01-01 00:00:00', 'seniority': 0, 'education': 1, 'entryTime': '2021-01-14 00:00:00', 'graduateInstitutions': '测试', 'nationality': 1, 'nativePlace': 1, 'avatar': 'https://stalegacy.huohua.cn/image/huohua/avatar/default/default_avatar1.png', 'introduce': '测试', 'status': 1, 'openModeTypes': '0', 'introduceVideoUrl': '', 'teacherCertificationUrl': '', 'probationStartDate': '', 'probationEndDate': '', 'gradeGroupType': 3, 'creatorId': 1, 'creatorName': 'EHR用户同步', 'createdTime': '2021-07-06 16:48:22', 'modifierId': 1, 'modifierName': 'sys', 'modifiedTime': '2021-07-06 16:49:17', 'classLocation': 1, 'jobType': 4, 'liveroomCenter': 1, 'professorRange': '1', 'qualityScore': 0.0, 'classQuota': 16, 'teacherRank': 0, 'lastRankTime': '2021-07-06 16:49:17', 'rankEffectTime': '2021-01-14 00:00:00', 'rankStatus': 1, 'departmentId': 17, 'departmentName': '教学部', 'leaderId': 0, 'leaderName': '‘’', 'changeSignTime': '', 'liveroomCode': '', 'eyeProtection': 0, 'legoTeacher': 0, 'legoProbationStartDate': '', 'legoProbationEndDate': '', 'activationStatus': 1, 'dimissionTime': '', 'dimissionSetTime': '', 'teacherType': 1, 'liveroomAllocationStatus': 2, 'flexible': 0, 'classroomVideo': '', 'classroomImg': '', 'pipStart': '1970-01-01 00:00:00', 'pipEnd': '1970-01-01 00:00:00', 'teacherLabel': 1, 'longLeaveStart': '1970-01-01 00:00:00', 'longLeaveEnd': '1970-01-01 00:00:00', 'forecastResignDate': '1970-01-01 00:00:00', 'teacherLabelSubType': 1, 'schoolExcellentTeacher': 0, 'innovationAPlusTeacher': 0, 'businessLine': 101, 'timeZoneName': '', 'timeZoneCode': '', 'teacherLabelList': []}]}
|
||
|
||
print(tl.should_be_equal_as_json(obj_json=a, sub_json=b))
|