Commit c223ab34 authored by Jim Fulton's avatar Jim Fulton

Fixed a bug that could cause database consistency problems

in cases where a transaction is aborted due to a storage error
(e.g. ConflictError) during two-phase commit and new objects
have been stored. The new objects were left in a state where they
incorrectly appeared to have been saved and be up to date.

Also removed unnecessary invalidation of objects in other connections
when commiting sub-transactions.
parent 13d7eb5e
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.43 2001/01/15 18:49:49 jim Exp $""" $Id: Connection.py,v 1.44 2001/01/18 18:21:19 jim Exp $"""
__version__='$Revision: 1.43 $'[11:-2] __version__='$Revision: 1.44 $'[11:-2]
from cPickleCache import PickleCache from cPickleCache import PickleCache
from POSException import ConflictError, ExportError from POSException import ConflictError, ExportError
...@@ -260,9 +260,11 @@ class Connection(ExportImport.ExportImport): ...@@ -260,9 +260,11 @@ class Connection(ExportImport.ExportImport):
oid=object._p_oid oid=object._p_oid
invalid=self._invalid invalid=self._invalid
if oid is None or object._p_jar is not self: if oid is None or object._p_jar is not self:
# new object
oid = self.new_oid() oid = self.new_oid()
object._p_jar=self object._p_jar=self
object._p_oid=oid object._p_oid=oid
self._creating.append(oid)
elif object._p_changed: elif object._p_changed:
if invalid(oid) or invalid(None): raise ConflictError, `oid` if invalid(oid) or invalid(None): raise ConflictError, `oid`
...@@ -321,7 +323,14 @@ class Connection(ExportImport.ExportImport): ...@@ -321,7 +323,14 @@ class Connection(ExportImport.ExportImport):
del stack[-1] del stack[-1]
oid=object._p_oid oid=object._p_oid
serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0') serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
if invalid(oid): raise ConflictError, `oid` if serial == '\0\0\0\0\0\0\0\0':
# new object
self._creating.append(oid)
else:
#XXX We should never get here
if invalid(oid) or invalid(None): raise ConflictError, `oid`
self._invalidating.append(oid)
klass = object.__class__ klass = object.__class__
if klass is ExtensionKlass: if klass is ExtensionKlass:
...@@ -390,8 +399,12 @@ class Connection(ExportImport.ExportImport): ...@@ -390,8 +399,12 @@ class Connection(ExportImport.ExportImport):
dest=self._version dest=self._version
get=self._cache.get get=self._cache.get
oids=src._index.keys() oids=src._index.keys()
# Copy invalidating and creating info from temporary storage:
invalidating=self._invalidating invalidating=self._invalidating
invalidating[len(invalidating):]=oids invalidating[len(invalidating):]=oids
creating=self._creating
creating[len(creating):]=src._creating
for oid in oids: for oid in oids:
data, serial = load(oid, src) data, serial = load(oid, src)
...@@ -408,12 +421,34 @@ class Connection(ExportImport.ExportImport): ...@@ -408,12 +421,34 @@ class Connection(ExportImport.ExportImport):
def abort_sub(self, t): def abort_sub(self, t):
"""Abort work done in subtransactions"""
tmp=self._tmp tmp=self._tmp
if tmp is None: return if tmp is None: return
src=self._storage src=self._storage
self._tmp=None self._tmp=None
self._storage=tmp self._storage=tmp
self._cache.invalidate(src._index.keys()) self._cache.invalidate(src._index.keys())
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction.
"""
if creating is None:
creating=self._creating
self._creating=[]
cache=self._cache
cache_get=cache.get
for oid in creating:
o=cache_get(oid, None)
if o is not None:
del o._p_jar
del o._p_oid
del cache[oid]
#XXX
def db(self): return self._db def db(self): return self._db
...@@ -520,11 +555,13 @@ class Connection(ExportImport.ExportImport): ...@@ -520,11 +555,13 @@ class Connection(ExportImport.ExportImport):
cache=self._cache cache=self._cache
cache.invalidate(self._invalidated) cache.invalidate(self._invalidated)
cache.invalidate(self._invalidating) cache.invalidate(self._invalidating)
self._invalidate_creating()
def tpc_begin(self, transaction, sub=None): def tpc_begin(self, transaction, sub=None):
if self._invalid(None): # Some nitwit invalidated everything! if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, "transaction already invalidated" raise ConflictError, "transaction already invalidated"
self._invalidating=[] self._invalidating=[]
self._creating=[]
if sub: if sub:
# Sub-transaction! # Sub-transaction!
...@@ -559,15 +596,30 @@ class Connection(ExportImport.ExportImport): ...@@ -559,15 +596,30 @@ class Connection(ExportImport.ExportImport):
# want another thread to be able to read any updated data # want another thread to be able to read any updated data
# until we've had a chance to send an invalidation message to # until we've had a chance to send an invalidation message to
# all of the other connections! # all of the other connections!
self._storage.tpc_finish(transaction, self.tpc_finish_) if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction, self._invalidate_sub)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
self._storage.tpc_finish(transaction,
self._invalidate_invalidating)
self._cache.invalidate(self._invalidated) self._cache.invalidate(self._invalidated)
self._incrgc() # This is a good time to do some GC self._incrgc() # This is a good time to do some GC
def tpc_finish_(self): def _invalidate_invalidating(self):
invalidate=self._db.invalidate invalidate=self._db.invalidate
for oid in self._invalidating: invalidate(oid, self) for oid in self._invalidating: invalidate(oid, self)
def _invalidate_sub(self):
# There's no point in invalidating any objects in a subtransaction
#
# Because we may ultimately abort the containing transaction.
pass
def sync(self): def sync(self):
get_transaction().abort() get_transaction().abort()
sync=getattr(self._storage, 'sync', 0) sync=getattr(self._storage, 'sync', 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment