You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
2.8 KiB
Python

import contextlib
10 months ago
import itertools
from contextlib import asynccontextmanager
from typing import List, Any, Optional
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
10 months ago
from sqlalchemy.orm import sessionmaker
from website import settings
10 months ago
from website.handler import Row
10 months ago
# class Row(dict):
# """A dict that allows for object-like property access syntax."""
#
# def __getattr__(self, name):
# try:
# return self[name]
# except KeyError:
# raise AttributeError(name)
11 months ago
def dict_to_obj(d):
return type('GenericClass', (object,), d)()
10 months ago
def to_json_list(cursor: Any) -> Optional[List[Row]]:
column_names = list(cursor.keys())
result = cursor.fetchall()
if not result:
return []
return [Row(itertools.zip_longest(column_names, row)) for row in result]
def to_json(cursor):
column_names = list(cursor.keys())
result = cursor.fetchone()
if not result:
return {}
return Row(itertools.zip_longest(column_names, result))
app_engine = create_engine(
"mysql+pymysql://{}:{}@{}/{}?charset=utf8mb4".format(
settings.mysql_app["user"],
settings.mysql_app["password"],
settings.mysql_app["host"],
settings.mysql_app["database"],
), # SQLAlchemy 数据库连接串,格式见下面
echo=bool(settings.SQLALCHEMY_ECHO), # 是不是要把所执行的SQL打印出来一般用于调试
pool_pre_ping=True,
pool_size=int(settings.SQLALCHEMY_POOL_SIZE), # 连接池大小
max_overflow=int(settings.SQLALCHEMY_POOL_MAX_SIZE), # 连接池最大的大小
pool_recycle=int(settings.SQLALCHEMY_POOL_RECYCLE), # 多久时间回收连接
)
10 months ago
Session = sessionmaker(bind=app_engine, expire_on_commit=False)
# Base = declarative_base(app_engine)
@contextlib.contextmanager
def get_session():
s = Session()
try:
yield s
s.commit()
except Exception as e:
s.rollback()
raise e
finally:
s.close()
async_app_engine = create_async_engine(
"mysql+aiomysql://{}:{}@{}/{}?charset=utf8mb4".format(
settings.mysql_app["user"],
settings.mysql_app["password"],
settings.mysql_app["host"],
settings.mysql_app["database"],
),
echo=bool(settings.SQLALCHEMY_ECHO),
pool_pre_ping=True,
pool_size=int(settings.SQLALCHEMY_POOL_SIZE),
max_overflow=int(settings.SQLALCHEMY_POOL_MAX_SIZE),
pool_recycle=int(settings.SQLALCHEMY_POOL_RECYCLE),
)
AsyncSession = sessionmaker(bind=async_app_engine, class_=AsyncSession)
@asynccontextmanager
async def get_async_session():
async with AsyncSession() as s:
try:
yield s
await s.commit()
except Exception as e:
await s.rollback()
raise e
finally:
await s.close()