Commit 4b3ea677 authored by Jim Fulton's avatar Jim Fulton

Added support for tpc_vote protocol. This makes the two-phase commit

implementation a little more orthodox and, more importantly, allows
ZEO perform stores asynchronously (and then synchronize on the
vote).
parent 069da393
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.30 2000/04/20 15:30:48 chrism Exp $""" $Id: Connection.py,v 1.31 2000/05/05 19:53:58 jim Exp $"""
__version__='$Revision: 1.30 $'[11:-2] __version__='$Revision: 1.31 $'[11:-2]
from cPickleCache import PickleCache from cPickleCache import PickleCache
from POSException import ConflictError, ExportError from POSException import ConflictError, ExportError
...@@ -239,7 +239,7 @@ class Connection(ExportImport.ExportImport): ...@@ -239,7 +239,7 @@ class Connection(ExportImport.ExportImport):
self._debug_info=() self._debug_info=()
db._closeConnection(self) db._closeConnection(self)
def commit(self, object, transaction): def commit(self, object, transaction, _type=type, _st=type('')):
oid=object._p_oid oid=object._p_oid
invalid=self._invalid invalid=self._invalid
if oid is None or object._p_jar is not self: if oid is None or object._p_jar is not self:
...@@ -292,9 +292,11 @@ class Connection(ExportImport.ExportImport): ...@@ -292,9 +292,11 @@ class Connection(ExportImport.ExportImport):
dbstore=self._storage.store dbstore=self._storage.store
file=file.getvalue file=file.getvalue
cache=self._cache cache=self._cache
get=cache.get
dump=pickler.dump dump=pickler.dump
clear_memo=pickler.clear_memo clear_memo=pickler.clear_memo
version=self._version version=self._version
while stack: while stack:
...@@ -329,20 +331,34 @@ class Connection(ExportImport.ExportImport): ...@@ -329,20 +331,34 @@ class Connection(ExportImport.ExportImport):
dump((klass,args)) dump((klass,args))
dump(state) dump(state)
p=file(1) p=file(1)
object._p_serial=dbstore(oid,serial,p,version,transaction) s=dbstore(oid,serial,p,version,transaction)
if s:
# Note that if s is false, then the storage defered the return
if _type(s) is _st:
# normal case
object._p_serial=s
object._p_changed=0 object._p_changed=0
else:
# defered returns
for oi, s in s:
o=get(oi, oi)
if o is not oi:
o._p_serial=s
o._p_changed=0
try: cache[oid]=object try: cache[oid]=object
except: except:
# Dang, I bet its wrapped: # Dang, I bet its wrapped:
if hasattr(object, 'aq_base'): if hasattr(object, 'aq_base'):
cache[oid]=object.aq_base cache[oid]=object.aq_base
def commit_sub(self, t): def commit_sub(self, t,
_type=type, _st=type(''), _None=None):
tmp=self._tmp tmp=self._tmp
if tmp is None: return if tmp is _None: return
src=self._storage src=self._storage
self._storage=tmp self._storage=tmp
self._tmp=None self._tmp=_None
tmp.tpc_begin(t) tmp.tpc_begin(t)
...@@ -357,8 +373,15 @@ class Connection(ExportImport.ExportImport): ...@@ -357,8 +373,15 @@ class Connection(ExportImport.ExportImport):
for oid in oids: for oid in oids:
data, serial = load(oid, src) data, serial = load(oid, src)
s=store(oid, serial, data, dest, t) s=store(oid, serial, data, dest, t)
o=get(oid, None) if s:
if o is not None: o._p_serial=s if _type(s) is _st:
o=get(oid, _None)
if o is not _None: o._p_serial=s
else:
for oid, s in s:
o=get(oid, _None)
if o is not _None: o._p_serial=s
def abort_sub(self, t): def abort_sub(self, t):
tmp=self._tmp tmp=self._tmp
...@@ -468,6 +491,19 @@ class Connection(ExportImport.ExportImport): ...@@ -468,6 +491,19 @@ class Connection(ExportImport.ExportImport):
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction):
try: vote=self._storage.tpc_vote
except: return
s=vote(transaction)
if s:
get=self._cache.get
for oid, s in s:
o=get(oid, oid)
if o is not oid:
o._p_serial=s
o._p_changed=0
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
self._storage.tpc_finish(transaction, self.tpc_finish_) self._storage.tpc_finish(transaction, self.tpc_finish_)
self._cache.invalidate(self._invalidated) self._cache.invalidate(self._invalidated)
......
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
############################################################################## ##############################################################################
"""Transaction management """Transaction management
$Id: Transaction.py,v 1.17 1999/11/10 22:17:18 klm Exp $""" $Id: Transaction.py,v 1.18 2000/05/05 19:55:19 jim Exp $"""
__version__='$Revision: 1.17 $'[11:-2] __version__='$Revision: 1.18 $'[11:-2]
import time, sys, struct, POSException import time, sys, struct, POSException
from struct import pack from struct import pack
...@@ -245,6 +245,10 @@ class Transaction: ...@@ -245,6 +245,10 @@ class Transaction:
j.commit_sub(self) j.commit_sub(self)
for jar in jars.values():
if not subtransaction:
jar.tpc_vote(self) # last chance to bail
except: except:
t,v,tb=sys.exc_info() t,v,tb=sys.exc_info()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment