Files
effekt-interface/common/sqlSession.py
2026-04-13 16:34:14 +08:00

99 lines
2.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 创建连接相关
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from urllib.parse import quote_plus as urlquote
from const import sparkatp_sql_uri
from logger import logger
_ENGINE_CACHE = {}
"""
sql操作
排序order_by(ChartsName.column.desc()/asc())
limit: .offset(n)过滤前面n条数据 .limit(n)
count: .count()计数
是否存在is_exist = session.query(exists().where(Book.id > 10)).scalar()
or: .filter(or_(Chart.column == x, Chart.column > y)).all()
one: .one()只获取一条,如不存在或存在多条都会报错
first: 通过主键获取记录 filter(**).first()
"""
class SqlSession:
def __init__(self, sql_uri=sparkatp_sql_uri):
self.sql_uri = sql_uri
self._session = self.get_session()
@staticmethod
def build_postgres_uri(host, port, user, password, database):
return f"postgresql+psycopg2://{user}:{urlquote(str(password))}@{host}:{port}/{database}"
def get_session(self):
engine = _ENGINE_CACHE.get(self.sql_uri)
if engine is None:
engine = create_engine(
self.sql_uri,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
pool_timeout=30,
connect_args={
'connect_timeout': 20,
'options': '-c timezone=Asia/Shanghai'
}
)
_ENGINE_CACHE[self.sql_uri] = engine
Session = sessionmaker(bind=engine)
session = Session()
return session
def query(self, obj):
return self._session.query(obj)
def add(self, added):
self._session.add(added)
def add_all(self, added_list):
if isinstance(added_list, list):
self._session.add_all(added_list)
else:
logger.warning('只能传递list')
def flush(self):
self._session.flush()
def commit(self):
self._session.commit()
def close(self):
self._session.close()
def execute(self, sql):
return self._session.execute(text(sql))
def done(self, close=True):
"""
执行完插入、删除、修改等操作后执行done如报错回滚本次事务的sql操作
:return:
"""
try:
self.commit()
if close:
self.close()
except Exception as e:
logger.warning(e)
self._session.rollback()
return e
@property
def session(self):
return self._session
def to_dict(self):
return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}