oracle_helper.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @File : oracle_helper.py
  5. @Time : 2025/01/09 11:31:32
  6. @Author : dulip3ng
  7. @Version : 1.0
  8. @Desc : None
  9. '''
  10. import os.path
  11. import oracledb
  12. from typing import List, Dict, Any, Optional
  13. class OracleHelper:
  14. def __init__(self, username: str, password: str, dsn: str):
  15. """
  16. 初始化数据库连接
  17. :param username: 数据库用户名
  18. :param password: 数据库密码
  19. :param dsn: 数据库连接字符串(格式:hostname:port/service_name)
  20. """
  21. current_dir = os.path.dirname(__file__)
  22. client_dir = os.path.join(current_dir, "instantclient_23_6")
  23. print(client_dir)
  24. oracledb.init_oracle_client(lib_dir= client_dir)
  25. self.username = username
  26. self.password = password
  27. self.dsn = dsn
  28. self.connection = None
  29. def connect(self):
  30. """连接到数据库"""
  31. try:
  32. self.connection = oracledb.connect(
  33. user=self.username,
  34. password=self.password,
  35. dsn=self.dsn
  36. )
  37. except oracledb.Error as e:
  38. print(f"数据库连接失败: {e}")
  39. def disconnect(self):
  40. """关闭数据库连接"""
  41. if self.connection:
  42. self.connection.close()
  43. def execute_query(self, sql: str, params: Optional[Dict] = None) -> List[Dict[str, Any]]:
  44. """
  45. 执行查询语句
  46. :param sql: SQL 查询语句
  47. :param params: 查询参数(可选)
  48. :return: 查询结果列表,每行是一个字典
  49. """
  50. results = []
  51. try:
  52. cursor = self.connection.cursor()
  53. cursor.execute(sql, params or {})
  54. columns = [col[0] for col in cursor.description] # 获取列名
  55. for row in cursor:
  56. results.append(dict(zip(columns, row))) # 将每行数据转换为字典
  57. cursor.close()
  58. except oracledb.Error as e:
  59. print(f"查询执行失败: {e}")
  60. return results
  61. def execute_update(self, sql: str, params: Optional[Dict] = None) -> int:
  62. """
  63. 执行更新语句(INSERT/UPDATE/DELETE)
  64. :param sql: SQL 更新语句
  65. :param params: 更新参数(可选)
  66. :return: 受影响的行数
  67. """
  68. try:
  69. cursor = self.connection.cursor()
  70. cursor.execute(sql, params or {})
  71. self.connection.commit()
  72. row_count = cursor.rowcount
  73. cursor.close()
  74. return row_count
  75. except oracledb.Error as e:
  76. self.connection.rollback()
  77. print(f"更新执行失败: {e}")
  78. return -1
  79. def execute_many(self, sql: str, data: List[Dict]) -> int:
  80. """
  81. 批量执行更新语句
  82. :param sql: SQL 更新语句
  83. :param data: 批量数据(列表中的每个字典对应一行数据)
  84. :return: 受影响的总行数
  85. """
  86. try:
  87. cursor = self.connection.cursor()
  88. cursor.executemany(sql, data)
  89. self.connection.commit()
  90. row_count = cursor.rowcount
  91. cursor.close()
  92. return row_count
  93. except oracledb.Error as e:
  94. self.connection.rollback()
  95. print(f"批量更新执行失败: {e}")
  96. return 0
  97. def call_procedure(self, procedure_name: str, params: Optional[List] = None):
  98. """
  99. 调用存储过程
  100. :param procedure_name: 存储过程名称
  101. :param params: 存储过程参数(可选)
  102. """
  103. try:
  104. cursor = self.connection.cursor()
  105. cursor.callproc(procedure_name, params or [])
  106. self.connection.commit()
  107. cursor.close()
  108. except oracledb.Error as e:
  109. self.connection.rollback()
  110. print(f"存储过程调用失败: {e}")