import psycopg2 from psycopg2 import OperationalError, extras from loguru import logger import os import traceback # 创建连接(你只需要创建一次,然后在服务中复用) def create_connection(): try: conn = psycopg2.connect( dbname=os.environ['POSTGRESQL_DATABASE'], user=os.environ['POSTGRESQL_USERNAME'], password=os.environ['POSTGRESQL_PASSWORD'], host=os.environ['POSTGRESQL_HOST'], port=os.environ['POSTGRESQL_PORT'], options=f'-c search_path={os.environ["POSTGRESQL_SCHEMA"]}' ) conn.autocommit = False with conn.cursor() as cur: cur.execute("SET TIME ZONE 'Asia/Shanghai';") conn.commit() return conn except OperationalError as e: logger.error(f"连接数据库失败: {e}") raise e # 插入数据的函数 def insert_data(conn, table, data_dict, pk_name='id'): """ 向指定表中插入数据 :param conn: psycopg2 connection 对象 :param table: 表名(字符串) :param data_dict: 字典格式的数据,比如 {"column1": value1, "column2": value2} """ if conn is None: logger.error("数据库连接无效") return try: with conn.cursor() as cur: columns = data_dict.keys() values = [data_dict[col] for col in columns] # 构造 SQL insert_query = f""" INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(values))}) RETURNING {pk_name} """ cur.execute(insert_query, values) inserted_id = cur.fetchone()[0] return inserted_id except Exception as e: logger.error(f"插入数据失败: {e}") raise e def insert_multiple_data(conn, table, data_list, batch_size=100): """ 批量向指定表中插入数据(使用 execute_values) :param conn: psycopg2 connection 对象 :param table: 表名(字符串) :param data_list: 包含多个字典的列表,每个字典代表一行数据,比如 [{"column1": value1, "column2": value2}, ...] """ if conn is None: logger.error("数据库连接无效") return try: with conn.cursor() as cur: columns = data_list[0].keys() insert_query = f""" INSERT INTO {table} ({', '.join(columns)}) VALUES %s """ for i in range(0, len(data_list), batch_size): batch = data_list[i:i + batch_size] values = [tuple(d.values()) for d in batch] extras.execute_values(cur, insert_query, values) except Exception as e: logger.error(f"批量插入数据失败: {e}") raise e conn = create_connection() def insert_pdf2md_table(pdf_path, pdf_name, process_status, start_time, end_time, rec_results): data_dict = { 'path': pdf_path, 'filename': pdf_name, 'process_status': process_status, 'analysis_start_time': start_time, 'analysis_end_time': end_time } try: inserted_id = insert_data(conn, 'pdf_info', data_dict) if process_status == 2: data_list = [] for i in range(len(rec_results)): # 每一页 page_no = i + 1 for j in range(len(rec_results[i])): # 每一个box box = rec_results[i][j] content = box.content clsid = box.clsid table_title = box.table_title order = j data_dict = { 'layout_type': clsid, 'content': content, 'page_no': page_no, 'pdf_id': inserted_id, 'table_title': table_title, 'display_order': order } data_list.append(data_dict) insert_multiple_data(conn, 'pdf_analysis_output', data_list) conn.commit() return inserted_id except Exception as e: conn.rollback() logger.error(f'operate database error!\n{traceback.format_exc()}') raise e