Commit 30e6b67a authored by Jeremy Hylton's avatar Jeremy Hylton

Fix deadlock problem reported by John D. Heintz.

If one thread was committing a transaction and another thread was
opening a new DB connection, deadlock could occur.  The cause of the
deadlock is that tpc_finish() acquires the storage and db locks in a
different order than DB.open().  As a result, if each starts at the
same time and gets one of the two locks it needs, the system will be
deadlocked.

The solution is to enforce a consistent locking order.  If a thread is
going to hold the DB lock and the storage lock, it MUST acquire the DB
lock first.  This patch implements that locking order for the
invalidation in tpc_finish().

The DB object gets methods called begin_invalidation() and
finish_invalidation() that acquire and release the DB lock
respectively.  Before the Connection calls tpc_finish() on the
storage, it calls begin_invalidation().  This guarantees that the DB
acquired before the storage lock.

When the invalidation phase is over, the Connection calls
end_invalidation() to release the DB lock.  This is an optimization.
It could wait until tpc_finish() returns, but we know that the DB will
not be used again for the rest of the tpc_finish() and tpc_finish()
could take a long time.

Specific changes:

DB.py
begin_invalidation(): Added.
finish_invalidation(): Added.
invalidate(): Remove locking.
invalidateMany(): Add comment about how it should be used.

Connection.py
tpc_finish(): Don't pass second argument to storage's tpc_finish()
    when committing a transaction with no data.  Add call to
    begin_invalidation() before calling storage's tpc_finish().
_invalidate_sub(): Remove empty, unnecessary method.
_invalidating_invalidating(): Add call to finish_invalidation() after
    last call to DB's invalidate().
parent 4fc9694d
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.53 2001/05/17 18:35:10 shane Exp $""" $Id: Connection.py,v 1.54 2001/05/21 22:45:38 jeremy Exp $"""
__version__='$Revision: 1.53 $'[11:-2] __version__='$Revision: 1.54 $'[11:-2]
from cPickleCache import PickleCache from cPickleCache import PickleCache
from POSException import ConflictError, ExportError from POSException import ConflictError, ExportError
...@@ -683,18 +683,19 @@ class Connection(ExportImport.ExportImport): ...@@ -683,18 +683,19 @@ class Connection(ExportImport.ExportImport):
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
# It's important that the storage call the function we pass # It's important that the storage call the function we pass
# (self.tpc_finish_) while it still has it's lock. We don't # (self._invalidate_invalidating) while it still has it's
# want another thread to be able to read any updated data # lock. We don't want another thread to be able to read any
# until we've had a chance to send an invalidation message to # updated data until we've had a chance to send an
# all of the other connections! # invalidation message to all of the other connections!
if self._tmp is not None: if self._tmp is not None:
# Commiting a subtransaction! # Commiting a subtransaction!
# There is no need to invalidate anything. # There is no need to invalidate anything.
self._storage.tpc_finish(transaction, self._invalidate_sub) self._storage.tpc_finish(transaction)
self._storage._creating[:0]=self._creating self._storage._creating[:0]=self._creating
del self._creating[:] del self._creating[:]
else: else:
self._db.begin_invalidation()
self._storage.tpc_finish(transaction, self._storage.tpc_finish(transaction,
self._invalidate_invalidating) self._invalidate_invalidating)
...@@ -703,13 +704,9 @@ class Connection(ExportImport.ExportImport): ...@@ -703,13 +704,9 @@ class Connection(ExportImport.ExportImport):
def _invalidate_invalidating(self): def _invalidate_invalidating(self):
invalidate=self._db.invalidate invalidate=self._db.invalidate
for oid in self._invalidating: invalidate(oid, self) for oid in self._invalidating:
invalidate(oid, self)
def _invalidate_sub(self): self._db.finish_invalidation()
# There's no point in invalidating any objects in a subtransaction
#
# Because we may ultimately abort the containing transaction.
pass
def sync(self): def sync(self):
get_transaction().abort() get_transaction().abort()
......
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Database objects """Database objects
$Id: DB.py,v 1.29 2001/05/17 18:35:10 shane Exp $""" $Id: DB.py,v 1.30 2001/05/21 22:45:38 jeremy Exp $"""
__version__='$Revision: 1.29 $'[11:-2] __version__='$Revision: 1.30 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection from Connection import Connection
...@@ -311,6 +311,16 @@ class DB(UndoLogCompatible.UndoLogCompatible): ...@@ -311,6 +311,16 @@ class DB(UndoLogCompatible.UndoLogCompatible):
def importFile(self, file): def importFile(self, file):
raise 'Not yet implemented' raise 'Not yet implemented'
def begin_invalidation(self):
# Must be called before first call to invalidate and before
# the storage lock is held.
self._a()
def finish_invalidation(self):
# Must be called after begin_invalidation() and after final
# invalidate() call.
self._r()
def invalidate(self, oid, connection=None, version='', def invalidate(self, oid, connection=None, version='',
rc=sys.getrefcount): rc=sys.getrefcount):
"""Invalidate references to a given oid. """Invalidate references to a given oid.
...@@ -320,40 +330,37 @@ class DB(UndoLogCompatible.UndoLogCompatible): ...@@ -320,40 +330,37 @@ class DB(UndoLogCompatible.UndoLogCompatible):
passed in to prevent useless (but harmless) messages to the passed in to prevent useless (but harmless) messages to the
connection. connection.
""" """
if connection is not None: version=connection._version if connection is not None:
self._a() version=connection._version
try: # Update modified in version cache
h=hash(oid)%131
# Update modified in version cache o=self._miv_cache.get(h, None)
h=hash(oid)%131 if o is not None and o[0]==oid: del self._miv_cache[h]
cache=self._miv_cache
o=cache.get(h, None) # Notify connections
if o and o[0]==oid: del cache[h] for pool, allocated in self._pools[1]:
for cc in allocated:
# Notify connections if (cc is not connection and
pools,pooll=self._pools (not version or cc._version==version)):
for pool, allocated in pooll: if rc(cc) <= 3:
for cc in allocated: cc.close()
cc.invalidate(oid)
temps=self._temps
if temps:
t=[]
for cc in temps:
if rc(cc) > 3:
if (cc is not connection and if (cc is not connection and
(not version or cc._version==version)): (not version or cc._version==version)):
if rc(cc) <= 3:
cc.close()
cc.invalidate(oid) cc.invalidate(oid)
t.append(cc)
temps=self._temps else: cc.close()
if temps: self._temps=t
t=[]
for cc in temps:
if rc(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
cc.invalidate(oid)
t.append(cc)
else: cc.close()
self._temps=t
finally: self._r()
def invalidateMany(self, oids=None, version=''): def invalidateMany(self, oids=None, version=''):
# XXX Callers of this method need to call begin_invalidation()
# and finish_invalidation() to get the right locking
if oids is None: self.invalidate(None, version=version) if oids is None: self.invalidate(None, version=version)
else: else:
for oid in oids: self.invalidate(oid, version=version) for oid in oids: self.invalidate(oid, version=version)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment