Commit f3a1b261 authored by Jeremy Hylton's avatar Jeremy Hylton

Add _handle_serial() helper that handles serialno responses from a

storage in all their many flavors.

This fixes a conflict resolution bug with subtransactions.
XXX Commit same change of zope-2_3-branch.

A related change in store() is to call _handle_serial() *after*
putting new objects in the cache.  This simplifies the logic of
_handle_serial() a bit, because every object should be in the cache.
parent 9b9fed54
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.50 2001/04/14 23:16:44 shane Exp $""" $Id: Connection.py,v 1.51 2001/05/10 23:00:14 jeremy Exp $"""
__version__='$Revision: 1.50 $'[11:-2] __version__='$Revision: 1.51 $'[11:-2]
from cPickleCache import PickleCache from cPickleCache import PickleCache
from POSException import ConflictError, ExportError from POSException import ConflictError, ExportError
...@@ -97,6 +97,7 @@ import Transaction, string, ExportImport, sys, traceback, TmpStore ...@@ -97,6 +97,7 @@ import Transaction, string, ExportImport, sys, traceback, TmpStore
from zLOG import LOG, ERROR, BLATHER from zLOG import LOG, ERROR, BLATHER
from coptimizations import new_persistent_id from coptimizations import new_persistent_id
from ConflictResolution import ResolvedSerial from ConflictResolution import ResolvedSerial
from types import StringType
ExtensionKlass=Base.__class__ ExtensionKlass=Base.__class__
...@@ -386,34 +387,9 @@ class Connection(ExportImport.ExportImport): ...@@ -386,34 +387,9 @@ class Connection(ExportImport.ExportImport):
dump(state) dump(state)
p=file(1) p=file(1)
s=dbstore(oid,serial,p,version,transaction) s=dbstore(oid,serial,p,version,transaction)
if s: # Put the object in the cache before handling the
# Note that if s is false, then the storage defered the return # response, just in case the response contains the
if _type(s) is _st: # serial number for a newly created object
# normal case
if s == ResolvedSerial:
# resolved conflict
object._p_changed=None
else:
object._p_serial=s
object._p_changed=0
else:
# defered returns
for oi, s in s:
if _type(s) is not _st: raise s
o=get(oi, oi)
if o is not oi:
if s == ResolvedSerial:
o._p_changed=None
else:
o._p_serial=s
o._p_changed=0
elif oi == oid:
if s == ResolvedSerial:
object._p_changed=None
else:
object._p_serial=s
object._p_changed=0
try: cache[oid]=object try: cache[oid]=object
except: except:
# Dang, I bet its wrapped: # Dang, I bet its wrapped:
...@@ -422,6 +398,8 @@ class Connection(ExportImport.ExportImport): ...@@ -422,6 +398,8 @@ class Connection(ExportImport.ExportImport):
else: else:
raise raise
self._handle_serial(s, oid)
def commit_sub(self, t, def commit_sub(self, t,
_type=type, _st=type(''), _None=None): _type=type, _st=type(''), _None=None):
"""Commit all work done in subtransactions""" """Commit all work done in subtransactions"""
...@@ -452,16 +430,7 @@ class Connection(ExportImport.ExportImport): ...@@ -452,16 +430,7 @@ class Connection(ExportImport.ExportImport):
for oid in oids: for oid in oids:
data, serial = load(oid, src) data, serial = load(oid, src)
s=store(oid, serial, data, dest, t) s=store(oid, serial, data, dest, t)
if s: self._handle_serial(s, oid, change=0)
if _type(s) is _st:
o=get(oid, _None)
if o is not _None: o._p_serial=s
else:
for oid, s in s:
if _type(s) is not _st: raise s
o=get(oid, _None)
if o is not _None: o._p_serial=s
def abort_sub(self, t): def abort_sub(self, t):
"""Abort work done in subtransactions""" """Abort work done in subtransactions"""
...@@ -639,17 +608,47 @@ class Connection(ExportImport.ExportImport): ...@@ -639,17 +608,47 @@ class Connection(ExportImport.ExportImport):
try: vote=self._storage.tpc_vote try: vote=self._storage.tpc_vote
except: return except: return
s=vote(transaction) s=vote(transaction)
if s: self._handle_serial(s)
get=self._cache.get
for oid, s in s: def _handle_serial(self, store_return, oid=None, change=1):
o=get(oid, oid) """Handle the returns from store() and tpc_vote() calls."""
if o is not oid:
if _type(s) is not _st: raise s # These calls can return different types depending on whether
if s == ResolvedSerial: # ZEO is used. ZEO uses asynchronous returns that may be
o._p_changed=None # returned in batches by the ClientStorage. ZEO1 can also
# return an exception object and expect that the Connection
# will raise the exception.
# When commit_sub() exceutes a store, there is no need to
# update the _p_changed flag, because the subtransaction
# tpc_voteh() calls already did this. The change=1 argument
# exists to allow commit_sub() to avoid setting the flag
# again.
if not store_return:
return
if isinstance(store_return, StringType):
assert oid is not None
serial = store_return
obj = self._cache.get(oid, None)
if serial == ResolvedSerial:
obj._p_changed = None
else:
if change:
obj._p_changed = 0
obj._p_serial = serial
else:
for oid, serial in store_return:
if not isinstance(serial, StringType):
raise serial
obj = self._cache.get(oid, None)
if obj is None:
continue
if serial == ResolvedSerial:
obj._p_changed = None
else: else:
o._p_serial=s if change:
o._p_changed=0 obj._p_changed = 0
obj._p_serial = serial
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment