2022年 11月 6日

python多线程并发访问&数据库连接池原理以及代码

现状

在工作中难免会使用数据库,为了能够高效并发访问数据库,数据库连接池必不可少,由于本站copy模式盛行,导致数据库连接池被错误使用,遇到错误甚至追求能跑通就行。

本文就python版本的数据库链接池模块在实际使用场景样例,来说明如何正确合理的使用数据库连接池。

业务场景 

在部署机器学习模型时采用的是flask框架,模型预测本身是一个很快的事情,无奈有太多的特征需要通过接口(或者是ots,mysql等)获取,导致响应时效性降低。

为了能很好的实现并发性,提升QPS,采用gunicorn进行多进程,异步处理方案。

此时单个进程只有一个数据库链接,就会导致异步执行的线程共用同一个连接,从而导致报错,引入数据库连接池是必须的。

数据库连接池原理

通过预先建立链接,放到然后list中,使用的时候,从list中取出一个链接,使用使用完成后归还连接。当线程太多,链接池中没有链接的时候,可以选择block,等到有链接可用的时候返回,或者是直接返回错误。

dbutils已经实现了两种pooldb:

PooledDB :可以被多线程共享的链接,适用于异步场景,不断有新线程进来获取连接池,本文使用该方案。
PersistentDB:下面这句话表示,对线程的要求是持续稳定的,不能产生新的线程。

Measures are taken to make the database connections thread-affine.
This means the same thread always uses the same cached connection,
and no other thread will use it.  So even if the underlying DB-API module
is not thread-safe at the connection level this will be no problem here.

For best performance, the application server should keep threads persistent.

dbutils结构如下

db结尾的是mysql等数据库专用。

pg结尾的是PostgreSQL专用。

如上交代完之后,相信你对数据库链接池有较为全面的认知了,好了具体实现代码如下:

主要代码框架逻辑:

1、初始化连接池

2、获取链接

3、查询数据库

4、close链接(返回给连接池,并不是真正的关闭连接池)

5、具体数据查询&解析逻辑根据业务修改,此处提供了sql_fetch_json函数,返回json格式数据。

6、test1为多线程测试,此处自己多运行体会查询结果。

  1. # coding=utf-8
  2. import random
  3. import threading
  4. from dbutils.pooled_db import PooledDB
  5. from dbutils.persistent_db import PersistentDB
  6. import time
  7. import pymysql
  8. from configuration.config import system_logger, db_config
  9. class MysqlHelper(object):
  10. def __init__(self, db_config):
  11. self.__pool = PooledDB(creator=pymysql,
  12. mincached=1,
  13. maxcached=5,
  14. maxshared=5,
  15. maxconnections=5,
  16. maxusage=5,
  17. blocking=True,
  18. user=db_config.get('user'),
  19. passwd=db_config.get('password'),
  20. db=db_config.get('database'),
  21. host=db_config.get('host'),
  22. port=db_config.get('port'),
  23. charset=db_config.get('charset'),
  24. )
  25. def getConn(self):
  26. conn = self.__pool.connection() # 从连接池获取一个链接
  27. cursor = conn.cursor()
  28. return conn, cursor
  29. @staticmethod
  30. def dispose(cursor, conn):
  31. cursor.close()
  32. conn.close()
  33. def getOne(self, sql):
  34. conn, cursor = self.getConn()
  35. th_name = threading.currentThread().getName()
  36. # print(f'{th_name} {self.conn} {self.cursor} {time.time():.4f} start {sql}')
  37. cursor.execute(sql)
  38. rows = cursor.fetchall()
  39. print(f"{th_name} {conn} {cursor} {time.time():.4f} {rows}")
  40. # self.dispose()
  41. self.dispose(cursor, conn)
  42. return rows
  43. def queryOne(self, sql):
  44. system_logger.info("----------------------sql start ----------------------")
  45. system_logger.info(sql)
  46. try:
  47. conn, cursor = self.getConn()
  48. result = cursor.execute(sql)
  49. # rows = cursor.fetchall()
  50. json_data = self.sql_fetch_json(cursor)
  51. # 将连接返回
  52. self.dispose(cursor, conn)
  53. system_logger.info(f"-----------------------queryByKey result:{result} " + str(json_data))
  54. if len(json_data) == 1:
  55. return json_data[0]
  56. return None
  57. except Exception as e:
  58. system_logger.info("-----------predict exception line: " + str(e.__traceback__.tb_lineno) + " of " +
  59. e.__traceback__.tb_frame.f_globals["__file__"])
  60. system_logger.info(e)
  61. return None
  62. @staticmethod
  63. def sql_fetch_json(cursor: pymysql.cursors.Cursor):
  64. """ Convert the pymysql SELECT result to json format """
  65. keys = []
  66. for column in cursor.description:
  67. keys.append(column[0])
  68. key_number = len(keys)
  69. json_data = []
  70. for row in cursor.fetchall():
  71. item = dict()
  72. for q in range(key_number):
  73. item[keys[q]] = row[q]
  74. json_data.append(item)
  75. return json_data
  76. def test1(pool):
  77. phone_no = f"1390709000{random.randint(6,7)}"
  78. strsql = f"select * from zy_phone where policy_holder_phone_no={phone_no} order by insure_date " \
  79. + "desc, kafka_etl_time asc limit 1 "
  80. while True:
  81. time.sleep(1)
  82. pool.getOne(strsql)
  83. # time.sleep(0.001)
  84. j = 0
  85. th_name = threading.currentThread().getName()
  86. # if th_name in ['Thread-2','Thread-5']:
  87. # # print(f"task {th_name}")
  88. # time.sleep(0.003)
  89. def main(pool):
  90. # pool.getConn()
  91. ths = []
  92. for i in range(5):
  93. th = threading.Thread(target=test1, args=(pool,))
  94. ths.append(th)
  95. for th in ths:
  96. th.start()
  97. for th in ths:
  98. th.join()
  99. if __name__ == "__main__":
  100. mysqlhelper = MysqlHelper(db_config)
  101. main(mysqlhelper)
  102. time.sleep(3)
  103. while True:
  104. time.sleep(1)

