增加数据库造数的接口

This commit is contained in:
qiaoxinjiu
2026-04-13 16:34:14 +08:00
commit 9183b8b0ff
29 changed files with 1263 additions and 0 deletions

18
app/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
from flask import Flask
from app.api.views import api
from flask_docs import ApiDoc
from logger import logger
def create_app():
app = Flask(__name__)
app.register_blueprint(api, url_prefix='/it/api')
ApiDoc(
app,
title="Effekt Interface App",
version="1.0.0",
description="Effekt Interface app API",
)
app.config["API_DOC_MEMBER"] = ["api", "platform"]
logger.info("app start-------")
return app

0
app/api/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,184 @@
# encoding: UTF-8
from datetime import date, datetime
from decimal import Decimal
from common.sqlSession import SqlSession
from common.getUserInfo import UserInfo
from common.cronRequest import CronRequest
from ..service.updateSqlProjectService import UpdateSqlProjectService
from ..model.updateSqlProjectModel import UpdateSqlProject
from const import EXECUTE_DB_CONFIG, QE_DOMAIN
from logger import logger
"""
创建和更新场景
"""
class UpdateSqlProjectController(object):
def __init__(self, req_json):
self.session = SqlSession()
self.run_env = req_json.get('runEnv')
self.sql = req_json.get('sql')
self.sql_id = req_json.get('sqlId')
self.project = req_json.get('project')
self.page_num = req_json.get('pageNo')
self.page_size = req_json.get('pageSize')
self.remark = req_json.get('remark')
self.run_group = req_json.get('runGroup')
self.creator = req_json.get('creator')
self.qe_domain = QE_DOMAIN
def create_sql_project(self):
project = self.project.strip().strip('"') if isinstance(self.project, str) else self.project
run_env = self.run_env.strip().strip('"') if isinstance(self.run_env, str) else self.run_env
sql = self.sql.strip().strip('"') if isinstance(self.sql, str) else self.sql
if not project or not run_env or not sql:
return 0, 'project、runEnv、sql 为必传参数'
sql_id = self.sql_id
if isinstance(sql_id, str):
sql_id = sql_id.strip().strip('"')
remark = self.remark.strip() if isinstance(self.remark, str) else self.remark
remark = remark if remark not in ('', None) else None
run_group = self.run_group.strip() if isinstance(self.run_group, str) else self.run_group
run_group = run_group if run_group not in ('', None) else ''
creator = self.creator.strip() if isinstance(self.creator, str) else self.creator
creator = creator if creator not in ('', None) else 'admin'
save_info = {
'project': project,
'run_env': run_env,
'sql': sql,
'remark': remark,
'run_group': run_group,
'creator': creator,
'is_delete': 0
}
if sql_id not in (None, ''):
update_res, err_msg = UpdateSqlProjectService.update_sql_project(
self.session, sql_id, save_info
)
return update_res, err_msg
create_id, err_msg = UpdateSqlProjectService.create_sql_project(self.session, save_info)
return create_id, err_msg
@staticmethod
def _format_value(value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(value, date):
return value.strftime('%Y-%m-%d')
if isinstance(value, Decimal):
return float(value)
return value
@classmethod
def _serialize_item(cls, item):
item_dict = item.to_dict()
for key, value in item_dict.items():
item_dict[key] = cls._format_value(value)
return item_dict
def query_smart_manage_sql_data(self):
"""
查询对应填写的sql语句列表数据
"""
page_num = self.page_num or 1
page_size = self.page_size or 20
project = self.project
creator = self.creator
run_group = self.run_group
run_env = self.run_env
if isinstance(project, str):
project = project.strip().strip('"')
if isinstance(creator, str):
creator = creator.strip().strip('"')
if isinstance(run_group, str):
run_group = run_group.replace('\u3000', ' ').strip().strip('"')
filter_list = list()
if project:
filter_list.append(UpdateSqlProject.project == project)
if creator:
filter_list.append(UpdateSqlProject.creator == creator)
if run_env:
filter_list.append(UpdateSqlProject.run_env == run_env)
if run_group:
filter_list.append(UpdateSqlProject.run_group.isnot(None))
filter_list.append(UpdateSqlProject.run_group != '')
filter_list.append(UpdateSqlProject.run_group == run_group)
test_info, count_num = UpdateSqlProjectService.get_sql_list_by_filters(
session=self.session,
filter_list=filter_list,
page_num=page_num,
page_size=page_size
)
result_list = [self._serialize_item(item) for item in test_info]
return {'list': result_list, 'total': count_num}
def get_sql_project_detail(self):
sql_id = self.sql_id
if isinstance(sql_id, str):
sql_id = sql_id.strip().strip('"')
if not sql_id:
return {}, 'sqlId 为必传参数'
sql_project = UpdateSqlProjectService.get_sql_project_by_id(self.session, sql_id)
if not sql_project:
return {}, '未查询到对应记录!'
detail = self._serialize_item(sql_project)
detail.pop('is_delete', None)
return detail, ''
def delete_sql_project(self):
sql_id = self.sql_id
if isinstance(sql_id, str):
sql_id = sql_id.strip().strip('"')
if not sql_id:
return 0, 'sqlId 为必传参数'
return UpdateSqlProjectService.delete_sql_project_by_id(self.session, sql_id)
def execute_sql_project(self):
sql_id = self.sql_id
if isinstance(sql_id, str):
sql_id = sql_id.strip().strip('"')
if not sql_id:
return {}, 'sqlId 为必传参数'
sql_project = UpdateSqlProjectService.get_sql_project_by_id(self.session, sql_id)
if not sql_project:
return {}, '未查询到对应SQL记录'
project = (sql_project.project or '').strip()
run_env = (sql_project.run_env or '').strip().lower()
project_config = EXECUTE_DB_CONFIG.get(project) or EXECUTE_DB_CONFIG.get(project.upper()) or EXECUTE_DB_CONFIG.get(project.lower())
target_config = (project_config or {}).get(run_env)
if not target_config:
return {}, '未配置对应项目环境的数据库连接信息!'
execute_session = SqlSession(SqlSession.build_postgres_uri(
target_config['host'],
target_config['port'],
target_config['user'],
target_config['password'],
target_config['database']
))
try:
result = execute_session.execute(sql_project.sql)
if result.returns_rows:
rows = []
for row in result.fetchall():
row_dict = {key: self._format_value(value) for key, value in dict(row._mapping).items()}
rows.append(row_dict)
execute_session.session.rollback()
execute_session.close()
return {'sqlId': int(sql_id), 'rows': rows, 'rowCount': len(rows)}, ''
err = execute_session.done(close=False)
if err:
execute_session.close()
return {}, f'执行SQL失败{err}'
row_count = result.rowcount
execute_session.close()
return {'sqlId': int(sql_id), 'rowCount': row_count}, ''
except Exception as e:
execute_session.session.rollback()
execute_session.close()
logger.warning(f'execute_sql_project执行失败sql_id: {sql_id}, err: {e}')
return {}, f'执行SQL失败{e}'

0
app/api/dao/__init__.py Normal file
View File

View File

@@ -0,0 +1,68 @@
# encoding: UTF-8
from ..model.updateSqlProjectModel import UpdateSqlProject
from logger import logger
class UpdateSqlProjectDao(object):
@staticmethod
def get_sql_project_by_id(session, sql_id):
return session.query(UpdateSqlProject).filter(
UpdateSqlProject.id == int(sql_id), UpdateSqlProject.is_delete == 0
).first()
@staticmethod
def delete_sql_project_by_id(session, sql_id):
delete_res = session.query(UpdateSqlProject).filter(
UpdateSqlProject.id == int(sql_id), UpdateSqlProject.is_delete == 0
).update({'is_delete': 1})
err = session.done(close=False)
if err:
logger.error('delete update_sql_project db失败sql_id: {}, err: {}'.format(sql_id, err))
return 0, f'删除记录失败!{err}'
if not delete_res:
return 0, '未查询到对应记录!'
return int(sql_id), ''
@staticmethod
def create_sql_project(session, add_info):
if not isinstance(add_info, dict):
logger.error('create_sql_project不支持其他类型。')
return 0, '入参类型错误!'
sql_project_obj = UpdateSqlProject(**add_info)
session.add(sql_project_obj)
err = session.done(close=False)
create_id = sql_project_obj.id
if err:
logger.warning(f'create_sql_project新增记录失败{err}')
return 0, f'新增记录失败!{err}'
if not create_id:
logger.warning('获取update_sql_project记录id失败')
return 0, f'{add_info}获取update_sql_project记录id失败'
return create_id, ''
@staticmethod
def get_sql_by_filters(session, filter_list, page=1, limit=20):
rets = session.query(UpdateSqlProject)\
.filter(*filter_list) \
.filter(UpdateSqlProject.is_delete == 0) \
.order_by(UpdateSqlProject.created_time.desc()) \
.offset((int(page) - 1) * int(limit)) \
.limit(limit) \
.all()
total = session.query(UpdateSqlProject).filter(*filter_list).filter(
UpdateSqlProject.is_delete == 0).count()
return rets, total
@staticmethod
def update_sql_project_by_id(session, sql_id, update_info):
update_res = session.query(UpdateSqlProject).filter(
UpdateSqlProject.id == int(sql_id), UpdateSqlProject.is_delete == 0
).update(update_info)
err = session.done(close=False)
if err:
logger.error('update update_sql_project db失败sql_id: {}, update_info:{}, err: {}'.format(sql_id, update_info, err))
return 0, f'更新记录失败!{err}'
if not update_res:
return 0, '未查询到对应记录!'
return int(sql_id), ''

View File

View File

@@ -0,0 +1,30 @@
from sqlalchemy import Column, Integer, String, TIMESTAMP, text
from sqlalchemy.ext.declarative import declarative_base
from common.sqlSession import to_dict
Base = declarative_base()
Base.to_dict = to_dict
class UpdateSqlProject(Base):
__tablename__ = 'update_sql_project'
id = Column(Integer, primary_key=True, autoincrement=True, comment='id')
sql = Column(String(500), comment='sql语句')
run_env = Column(String(120), comment='运行环境')
project = Column(String(120), comment='项目')
run_group = Column(String(120), comment='对sql进行分组')
remark = Column(String(300), comment='备注')
creator = Column(String(300), comment='创建人')
is_delete = Column(Integer, default=0, comment='0未删除1已删除')
created_time = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'), nullable=True, comment='创建时间')
modified_time = Column(
TIMESTAMP,
server_default=text('CURRENT_TIMESTAMP'),
server_onupdate=text('CURRENT_TIMESTAMP'),
nullable=True,
comment='修改时间'
)
def __repr__(self):
return '<update_sql_project %r>' % self.id

View File

View File

@@ -0,0 +1,37 @@
# encoding: UTF-8
from ..dao.updateSqlProjectDao import UpdateSqlProjectDao
from logger import logger
class UpdateSqlProjectService(object):
def __init__(self):
pass
@staticmethod
def create_sql_project(session, add_info):
bf_dao = UpdateSqlProjectDao()
sql_id, err_msg = bf_dao.create_sql_project(session, add_info)
return sql_id, err_msg
@staticmethod
def update_sql_project(session, sql_id, update_info):
bf_dao = UpdateSqlProjectDao()
update_res, err_msg = bf_dao.update_sql_project_by_id(session, sql_id, update_info)
return update_res, err_msg
@staticmethod
def get_sql_list_by_filters(session, page_num=1, page_size=20, filter_list=None):
bf_dao = UpdateSqlProjectDao()
filter_list = filter_list or []
bf_obj, count_num = bf_dao.get_sql_by_filters(session, filter_list, int(page_num), int(page_size))
return bf_obj, count_num
@staticmethod
def get_sql_project_by_id(session, sql_id):
bf_dao = UpdateSqlProjectDao()
return bf_dao.get_sql_project_by_id(session, sql_id)
@staticmethod
def delete_sql_project_by_id(session, sql_id):
bf_dao = UpdateSqlProjectDao()
return bf_dao.delete_sql_project_by_id(session, sql_id)

View File

43
app/api/utils/apiAuth.py Normal file
View File

@@ -0,0 +1,43 @@
# encoding: UTF-8
from logger import logger
import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from urllib import parse
class apiAuth(object):
def __init__(self):
self.ops_uri = ""
self.showUsername = ""
self.username = ""
self.password = ""
self.sso_login_url = ""
self.redirect_url = ""
def getSsoToken(self):
session = requests.session()
post_data = dict()
post_data['showUsername'] = self.showUsername
post_data['username'] = self.username
post_data['password'] = self.password
session.post(url=self.sso_login_url,data=post_data,allow_redirects=True,verify=False)
resp = session.get(
url=self.redirect_url ,
allow_redirects=False ,
verify=False)
resp1 = session.get(
url=resp.headers['Location'] ,
allow_redirects=False ,
verify=False)
ssoToken = resp1.headers["Set-Cookie"].split("=")[1]
return ssoToken
if __name__ == '__main__':
test = apiAuth()
print(test.getSsoToken())

56
app/api/views.py Normal file
View File

@@ -0,0 +1,56 @@
# encoding: UTF-8
from flask import Blueprint, request
from common.apiResponse import ApiResponse
from .controller.updateSqlProjectController import UpdateSqlProjectController
api = Blueprint('api', __name__)
@api.route('/list', methods=['GET'])
def get_list():
request_args = request.args
controller = UpdateSqlProjectController(request_args)
ret = controller.query_smart_manage_sql_data()
return ApiResponse.build_success(20000, data=ret)
@api.route('/create', methods=['POST'])
def create_sql_project():
req_json = request.get_json() or {}
controller = UpdateSqlProjectController(req_json)
create_id, err_msg = controller.create_sql_project()
if err_msg:
return ApiResponse.build_failure(40009, msg=err_msg)
return ApiResponse.build_success(20000, data={'sqlId': create_id})
@api.route('/detail', methods=['GET'])
def get_sql_project_detail():
request_args = request.args
controller = UpdateSqlProjectController(request_args)
ret, err_msg = controller.get_sql_project_detail()
if err_msg:
return ApiResponse.build_failure(40011, msg=err_msg)
return ApiResponse.build_success(20000, data=ret)
@api.route('/delete', methods=['POST'])
def delete_sql_project():
req_json = request.get_json() or {}
controller = UpdateSqlProjectController(req_json)
delete_id, err_msg = controller.delete_sql_project()
if err_msg:
return ApiResponse.build_failure(40012, msg=err_msg)
return ApiResponse.build_success(20000, data={'sqlId': delete_id})
@api.route('/execute', methods=['POST'])
def execute_sql_project():
req_json = request.get_json() or {}
controller = UpdateSqlProjectController(req_json)
ret, err_msg = controller.execute_sql_project()
if err_msg:
return ApiResponse.build_failure(40009, msg=err_msg)
return ApiResponse.build_success(20000, data=ret)