Commit 05bf48de authored by Julien Muchembled's avatar Julien Muchembled

importer: fetch and process the data to import in a separate process

A new subprocess is used to:
- fetch data from the source DB
- repickle to change oids (when merging several DB)
- compress
- checksum

This is mostly useful for the second step, which is relatively much slower than
any other step, while not releasing the GIL.

By using a second CPU core, it is also often possible to use a better
compression algorithm for free (e.g. zlib=9). Actually, smaller data can speed
up the writing process.

In addition to greatly speed up the import by parallelizing fetch+process with
write, it also makes the main process more reactive to queries from client
nodes.
parent 30a02bdc
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import cPickle, pickle, time
import cPickle, pickle, sys, time
from bisect import bisect, insort
from collections import deque
from cStringIO import StringIO
......@@ -33,6 +33,9 @@ from neo.lib.protocol import BackendNotImplemented, MAX_TID
patch.speedupFileStorageTxnLookup()
FORK = sys.platform != 'win32'
class Reference(object):
__slots__ = "value",
......@@ -396,6 +399,7 @@ class ImporterDatabaseManager(DatabaseManager):
zodb = self.zodb[-1]
self.zodb_loid = zodb.shift_oid + zodb.next_oid - 1
self.zodb_tid = self.db.getLastTID(self.zodb_ltid) or 0
if callable(self._import):
self._import = self._import()
def doOperation(self, app):
......@@ -403,6 +407,62 @@ class ImporterDatabaseManager(DatabaseManager):
app.newTask(self._import)
def _import(self):
u64 = util.u64
if FORK:
from multiprocessing import Process
from ..shared_queue import Queue
queue = Queue(1<<24)
process = self._import_process = Process(
target=lambda: queue(self._iter_zodb()))
process.daemon = True
process.start()
else:
queue = self._iter_zodb()
process = None
object_list = []
data_id_list = []
for txn in queue:
if txn is None:
break
if len(txn) == 3:
oid, data_id, data_tid = txn
if data_id is not None:
checksum, data, compression = data_id
data_id = self.holdData(checksum, oid, data, compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
# Give the main loop the opportunity to process requests
# from other nodes. In particular, clients may commit. If the
# storage node exits after such commit, and before we actually
# update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is
# solved when resuming the migration.
# XXX: The leak was solved by the deduplication,
# but it was disabled by default.
else:
tid = txn[-1]
self.storeTransaction(tid, object_list,
((x[0] for x in object_list),) + txn,
False)
self.releaseData(data_id_list)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn[0], txn[1], len(object_list))
del object_list[:], data_id_list[:]
if self._last_commit + 1 < time.time():
self.commit()
self.zodb_tid = u64(tid)
yield
if process:
process.join()
self.commit()
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList
""".split():
setattr(self, x, getattr(self.db, x))
def _iter_zodb(self):
p64 = util.p64
u64 = util.u64
tid = p64(self.zodb_tid + 1) if self.zodb_tid else None
......@@ -412,66 +472,40 @@ class ImporterDatabaseManager(DatabaseManager):
zodb_list.append(ZODBIterator(zodb, tid, p64(self.zodb_ltid)))
except StopIteration:
pass
if zodb_list:
tid = None
def finish():
if tid:
self.storeTransaction(tid, object_list, (
(x[0] for x in object_list),
str(txn.user), str(txn.description),
cPickle.dumps(txn.extension),
txn.status == 'p', tid),
False)
self.releaseData(data_id_list)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn.user, txn.description, len(object_list))
del object_list[:], data_id_list[:]
if self._last_commit + 1 < time.time():
self.commit()
self.zodb_tid = u64(tid)
_compress = compress.getCompress(self.compress)
object_list = []
data_id_list = []
while zodb_list:
while 1:
zodb_list.sort()
z = zodb_list[0]
# Merge transactions with same tid. Only
# user/desc/ext from first ZODB are kept.
if tid != z.tid:
finish()
if tid:
yield txn
txn = z.transaction
tid = txn.tid
yield
txn = (str(txn.user), str(txn.description),
cPickle.dumps(txn.extension),
txn.status == 'p', tid)
zodb = z.zodb
for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid)
data_tid = r.data_txn
if data_tid or r.data is None:
data_id = None
data = None
else:
_, compression, data = _compress(zodb.repickle(r.data))
data_id = self.holdData(util.makeChecksum(data), oid, data,
compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
# Give the main loop the opportunity to process requests
# from other nodes. In particular, clients may commit. If the
# storage node exits after such commit, and before we actually
# update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is
# solved when resuming the migration.
yield
data = util.makeChecksum(data), data, compression
yield oid, data, data_tid
try:
z.next()
except StopIteration:
del zodb_list[0]
self._last_commit = 0
finish()
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList
""".split():
setattr(self, x, getattr(self.db, x))
if not zodb_list:
break
yield txn
yield
def inZodb(self, oid, tid=None, before_tid=None):
return oid <= self.zodb_loid and (
......@@ -610,7 +644,6 @@ class WriteBack(object):
_changed = False
_process = None
threading = False
def __init__(self, db, storage):
self._db = db
......@@ -631,10 +664,10 @@ class WriteBack(object):
if self._process:
self._event.set()
else:
if self.threading:
from threading import Thread as Process, Event
else:
if FORK:
from multiprocessing import Process, Event
else:
from threading import Thread as Process, Event
self._event = Event()
self._idle = Event()
self._stop = Event()
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from msgpack import Packer, Unpacker
class Queue(object):
"""Unidirectional pipe for asynchronous and fast exchange of big amounts
of data between 2 processes.
It is implemented using shared memory, a few locks and msgpack
serialization. While the latter is faster than C pickle, it was mainly
chosen for its streaming API while deserializing, which greatly reduces
the locking overhead for the consumer process.
There is no mechanism to end a communication, so this information must be
in the exchanged data, for example by choosing a marker object like None:
- the last object sent by the producer is this marker
- the consumer stops iterating when it gets this marker
As long as there are data being exchanged, the 2 processes can't change
roles (producer/consumer).
"""
def __init__(self, max_size):
from multiprocessing import Lock, RawArray, RawValue
self._max_size = max_size
self._array = RawArray('c', max_size)
self._pos = RawValue('L')
self._size = RawValue('L')
self._locks = Lock(), Lock(), Lock()
def __repr__(self):
return "<%s pos=%s size=%s max_size=%s>" % (self.__class__.__name__,
self._pos.value, self._size.value, self._max_size)
def __iter__(self):
"""Iterate endlessly over all objects sent by the producer
Internally, this method uses a receiving buffer that is lost if
interrupted (GeneratorExit). If this buffer was not empty, the queue
is left in a inconsistent state and this method can't be called again.
So the correct way to split a loop is to first get an iterator
explicitly:
iq = iter(queue)
for x in iq:
if ...:
break
for x in iq:
...
"""
unpacker = Unpacker(use_list=False, raw=True)
feed = unpacker.feed
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
while 1:
for data in unpacker:
yield data
while 1:
with lock:
p = pos.value
s = size.value
if s:
break
get_lock.acquire()
e = p + s
if e < max_size:
feed(array[p:e])
else:
feed(array[p:])
e -= max_size
feed(array[:e])
with lock:
pos.value = e
n = size.value
size.value = n - s
if n == max_size:
put_lock.acquire(0)
put_lock.release()
def __call__(self, iterable):
"""Fill the queue with given objects
Hoping than msgpack.Packer gets a streaming API, 'iterable' should not
be split (i.e. this method should be called only once, like __iter__).
"""
pack = Packer(use_bin_type=True).pack
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
for data in iterable:
data = pack(data)
n = len(data)
i = 0
while 1:
if not left:
while 1:
with lock:
p = pos.value
j = size.value
left = max_size - j
if left:
break
put_lock.acquire()
p += j
if p >= max_size:
p -= max_size
e = min(p + min(n, left), max_size)
j = e - p
array[p:e] = data[i:i+j]
n -= j
i += j
with lock:
p = pos.value
s = size.value
j += s
size.value = j
if not s:
get_lock.acquire(0)
get_lock.release()
p += j
if p >= max_size:
p -= max_size
left = max_size - j
if not n:
break
def test(self):
import multiprocessing, random, sys, threading
from traceback import print_tb
r = range(50)
random.shuffle(r)
for P in threading.Thread, multiprocessing.Process:
q = Queue(23)
def t():
for n in xrange(len(r)):
yield '.' * n
yield
for n in r:
yield '.' * n
i = j = 0
p = P(target=q, args=(t(),))
p.daemon = 1
p.start()
try:
q = iter(q)
for i, x in enumerate(q):
if x is None:
break
self.assertEqual(x, '.' * i)
self.assertEqual(i, len(r))
for j in r:
self.assertEqual(next(q), '.' * j)
except KeyboardInterrupt:
print_tb(sys.exc_info()[2])
self.fail((i, j))
p.join()
if __name__ == '__main__':
import unittest
unittest.TextTestRunner().run(type('', (unittest.TestCase,), {
'runTest': test})())
......@@ -21,6 +21,8 @@ from neo.lib.util import ReadBuffer, parseNodeAddress
class UtilTests(NeoUnitTestBase):
from neo.storage.shared_queue import test as testSharedQueue
def test_parseNodeAddress(self):
""" Parsing of addresses """
def test(parsed, *args):
......
......@@ -22,15 +22,13 @@ import transaction, ZODB
from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging
from neo.lib.util import u64
from neo.storage.database import getAdapterKlass, manager
from neo.storage.database.importer import \
Repickler, TransactionRecord, WriteBack
from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import Repickler, TransactionRecord
from .. import expectedFailure, getTempDirectory, random_tree, Patch
from . import NEOCluster, NEOThreadedTest
from ZODB import serialize
from ZODB.FileStorage import FileStorage
class Equal:
_recurse = {}
......@@ -244,6 +242,7 @@ class ImporterTests(NEOThreadedTest):
finalCheck(db.open().root()['tree'])
db.close()
@unittest.skipUnless(importer.FORK, 'no os.fork')
def test1(self):
self._importFromFileStorage()
......@@ -267,7 +266,7 @@ class ImporterTests(NEOThreadedTest):
def sleep(orig, seconds):
self.assertEqual(len(tid_list), 5)
p.revert()
with Patch(WriteBack, threading=True), \
with Patch(importer, FORK=False), \
Patch(TransactionRecord, __init__=__init__), \
Patch(manager.DatabaseManager, fetchObject=fetchObject), \
Patch(time, sleep=sleep) as p:
......
......@@ -38,7 +38,7 @@ extras_require = {
'master': [],
'storage-sqlite': [],
'storage-mysqldb': ['mysqlclient'],
'storage-importer': zodb_require,
'storage-importer': zodb_require + ['msgpack>=0.5.6'],
}
extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2',
'neoppod[%s]' % ', '.join(extras_require)]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment