Commit 0d4d91f1 authored by Barry Warsaw's avatar Barry Warsaw

Ported to pybsddb 4.1 (experimental), these changes are actually the

right thing to do anyway, and should work just find for earlier
versions of the wrapper.  Specifically,

_dorecovery(), _collect_objs(), _mark(): transactionally protect
.truncate() and .consume()

_docommit(): Move the setting of the _pending flag to here from
_finish() so we can transactionally protect it more conveniently.
This does't change the semantics and the recovery code in _setupDBs()
doesn't care since the flag will already be set to COMMIT.
parent 86e7e1c5
......@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support.
"""
__version__ = '$Revision: 1.55 $'.split()[-2:][0]
__version__ = '$Revision: 1.56 $'.split()[-2:][0]
import sys
import time
......@@ -252,8 +252,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# If these tables are non-empty, it means we crashed during a pack
# operation. I think we can safely throw out this data since the next
# pack operation will reproduce it faithfully.
self._oidqueue.truncate()
self._packmark.truncate()
txn = self._env.txn_begin()
try:
self._oidqueue.truncate(txn)
self._packmark.truncate(txn)
except:
txn.abort()
raise
else:
txn.commit()
# The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys()
......@@ -381,6 +388,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._doabort, tid)
def _docommit(self, txn, tid):
self._pending.put(self._serial, COMMIT, txn)
# Almost all the data's already written by now so we don't need to do
# much more than update reference counts. Even there, our work is
# easy because we're not going to decref anything here.
......@@ -456,7 +464,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._dobegin, self._serial, u, d, e)
def _finish(self, tid, u, d, e):
self._pending[self._serial] = COMMIT
self._withtxn(self._docommit, self._serial)
self.__ltid = tid
......@@ -1494,7 +1501,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._delqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._delqueue.consume()
orec = self._delqueue.consume(txn)
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
......@@ -1558,7 +1565,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._delqueue.consume()
orec = self._delqueue.consume(txn)
assert len(self._delqueue) == 0
def _findrev(self, oid, packtid, txn):
......@@ -1623,7 +1630,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
for oid in refdoids:
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume()
rec = self._oidqueue.consume(txn)
oid = rec and rec[1]
assert len(self._oidqueue) == 0
......
......@@ -15,7 +15,7 @@
"""Berkeley storage with full undo and versioning support.
"""
__version__ = '$Revision: 1.55 $'.split()[-2:][0]
__version__ = '$Revision: 1.56 $'.split()[-2:][0]
import sys
import time
......@@ -252,8 +252,15 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
# If these tables are non-empty, it means we crashed during a pack
# operation. I think we can safely throw out this data since the next
# pack operation will reproduce it faithfully.
self._oidqueue.truncate()
self._packmark.truncate()
txn = self._env.txn_begin()
try:
self._oidqueue.truncate(txn)
self._packmark.truncate(txn)
except:
txn.abort()
raise
else:
txn.commit()
# The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys()
......@@ -381,6 +388,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._doabort, tid)
def _docommit(self, txn, tid):
self._pending.put(self._serial, COMMIT, txn)
# Almost all the data's already written by now so we don't need to do
# much more than update reference counts. Even there, our work is
# easy because we're not going to decref anything here.
......@@ -456,7 +464,6 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._withtxn(self._dobegin, self._serial, u, d, e)
def _finish(self, tid, u, d, e):
self._pending[self._serial] = COMMIT
self._withtxn(self._docommit, self._serial)
self.__ltid = tid
......@@ -1494,7 +1501,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
self._delqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._delqueue.consume()
orec = self._delqueue.consume(txn)
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
......@@ -1558,7 +1565,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._delqueue.consume()
orec = self._delqueue.consume(txn)
assert len(self._delqueue) == 0
def _findrev(self, oid, packtid, txn):
......@@ -1623,7 +1630,7 @@ class Full(BerkeleyBase, ConflictResolvingStorage):
for oid in refdoids:
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume()
rec = self._oidqueue.consume(txn)
oid = rec and rec[1]
assert len(self._oidqueue) == 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment