Commit 1b722824 authored by Vincent Pelletier's avatar Vincent Pelletier

Fix big thread concurency mistake in ZMySQLDA/db.py: the object registerable...

Fix big thread concurency mistake in ZMySQLDA/db.py: the object registerable to transaction manager was the one chared by all threads, preventing multiple threads from effectively registering to transaction manager. This caused MySQL deadlocks, since commits were never issued in all concurent threads but one. This patch creates a new intermediate object between DB and DA which just handles pooling - and factorising connection string parsing and server capabilities probing.
Update DA so it uses the intermediate class.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13846 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent a7a6ad1f
......@@ -89,7 +89,7 @@ $Id: DA.py,v 1.4 2001/08/09 20:16:36 adustman Exp $''' % database_type
__version__='$Revision: 1.4 $'[11:-2]
import os
from db import DB
from db import ThreadedDB
import Shared.DC.ZRDB.Connection, sys, DABase
from App.Dialogs import MessageDialog
from Globals import HTMLFile
......@@ -120,7 +120,7 @@ class Connection(DABase.Connection):
manage_properties=HTMLFile('connectionEdit', globals())
def factory(self): return DB
def factory(self): return ThreadedDB
def connect(self, s):
try:
......@@ -128,13 +128,13 @@ class Connection(DABase.Connection):
self._v_connected = ''
pool_key = self.getPhysicalPath()
connection = database_connection_pool.get(pool_key)
if connection is not None and connection.connection == s:
if connection is not None and connection._connection == s:
self._v_database_connection = connection
else:
if connection is not None:
connection.closeConnection()
DB = self.factory()
database_connection_pool[pool_key] = DB(s)
ThreadedDB = self.factory()
database_connection_pool[pool_key] = ThreadedDB(s)
self._v_database_connection = database_connection_pool[pool_key]
# XXX If date is used as such, it can be wrong because an existing
# connection may be reused. But this is suposedly only used as a
......
......@@ -106,13 +106,21 @@ from zLOG import LOG, ERROR, INFO
import string, sys
from string import strip, split, find, upper, rfind
from time import time
from thread import get_ident
from thread import get_ident, allocate_lock
# Connection is unusable, reconnect if possible (if so, don't re-raise the
# exception), otherwise re-raise the exception.
hosed_connection = (
CR.SERVER_GONE_ERROR,
CR.SERVER_LOST
)
# Connection is unusable and some earlier query caused the trouble.
# Reconnect *and* re-raise the exception.
dead_connection = (
CR.COMMANDS_OUT_OF_SYNC,
)
key_types = {
"PRI": "PRIMARY KEY",
"MUL": "INDEX",
......@@ -156,22 +164,12 @@ def int_or_long(s):
try: return int(s)
except: return long(s)
class DB(TM):
Database_Connection=_mysql.connect
Database_Error=_mysql.Error
def Database_Connection(self, *args, **kwargs):
return MySQLdb.connect(*args, **kwargs)
defs={
FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
}
class ThreadedDB:
"""
This class is an interface to DB.
Its caracteristic is that an instance of this class interfaces multiple
instanes of DB class, each one being bound to a specific thread.
"""
conv=conversions.copy()
conv[FIELD_TYPE.LONG] = int_or_long
......@@ -180,15 +178,19 @@ class DB(TM):
conv[FIELD_TYPE.DECIMAL] = float
del conv[FIELD_TYPE.TIME]
_p_oid=_p_changed=_registered=None
def __init__(self,connection):
self.connection=connection
self.kwargs = self._parse_connection_string(connection)
self.db = {}
self._finished_or_aborted = {}
db = self._getConnection()
transactional = db.server_capabilities & CLIENT.TRANSACTIONS
"""
Parse the connection string.
Initiate a trial connection with the database to check
transactionality once instead of once per DB instance.
"""
self._connection = connection
self._kw_args = self._parse_connection_string(connection)
self._db_pool = {}
self._db_lock = allocate_lock()
connection = MySQLdb.connect(**self._kw_args)
transactional = connection.server_capabilities & CLIENT.TRANSACTIONS
connection.close()
if self._try_transactions == '-':
transactional = 0
elif not transactional and self._try_transactions == '+':
......@@ -197,38 +199,6 @@ class DB(TM):
if self._mysql_lock:
self._use_TM = 1
def __del__(self):
self._cleanupConnections()
def _getFinishedOrAborted(self):
return self._finished_or_aborted[get_ident()]
def _setFinishedOrAborted(self, value):
self._finished_or_aborted[get_ident()] = value
def _cleanupConnections(self):
for db in self.db.itervalues():
db.close()
def _forceReconnection(self):
db = apply(self.Database_Connection, (), self.kwargs)
self.db[get_ident()] = db
return db
def _getConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is None:
db = self._forceReconnection()
return db
def _closeConnection(self):
ident = get_ident()
db = self.db.get(ident)
if db is not None:
db.close()
del self.db[ident]
def _parse_connection_string(self, connection):
kwargs = {'conv': self.conv}
items = split(connection)
......@@ -266,6 +236,74 @@ class DB(TM):
kwargs['unix_socket'], items = items[0], items[1:]
return kwargs
def _pool_set(self, key, value):
self._db_lock.acquire()
try:
self._db_pool[key] = value
finally:
self._db_lock.release()
def _pool_get(self, key):
self._db_lock.acquire()
try:
return self._db_pool.get(key)
finally:
self._db_lock.release()
def _access_db(self, method_id, args, kw):
"""
Generic method to call pooled objects' methods.
When the current thread had never issued any call, create a DB
instance.
"""
ident = get_ident()
db = self._pool_get(ident)
if db is None:
db = DB(kw_args=self._kw_args, use_TM=self._use_TM,
mysql_lock=self._mysql_lock,
transactions=self._transactions)
self._pool_set(ident, db)
return getattr(db, method_id)(*args, **kw)
def tables(self, *args, **kw):
return self._access_db(method_id='tables', args=args, kw=kw)
def columns(self, *args, **kw):
return self._access_db(method_id='columns', args=args, kw=kw)
def query(self, *args, **kw):
return self._access_db(method_id='query', args=args, kw=kw)
def string_literal(self, *args, **kw):
return self._access_db(method_id='string_literal', args=args, kw=kw)
class DB(TM):
defs={
FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d",
FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n",
FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i",
FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l",
FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d",
FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i",
}
_p_oid=_p_changed=_registered=None
def __init__(self, kw_args, use_TM, mysql_lock, transactions):
self._kw_args = kw_args
self._mysql_lock = mysql_lock
self._use_TM = use_TM
self._transactions = transactions
self._forceReconnection()
def __del__(self):
self.db.close()
def _forceReconnection(self):
db = MySQLdb.connect(**self._kw_args)
self.db = db
def tables(self, rdb=0,
_care=('TABLE', 'VIEW')):
r=[]
......@@ -331,18 +369,19 @@ class DB(TM):
because they are bound to the connection. This check can be
overridden by passing force_reconnect with True value.
"""
db = self._getConnection()
try:
db.query(query)
self.db.query(query)
except OperationalError, m:
if ((not force_reconnect) and \
(self._mysql_lock or self._transactions)) or \
m[0] not in hosed_connection:
m[0] not in (hosed_connection + dead_connection):
raise
# Hm. maybe the db is hosed. Let's restart it.
self._forceReconnection()
db.query(query)
return db.store_result()
if m[0] in dead_connection:
raise
self.db.query(query)
return self.db.store_result()
def query(self,query_string, max_rows=1000):
self._use_TM and self._register()
......@@ -380,10 +419,11 @@ class DB(TM):
return items, result
def string_literal(self, s):
return self._getConnection().string_literal(s)
return self.db.string_literal(s)
def _begin(self, *ignored):
try:
self._transaction_begun = True
# Ping the database to reconnect if connection was closed.
self._query("SELECT 1", force_reconnect=True)
if self._transactions:
......@@ -394,21 +434,20 @@ class DB(TM):
LOG('ZMySQLDA', ERROR, "exception during _begin",
error=sys.exc_info())
raise
self._setFinishedOrAborted(False)
def _finish(self, *ignored):
if self._getFinishedOrAborted():
if not self._transaction_begun:
return
self._setFinishedOrAborted(True)
self._transaction_begun = False
if self._mysql_lock:
self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
if self._transactions:
self._query("COMMIT")
def _abort(self, *ignored):
if self._getFinishedOrAborted():
if not self._transaction_begun:
return
self._setFinishedOrAborted(True)
self._transaction_begun = False
if self._mysql_lock:
self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock)
if self._transactions:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment