Files
qiaoxinjiu 6994b185a3 addproject
2026-01-22 19:10:37 +08:00

301 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
"""
Author: qiaoxinjiu
Create Data: 2020/11/6 17:34
"""
import pymysql
import pymongo
# from DBUtils.PooledDB import PooledDB
from dbutils.pooled_db import PooledDB
import psycopg2
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from base_framework.public_tools.read_config import InitConfig
from base_framework.public_tools.read_config import ReadConfig, get_current_config
# from base_framework.public_tools import db_config as config
from base_framework.base_config.current_pth import *
"""
@功能:创建数据库连接池
"""
as_db = ['ZZYY']
class PgConnectionPool(InitConfig):
pool = None
pool_cache = dict()
def __init__(self):
try:
super().__init__()
self.DB_SSL = False
except Exception as e:
print(e)
# 创建数据库连接conn和游标cursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
def __getconn(self, choose_db=None):
# 如果未指定数据库,使用默认配置
if not choose_db:
choose_db = 'default'
try:
self.pool = self.pool_cache[choose_db]
except KeyError:
rc = ReadConfig(config_file_path)
# 根据choose_db值获取对应的PostgreSQL配置
if choose_db == 'default':
db_host = rc.get_value(sections='PostgreSQL', options='db_test_host')
db_port = rc.get_value(sections='PostgreSQL', options='db_test_port')
db_name = rc.get_value(sections='PostgreSQL', options='db_test_dbname')
db_user = rc.get_value(sections='PostgreSQL', options='db_test_user')
db_password = rc.get_value(sections='PostgreSQL', options='db_test_password')
db_min_cached = rc.get_value(sections='PostgreSQL', options='db_min_cached')
db_max_cached = rc.get_value(sections='PostgreSQL', options='db_max_cached')
db_max_shared = rc.get_value(sections='PostgreSQL', options='db_max_shared')
db_max_connecyions = rc.get_value(sections='PostgreSQL', options='db_max_connecyions')
db_max_usage = rc.get_value(sections='PostgreSQL', options='db_max_usage')
else:
# 可以扩展其他数据库连接
db_host = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_host')
db_port = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_port')
db_name = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_name')
db_user = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_user')
db_password = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_password')
db_min_cached = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_min_cached')
db_max_cached = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_max_cached')
db_max_shared = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_max_shared')
db_max_connecyions = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_max_connecyions')
db_max_usage = rc.get_value(sections='PostgreSQL', options=f'db_{choose_db}_max_usage')
# PostgreSQL连接池配置
try:
print("=" * 80)
print("PostgreSQL连接池配置信息:")
print(" 主机(Host): {}".format(db_host))
print(" 端口(Port): {}".format(db_port))
print(" 数据库名(Database): {}".format(db_name))
print(" 用户名(User): {}".format(db_user))
print(" 密码(Password): {} (已隐藏)".format('*' * len(db_password) if db_password else 'None'))
print(" 最小缓存连接数(MinCached): {}".format(db_min_cached))
print(" 最大缓存连接数(MaxCached): {}".format(db_max_cached))
print(" 最大共享连接数(MaxShared): {}".format(db_max_shared))
print(" 最大连接数(MaxConnections): {}".format(db_max_connecyions))
print(" 最大使用次数(MaxUsage): {}".format(db_max_usage))
print(" SSL模式(SSLMode): {}".format('require' if self.DB_SSL else 'disable'))
print(" 连接超时(ConnectTimeout): 30秒")
print("=" * 80)
self.pool = PooledDB(
creator=psycopg2,
host=db_host,
port=int(db_port),
user=db_user,
password=db_password,
database=db_name,
mincached=int(db_min_cached),
maxcached=int(db_max_cached),
maxshared=int(db_max_shared),
maxconnections=int(db_max_connecyions),
blocking=True,
maxusage=int(db_max_usage),
setsession=None,
# PostgreSQL特定参数
sslmode='require' if self.DB_SSL else 'disable',
connect_timeout=30,
keepalives=1,
keepalives_idle=30,
keepalives_interval=10,
keepalives_count=5
)
self.pool_cache[choose_db] = self.pool
print("PostgreSQL连接池创建成功")
except Exception as e:
error_msg = """
PostgreSQL连接池创建失败
连接配置信息:
主机(Host): {}
端口(Port): {}
数据库名(Database): {}
用户名(User): {}
密码(Password): {} (已隐藏)
SSL模式(SSLMode): {}
连接超时(ConnectTimeout): 30秒
错误详情: {}
""".format(
db_host, db_port, db_name, db_user,
'*' * len(db_password) if db_password else 'None',
'require' if self.DB_SSL else 'disable',
str(e)
)
print(error_msg)
raise Exception(error_msg)
try:
return self.pool.connection()
except Exception as e:
# 尝试获取连接配置信息用于错误提示
try:
rc = ReadConfig(config_file_path)
db_host = rc.get_value(sections='PostgreSQL', options='db_test_host')
db_port = rc.get_value(sections='PostgreSQL', options='db_test_port')
db_name = rc.get_value(sections='PostgreSQL', options='db_test_dbname')
db_user = rc.get_value(sections='PostgreSQL', options='db_test_user')
except:
db_host = db_port = db_name = db_user = 'unknown'
error_msg = """
PostgreSQL连接获取失败
连接配置信息:
主机(Host): {}
端口(Port): {}
数据库名(Database): {}
用户名(User): {}
错误详情: {}
""".format(db_host, db_port, db_name, db_user, str(e))
print(error_msg)
raise Exception(error_msg)
# 释放连接池资源
def __exit__(self, exc_type, exc_val, exc_tb):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
# 获取连接和游标(返回字典形式的结果)
def getconn(self, choose_db=None):
conn = self.__getconn(choose_db=choose_db)
# 使用RealDictCursor返回字典形式的游标
cursor = conn.cursor(cursor_factory=RealDictCursor)
return cursor, conn
class MyConnectionPool(InitConfig):
pool = None
pool_cache = dict()
def __init__(self):
try:
super().__init__()
except Exception as e:
print(e)
self.current_business = get_current_config(section='run_evn_name', key='current_business')
# 创建数据库连接conn和游标cursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
def __getconn(self, choose_db=None):
current_team = ReadConfig(env_choose_path).get_value(sections='run_evn_name', options='current_team')
if not choose_db: # 没有指定,则按小组默认设置
if current_team.upper() in as_db and choose_db is None:
choose_db = 'as'
elif current_team.upper() == "SE" and choose_db is None:
choose_db = 'se'
elif current_team.upper() == "XUEDAU" and choose_db is None:
choose_db = 'xdu'
elif current_team.upper() not in as_db and choose_db is None and self.current_business == 'hh':
choose_db = 'hh'
elif current_team.upper() not in as_db and choose_db is None and self.current_business == 'hhi':
choose_db = 'hhi'
try:
self.pool = self.pool_cache[choose_db]
except Exception as e:
rc = ReadConfig(config_file_path)
if choose_db == 'as':
db_host = rc.get_value(sections='Mysql', options='db_as_svr')
elif choose_db == 'se':
db_host = rc.get_value(sections='Mysql', options='db_se_svr')
elif choose_db == 'xdu':
db_host = rc.get_value(sections='Mysql', options='db_xdu_svr')
self.DB_TEST_USER = rc.get_value(sections='Mysql', options='db_xdu_user')
self.DB_TEST_PASSWORD = rc.get_value(sections='Mysql', options='db_xdu_password')
elif choose_db == 'hh' or choose_db == 'huohua':
db_host = rc.get_value(sections='Mysql', options='db_hh_svr')
elif choose_db == 'hhi':
db_host = rc.get_value(sections='Mysql', options='db_hhi_svr')
elif choose_db == 'hh.qa': # 自动化和信息化的数据都走huohua
db_host = 'mysql.qa.huohua.cn'
elif not choose_db: # 没有传入则默认走huohua
choose_db = 'hh.qa'
db_host = 'mysql.qa.huohua.cn'
else:
raise Exception("当前仅支持hh,hhi,as,se四个数据库服务器而你选择的是{}".format(choose_db))
default_db = 'sys'
self.pool = PooledDB(
creator=pymysql,
host=db_host,
port=int(self.DB_TEST_PORT),
user=self.DB_TEST_USER,
passwd=self.DB_TEST_PASSWORD,
db=default_db,
mincached=int(self.DB_MIN_CACHED),
maxcached=int(self.DB_MAX_CACHED),
maxshared=int(self.DB_MAX_SHARED),
maxconnections=int(self.DB_MAX_CONNECYIONS),
blocking=True,
maxusage=int(self.DB_MAX_USAGE),
setsession=None,
use_unicode=True,
charset=self.DB_CHARSET
)
self.pool_cache[choose_db] = self.pool
return self.pool.connection()
# 释放连接池资源
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
# 关闭连接归还给链接池
# def close(self):
# self.cursor.close()
# self.conn.close()
# 从连接池中取出一个连接
def getconn(self, choose_db=None):
conn = self.__getconn(choose_db=choose_db)
# 字典形式返回
cursor = conn.cursor(pymysql.cursors.DictCursor)
return cursor, conn
# 获取连接池,实例化
def get_my_connection():
return MyConnectionPool()
def get_pg_connection():
return PgConnectionPool()
class MongoConnectionPool(InitConfig):
def __init__(self):
try:
# super().__init__()
super(MongoConnectionPool, self).__init__()
except Exception as e:
print(e)
def mongo_connect(self):
try:
self.connect_ = pymongo.MongoClient(host=self.MONGO_HOST,
port=int(self.MONGO_PORT),
username=self.MONGO_USER,
password=self.MONGO_PASSWORD,
authSource="hulk_teach_marketing"
)
except Exception as e:
raise Exception("mongdb连接失败{}".format(e))
return self.connect_
def get_my_mongo_connection():
return MongoConnectionPool()