123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- '''
- @File : oracle_helper.py
- @Time : 2025/01/09 11:31:32
- @Author : dulip3ng
- @Version : 1.0
- @Desc : None
- '''
- import os.path
- import oracledb
- from typing import List, Dict, Any, Optional
- class OracleHelper:
- def __init__(self, username: str, password: str, dsn: str):
- """
- 初始化数据库连接
- :param username: 数据库用户名
- :param password: 数据库密码
- :param dsn: 数据库连接字符串(格式:hostname:port/service_name)
- """
- current_dir = os.path.dirname(__file__)
- client_dir = os.path.join(current_dir, "instantclient_23_6")
- print(client_dir)
- oracledb.init_oracle_client(lib_dir= client_dir)
- self.username = username
- self.password = password
- self.dsn = dsn
- self.connection = None
- def connect(self):
- """连接到数据库"""
- try:
- self.connection = oracledb.connect(
- user=self.username,
- password=self.password,
- dsn=self.dsn
- )
- except oracledb.Error as e:
- print(f"数据库连接失败: {e}")
- def disconnect(self):
- """关闭数据库连接"""
- if self.connection:
- self.connection.close()
- def execute_query(self, sql: str, params: Optional[Dict] = None) -> List[Dict[str, Any]]:
- """
- 执行查询语句
- :param sql: SQL 查询语句
- :param params: 查询参数(可选)
- :return: 查询结果列表,每行是一个字典
- """
- results = []
- try:
- cursor = self.connection.cursor()
- cursor.execute(sql, params or {})
- columns = [col[0] for col in cursor.description] # 获取列名
- for row in cursor:
- results.append(dict(zip(columns, row))) # 将每行数据转换为字典
- cursor.close()
- except oracledb.Error as e:
- print(f"查询执行失败: {e}")
- return results
- def execute_update(self, sql: str, params: Optional[Dict] = None) -> int:
- """
- 执行更新语句(INSERT/UPDATE/DELETE)
- :param sql: SQL 更新语句
- :param params: 更新参数(可选)
- :return: 受影响的行数
- """
- try:
- cursor = self.connection.cursor()
- cursor.execute(sql, params or {})
- self.connection.commit()
- row_count = cursor.rowcount
- cursor.close()
- return row_count
- except oracledb.Error as e:
- self.connection.rollback()
- print(f"更新执行失败: {e}")
- return -1
- def execute_many(self, sql: str, data: List[Dict]) -> int:
- """
- 批量执行更新语句
- :param sql: SQL 更新语句
- :param data: 批量数据(列表中的每个字典对应一行数据)
- :return: 受影响的总行数
- """
- try:
- cursor = self.connection.cursor()
- cursor.executemany(sql, data)
- self.connection.commit()
- row_count = cursor.rowcount
- cursor.close()
- return row_count
- except oracledb.Error as e:
- self.connection.rollback()
- print(f"批量更新执行失败: {e}")
- return 0
- def call_procedure(self, procedure_name: str, params: Optional[List] = None):
- """
- 调用存储过程
- :param procedure_name: 存储过程名称
- :param params: 存储过程参数(可选)
- """
- try:
- cursor = self.connection.cursor()
- cursor.callproc(procedure_name, params or [])
- self.connection.commit()
- cursor.close()
- except oracledb.Error as e:
- self.connection.rollback()
- print(f"存储过程调用失败: {e}")
|