405 lines
21 KiB
Python
405 lines
21 KiB
Python
# encoding: utf-8
|
||
# 维护人员:qiaoxinjiu
|
||
import re
|
||
import time
|
||
import requests
|
||
import json
|
||
import urllib3
|
||
from base_framework.public_tools.read_config import get_current_env, ReadConfig
|
||
|
||
urllib3.disable_warnings()
|
||
|
||
|
||
class HuoHuaDBS:
|
||
def __init__(self, user_name=None, pwd=None):
|
||
self.conf = ReadConfig()
|
||
# if not user_name:
|
||
# user_name = self.conf.get_value(sections="lrq", options="show_username")
|
||
# pwd = self.conf.get_value(sections="lrq", options="password")
|
||
self.req = requests.session()
|
||
self.user_name = user_name
|
||
self.user_pwd = pwd
|
||
self.headers = {}
|
||
self.session_id = None
|
||
self.authenticate_csrf_token = None
|
||
self.query_url = "https://dbs.qc.huohua.cn/query/"
|
||
self.check_url = "https://dbs.qc.huohua.cn/simplecheck/"
|
||
self.commit_url = "https://dbs.qc.huohua.cn/autoreview/"
|
||
self.execute_url = "https://dbs.qc.huohua.cn/execute/"
|
||
self.get_status_url = "https://dbs.qc.huohua.cn/sqlworkflow/detail_content/?workflow_id="
|
||
self.env = get_current_env().lower()
|
||
if self.env == "sim": # sim环境读取一次db配置, 方便后续场景中直接使用,避免重复查询
|
||
self.sim_db = {} # 最终格式: {"teach_teacher": "sim126", "teach_classes": "sim128"}
|
||
instance_list = self.conf.get_options(section="DB_LIST")
|
||
for instance in instance_list:
|
||
db_list = self.conf.get_value(sections="DB_LIST", options=instance)
|
||
dbs = db_list.split(',')
|
||
for db in dbs:
|
||
self.sim_db[db] = instance
|
||
|
||
def __get_instance_name(self, sql_content):
|
||
if self.env.lower() == "qa" or "-" in self.env: # 如果是qa或独立环境,则默认查询qadb-slave
|
||
if sql_content.split(' ')[0].lower() == "select":
|
||
instance_name = "qadb-slave" # sql查询时,使用此名
|
||
else:
|
||
instance_name = "qadb-master" # sql执行时,使用此名
|
||
else:
|
||
# db_name = sql_content.split(".")[0].split(" ")[-1].replace("`", "").replace(" ", "")
|
||
db_names = self.__get_db_names_from_sql(sql_statements=sql_content)
|
||
instance_list = []
|
||
no_config_list = []
|
||
for db_name in db_names:
|
||
if db_name not in self.sim_db:
|
||
no_config_list.append(db_name)
|
||
continue
|
||
if self.sim_db[db_name] not in instance_list:
|
||
instance_list.append(self.sim_db[db_name])
|
||
if len(no_config_list) > 0:
|
||
raise Exception("你本次查询的数据库:{} 未做sim配置,请添加....".format(no_config_list))
|
||
if len(set(instance_list)) > 1:
|
||
raise Exception("你本次查询的数据库:{}\n位于不同的实例:{}\n请修改sql....".format(db_names,instance_list))
|
||
instance_name = instance_list[0]
|
||
return instance_name
|
||
|
||
@staticmethod
|
||
def __check_sql_content(sql_content):
|
||
"""检查sql语句是否符合规范"""
|
||
sql_list = sql_content.split(" ")
|
||
if sql_list[0].lower() in ["insert", "update", "delete", "select"]:
|
||
if "." not in sql_content:
|
||
raise Exception("sql语句必现是db_name.table_name格式,请修正:{}".format(sql_content))
|
||
|
||
@staticmethod
|
||
def __get_db_names_from_sql(sql_statements):
|
||
"""
|
||
提取sql语句中的数据库名称
|
||
:param sql_statements: sql语句
|
||
:return: 数据库名称列表,去重
|
||
"""
|
||
sql_statements = sql_statements.replace("`", "")
|
||
pattern = re.compile(r'\b(?:FROM|INTO|UPDATE|JOIN)\s+([a-zA-Z0-9_]+)\.', re.IGNORECASE)
|
||
|
||
databases = set()
|
||
# 移除注释和多余空格
|
||
sql_clean = re.sub(r'--.*', '', sql_statements) # 移除行注释
|
||
sql_clean = ' '.join(sql_clean.split()) # 合并多余空格
|
||
# 查找所有匹配项
|
||
matches = pattern.findall(sql_clean)
|
||
if matches:
|
||
databases.update(matches)
|
||
return list(databases)
|
||
|
||
def dbs_login(self):
|
||
login_index = 'http://dbs.qc.huohua.cn/login/'
|
||
res_login = self.req.get(login_index)
|
||
self.authenticate_csrf_token = res_login.cookies.get('csrftoken')
|
||
authenticate_url = 'https://dbs.qc.huohua.cn/authenticate/'
|
||
params_json_str = {"username": self.user_name, "password": self.user_pwd}
|
||
self.headers.update({"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
|
||
"Cookie": "experimentation_subject_id=IjdmMmE5MmYxLTk4N2QtNDU1YS1hMGRjLTZhZWIwZWY3YWEyMiI%3D--fc2a7fdd42c874af494e99f3f5fdf0c372d3b4d2; __UUID=93b21b91-675f-4019-fe4f-4442ec9f6e65-x; __DEVICE_ID=93b21b91-675f-4019-fe4f-4442ec9f6e65-x; csrftoken={}".format(
|
||
self.authenticate_csrf_token)})
|
||
self.headers.update({"X-CSRFToken": "{}".format(self.authenticate_csrf_token), "X-Requested-With": "XMLHttpRequest"})
|
||
resp = self.req.post(authenticate_url, params_json_str, headers=self.headers)
|
||
# print(json.loads(resp.text).get("msg"))
|
||
self.session_id = resp.cookies.get('sessionid')
|
||
self.headers.update({
|
||
"Cookie": "experimentation_subject_id=IjdmMmE5MmYxLTk4N2QtNDU1YS1hMGRjLTZhZWIwZWY3YWEyMiI%3D--fc2a7fdd42c874af494e99f3f5fdf0c372d3b4d2; __UUID=93b21b91-675f-4019-fe4f-4442ec9f6e65-x; __DEVICE_ID=93b21b91-675f-4019-fe4f-4442ec9f6e65-x; csrftoken={};sessionid={}".format(
|
||
self.authenticate_csrf_token, self.session_id)})
|
||
# print(self.session_id)
|
||
|
||
def dbs_query(self, instance_name, sql_content, limit_num="100"):
|
||
"""
|
||
dbs查询
|
||
| 请求参数名 | 说明 | 类型 | 是否必填 |
|
||
| instance_name | 数据库实例: 例如sim-my-allschool | string | True |
|
||
| db_name | 数据库名称: hulk_class_help | string | True |
|
||
| schema_name | 数据库模式: 默认空不填 | string | True |
|
||
| tb_name | 表名 | string | True |
|
||
| sql_content | 查询sql | string | True |
|
||
| limit_num | 查询行数 | string | True |
|
||
"""
|
||
if not self.session_id:
|
||
self.dbs_login()
|
||
db_name = sql_content.split(".")[0].split(" ")[-1].replace("`", "").replace(" ", "")
|
||
data = {"instance_name": instance_name, "db_name": db_name, "schema_name": "", "tb_name": "",
|
||
"sql_content": sql_content, "limit_num": limit_num}
|
||
self.headers.update({"Referer": "https://dbs.qc.huohua.cn/sqlquery/"})
|
||
resp = self.req.post(self.query_url, data, headers=self.headers)
|
||
result = json.loads(resp.text)
|
||
return result.get('data').get('rows')
|
||
|
||
def dbs_query_as_json(self, instance_name, sql_content, limit_num="1000"):
|
||
"""
|
||
通过DBS查询数据,并返回json格式
|
||
Args:
|
||
instance_name: 数据库实例: 例如sim-my-allschool
|
||
sql_content: 查询语句
|
||
limit_num: 查询行数
|
||
Returns:
|
||
[{key:value}]
|
||
"""
|
||
if not self.session_id:
|
||
self.dbs_login()
|
||
self.headers.update({"Referer": "https://dbs.qc.huohua.cn/sqlquery/"})
|
||
# db_name = sql_content.lower().split("from ")[1].split(".")[0].replace("`", "").replace(" ", "")
|
||
db_name = self.__get_db_names_from_sql(sql_statements=sql_content)[0]
|
||
data = {"instance_name": instance_name, "db_name": db_name, "schema_name": "", "tb_name": "",
|
||
"sql_content": sql_content, "limit_num": limit_num, "workflow_name": "ODS线上数据监控"}
|
||
resp = self.req.post(self.query_url, data, headers=self.headers)
|
||
result = json.loads(resp.text)
|
||
if result.get('status') != 0:
|
||
raise Exception(result)
|
||
else:
|
||
r_data = self.__make_format_data(key_list=result.get('data').get('column_list'),
|
||
column_list=result.get('data').get('rows'))
|
||
return r_data
|
||
|
||
def dbs_query_as_json_auto_identify_env(self, sql_content, instance_name=None, limit_num="1000"):
|
||
"""
|
||
通过DBS查询数据,并返回json格式【建议别链表查询,因为sim环境不支持,除非你的函数不在sim使用】
|
||
Args:
|
||
sql_content: 查询语句
|
||
instance_name: 数据库实例: env_choose中是qa或独立环境时,此参数失效,否则已输入为准,不传则默认为sim126
|
||
limit_num: 查询行数
|
||
Returns:
|
||
Note:可以根据你的查询表,直接指定sim环境的instance_name,因为qa环境时不使用此参数
|
||
"""
|
||
self.__check_sql_content(sql_content) # sql格式检查
|
||
if not instance_name:
|
||
instance_name = self.__get_instance_name(sql_content=sql_content) # 自动识别环境
|
||
|
||
if not self.session_id:
|
||
self.dbs_login()
|
||
self.headers.update({"Referer": "https://dbs.qc.huohua.cn/sqlquery/"})
|
||
# db_name = sql_content.split(".")[0].split(" ")[-1].replace("`", "").replace(" ", "")
|
||
db_name = sql_content.lower().split("from ")[1].split(".")[0].replace("`", "").replace(" ", "")
|
||
data = {"instance_name": instance_name, "db_name": db_name, "schema_name": "", "tb_name": "",
|
||
"sql_content": sql_content, "limit_num": limit_num, "workflow_name": "Test_online_data_check"}
|
||
resp = self.req.post(self.query_url, data, headers=self.headers)
|
||
result = json.loads(resp.text)
|
||
if result.get('status') != 0:
|
||
raise Exception(result)
|
||
else:
|
||
r_data = self.__make_format_data(key_list=result.get('data').get('column_list'),
|
||
column_list=result.get('data').get('rows'))
|
||
return r_data
|
||
|
||
def dbs_query_all_data_as_json(self, sql_content, instance_name=None, sort_key='id', sort_type='asc'):
|
||
"""
|
||
通过DBS查询db中的所有数据(超过1000条也行),并返回json格式,别带limit关键字,带了就走普通查询
|
||
Args:
|
||
sql_content: 查询语句
|
||
instance_name: 数据库实例: env_choose中是qa或独立环境时,此参数失效,否则已输入为准,不传则默认为sim126
|
||
sort_key: 排序字段,默认为id
|
||
sort_type: 排序类型,默认为asc-顺排,desc-倒排
|
||
"""
|
||
# 如果查询语句带有limit限制,则直接走普通查询
|
||
if " limit " in sql_content.lower():
|
||
return self.dbs_query_as_json_auto_identify_env(sql_content=sql_content, instance_name=instance_name)
|
||
# 否则走全量查询
|
||
if " join " in sql_content.lower() or " union " in sql_content.lower() or sql_content.lower().count("from") > 1:
|
||
raise Exception("链表查询不支持全量查询,请修改查询语句")
|
||
if "where" not in sql_content.lower():
|
||
raise Exception("全量查询必须带有where条件,防止数量过多,请修改查询语句")
|
||
# 全量查询
|
||
all_data = []
|
||
off_set = 0
|
||
str_list = sql_content.lower().split("where")
|
||
while True:
|
||
if sort_type.lower() == 'asc':
|
||
sql_str = ("{} where {}>{} and {}"
|
||
.format(str_list[0], sort_key, off_set, str_list[1]))
|
||
elif sort_type.lower() == 'desc':
|
||
sql_str = ("{} where {}<{} and {}"
|
||
.format(str_list[0], sort_key, off_set, str_list[1]))
|
||
db_data = self.dbs_query_as_json_auto_identify_env(instance_name='prod-my-teach_classes-slave01',
|
||
sql_content=sql_str)
|
||
all_data += db_data
|
||
if len(db_data) < 1000:
|
||
break
|
||
else:
|
||
off_set = db_data[-1][sort_key]
|
||
return all_data
|
||
|
||
def dbs_query_as_list(self, instance_name, sql_content, limit_num="100"):
|
||
"""
|
||
通过DBS查询数据,并返回list格式
|
||
Args:
|
||
instance_name: 数据库实例: 例如sim-my-allschool
|
||
sql_content: 查询语句
|
||
limit_num: 查询行数
|
||
Returns:
|
||
[{key:value}]
|
||
"""
|
||
res = self.dbs_query_as_json(instance_name=instance_name, sql_content=sql_content, limit_num=limit_num)
|
||
out_data = []
|
||
if len(res) > 0:
|
||
for item in res:
|
||
tmp_data = []
|
||
for key in item:
|
||
if len(item) > 1: # 查询结课是多列,则返回二维数组
|
||
tmp_data.append(item[key])
|
||
else: # 查询结课是单列,则返回一维数组
|
||
tmp_data = item[key]
|
||
out_data.append(tmp_data)
|
||
|
||
return out_data
|
||
|
||
@staticmethod
|
||
def __make_format_data(key_list, column_list):
|
||
"""
|
||
组装json格式的数据类型
|
||
Args:
|
||
key_list: 列名,如['a','b']
|
||
column_list: 数据,如[[1,2],[3,4]]
|
||
Returns:
|
||
[{key:value}],如[['a':1,'b':2],['a':3,'b':4]]
|
||
"""
|
||
obj_data = []
|
||
for column in column_list:
|
||
col_data = {}
|
||
for index in range(len(key_list)):
|
||
col_data[key_list[index]] = column[index]
|
||
obj_data.append(col_data)
|
||
return obj_data
|
||
|
||
def dbs_execute(self, instance_name, sql_content, time_out=60):
|
||
"""
|
||
SIM环境执行insert、update、del数据库操作
|
||
Args:
|
||
instance_name: 选择实例,例如sim126
|
||
sql_content: sql语句
|
||
time_out: 超时时间,默认60秒
|
||
"""
|
||
self.__check_sql_content(sql_content) # sql格式检查
|
||
sql_info = sql_content.split(".")[0].split(" ")
|
||
db_name = sql_info[-1].replace("`", "").replace(" ", "")
|
||
if not self.session_id:
|
||
self.dbs_login()
|
||
try:
|
||
# 检查SQL
|
||
req_params = {"sql_content": sql_content, "instance_name": instance_name, "db_name": db_name,
|
||
"sql_type": "online"}
|
||
resp_check = self.req.post(self.check_url, req_params, headers=self.headers)
|
||
if resp_check.status_code == 200:
|
||
result_check = json.loads(resp_check.text)
|
||
if result_check.get("data").get("CheckErrorCount") != 0:
|
||
raise Exception("sql检查未通过,请检查调整后,再提交,msg: {}".format(result_check))
|
||
else:
|
||
raise Exception("dbs执行检查sql失败,错误状态码: {}, res:{}".format(resp_check.status_code, resp_check.text))
|
||
|
||
# 提交SQL
|
||
req_params = {"csrfmiddlewaretoken": self.authenticate_csrf_token, "sql_content": sql_content,
|
||
"instance_name": instance_name, "db_name": db_name,
|
||
"sql_type": "online", "sql-upload": "", "workflow_id": "", "workflow_name": "test",
|
||
"demand_url": "",
|
||
"group_name": "测试组", "run_date_start": "", "run_date_end": "", "workflow_auditors": 1}
|
||
resp_commit = self.req.post(self.commit_url, req_params, headers=self.headers)
|
||
if resp_commit.status_code == 200:
|
||
workflow_id = resp_commit.history[0].headers.get("Location").split("/")[2]
|
||
else:
|
||
raise Exception("dbs执行提交sql失败,错误状态码: {}, res:{}".format(resp_commit.status_code, resp_commit.text))
|
||
|
||
# 执行sql
|
||
req_params = {"csrfmiddlewaretoken": self.authenticate_csrf_token, "workflow_id": workflow_id,
|
||
"mode": "auto"}
|
||
resp_execute = self.req.post(self.execute_url, req_params, headers=self.headers)
|
||
if resp_execute.status_code == 200:
|
||
if not resp_execute.ok:
|
||
raise Exception("sql执行未通过,请检查调整后,再提交,msg: {}".format(resp_execute.reason))
|
||
else:
|
||
raise Exception("dbs执行sql失败,错误状态码: {}, res:{}".format(resp_execute.status_code, resp_execute.text))
|
||
|
||
# 查询执行状态
|
||
time_out = int(time_out)
|
||
while time_out:
|
||
time_out -= 1
|
||
resp_get_status = self.req.get(self.get_status_url + str(workflow_id), headers=self.headers)
|
||
if resp_get_status.status_code == 200:
|
||
result_status = json.loads(resp_get_status.text)
|
||
if "Execute Successfully" in result_status.get("rows")[1].get("stagestatus"):
|
||
return True
|
||
else:
|
||
time.sleep(1)
|
||
else:
|
||
time.sleep(1)
|
||
raise Exception("超时未查询到结果,请确认sql是不是需要执行很久,若有必要请调整超时参数")
|
||
except Exception as e:
|
||
raise Exception(e)
|
||
|
||
def dbs_query_by_db_name(self, instance_name, db_name, sql_content, limit_num="100"):
|
||
"""
|
||
dbs查询
|
||
| 请求参数名 | 说明 | 类型 | 是否必填 |
|
||
| instance_name | 数据库实例: 例如sim-my-allschool | string | True |
|
||
| db_name | 数据库名称: hulk_class_help | string | True |
|
||
| schema_name | 数据库模式: 默认空不填 | string | True |
|
||
| tb_name | 表名 | string | True |
|
||
| sql_content | 查询sql | string | True |
|
||
| limit_num | 查询行数 | string | True |
|
||
"""
|
||
if not self.session_id:
|
||
self.dbs_login()
|
||
data = {"instance_name": instance_name, "db_name": db_name, "schema_name": "", "tb_name": "",
|
||
"sql_content": sql_content, "limit_num": limit_num}
|
||
self.headers.update({"Referer": "https://dbs.qc.huohua.cn/sqlquery/"})
|
||
resp = self.req.post(self.query_url, data, headers=self.headers)
|
||
result = json.loads(resp.text)
|
||
return result.get('data').get('rows')
|
||
|
||
def dbs_select(self, sql_content, r_type='json', instance_name=None):
|
||
"""
|
||
功能:通过dbs查询并返回全部数据
|
||
Arg:
|
||
sql: 查询语句
|
||
r_type: 返回类型,json或list
|
||
instance_name: 数据库实例名,不传则根据env_choose文件中的环境配置来获取
|
||
Note:
|
||
1. 若想要返回一条数据,自行在sql语句中添加limit 1限制
|
||
2. 用于线上环境查询时,指定instance_name参数即可
|
||
"""
|
||
self.__check_sql_content(sql_content) # sql格式检查
|
||
if not instance_name:
|
||
instance_name = self.__get_instance_name(sql_content=sql_content) # 获取数据库实例名
|
||
if r_type == 'json':
|
||
db_data = self.dbs_query_as_json(sql_content=sql_content, instance_name=instance_name)
|
||
elif r_type == 'list':
|
||
db_data = self.dbs_query_as_list(instance_name=instance_name, sql_content=sql_content)
|
||
else:
|
||
raise Exception("返回类型只能是json或list")
|
||
# if 'limit 1;' in sql_content:
|
||
# if len(db_data) == 0:
|
||
# return []
|
||
# return db_data[0]
|
||
# else:
|
||
# return db_data
|
||
return db_data
|
||
|
||
def dbs_execute_sql(self, sql_content, instance_name=None, time_out=60):
|
||
"""
|
||
通过DBS执行insert、update、del操作,支持concat拼接sql
|
||
Args:
|
||
sql_content: 待执行语句
|
||
instance_name: 数据库实例: env_choose中是qa或独立环境时,此参数失效,否则已输入为准,不传则默认为sim126
|
||
time_out: 超时时间,默认60秒
|
||
Returns:
|
||
Note:可以根据你的查询表,直接指定sim环境的instance_name,因为qa环境时不使用此参数
|
||
"""
|
||
if not instance_name:
|
||
instance_name = self.__get_instance_name(sql_content=sql_content) # 自动识别环境
|
||
if "concat" in sql_content.lower(): # 如果是拼接sql,则拆分后执行
|
||
sql_list = self.dbs_query_as_json_auto_identify_env(sql_content=sql_content)
|
||
for sql in sql_list:
|
||
for key in sql:
|
||
self.dbs_execute_sql(sql_content=sql[key])
|
||
else:
|
||
self.dbs_execute(instance_name=instance_name, sql_content=sql_content, time_out=time_out)
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
hh_dbs = HuoHuaDBS()
|
||
# hh_dbs.dbs_query_as_json("")
|
||
sql = "SELECT ui.user_id,sp.id AS student_id,ui.user_name,ui.logo,ui.phone,ui.phone_code,ui.nick_name,ui.english_nickname,ui.share_code,ui.invite_code,ui.customer_id,ui.channel_id,ui.from_user_id,ui.email,ss.business_line FROM (SELECT up.id as user_id,up.user_name,up.logo,up.phone,up.phone_code,up.nick_name,up.english_nickname,up.share_code,up.invite_code,up.customer_id,up.channel_id,up.from_user_id,uc.contact_info as email FROM ucenter.user_profile up LEFT JOIN ucenter.user_contact uc ON up.id=uc.user_id WHERE up.phone_code='653666709542760459') ui LEFT JOIN peppa_channel.statistics_setting ss ON ui.channel_id = ss.channel_id LEFT JOIN ucenter.student_profile sp ON sp.user_id=ui.user_id;"
|
||
|
||
print(hh_dbs.get_instance_name(sql_content=sql)) |