#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @File : oracle_helper.py @Time : 2025/01/09 11:31:32 @Author : dulip3ng @Version : 1.0 @Desc : None ''' import os.path import oracledb from typing import List, Dict, Any, Optional class OracleHelper: def __init__(self, username: str, password: str, dsn: str): """ 初始化数据库连接 :param username: 数据库用户名 :param password: 数据库密码 :param dsn: 数据库连接字符串(格式:hostname:port/service_name) """ current_dir = os.path.dirname(__file__) client_dir = os.path.join(current_dir, "instantclient_23_6") print(client_dir) oracledb.init_oracle_client(lib_dir= client_dir) self.username = username self.password = password self.dsn = dsn self.connection = None def connect(self): """连接到数据库""" try: self.connection = oracledb.connect( user=self.username, password=self.password, dsn=self.dsn ) except oracledb.Error as e: print(f"数据库连接失败: {e}") def disconnect(self): """关闭数据库连接""" if self.connection: self.connection.close() def execute_query(self, sql: str, params: Optional[Dict] = None) -> List[Dict[str, Any]]: """ 执行查询语句 :param sql: SQL 查询语句 :param params: 查询参数(可选) :return: 查询结果列表,每行是一个字典 """ results = [] try: cursor = self.connection.cursor() cursor.execute(sql, params or {}) columns = [col[0] for col in cursor.description] # 获取列名 for row in cursor: results.append(dict(zip(columns, row))) # 将每行数据转换为字典 cursor.close() except oracledb.Error as e: print(f"查询执行失败: {e}") return results def execute_update(self, sql: str, params: Optional[Dict] = None) -> int: """ 执行更新语句(INSERT/UPDATE/DELETE) :param sql: SQL 更新语句 :param params: 更新参数(可选) :return: 受影响的行数 """ try: cursor = self.connection.cursor() cursor.execute(sql, params or {}) self.connection.commit() row_count = cursor.rowcount cursor.close() return row_count except oracledb.Error as e: self.connection.rollback() print(f"更新执行失败: {e}") return -1 def execute_many(self, sql: str, data: List[Dict]) -> int: """ 批量执行更新语句 :param sql: SQL 更新语句 :param data: 批量数据(列表中的每个字典对应一行数据) :return: 受影响的总行数 """ try: cursor = self.connection.cursor() cursor.executemany(sql, data) self.connection.commit() row_count = cursor.rowcount cursor.close() return row_count except oracledb.Error as e: self.connection.rollback() print(f"批量更新执行失败: {e}") return 0 def call_procedure(self, procedure_name: str, params: Optional[List] = None): """ 调用存储过程 :param procedure_name: 存储过程名称 :param params: 存储过程参数(可选) """ try: cursor = self.connection.cursor() cursor.callproc(procedure_name, params or []) self.connection.commit() cursor.close() except oracledb.Error as e: self.connection.rollback() print(f"存储过程调用失败: {e}")