# 创建连接相关 from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from urllib.parse import quote_plus as urlquote from const import sparkatp_sql_uri from logger import logger _ENGINE_CACHE = {} """ sql操作: 排序:order_by(ChartsName.column.desc()/asc()) limit: .offset(n)过滤前面n条数据 .limit(n) count: .count()计数 是否存在:is_exist = session.query(exists().where(Book.id > 10)).scalar() or: .filter(or_(Chart.column == x, Chart.column > y)).all() one: .one()只获取一条,如不存在或存在多条都会报错 first: 通过主键获取记录 filter(**).first() """ class SqlSession: def __init__(self, sql_uri=sparkatp_sql_uri): self.sql_uri = sql_uri self._session = self.get_session() @staticmethod def build_postgres_uri(host, port, user, password, database): return f"postgresql+psycopg2://{user}:{urlquote(str(password))}@{host}:{port}/{database}" def get_session(self): engine = _ENGINE_CACHE.get(self.sql_uri) if engine is None: engine = create_engine( self.sql_uri, pool_size=5, max_overflow=10, pool_pre_ping=True, pool_recycle=1800, pool_timeout=30, connect_args={ 'connect_timeout': 20, 'options': '-c timezone=Asia/Shanghai' } ) _ENGINE_CACHE[self.sql_uri] = engine Session = sessionmaker(bind=engine) session = Session() return session def query(self, obj): return self._session.query(obj) def add(self, added): self._session.add(added) def add_all(self, added_list): if isinstance(added_list, list): self._session.add_all(added_list) else: logger.warning('只能传递list') def flush(self): self._session.flush() def commit(self): self._session.commit() def close(self): self._session.close() def execute(self, sql): return self._session.execute(text(sql)) def done(self, close=True): """ 执行完插入、删除、修改等操作后执行done,如报错回滚本次事务的sql操作 :return: """ try: self.commit() if close: self.close() except Exception as e: logger.warning(e) self._session.rollback() return e @property def session(self): return self._session def to_dict(self): return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}