常见错误使用方法1:

 def getConn(self):
      self.conn = self.__pool.connection()
      self.cursor = self.conn.cursor() 
此处不应该共享链接,和cursor,会导致报错:

AttributeError: ‘NoneType’ object has no attribute ‘read’

或者:

AttributeError: ‘NoneType’ object has no attribute ‘settimeout‘

常见错误使用方法2:

获取链接以及查询的时候加锁

lock.acquire()
pool.getConn()
pool.getOne(strsql)
lock.release()
time.sleep(1)

因为pooldb本身就会加锁,参见如下源码中,自己在从链接池获取链接,到cursor获取数据的时候加锁,会导致锁冗余,此时连接池会退化成单个数据库链接。

self.__pool.connection() 逻辑如下:

  1. def connection(self, shareable=True):
  2. """Get a steady, cached DB-API 2 connection from the pool.
  3. If shareable is set and the underlying DB-API 2 allows it,
  4. then the connection may be shared with other threads.
  5. """
  6. if shareable and self._maxshared:
  7. with self._lock:
  8. while (not self._shared_cache and self._maxconnections
  9. and self._connections >= self._maxconnections):
  10. self._wait_lock()
  11. if len(self._shared_cache) < self._maxshared:
  12. # shared cache is not full, get a dedicated connection
  13. try: # first try to get it from the idle cache
  14. con = self._idle_cache.pop(0)
  15. except IndexError: # else get a fresh connection
  16. con = self.steady_connection()
  17. else:
  18. con._ping_check() # check this connection
  19. con = SharedDBConnection(con)
  20. self._connections += 1
  21. else: # shared cache full or no more connections allowed
  22. self._shared_cache.sort() # least shared connection first
  23. con = self._shared_cache.pop(0) # get it
  24. while con.con._transaction:
  25. # do not share connections which are in a transaction
  26. self._shared_cache.insert(0, con)
  27. self._wait_lock()
  28. self._shared_cache.sort()
  29. con = self._shared_cache.pop(0)
  30. con.con._ping_check() # check the underlying connection
  31. con.share() # increase share of this connection
  32. # put the connection (back) into the shared cache
  33. self._shared_cache.append(con)
  34. self._lock.notify()
  35. con = PooledSharedDBConnection(self, con)
  36. else: # try to get a dedicated connection
  37. with self._lock:
  38. while (self._maxconnections
  39. and self._connections >= self._maxconnections):
  40. self._wait_lock()
  41. # connection limit not reached, get a dedicated connection
  42. try: # first try to get it from the idle cache
  43. con = self._idle_cache.pop(0)
  44. except IndexError: # else get a fresh connection
  45. con = self.steady_connection()
  46. else:
  47. con._ping_check() # check connection
  48. con = PooledDedicatedDBConnection(self, con)
  49. self._connections += 1
  50. return con

到此本文结束,如果觉得有收获,就点个赞吧。