Commit 111ff445 authored by Barry Warsaw's avatar Barry Warsaw

Added pack and autopack support. Packing is only necessary to garbage

collect cycles.

Also, some code updating and re-org to factor code into the base
class.

_dostore(): Support conflict resolution.
parent d3071e49
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.13 $'[-2:][0] __version__ = '$Revision: 1.14 $'[-2:][0]
import time
import threading
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -29,15 +32,29 @@ from BerkeleyBase import BerkeleyBase ...@@ -29,15 +32,29 @@ from BerkeleyBase import BerkeleyBase
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try:
True, False
except NameError:
True = 1
False = 0
class Minimal(BerkeleyBase): class Minimal(BerkeleyBase, ConflictResolvingStorage):
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
# #
...@@ -77,17 +94,35 @@ class Minimal(BerkeleyBase): ...@@ -77,17 +94,35 @@ class Minimal(BerkeleyBase):
# no pending entry. It is a database invariant that if the # no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty. # pending table is empty, the oids table must also be empty.
# #
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
#
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
# It is also used during pack to list objects for which no more
# references exist, such that the objects can be completely packed
# away.
#
self._serials = self._setupDB('serials', db.DB_DUP) self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._oids = self._setupDB('oids') self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending') self._pending = self._setupDB('pending')
# Tables to support packing.
self._packmark = self._setupDB('packmark')
self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
pendings = self._pending.keys() pendings = self._pending.keys()
assert len(pendings) <= 1 assert len(pendings) <= 1
if len(pendings) == 0: if len(pendings) == 0:
assert len(self._oids) == 0 assert len(self._oids) == 0
return else:
# Do recovery # Do recovery
tid = pendings[0] tid = pendings[0]
flag = self._pending.get(tid) flag = self._pending.get(tid)
...@@ -95,35 +130,38 @@ class Minimal(BerkeleyBase): ...@@ -95,35 +130,38 @@ class Minimal(BerkeleyBase):
self._lock_acquire() self._lock_acquire()
try: try:
if flag == ABORT: if flag == ABORT:
self._do(self._doabort, tid) self._withtxn(self._doabort, tid)
else: else:
self._do(self._docommit, tid) self._withtxn(self._docommit, tid)
finally: finally:
self._lock_release() self._lock_release()
# Set up the autopacking thread
if self._config.frequency > 0:
config = self._config
self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start()
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._refcounts.close() self._refcounts.close()
self._oids.close() self._oids.close()
self._pending.close() self._pending.close()
self._packmark.close()
self._oidqueue.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
def _do(self, meth, tid): def _doabort(self, txn, tid):
txn = self._env.txn_begin()
try:
meth(tid, txn)
self._oids.truncate(txn)
self._pending.truncate(txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
def _doabort(self, tid, txn):
co = cs = None co = cs = None
try: try:
co = self._oids.cursor(txn=txn) co = self._oids.cursor(txn=txn)
...@@ -145,8 +183,14 @@ class Minimal(BerkeleyBase): ...@@ -145,8 +183,14 @@ class Minimal(BerkeleyBase):
# if co.close() were to fail. In practice this shouldn't happen. # if co.close() were to fail. In practice this shouldn't happen.
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
# We're done with these tables
self._oids.truncate(txn)
self._pending.truncate()
def _abort(self):
self._withtxn(self._doabort, self._serial)
def _docommit(self, tid, txn): def _docommit(self, txn, tid):
deltas = {} deltas = {}
co = cs = None co = cs = None
try: try:
...@@ -183,6 +227,9 @@ class Minimal(BerkeleyBase): ...@@ -183,6 +227,9 @@ class Minimal(BerkeleyBase):
# if co.close() were to fail. In practice this shouldn't happen. # if co.close() were to fail. In practice this shouldn't happen.
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
# We're done with this table
self._oids.truncate(txn)
self._pending.truncate()
# Now, to finish up, we need apply the refcount deltas to the # Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0 # refcounts table, and do recursive collection of all refcount == 0
# objects. # objects.
...@@ -192,9 +239,9 @@ class Minimal(BerkeleyBase): ...@@ -192,9 +239,9 @@ class Minimal(BerkeleyBase):
def _update_refcounts(self, deltas, txn): def _update_refcounts(self, deltas, txn):
newdeltas = {} newdeltas = {}
for oid, delta in deltas.items(): for oid, delta in deltas.items():
rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta refcount = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
assert rc >= 0 assert refcount >= 0
if rc == 0: if refcount == 0:
# The reference count for this object has just gone to zero, # The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials, # so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its # pickles and refcounts table. Note that before we remove its
...@@ -209,7 +256,7 @@ class Minimal(BerkeleyBase): ...@@ -209,7 +256,7 @@ class Minimal(BerkeleyBase):
self._refcounts.delete(oid, txn=txn) self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn) self._pickles.delete(oid+current, txn=txn)
else: else:
self._refcounts.put(oid, p64(rc), txn=txn) self._refcounts.put(oid, p64(refcount), txn=txn)
# Return the list of objects referenced by pickles just deleted in # Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round. # this round, for decref'ing on the next go 'round.
return newdeltas return newdeltas
...@@ -220,52 +267,48 @@ class Minimal(BerkeleyBase): ...@@ -220,52 +267,48 @@ class Minimal(BerkeleyBase):
# will be aborted. # will be aborted.
self._pending[self._serial] = ABORT self._pending[self._serial] = ABORT
def store(self, oid, serial, data, version, transaction): def _dostore(self, txn, oid, serial, data):
if transaction is not self._transaction: conflictresolved = False
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
oserial = self._getCurrentSerial(oid) oserial = self._getCurrentSerial(oid)
if oserial is not None and serial <> oserial: if oserial is not None and serial <> oserial:
# The object exists in the database, but the serial number # The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial # given in the call is not the same as the last stored serial
# number. Raise a ConflictError. # number. Raise a ConflictError.
# data = self.tryToResolveConflict(oid, oserial, serial, data)
# BAW: do application level conflict resolution if data:
conflictresolved = True
else:
raise POSException.ConflictError(serials=(oserial, serial)) raise POSException.ConflictError(serials=(oserial, serial))
# Optimistically write to the serials and pickles table. Be sure # Optimistically write to the serials and pickles table. Be sure
# to also update the oids table for this object too. # to also update the oids table for this object too.
newserial = self._serial newserial = self._serial
txn = self._env.txn_begin()
try:
self._serials.put(oid, newserial, txn=txn) self._serials.put(oid, newserial, txn=txn)
self._pickles.put(oid+newserial, data, txn=txn) self._pickles.put(oid+newserial, data, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
finally:
self._lock_release()
# Return the new serial number for the object # Return the new serial number for the object
if conflictresolved:
return ResolvedSerial
return newserial return newserial
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
return self._withtxn(self._dostore, oid, serial, data)
finally:
self._lock_release()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# Twiddle the pending flag to COMMIT now since after the vote call, we # Twiddle the pending flag to COMMIT now since after the vote call, we
# promise that the changes will be committed, no matter what. The # promise that the changes will be committed, no matter what. The
# recovery process will check this. # recovery process will check this.
self._pending[self._serial] = COMMIT self._pending[self._serial] = COMMIT
self._do(self._docommit, self._serial) self._withtxn(self._docommit, self._serial)
def _abort(self):
self._do(self._doabort, self._serial)
# #
# Accessor interface # Accessor interface
...@@ -326,3 +369,207 @@ class Minimal(BerkeleyBase): ...@@ -326,3 +369,207 @@ class Minimal(BerkeleyBase):
# So BaseStorage.getSerial() just works. Note that this storage # So BaseStorage.getSerial() just works. Note that this storage
# doesn't support versions. # doesn't support versions.
return '' return ''
#
# Packing. In Minimal storage, packing is only required to get rid of
# object cycles, since there are no old object revisions.
#
def pack(self, t, zreferencesf):
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
# the same.
assert zreferencesf == referencesf
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
#
# Also, we don't care about the pack time, since we don't need to
# collect object revisions
self._dopack()
finally:
self._packlock.release()
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished')
def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire()
try:
self._withtxn(self._mark)
finally:
self._lock_release()
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._lock_acquire()
try:
self._withtxn(self._sweep)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
def _mark(self, txn):
# Find the oids for all the objects reachable from the root. To
# reduce the amount of in-core memory we need do do a pack operation,
# we'll save the mark data in the packmark table. The oidqueue is a
# BerkeleyDB Queue that holds the list of object ids to look at next,
# and by using this we don't need to keep an in-memory dictionary.
assert len(self._packmark) == 0
assert len(self._oidqueue) == 0
# Quick exit for empty storages
if not self._serials:
return
# The oid of the object we're looking at, starting at the root
oid = ZERO
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
if self._packmark.has_key(oid):
# We've already seen this object
continue
self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object
tid = self._getCurrentSerial(oid)
# Say there's no root object (as is the case in some of the unit
# tests), and we're looking up oid ZERO. Then serial will be None.
if tid is not None:
data = self._pickles[oid+tid]
# Now get the oids of all the objects referenced by this pickle
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
for oid in refdoids:
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume()
oid = rec and rec[1]
assert len(self._oidqueue) == 0
def _sweep(self, txn):
c = self._serials.cursor(txn=txn)
try:
rec = c.first()
while rec:
oid = rec[0]
rec = c.next()
# If packmark (which knows about all the root reachable
# objects) doesn't have a record for this guy, then we can zap
# it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
self._oidqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
self._packmark.truncate(txn=txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
while orec:
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Now collect the pickle data and do reference counting
c = self._pickles.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[0][:8] == oid:
data = rec[1]
c.delete()
rec = c.next()
deltas = {}
self._update(deltas, data, -1)
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
assert refcount >= 0
if refcount == 0:
self._oidqueue.append(oid, txn)
else:
self._refcounts.put(oid, p64(refcount), txn=txn)
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
#
# Stuff we don't support
#
def supportsTransactionalUndo(self):
return False
def supportsUndo(self):
return False
def supportsVersions(self):
return False
# Don't implement these
#
# versionEmpty(self, version)
# versions(self, max=None)
# loadSerial(self, oid, serial)
# getSerial(self, oid)
# transactionalUndo(self, tid, transaction)
# undoLog(self, first=0, last=-20, filter=None)
# history(self, oid, version=None, size=1, filter=None)
# iterator(self, start=None, stop=None)
class _Autopack(threading.Thread):
def __init__(self, storage, frequency):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
# Bookkeeping
self._stop = False
self._nextpack = 0
def run(self):
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Run the autopack phase
self._storage.pack('ignored', referencesf)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
"""Berkeley storage without undo or versioning. """Berkeley storage without undo or versioning.
""" """
__version__ = '$Revision: 1.13 $'[-2:][0] __version__ = '$Revision: 1.14 $'[-2:][0]
import time
import threading
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net. It is compatible with release 3.4 of # http://pybsddb.sourceforge.net. It is compatible with release 3.4 of
...@@ -29,15 +32,29 @@ from BerkeleyBase import BerkeleyBase ...@@ -29,15 +32,29 @@ from BerkeleyBase import BerkeleyBase
from ZODB import POSException from ZODB import POSException
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import zLOG
ABORT = 'A' ABORT = 'A'
COMMIT = 'C' COMMIT = 'C'
PRESENT = 'X' PRESENT = 'X'
ZERO = '\0'*8 ZERO = '\0'*8
# Number of seconds for the autopack thread to sleep before checking to see if
# it's time for another autopack run. Lower numbers mean more processing,
# higher numbers mean less responsiveness to shutdown requests. 10 seconds
# seems like a good compromise.
AUTOPACK_CHECK_SLEEP = 10
try:
True, False
except NameError:
True = 1
False = 0
class Minimal(BerkeleyBase): class Minimal(BerkeleyBase, ConflictResolvingStorage):
def _setupDBs(self): def _setupDBs(self):
# Data Type Assumptions: # Data Type Assumptions:
# #
...@@ -77,17 +94,35 @@ class Minimal(BerkeleyBase): ...@@ -77,17 +94,35 @@ class Minimal(BerkeleyBase):
# no pending entry. It is a database invariant that if the # no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty. # pending table is empty, the oids table must also be empty.
# #
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
#
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
# It is also used during pack to list objects for which no more
# references exist, such that the objects can be completely packed
# away.
#
self._serials = self._setupDB('serials', db.DB_DUP) self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles') self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts') self._refcounts = self._setupDB('refcounts')
self._oids = self._setupDB('oids') self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending') self._pending = self._setupDB('pending')
# Tables to support packing.
self._packmark = self._setupDB('packmark')
self._oidqueue = db.DB(self._env)
self._oidqueue.set_re_len(8)
# BAW: do we need to set the queue extent size?
self._oidqueue.open(self._prefix + 'oidqueue',
db.DB_QUEUE, db.DB_CREATE)
# Do recovery and consistency checks # Do recovery and consistency checks
pendings = self._pending.keys() pendings = self._pending.keys()
assert len(pendings) <= 1 assert len(pendings) <= 1
if len(pendings) == 0: if len(pendings) == 0:
assert len(self._oids) == 0 assert len(self._oids) == 0
return else:
# Do recovery # Do recovery
tid = pendings[0] tid = pendings[0]
flag = self._pending.get(tid) flag = self._pending.get(tid)
...@@ -95,35 +130,38 @@ class Minimal(BerkeleyBase): ...@@ -95,35 +130,38 @@ class Minimal(BerkeleyBase):
self._lock_acquire() self._lock_acquire()
try: try:
if flag == ABORT: if flag == ABORT:
self._do(self._doabort, tid) self._withtxn(self._doabort, tid)
else: else:
self._do(self._docommit, tid) self._withtxn(self._docommit, tid)
finally: finally:
self._lock_release() self._lock_release()
# Set up the autopacking thread
if self._config.frequency > 0:
config = self._config
self._autopacker = _Autopack(self, config.frequency)
self._autopacker.start()
def close(self): def close(self):
# We must stop the autopacker first before closing any tables. BAW:
# should we use a timeout on the join() call? I'm not sure. On the
# one hand we don't want to block forever, but on the other, killing
# the autopacker thread in the middle of real work could leave the
# databases in a corrupted state, requiring recovery. With a
# AUTOPACK_CHECK_SLEEP low enough, we shouldn't be blocking for long.
if self._autopacker:
zLOG.LOG('Minimal storage', zLOG.INFO, 'stopping autopack thread')
self._autopacker.stop()
self._autopacker.join()
self._serials.close() self._serials.close()
self._pickles.close() self._pickles.close()
self._refcounts.close() self._refcounts.close()
self._oids.close() self._oids.close()
self._pending.close() self._pending.close()
self._packmark.close()
self._oidqueue.close()
BerkeleyBase.close(self) BerkeleyBase.close(self)
def _do(self, meth, tid): def _doabort(self, txn, tid):
txn = self._env.txn_begin()
try:
meth(tid, txn)
self._oids.truncate(txn)
self._pending.truncate(txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
def _doabort(self, tid, txn):
co = cs = None co = cs = None
try: try:
co = self._oids.cursor(txn=txn) co = self._oids.cursor(txn=txn)
...@@ -145,8 +183,14 @@ class Minimal(BerkeleyBase): ...@@ -145,8 +183,14 @@ class Minimal(BerkeleyBase):
# if co.close() were to fail. In practice this shouldn't happen. # if co.close() were to fail. In practice this shouldn't happen.
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
# We're done with these tables
self._oids.truncate(txn)
self._pending.truncate()
def _abort(self):
self._withtxn(self._doabort, self._serial)
def _docommit(self, tid, txn): def _docommit(self, txn, tid):
deltas = {} deltas = {}
co = cs = None co = cs = None
try: try:
...@@ -183,6 +227,9 @@ class Minimal(BerkeleyBase): ...@@ -183,6 +227,9 @@ class Minimal(BerkeleyBase):
# if co.close() were to fail. In practice this shouldn't happen. # if co.close() were to fail. In practice this shouldn't happen.
if co: co.close() if co: co.close()
if cs: cs.close() if cs: cs.close()
# We're done with this table
self._oids.truncate(txn)
self._pending.truncate()
# Now, to finish up, we need apply the refcount deltas to the # Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0 # refcounts table, and do recursive collection of all refcount == 0
# objects. # objects.
...@@ -192,9 +239,9 @@ class Minimal(BerkeleyBase): ...@@ -192,9 +239,9 @@ class Minimal(BerkeleyBase):
def _update_refcounts(self, deltas, txn): def _update_refcounts(self, deltas, txn):
newdeltas = {} newdeltas = {}
for oid, delta in deltas.items(): for oid, delta in deltas.items():
rc = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta refcount = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
assert rc >= 0 assert refcount >= 0
if rc == 0: if refcount == 0:
# The reference count for this object has just gone to zero, # The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials, # so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its # pickles and refcounts table. Note that before we remove its
...@@ -209,7 +256,7 @@ class Minimal(BerkeleyBase): ...@@ -209,7 +256,7 @@ class Minimal(BerkeleyBase):
self._refcounts.delete(oid, txn=txn) self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn) self._pickles.delete(oid+current, txn=txn)
else: else:
self._refcounts.put(oid, p64(rc), txn=txn) self._refcounts.put(oid, p64(refcount), txn=txn)
# Return the list of objects referenced by pickles just deleted in # Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round. # this round, for decref'ing on the next go 'round.
return newdeltas return newdeltas
...@@ -220,52 +267,48 @@ class Minimal(BerkeleyBase): ...@@ -220,52 +267,48 @@ class Minimal(BerkeleyBase):
# will be aborted. # will be aborted.
self._pending[self._serial] = ABORT self._pending[self._serial] = ABORT
def store(self, oid, serial, data, version, transaction): def _dostore(self, txn, oid, serial, data):
if transaction is not self._transaction: conflictresolved = False
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
oserial = self._getCurrentSerial(oid) oserial = self._getCurrentSerial(oid)
if oserial is not None and serial <> oserial: if oserial is not None and serial <> oserial:
# The object exists in the database, but the serial number # The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial # given in the call is not the same as the last stored serial
# number. Raise a ConflictError. # number. Raise a ConflictError.
# data = self.tryToResolveConflict(oid, oserial, serial, data)
# BAW: do application level conflict resolution if data:
conflictresolved = True
else:
raise POSException.ConflictError(serials=(oserial, serial)) raise POSException.ConflictError(serials=(oserial, serial))
# Optimistically write to the serials and pickles table. Be sure # Optimistically write to the serials and pickles table. Be sure
# to also update the oids table for this object too. # to also update the oids table for this object too.
newserial = self._serial newserial = self._serial
txn = self._env.txn_begin()
try:
self._serials.put(oid, newserial, txn=txn) self._serials.put(oid, newserial, txn=txn)
self._pickles.put(oid+newserial, data, txn=txn) self._pickles.put(oid+newserial, data, txn=txn)
self._oids.put(oid, PRESENT, txn=txn) self._oids.put(oid, PRESENT, txn=txn)
except:
txn.abort()
self._docheckpoint()
raise
else:
txn.commit()
self._docheckpoint()
finally:
self._lock_release()
# Return the new serial number for the object # Return the new serial number for the object
if conflictresolved:
return ResolvedSerial
return newserial return newserial
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
return self._withtxn(self._dostore, oid, serial, data)
finally:
self._lock_release()
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# Twiddle the pending flag to COMMIT now since after the vote call, we # Twiddle the pending flag to COMMIT now since after the vote call, we
# promise that the changes will be committed, no matter what. The # promise that the changes will be committed, no matter what. The
# recovery process will check this. # recovery process will check this.
self._pending[self._serial] = COMMIT self._pending[self._serial] = COMMIT
self._do(self._docommit, self._serial) self._withtxn(self._docommit, self._serial)
def _abort(self):
self._do(self._doabort, self._serial)
# #
# Accessor interface # Accessor interface
...@@ -326,3 +369,207 @@ class Minimal(BerkeleyBase): ...@@ -326,3 +369,207 @@ class Minimal(BerkeleyBase):
# So BaseStorage.getSerial() just works. Note that this storage # So BaseStorage.getSerial() just works. Note that this storage
# doesn't support versions. # doesn't support versions.
return '' return ''
#
# Packing. In Minimal storage, packing is only required to get rid of
# object cycles, since there are no old object revisions.
#
def pack(self, t, zreferencesf):
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
# the same.
assert zreferencesf == referencesf
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
#
# Also, we don't care about the pack time, since we don't need to
# collect object revisions
self._dopack()
finally:
self._packlock.release()
zLOG.LOG('Minimal storage', zLOG.INFO, 'classic pack finished')
def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire()
try:
self._withtxn(self._mark)
finally:
self._lock_release()
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._lock_acquire()
try:
self._withtxn(self._sweep)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
def _mark(self, txn):
# Find the oids for all the objects reachable from the root. To
# reduce the amount of in-core memory we need do do a pack operation,
# we'll save the mark data in the packmark table. The oidqueue is a
# BerkeleyDB Queue that holds the list of object ids to look at next,
# and by using this we don't need to keep an in-memory dictionary.
assert len(self._packmark) == 0
assert len(self._oidqueue) == 0
# Quick exit for empty storages
if not self._serials:
return
# The oid of the object we're looking at, starting at the root
oid = ZERO
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
if self._packmark.has_key(oid):
# We've already seen this object
continue
self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object
tid = self._getCurrentSerial(oid)
# Say there's no root object (as is the case in some of the unit
# tests), and we're looking up oid ZERO. Then serial will be None.
if tid is not None:
data = self._pickles[oid+tid]
# Now get the oids of all the objects referenced by this pickle
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
for oid in refdoids:
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume()
oid = rec and rec[1]
assert len(self._oidqueue) == 0
def _sweep(self, txn):
c = self._serials.cursor(txn=txn)
try:
rec = c.first()
while rec:
oid = rec[0]
rec = c.next()
# If packmark (which knows about all the root reachable
# objects) doesn't have a record for this guy, then we can zap
# it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
self._oidqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
self._packmark.truncate(txn=txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume()
while orec:
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
rec = c.set(oid)
while rec and rec[0] == oid:
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Now collect the pickle data and do reference counting
c = self._pickles.cursor(txn)
try:
rec = c.set_range(oid)
while rec and rec[0][:8] == oid:
data = rec[1]
c.delete()
rec = c.next()
deltas = {}
self._update(deltas, data, -1)
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
assert refcount >= 0
if refcount == 0:
self._oidqueue.append(oid, txn)
else:
self._refcounts.put(oid, p64(refcount), txn=txn)
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume()
assert len(self._oidqueue) == 0
#
# Stuff we don't support
#
def supportsTransactionalUndo(self):
return False
def supportsUndo(self):
return False
def supportsVersions(self):
return False
# Don't implement these
#
# versionEmpty(self, version)
# versions(self, max=None)
# loadSerial(self, oid, serial)
# getSerial(self, oid)
# transactionalUndo(self, tid, transaction)
# undoLog(self, first=0, last=-20, filter=None)
# history(self, oid, version=None, size=1, filter=None)
# iterator(self, start=None, stop=None)
class _Autopack(threading.Thread):
def __init__(self, storage, frequency):
threading.Thread.__init__(self)
self._storage = storage
self._frequency = frequency
# Bookkeeping
self._stop = False
self._nextpack = 0
def run(self):
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread started')
while not self._stop:
now = time.time()
if now > self._nextpack:
# Run the autopack phase
self._storage.pack('ignored', referencesf)
self._nextpack = now + self._frequency
# Now we sleep for a little while before we check again. Sleep
# for the minimum of self._frequency and AUTOPACK_CHECK_SLEEPso as
# to be as responsive as ossible to .stop() calls.
time.sleep(min(self._frequency, AUTOPACK_CHECK_SLEEP))
zLOG.LOG('Minimal storage', zLOG.INFO, 'autopack thread finished')
def stop(self):
self._stop = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment