Commit f6816a7c authored by Kirill Smelkov's avatar Kirill Smelkov

Merge remote-tracking branch 'origin/master' into t

* origin/master:
  client: account for cache hit/miss statistics
  client: remove redundant information from cache's __repr__
  cache: fix possible endless loop in __repr__/_iterQueue
  storage: speed up replication by not getting object next_serial for nothing
  storage: speed up replication by sending bigger network packets
  neoctl: remove ignored option
  client: bug found, add log to collect more information
  client: new 'cache-size' Storage option
  doc: mention HTTPS URLs when possible
  doc: update comment in neolog about Python issue 13773
  neolog: add support for xz-compressed logs, using external xzcat commands
  neolog: --from option now also tries to parse with dateutil
  importer: do not crash if a backup cluster tries to replicate
  storage: disable data deduplication by default
  Release version 1.8.1
parents 8bac3dba c76b3a0a
Change History Change History
============== ==============
1.8.1 (2017-11-07)
------------------
- Add support for OpenSSL >= 1.1.
- storage: fix possible crash when delaying replication requests.
- mysql: fix performance issues on read operations by using more index hints.
1.8 (2017-07-04) 1.8 (2017-07-04)
---------------- ----------------
......
...@@ -202,9 +202,9 @@ Developers interested in NEO may refer to ...@@ -202,9 +202,9 @@ Developers interested in NEO may refer to
`NEO Web site <https://neo.nexedi.com/>`_ and subscribe to following mailing `NEO Web site <https://neo.nexedi.com/>`_ and subscribe to following mailing
lists: lists:
- `neo-users <http://mail.tiolive.com/mailman/listinfo/neo-users>`_: - `neo-users <https://mail.tiolive.com/mailman/listinfo/neo-users>`_:
users discussion users discussion
- `neo-dev <http://mail.tiolive.com/mailman/listinfo/neo-dev>`_: - `neo-dev <https://mail.tiolive.com/mailman/listinfo/neo-dev>`_:
developers discussion developers discussion
Automated test results are published at Automated test results are published at
......
...@@ -59,7 +59,8 @@ class Application(ThreadedApplication): ...@@ -59,7 +59,8 @@ class Application(ThreadedApplication):
# is unreachable. # is unreachable.
max_reconnection_to_master = float('inf') max_reconnection_to_master = float('inf')
def __init__(self, master_nodes, name, compress=True, **kw): def __init__(self, master_nodes, name, compress=True, cache_size=None,
**kw):
super(Application, self).__init__(parseMasterList(master_nodes), super(Application, self).__init__(parseMasterList(master_nodes),
name, **kw) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
...@@ -69,7 +70,8 @@ class Application(ThreadedApplication): ...@@ -69,7 +70,8 @@ class Application(ThreadedApplication):
self.trying_master_node = None self.trying_master_node = None
# no self-assigned UUID, primary master will supply us one # no self-assigned UUID, primary master will supply us one
self._cache = ClientCache() self._cache = ClientCache() if cache_size is None else \
ClientCache(max_size=cache_size)
self._loading_oid = None self._loading_oid = None
self.new_oid_list = () self.new_oid_list = ()
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
...@@ -199,6 +201,12 @@ class Application(ThreadedApplication): ...@@ -199,6 +201,12 @@ class Application(ThreadedApplication):
else: else:
# Otherwise, check one by one. # Otherwise, check one by one.
master_list = self.nm.getMasterList() master_list = self.nm.getMasterList()
if not master_list:
# XXX: On shutdown, it already happened that this list
# is empty, leading to ZeroDivisionError. This
# looks a minor issue so let's wait to have more
# information.
logging.error('%r', self.__dict__)
index = (index + 1) % len(master_list) index = (index + 1) % len(master_list)
node = master_list[index] node = master_list[index]
# Connect to master # Connect to master
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import math import math
from bisect import insort from bisect import insort
...@@ -46,7 +47,7 @@ class ClientCache(object): ...@@ -46,7 +47,7 @@ class ClientCache(object):
"""In-memory pickle cache based on Multi-Queue cache algorithm """In-memory pickle cache based on Multi-Queue cache algorithm
Multi-Queue algorithm for Second Level Buffer Caches: Multi-Queue algorithm for Second Level Buffer Caches:
http://www.usenix.org/event/usenix01/full_papers/zhou/zhou_html/index.html https://www.usenix.org/event/usenix01/full_papers/zhou/zhou_html/index.html
Quick description: Quick description:
- There are multiple "regular" queues, plus a history queue - There are multiple "regular" queues, plus a history queue
...@@ -64,7 +65,8 @@ class ClientCache(object): ...@@ -64,7 +65,8 @@ class ClientCache(object):
""" """
__slots__ = ('_life_time', '_max_history_size', '_max_size', __slots__ = ('_life_time', '_max_history_size', '_max_size',
'_queue_list', '_oid_dict', '_time', '_size', '_history_size') '_queue_list', '_oid_dict', '_time', '_size', '_history_size',
'_nhit', '_nmiss')
def __init__(self, life_time=10000, max_history_size=100000, def __init__(self, life_time=10000, max_history_size=100000,
max_size=20*1024*1024): max_size=20*1024*1024):
...@@ -80,27 +82,33 @@ class ClientCache(object): ...@@ -80,27 +82,33 @@ class ClientCache(object):
self._time = 0 self._time = 0
self._size = 0 self._size = 0
self._history_size = 0 self._history_size = 0
self._nhit = self._nmiss = 0
def __repr__(self): def __repr__(self):
return ("<%s history_size=%s oid_count=%s size=%s time=%s" nload = self._nhit + self._nmiss
" queue_length=%r (life_time=%s max_history_size=%s" return ("<%s #loads=%s #oids=%s size=%s time=%s queue_length=%r"
" max_size=%s)>") % ( " (life_time=%s max_history_size=%s max_size=%s)>") % (
self.__class__.__name__, self._history_size, self.__class__.__name__,
nload and '%s (%.3g%% hit)' % (nload, 100 * self._nhit / nload),
len(self._oid_dict), self._size, self._time, len(self._oid_dict), self._size, self._time,
[sum(1 for _ in self._iterQueue(x)) [self._history_size] + [
for x in xrange(len(self._queue_list))], sum(1 for _ in self._iterQueue(level))
for level in xrange(1, len(self._queue_list))],
self._life_time, self._max_history_size, self._max_size) self._life_time, self._max_history_size, self._max_size)
def _iterQueue(self, level): def _iterQueue(self, level):
"""for debugging purpose""" """for debugging purpose"""
if level < len(self._queue_list): if level < len(self._queue_list):
item = head = self._queue_list[level] # Lockless iteration of the queue.
if item: # XXX: In case of race condition, the result is wrong but at least,
while 1: # it won't loop endlessly. If one want to collect accurate
# statistics, a lock should be used.
expire = 0
item = self._queue_list[level]
while item and item.level == level and expire < item.expire:
yield item yield item
expire = item.expire
item = item.next item = item.next
if item is head:
break
def _remove_from_oid_dict(self, item): def _remove_from_oid_dict(self, item):
item_list = self._oid_dict[item.oid] item_list = self._oid_dict[item.oid]
...@@ -157,7 +165,7 @@ class ClientCache(object): ...@@ -157,7 +165,7 @@ class ClientCache(object):
# XXX It might be better to adjust the level according to the object # XXX It might be better to adjust the level according to the object
# size. See commented factor for example. # size. See commented factor for example.
item.level = 1 + int(_log(counter, 2) item.level = 1 + int(_log(counter, 2)
# * (1.01 - float(len(item.data)) / self._max_size) # * (1.01 - len(item.data) / self._max_size)
) )
self._add(item) self._add(item)
...@@ -192,8 +200,10 @@ class ClientCache(object): ...@@ -192,8 +200,10 @@ class ClientCache(object):
if item: if item:
data = item.data data = item.data
if data is not None: if data is not None:
self._nhit += 1
self._fetched(item) self._fetched(item)
return data, item.tid, item.next_tid return data, item.tid, item.next_tid
self._nmiss += 1
def store(self, oid, data, tid, next_tid): def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache""" """Store a new data record in the cache"""
......
...@@ -32,6 +32,13 @@ ...@@ -32,6 +32,13 @@
Log debugging information to specified SQLite DB. Log debugging information to specified SQLite DB.
</description> </description>
</key> </key>
<key name="cache-size" datatype="byte-size" default="20MB">
<description>
Storage cache size in bytes. Records are cached uncompressed.
Optional ``KB``, ``MB`` or ``GB`` suffixes can (and usually are)
used to specify units other than bytes.
</description>
</key>
<key name="dynamic_master_list" datatype="existing-dirpath"> <key name="dynamic_master_list" datatype="existing-dirpath">
<description> <description>
The file designated by this option contains an updated list of master The file designated by this option contains an updated list of master
......
...@@ -147,3 +147,6 @@ class ConfigurationManager(object): ...@@ -147,3 +147,6 @@ class ConfigurationManager(object):
n = self.__get('autostart', True) n = self.__get('autostart', True)
if n: if n:
return int(n) return int(n)
def getDedup(self):
return self.__get('dedup', True)
...@@ -306,6 +306,7 @@ class Connection(BaseConnection): ...@@ -306,6 +306,7 @@ class Connection(BaseConnection):
# XXX: rename isPending, hasPendingMessages & pending methods # XXX: rename isPending, hasPendingMessages & pending methods
buffering = False
connecting = True connecting = True
client = False client = False
server = False server = False
...@@ -545,8 +546,16 @@ class Connection(BaseConnection): ...@@ -545,8 +546,16 @@ class Connection(BaseConnection):
def _addPacket(self, packet): def _addPacket(self, packet):
"""Add a packet into the write buffer.""" """Add a packet into the write buffer."""
if self.connector.queue(packet.encode()): if self.connector.queue(packet.encode()):
if packet.nodelay or 65536 < self.connector.queue_size:
assert not self.buffering
# enable polling for writing. # enable polling for writing.
self.em.addWriter(self) self.em.addWriter(self)
else:
self.buffering = True
elif self.buffering and (65536 < self.connector.queue_size
or packet.nodelay):
self.buffering = False
self.em.addWriter(self)
logging.packet(self, packet, True) logging.packet(self, packet, True)
def send(self, packet, msg_id=None): def send(self, packet, msg_id=None):
......
...@@ -72,11 +72,13 @@ class SocketConnector(object): ...@@ -72,11 +72,13 @@ class SocketConnector(object):
# disable Nagle algorithm to reduce latency # disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION] self.queued = [ENCODED_VERSION]
self.queue_size = len(ENCODED_VERSION)
return self return self
def queue(self, data): def queue(self, data):
was_empty = not self.queued was_empty = not self.queued
self.queued += data self.queued += data
self.queue_size += len(data)
return was_empty return was_empty
def _error(self, op, exc=None): def _error(self, op, exc=None):
...@@ -183,8 +185,10 @@ class SocketConnector(object): ...@@ -183,8 +185,10 @@ class SocketConnector(object):
# could be sent. # NOTE queue may grow up indefinitely in this case! # could be sent. # NOTE queue may grow up indefinitely in this case!
if n != len(msg): if n != len(msg):
self.queued[:] = msg[n:], self.queued[:] = msg[n:],
self.queue_size -= n
return False return False
del self.queued[:] del self.queued[:]
self.queue_size = 0
else: else:
assert not self.queued assert not self.queued
return True return True
......
...@@ -225,6 +225,7 @@ class Packet(object): ...@@ -225,6 +225,7 @@ class Packet(object):
_code = None _code = None
_fmt = None _fmt = None
_id = None _id = None
nodelay = True
poll_thread = False poll_thread = False
def __init__(self, *args): def __init__(self, *args):
...@@ -1444,6 +1445,8 @@ class AddTransaction(Packet): ...@@ -1444,6 +1445,8 @@ class AddTransaction(Packet):
""" """
S -> S S -> S
""" """
nodelay = False
_fmt = PStruct('add_transaction', _fmt = PStruct('add_transaction',
PTID('tid'), PTID('tid'),
PString('user'), PString('user'),
...@@ -1483,6 +1486,8 @@ class AddObject(Packet): ...@@ -1483,6 +1486,8 @@ class AddObject(Packet):
""" """
S -> S S -> S
""" """
nodelay = False
_fmt = PStruct('add_object', _fmt = PStruct('add_object',
POID('oid'), POID('oid'),
PTID('serial'), PTID('serial'),
......
...@@ -59,7 +59,7 @@ def packTID(higher, lower): ...@@ -59,7 +59,7 @@ def packTID(higher, lower):
# seconds (e.g. TZ=right/UTC), then the best we can do is to use # seconds (e.g. TZ=right/UTC), then the best we can do is to use
# TID_LOW_MAX, because TID format was not designed to support them. # TID_LOW_MAX, because TID format was not designed to support them.
# For more information about leap seconds on Unix, see: # For more information about leap seconds on Unix, see:
# http://en.wikipedia.org/wiki/Unix_time # https://en.wikipedia.org/wiki/Unix_time
# http://www.madore.org/~david/computers/unix-leap-seconds.html # http://www.madore.org/~david/computers/unix-leap-seconds.html
return pack('!LL', packed_higher, min(lower, TID_LOW_MAX)) return pack('!LL', packed_higher, min(lower, TID_LOW_MAX))
......
...@@ -24,7 +24,6 @@ from neo.lib.util import parseNodeAddress ...@@ -24,7 +24,6 @@ from neo.lib.util import parseNodeAddress
parser = getOptionParser() parser = getOptionParser()
parser.add_option('-a', '--address', help = 'specify the address (ip:port) ' \ parser.add_option('-a', '--address', help = 'specify the address (ip:port) ' \
'of an admin node', default = '127.0.0.1:9999') 'of an admin node', default = '127.0.0.1:9999')
parser.add_option('--handler', help = 'specify the connection handler')
def main(args=None): def main(args=None):
(options, args) = parser.parse_args(args=args) (options, args) = parser.parse_args(args=args)
......
...@@ -22,7 +22,7 @@ from bisect import insort ...@@ -22,7 +22,7 @@ from bisect import insort
from logging import getLevelName from logging import getLevelName
from zlib import decompress from zlib import decompress
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile) comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile, xz='xzcat')
class Log(object): class Log(object):
...@@ -41,13 +41,16 @@ class Log(object): ...@@ -41,13 +41,16 @@ class Log(object):
name, ext = name.rsplit(os.extsep, 1) name, ext = name.rsplit(os.extsep, 1)
ZipFile = comp_dict[ext] ZipFile = comp_dict[ext]
except (KeyError, ValueError): except (KeyError, ValueError):
# WKRD: Python does not support URI so we can't open in read-only # BBB: Python 2 does not support URI so we can't open in read-only
# mode. See http://bugs.python.org/issue13773 # mode. See https://bugs.python.org/issue13773
os.stat(db_path) # do not create empty DB if file is missing os.stat(db_path) # do not create empty DB if file is missing
self._db = sqlite3.connect(db_path) self._db = sqlite3.connect(db_path)
else: else:
import shutil, tempfile import shutil, subprocess, tempfile
with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile() as f:
if type(ZipFile) is str:
subprocess.check_call((ZipFile, db_path), stdout=f)
else:
shutil.copyfileobj(ZipFile(db_path), f) shutil.copyfileobj(ZipFile(db_path), f)
self._db = sqlite3.connect(f.name) self._db = sqlite3.connect(f.name)
name = name.rsplit(os.extsep, 1)[0] name = name.rsplit(os.extsep, 1)[0]
...@@ -241,16 +244,28 @@ def main(): ...@@ -241,16 +244,28 @@ def main():
parser.add_option('-s', '--sleep-interval', type="float", default=1, parser.add_option('-s', '--sleep-interval', type="float", default=1,
help='with -f, sleep for approximately N seconds (default 1.0)' help='with -f, sleep for approximately N seconds (default 1.0)'
' between iterations', metavar='N') ' between iterations', metavar='N')
parser.add_option('--from', dest='filter_from', type="float", parser.add_option('--from', dest='filter_from',
help='show records more recent that timestamp N if N > 0,' help='show records more recent that timestamp N if N > 0,'
' or now+N if N < 0', metavar='N') ' or now+N if N < 0; N can also be a string that is'
' parseable by dateutil ', metavar='N')
options, args = parser.parse_args() options, args = parser.parse_args()
if options.sleep_interval <= 0: if options.sleep_interval <= 0:
parser.error("sleep_interval must be positive") parser.error("sleep_interval must be positive")
if not args: if not args:
parser.error("no log specified") parser.error("no log specified")
filter_from = options.filter_from filter_from = options.filter_from
if filter_from and filter_from < 0: if filter_from:
try:
filter_from = float(options.filter_from)
except ValueError:
from dateutil.parser import parse
x = parse(filter_from)
if x.tzinfo:
filter_from = (x - x.fromtimestamp(0, x.tzinfo)).total_seconds()
else:
filter_from = time.mktime(x.timetuple()) + x.microsecond * 1e-6
else:
if filter_from < 0:
filter_from += time.time() filter_from += time.time()
node_list = options.node or [] node_list = options.node or []
try: try:
......
...@@ -30,6 +30,9 @@ parser.add_option('-d', '--database', help = 'database connections string') ...@@ -30,6 +30,9 @@ parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine') parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be ' parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0) 'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data'
' when setting up a new storage node')
parser.add_option('--disable-drop-partitions', action='store_true', parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is' help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current' ' useful for big databases because the current'
......
...@@ -69,7 +69,7 @@ class Application(BaseApplication): ...@@ -69,7 +69,7 @@ class Application(BaseApplication):
# operation related data # operation related data
self.operational = False self.operational = False
self.dm.setup(reset=config.getReset()) self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration() self.loadConfiguration()
# force node uuid from command line argument, for testing purpose only # force node uuid from command line argument, for testing purpose only
...@@ -258,9 +258,8 @@ class Application(BaseApplication): ...@@ -258,9 +258,8 @@ class Application(BaseApplication):
try: try:
# NOTE backup/importer processed under isIdle # NOTE backup/importer processed under isIdle
while isIdle(): while isIdle():
if task_queue[-1].next(): next(task_queue[-1]) or task_queue.rotate()
_poll(0) _poll(0)
task_queue.rotate()
break break
except StopIteration: except StopIteration:
task_queue.pop() task_queue.pop()
...@@ -274,10 +273,6 @@ class Application(BaseApplication): ...@@ -274,10 +273,6 @@ class Application(BaseApplication):
self.replicator.stop() self.replicator.stop()
def newTask(self, iterator): def newTask(self, iterator):
try:
iterator.next()
except StopIteration:
return
self.task_queue.appendleft(iterator) self.task_queue.appendleft(iterator)
def closeClient(self, connection): def closeClient(self, connection):
......
...@@ -322,8 +322,8 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -322,8 +322,8 @@ class ImporterDatabaseManager(DatabaseManager):
for zodb in self.zodb: for zodb in self.zodb:
zodb.close() zodb.close()
def setup(self, reset=0): def setup(self, reset=False, dedup=False):
self.db.setup(reset) self.db.setup(reset, dedup)
zodb_state = self.getConfiguration("zodb") zodb_state = self.getConfiguration("zodb")
if zodb_state: if zodb_state:
logging.warning("Ignoring configuration file for oid mapping." logging.warning("Ignoring configuration file for oid mapping."
...@@ -388,7 +388,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -388,7 +388,7 @@ class ImporterDatabaseManager(DatabaseManager):
finish() finish()
txn = z.transaction txn = z.transaction
tid = txn.tid tid = txn.tid
yield 1 yield
zodb = z.zodb zodb = z.zodb
for r in z.transaction: for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid) oid = p64(u64(r.oid) + zodb.shift_oid)
...@@ -413,7 +413,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -413,7 +413,7 @@ class ImporterDatabaseManager(DatabaseManager):
# update 'obj' with 'object_list', some rows in 'data' may be # update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is # unreferenced. This is not a problem because the leak is
# solved when resuming the migration. # solved when resuming the migration.
yield 1 yield
try: try:
z.next() z.next()
except StopIteration: except StopIteration:
...@@ -423,7 +423,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -423,7 +423,7 @@ class ImporterDatabaseManager(DatabaseManager):
logging.warning("All data are imported. You should change" logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.") " your configuration to use the native backend and restart.")
self._import = None self._import = None
for x in """getObject getReplicationTIDList for x in """getObject getReplicationTIDList getReplicationObjectList
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
...@@ -514,6 +514,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -514,6 +514,9 @@ class ImporterDatabaseManager(DatabaseManager):
self.db._deleteRange(partition, min_tid, max_tid) self.db._deleteRange(partition, min_tid, max_tid)
def getReplicationTIDList(self, min_tid, max_tid, length, partition): def getReplicationTIDList(self, min_tid, max_tid, length, partition):
# This method is not tested and it is anyway
# useless without getReplicationObjectList.
raise BackendNotImplemented(self.getReplicationTIDList)
p64 = util.p64 p64 = util.p64
tid = p64(self.zodb_tid) tid = p64(self.zodb_tid)
if min_tid <= tid: if min_tid <= tid:
......
...@@ -133,7 +133,7 @@ class DatabaseManager(object): ...@@ -133,7 +133,7 @@ class DatabaseManager(object):
def erase(self): def erase(self):
"""""" """"""
def _setup(self): def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database """To be overridden by the backend to set up a database
It must recover self._uncommitted_data from temporary object table. It must recover self._uncommitted_data from temporary object table.
...@@ -144,14 +144,14 @@ class DatabaseManager(object): ...@@ -144,14 +144,14 @@ class DatabaseManager(object):
""" """
@requires(_setup) @requires(_setup)
def setup(self, reset=0): def setup(self, reset=False, dedup=False):
"""Set up a database, discarding existing data first if reset is True """Set up a database, discarding existing data first if reset is True
""" """
if reset: if reset:
self.erase() self.erase()
self._readable_set = set() self._readable_set = set()
self._uncommitted_data = defaultdict(int) self._uncommitted_data = defaultdict(int)
self._setup() self._setup(dedup)
@abstract @abstract
def nonempty(self, table): def nonempty(self, table):
...@@ -441,6 +441,20 @@ class DatabaseManager(object): ...@@ -441,6 +441,20 @@ class DatabaseManager(object):
compression, checksum, data, compression, checksum, data,
None if data_serial is None else util.p64(data_serial)) None if data_serial is None else util.p64(data_serial))
@fallback
def _fetchObject(self, oid, tid):
r = self._getObject(oid, tid)
if r:
return r[:1] + r[2:]
def fetchObject(self, oid, tid):
u64 = util.u64
r = self._fetchObject(u64(oid), u64(tid))
if r:
serial, compression, checksum, data, data_serial = r
return (util.p64(serial), compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
@contextmanager @contextmanager
def replicated(self, offset): def replicated(self, offset):
readable_set = self._readable_set readable_set = self._readable_set
......
...@@ -172,7 +172,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -172,7 +172,7 @@ class MySQLDatabaseManager(DatabaseManager):
if e.args[0] != NO_SUCH_TABLE: if e.args[0] != NO_SUCH_TABLE:
raise raise
def _setup(self): def _setup(self, dedup=False):
self._config.clear() self._config.clear()
q = self.query q = self.query
p = engine = self._engine p = engine = self._engine
...@@ -240,9 +240,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -240,9 +240,9 @@ class MySQLDatabaseManager(DatabaseManager):
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
hash BINARY(20) NOT NULL, hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL, compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL, value MEDIUMBLOB NOT NULL%s
UNIQUE (hash, compression) ) ENGINE=%s""" % (""",
) ENGINE=""" + engine) UNIQUE (hash, compression)""" if dedup else "", engine))
q("""CREATE TABLE IF NOT EXISTS bigdata ( q("""CREATE TABLE IF NOT EXISTS bigdata (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
...@@ -702,6 +702,21 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -702,6 +702,21 @@ class MySQLDatabaseManager(DatabaseManager):
if r: if r:
return [(p64(tid), length or 0) for tid, length in r] return [(p64(tid), length or 0) for tid, length in r]
def _fetchObject(self, oid, tid):
r = self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(`partition`)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d AND tid = %d'
% (self._getReadablePartition(oid), oid, tid))
if r:
r = r[0]
compression = r[1]
if compression and compression & 0x80:
return (r[0], compression & 0x7f, r[2],
''.join(self._bigData(data)), r[4])
return r
def getReplicationObjectList(self, min_tid, max_tid, length, partition, def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid): min_oid):
u64 = util.u64 u64 = util.u64
......
...@@ -62,7 +62,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -62,7 +62,7 @@ class SQLiteDatabaseManager(DatabaseManager):
"""This class manages a database on SQLite. """This class manages a database on SQLite.
CAUTION: Make sure we never use statement journal files, as explained at CAUTION: Make sure we never use statement journal files, as explained at
http://www.sqlite.org/tempfiles.html for more information. https://www.sqlite.org/tempfiles.html for more information.
In other words, temporary files (by default in /var/tmp !) must In other words, temporary files (by default in /var/tmp !) must
never be used for small requests. never be used for small requests.
""" """
...@@ -112,7 +112,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -112,7 +112,7 @@ class SQLiteDatabaseManager(DatabaseManager):
if not e.args[0].startswith("no such table:"): if not e.args[0].startswith("no such table:"):
raise raise
def _setup(self): def _setup(self, dedup=False):
# SQLite does support transactional Data Definition Language statements # SQLite does support transactional Data Definition Language statements
# but unfortunately, the built-in Python binding automatically commits # but unfortunately, the built-in Python binding automatically commits
# between such statements. This anti-feature causes this method to be # between such statements. This anti-feature causes this method to be
...@@ -179,6 +179,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -179,6 +179,7 @@ class SQLiteDatabaseManager(DatabaseManager):
compression INTEGER NOT NULL, compression INTEGER NOT NULL,
value BLOB NOT NULL) value BLOB NOT NULL)
""") """)
if dedup:
q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON
data(hash, compression) data(hash, compression)
""") """)
...@@ -531,6 +532,17 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -531,6 +532,17 @@ class SQLiteDatabaseManager(DatabaseManager):
self._getPackTID(), offset, length)) self._getPackTID(), offset, length))
] or None ] or None
def _fetchObject(self, oid, tid):
for serial, compression, checksum, data, value_serial in self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=? AND tid=?',
(self._getReadablePartition(oid), oid, tid)):
if checksum:
checksum = str(checksum)
data = str(data)
return serial, compression, checksum, data, value_serial
def getReplicationObjectList(self, min_tid, max_tid, length, partition, def getReplicationObjectList(self, min_tid, max_tid, length, partition,
min_oid): min_oid):
u64 = util.u64 u64 = util.u64
......
...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler): ...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id) # NOTE msg_id: out-of-order answer conn.send(Packets.AnswerCheckTIDRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield return; yield
app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=True) @checkFeedingConnection(check=True)
...@@ -173,7 +173,7 @@ class StorageOperationHandler(EventHandler): ...@@ -173,7 +173,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id) # NOTE msg_id: out-of-order answer conn.send(Packets.AnswerCheckSerialRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield return; yield
app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=False) @checkFeedingConnection(check=False)
...@@ -210,9 +210,17 @@ class StorageOperationHandler(EventHandler): ...@@ -210,9 +210,17 @@ class StorageOperationHandler(EventHandler):
% partition), msg_id) % partition), msg_id)
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid = t
# Sending such packet does not mark the connection
# for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user, conn.send(Packets.AddTransaction(tid, user,
desc, ext, packed, ttid, oid_list), msg_id) desc, ext, packed, ttid, oid_list), msg_id)
yield # To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the
# scheduler not to switch to another background task.
# Ideally, we are filling a buffer while the kernel
# is flushing another one for a concurrent connection.
yield conn.buffering
conn.send(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) # NOTE msg_id: out-of-order answer pack_tid, next_tid, peer_tid_set), msg_id) # NOTE msg_id: out-of-order answer
yield yield
...@@ -248,15 +256,15 @@ class StorageOperationHandler(EventHandler): ...@@ -248,15 +256,15 @@ class StorageOperationHandler(EventHandler):
if not oid_set: if not oid_set:
del object_dict[serial] del object_dict[serial]
continue continue
object = dm.getObject(oid, serial) object = dm.fetchObject(oid, serial)
if not object: if not object:
conn.send(Errors.ReplicationError( conn.send(Errors.ReplicationError(
"partition %u dropped or truncated" "partition %u dropped or truncated"
% partition), msg_id) % partition), msg_id)
return return
conn.send(Packets.AddObject(oid, serial, *object[2:]), # Same as in askFetchTransactions.
msg_id) conn.send(Packets.AddObject(oid, *object), msg_id)
yield yield conn.buffering
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) # NOTE msg_id: out-of-order answer pack_tid, next_tid, next_oid, object_dict), msg_id) # NOTE msg_id: out-of-order answer
yield yield
......
...@@ -39,7 +39,7 @@ class StorageMySQLdbTests(StorageDBTests): ...@@ -39,7 +39,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(db.db, DB_PREFIX + '0') self.assertEqual(db.db, DB_PREFIX + '0')
self.assertEqual(db.user, DB_USER) self.assertEqual(db.user, DB_USER)
try: try:
db.setup(reset) db.setup(reset, True)
except NotSupportedError as m: except NotSupportedError as m:
code, m = m.args code, m = m.args
if code != UNKNOWN_STORAGE_ENGINE: if code != UNKNOWN_STORAGE_ENGINE:
......
...@@ -27,7 +27,7 @@ class StorageSQLiteTests(StorageDBTests): ...@@ -27,7 +27,7 @@ class StorageSQLiteTests(StorageDBTests):
def getDB(self, reset=0): def getDB(self, reset=0):
db = SQLiteDatabaseManager(':memory:') db = SQLiteDatabaseManager(':memory:')
db.setup(reset) db.setup(reset, True)
return db return db
def test_lockDatabase(self): def test_lockDatabase(self):
......
...@@ -675,7 +675,7 @@ class NEOCluster(object): ...@@ -675,7 +675,7 @@ class NEOCluster(object):
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True, storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', compress=True, db_user=DB_USER, db_password='', compress=True,
importer=None, autostart=None): importer=None, autostart=None, dedup=False):
self.name = 'neo_%s' % self._allocate('name', self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100)) lambda: random.randint(0, 100))
self.compress = compress self.compress = compress
...@@ -684,7 +684,7 @@ class NEOCluster(object): ...@@ -684,7 +684,7 @@ class NEOCluster(object):
for _ in xrange(master_count)] for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list) self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
kw = Node.convertInitArgs(replicas=replicas, adapter=adapter, kw = Node.convertInitArgs(replicas=replicas, adapter=adapter,
partitions=partitions, reset=clear_databases) partitions=partitions, reset=clear_databases, dedup=dedup)
kw['cluster'] = weak_self = weakref.proxy(self) kw['cluster'] = weak_self = weakref.proxy(self)
kw['getSSL'] = self.SSL kw['getSSL'] = self.SSL
if upstream is not None: if upstream is not None:
......
...@@ -37,8 +37,8 @@ from neo.lib import logging ...@@ -37,8 +37,8 @@ from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID) Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID)
from .. import expectedFailure, unpickle_state, Patch, TransactionalResource from .. import expectedFailure, unpickle_state, Patch, TransactionalResource
from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
RandomConflictDict, ThreadId, with_cluster NEOThreadedTest, RandomConflictDict, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.transactions import Transaction from neo.client.transactions import Transaction
...@@ -198,9 +198,9 @@ class Test(NEOThreadedTest): ...@@ -198,9 +198,9 @@ class Test(NEOThreadedTest):
def testUndoConflictDuringStore(self, cluster): def testUndoConflictDuringStore(self, cluster):
self._testUndoConflict(cluster, 1) self._testUndoConflict(cluster, 1)
@with_cluster() def testStorageDataLock(self, dedup=False):
def testStorageDataLock(self, cluster): with NEOCluster(dedup=dedup) as cluster:
if 1: cluster.start()
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
data_info = {} data_info = {}
...@@ -212,8 +212,6 @@ class Test(NEOThreadedTest): ...@@ -212,8 +212,6 @@ class Test(NEOThreadedTest):
r1 = storage.store(oid, None, data, '', txn) r1 = storage.store(oid, None, data, '', txn)
r2 = storage.tpc_vote(txn) r2 = storage.tpc_vote(txn)
tid = storage.tpc_finish(txn) tid = storage.tpc_finish(txn)
data_info[key] = 0
storage.sync()
txn = [transaction.Transaction() for x in xrange(4)] txn = [transaction.Transaction() for x in xrange(4)]
for t in txn: for t in txn:
...@@ -221,20 +219,20 @@ class Test(NEOThreadedTest): ...@@ -221,20 +219,20 @@ class Test(NEOThreadedTest):
storage.store(oid if tid else storage.new_oid(), storage.store(oid if tid else storage.new_oid(),
tid, data, '', t) tid, data, '', t)
tid = None tid = None
data_info[key] = 4 data_info[key] = 4 if dedup else 1
storage.sync() self.tic()
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn.pop()) storage.tpc_abort(txn.pop())
for t in txn: for t in txn:
storage.tpc_vote(t) storage.tpc_vote(t)
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn[1]) storage.tpc_abort(txn[1])
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
tid1 = storage.tpc_finish(txn[2]) tid1 = storage.tpc_finish(txn[2])
...@@ -243,10 +241,13 @@ class Test(NEOThreadedTest): ...@@ -243,10 +241,13 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn[0]) storage.tpc_abort(txn[0])
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
def testStorageDataLockWithDeduplication(self, dedup=False):
self.testStorageDataLock(True)
@with_cluster() @with_cluster()
def testStorageDataLock2(self, cluster): def testStorageDataLock2(self, cluster):
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
......
...@@ -60,7 +60,7 @@ else: ...@@ -60,7 +60,7 @@ else:
setup( setup(
name = 'neoppod', name = 'neoppod',
version = '1.8', version = '1.8.1',
description = __doc__.strip(), description = __doc__.strip(),
author = 'Nexedi SA', author = 'Nexedi SA',
author_email = 'neo-dev@erp5.org', author_email = 'neo-dev@erp5.org',
...@@ -89,6 +89,9 @@ setup( ...@@ -89,6 +89,9 @@ setup(
'neo = neo.client.zodburi:resolve_uri [client]', 'neo = neo.client.zodburi:resolve_uri [client]',
], ],
}, },
install_requires = [
'python-dateutil', # neolog --from
],
extras_require = extras_require, extras_require = extras_require,
package_data = { package_data = {
'neo.client': [ 'neo.client': [
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment