Commit 346c9d00 authored by Julien Muchembled's avatar Julien Muchembled

client: fix partial import from a source storage

The correct way to specify a start/stop tid is when constructing the 'source'
object, hence the remove of start/stop args. In fact, source.iterator()
does not always take such args.

On the other hand, when resuming import, Application.importFrom must manage
with incomplete preindex.
parent b648904b
...@@ -160,11 +160,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -160,11 +160,7 @@ class Storage(BaseStorage.BaseStorage,
def copyTransactionsFrom(self, source, verbose=False): def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """ """ Zope compliant API """
return self.importFrom(source) return self.app.importFrom(self, source)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(self, source, start, stop, preindex)
def pack(self, t, referencesf, gc=False): def pack(self, t, referencesf, gc=False):
if gc: if gc:
......
...@@ -868,20 +868,25 @@ class Application(ThreadedApplication): ...@@ -868,20 +868,25 @@ class Application(ThreadedApplication):
self._insertMetadata(txn_info, txn_ext) self._insertMetadata(txn_info, txn_ext)
return result return result
def importFrom(self, storage, source, start, stop, preindex=None): def importFrom(self, storage, source):
# TODO: The main difference with BaseStorage implementation is that # TODO: The main difference with BaseStorage implementation is that
# preindex can't be filled with the result 'store' (tid only # preindex can't be filled with the result 'store' (tid only
# known after 'tpc_finish'. This method could be dropped if we # known after 'tpc_finish'. This method could be dropped if we
# implemented IStorageRestoreable (a wrapper around source would # implemented IStorageRestoreable (a wrapper around source would
# still be required for partial import). # still be required for partial import).
if preindex is None: preindex = {}
preindex = {} for transaction in source.iterator():
for transaction in source.iterator(start, stop):
tid = transaction.tid tid = transaction.tid
self.tpc_begin(storage, transaction, tid, transaction.status) self.tpc_begin(storage, transaction, tid, transaction.status)
for r in transaction: for r in transaction:
oid = r.oid oid = r.oid
pre = preindex.get(oid) try:
pre = preindex[oid]
except KeyError:
try:
pre = self.load(oid)[1]
except NEOStorageNotFoundError:
pre = ZERO_TID
self.store(oid, pre, r.data, r.version, transaction) self.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = tid preindex[oid] = tid
conflicted = self.tpc_vote(transaction) conflicted = self.tpc_vote(transaction)
......
...@@ -891,10 +891,9 @@ class NEOCluster(object): ...@@ -891,10 +891,9 @@ class NEOCluster(object):
if dummy_zodb is None: if dummy_zodb is None:
from ..stat_zodb import PROD1 from ..stat_zodb import PROD1
dummy_zodb = PROD1(random) dummy_zodb = PROD1(random)
preindex = {}
as_storage = dummy_zodb.as_storage as_storage = dummy_zodb.as_storage
return lambda count: self.getZODBStorage().importFrom( return lambda count: self.getZODBStorage().copyTransactionsFrom(
as_storage(count), preindex=preindex) as_storage(count))
def populate(self, transaction_list, tid=lambda i: p64(i+1), def populate(self, transaction_list, tid=lambda i: p64(i+1),
oid=lambda i: p64(i+1)): oid=lambda i: p64(i+1)):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment