Commit 6b0ee461 authored by Jim Fulton's avatar Jim Fulton

Merged the shane-poll-invalidations branch, which adds RelStorage

support.
parent d2a26bcc
......@@ -30,6 +30,7 @@ from persistent import PickleCache
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
......@@ -94,8 +95,16 @@ class Connection(ExportImport, object):
# Multi-database support
self.connections = {self._db.database_name: self}
self._normal_storage = self._storage = db.storage
self.new_oid = db.storage.new_oid
storage = db.storage
if IMVCCStorage.providedBy(storage):
# Use a connection-specific storage instance.
self._mvcc_storage = True
storage = storage.new_instance()
else:
self._mvcc_storage = False
self._normal_storage = self._storage = storage
self.new_oid = storage.new_oid
self._savepoint_storage = None
# Do we need to join a txn manager?
......@@ -148,7 +157,6 @@ class Connection(ExportImport, object):
# in the cache on abort and in other connections on finish.
self._modified = []
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
......@@ -179,10 +187,10 @@ class Connection(ExportImport, object):
# _conflicts).
self._conflicts = {}
# If MVCC is enabled, then _mvcc is True and _txn_time stores
# the upper bound on transactions visible to this connection.
# That is, all object revisions must be written before _txn_time.
# If it is None, then the current revisions are acceptable.
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
......@@ -295,6 +303,9 @@ class Connection(ExportImport, object):
if self.opened:
self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary:
for connection in self.connections.values():
if connection is not self:
......@@ -323,6 +334,10 @@ class Connection(ExportImport, object):
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
if self.before is not None:
# this is an historical connection. Invalidations are irrelevant.
return
......@@ -462,6 +477,16 @@ class Connection(ExportImport, object):
# Process pending invalidations.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(self._cache.cache_data.keys())
elif invalidated:
self._cache.invalidate(invalidated)
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
......@@ -1048,6 +1073,11 @@ class Connection(ExportImport, object):
if getattr(self, '_reader', None) is not None:
self._reader._cache = cache
def _releaseStorage(self):
"""Tell the storage to release resources it's using"""
if self._mvcc_storage:
self._storage.release()
##########################################################################
# Python protocol
......
......@@ -34,6 +34,7 @@ import transaction.weakset
from zope.interface import implements
from ZODB.interfaces import IDatabase
from ZODB.interfaces import IMVCCStorage
import BTrees.OOBTree
import transaction
......@@ -198,6 +199,7 @@ class ConnectionPool(AbstractConnectionPool):
# reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c._resetCache()
c._releaseStorage()
def reduce_size(self):
self._reduce_size()
......@@ -452,24 +454,32 @@ class DB(object):
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None
if IMVCCStorage.providedBy(storage):
temp_storage = storage.new_instance()
else:
temp_storage = storage
try:
storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
from persistent.mapping import PersistentMapping
root = PersistentMapping()
# Manually create a pickle for the root to put in the storage.
# The pickle must be in the special ZODB format.
file = cStringIO.StringIO()
p = cPickle.Pickler(file, 1)
p.dump((root.__class__, None))
p.dump(root.__getstate__())
t = transaction.Transaction()
t.description = 'initial database creation'
storage.tpc_begin(t)
storage.store(z64, None, file.getvalue(), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
try:
temp_storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
from persistent.mapping import PersistentMapping
root = PersistentMapping()
# Manually create a pickle for the root to put in the storage.
# The pickle must be in the special ZODB format.
file = cStringIO.StringIO()
p = cPickle.Pickler(file, 1)
p.dump((root.__class__, None))
p.dump(root.__getstate__())
t = transaction.Transaction()
t.description = 'initial database creation'
temp_storage.tpc_begin(t)
temp_storage.store(z64, None, file.getvalue(), '', t)
temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t)
finally:
if IMVCCStorage.providedBy(temp_storage):
temp_storage.release()
# Multi-database setup.
if databases is None:
......
......@@ -37,7 +37,7 @@ class MappingStorage(object):
def __init__(self, name='MappingStorage'):
self.__name__ = name
self._data = {} # {oid->{tid->pickle}}
self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
self._ltid = None
self._last_pack = None
_lock = threading.RLock()
......
......@@ -953,6 +953,111 @@ class IStorageUndoable(IStorage):
# DB pass-through
class IMVCCStorage(IStorage):
"""A storage that provides MVCC semantics internally.
MVCC (multi-version concurrency control) means each user of a
database has a snapshot view of the database. The snapshot view
does not change, even if concurrent connections commit
transactions, until a transaction boundary. Relational databases
that support serializable transaction isolation provide MVCC.
Storages that implement IMVCCStorage, such as RelStorage, provide
MVCC semantics at the ZODB storage layer. When ZODB.Connection uses
a storage that implements IMVCCStorage, each connection uses a
connection-specific storage instance, and that storage instance
provides a snapshot of the database.
By contrast, storages that do not implement IMVCCStorage, such as
FileStorage, rely on ZODB.Connection to provide MVCC semantics, so
in that case, one storage instance is shared by many
ZODB.Connections. Applications that use ZODB.Connection always have
a snapshot view of the database; IMVCCStorage only modifies which
layer of ZODB provides MVCC.
Furthermore, IMVCCStorage changes the way object invalidation
works. An essential feature of ZODB is the propagation of object
invalidation messages to keep in-memory caches up to date. Storages
like FileStorage and ZEO.ClientStorage send invalidation messages
to all other Connection instances at transaction commit time.
Storages that implement IMVCCStorage, on the other hand, expect the
ZODB.Connection to poll for a list of invalidated objects.
Certain methods of IMVCCStorage implementations open persistent
back end database sessions and retain the sessions even after the
method call finishes::
load
loadEx
loadSerial
loadBefore
store
restore
new_oid
history
tpc_begin
tpc_vote
tpc_abort
tpc_finish
If you know that the storage instance will no longer be used after
calling any of these methods, you should call the release method to
release the persistent sessions. The persistent sessions will be
reopened as necessary if you call one of those methods again.
Other storage methods open short lived back end sessions and close
the back end sessions before returning. These include::
__len__
getSize
undoLog
undo
pack
iterator
These methods do not provide MVCC semantics, so these methods
operate on the most current view of the database, rather than the
snapshot view that the other methods use.
"""
def new_instance():
"""Creates and returns another storage instance.
The returned instance provides IMVCCStorage and connects to the
same back-end database. The database state visible by the
instance will be a snapshot that varies independently of other
storage instances.
"""
def release():
"""Release all persistent sessions used by this storage instance.
After this call, the storage instance can still be used;
calling methods that use persistent sessions will cause the
persistent sessions to be reopened.
"""
def poll_invalidations():
"""Poll the storage for external changes.
Returns either a sequence of OIDs that have changed, or None. When a
sequence is returned, the corresponding objects should be removed
from the ZODB in-memory cache. When None is returned, the storage is
indicating that so much time has elapsed since the last poll that it
is no longer possible to enumerate all of the changed OIDs, since the
previous transaction seen by the connection has already been packed.
In that case, the ZODB in-memory cache should be cleared.
"""
def sync(force=True):
"""Updates the internal snapshot to the current state of the database.
If the force parameter is False, the storage may choose to
ignore this call. By ignoring this call, a storage can reduce
the frequency of database polls, thus reducing database load.
"""
class IStorageCurrentRecordIteration(IStorage):
def record_iternext(next=None):
......
##############################################################################
#
# Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""An extension of MappingStorage that depends on polling.
Each Connection has its own view of the database. Polling updates each
connection's view.
"""
import time
import BTrees
from ZODB.interfaces import IMVCCStorage
from ZODB.MappingStorage import MappingStorage
from ZODB.TimeStamp import TimeStamp
from zope.interface import implements
class MVCCMappingStorage(MappingStorage):
implements(IMVCCStorage)
def __init__(self, name="MVCC Mapping Storage"):
MappingStorage.__init__(self, name=name)
# _polled_tid contains the transaction ID at the last poll.
self._polled_tid = ''
def new_instance(self):
"""Returns a storage instance that is a view of the same data.
"""
res = MVCCMappingStorage(name=self.__name__)
res._transactions = self._transactions
return res
def sync(self, force=False):
pass
def release(self):
pass
def poll_invalidations(self):
"""Poll the storage for changes by other connections.
"""
new_tid = self._transactions.maxKey()
if self._polled_tid:
if not self._transactions.has_key(self._polled_tid):
# This connection is so old that we can no longer enumerate
# all the changes.
self._polled_tid = new_tid
return None
changed_oids = set()
for tid, txn in self._transactions.items(
self._polled_tid, new_tid, excludemin=True, excludemax=False):
if txn.status == 'p':
# This transaction has been packed, so it is no longer
# possible to enumerate all changed oids.
self._polled_tid = new_tid
return None
if tid == self._ltid:
# ignore the transaction committed by this connection
continue
changes = txn.data
# pull in changes from the transaction log
for oid, value in changes.iteritems():
tid_data = self._data.get(oid)
if tid_data is None:
tid_data = BTrees.OOBTree.OOBucket()
self._data[oid] = tid_data
tid_data[tid] = changes[oid]
changed_oids.update(changes.keys())
self._polled_tid = new_tid
return list(changed_oids)
##############################################################################
#
# Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
from persistent.mapping import PersistentMapping
import transaction
from ZODB.DB import DB
from ZODB.tests.MVCCMappingStorage import MVCCMappingStorage
from ZODB.tests import (
BasicStorage,
HistoryStorage,
IteratorStorage,
MTStorage,
PackableStorage,
RevisionStorage,
StorageTestBase,
Synchronization,
)
class MVCCTests:
def checkCrossConnectionInvalidation(self):
# Verify connections see updated state at txn boundaries.
# This will fail if the Connection doesn't poll for changes.
db = DB(self._storage)
try:
c1 = db.open()
r1 = c1.root()
r1['myobj'] = 'yes'
c2 = db.open()
r2 = c2.root()
self.assert_('myobj' not in r2)
storage = c1._storage
t = transaction.Transaction()
t.description = 'invalidation test'
storage.tpc_begin(t)
c1.commit(t)
storage.tpc_vote(t)
storage.tpc_finish(t)
self.assert_('myobj' not in r2)
c2.sync()
self.assert_('myobj' in r2)
self.assert_(r2['myobj'] == 'yes')
finally:
db.close()
def checkCrossConnectionIsolation(self):
# Verify MVCC isolates connections.
# This will fail if Connection doesn't poll for changes.
db = DB(self._storage)
try:
c1 = db.open()
r1 = c1.root()
r1['alpha'] = PersistentMapping()
r1['gamma'] = PersistentMapping()
transaction.commit()
# Open a second connection but don't load root['alpha'] yet
c2 = db.open()
r2 = c2.root()
r1['alpha']['beta'] = 'yes'
storage = c1._storage
t = transaction.Transaction()
t.description = 'isolation test 1'
storage.tpc_begin(t)
c1.commit(t)
storage.tpc_vote(t)
storage.tpc_finish(t)
# The second connection will now load root['alpha'], but due to
# MVCC, it should continue to see the old state.
self.assert_(r2['alpha']._p_changed is None) # A ghost
self.assert_(not r2['alpha'])
self.assert_(r2['alpha']._p_changed == 0)
# make root['alpha'] visible to the second connection
c2.sync()
# Now it should be in sync
self.assert_(r2['alpha']._p_changed is None) # A ghost
self.assert_(r2['alpha'])
self.assert_(r2['alpha']._p_changed == 0)
self.assert_(r2['alpha']['beta'] == 'yes')
# Repeat the test with root['gamma']
r1['gamma']['delta'] = 'yes'
storage = c1._storage
t = transaction.Transaction()
t.description = 'isolation test 2'
storage.tpc_begin(t)
c1.commit(t)
storage.tpc_vote(t)
storage.tpc_finish(t)
# The second connection will now load root[3], but due to MVCC,
# it should continue to see the old state.
self.assert_(r2['gamma']._p_changed is None) # A ghost
self.assert_(not r2['gamma'])
self.assert_(r2['gamma']._p_changed == 0)
# make root[3] visible to the second connection
c2.sync()
# Now it should be in sync
self.assert_(r2['gamma']._p_changed is None) # A ghost
self.assert_(r2['gamma'])
self.assert_(r2['gamma']._p_changed == 0)
self.assert_(r2['gamma']['delta'] == 'yes')
finally:
db.close()
class MVCCMappingStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
HistoryStorage.HistoryStorage,
IteratorStorage.ExtendedIteratorStorage,
IteratorStorage.IteratorStorage,
MTStorage.MTStorage,
PackableStorage.PackableStorageWithOptionalGC,
RevisionStorage.RevisionStorage,
Synchronization.SynchronizedStorage,
MVCCTests
):
def setUp(self):
self._storage = MVCCMappingStorage()
def tearDown(self):
self._storage.close()
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
checkUndoZombie = checkLoadBeforeUndo
def test_suite():
suite = unittest.makeSuite(MVCCMappingStorageTests, 'check')
return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment