Commit 75a71514 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge remote-tracking branch 'origin/master' into t

* origin/master:
  debug: add helper to run code outside the signal handler
  Preserve 'packed' flag on import/iteration
  fixup! storage: speed up replication by not getting object next_serial for nothing
  client: fix accounting of cache size
  doc: comments, fixups
parents 64ede21c 875fc1b9
......@@ -42,7 +42,7 @@ Other changes:
- Proper handling of incoming packets for closed/aborted connections.
- An exception while processing an answer could leave the handler switcher
in the bad state.
- In STOPPING cluster state, really wait for all transaction to be finished.
- In STOPPING cluster state, really wait for all transactions to be finished.
- Several issues when undoing transactions with conflict resolutions
have been fixed.
- Delayed connection acceptation when the storage node is ready.
......
......@@ -791,7 +791,6 @@ class Application(ThreadedApplication):
txn_info[k] = v
def _getTransactionInformation(self, tid):
packet = Packets.AskTransactionInformation(tid)
return self._askStorageForRead(tid,
Packets.AskTransactionInformation(tid))
......
......@@ -135,13 +135,16 @@ class ClientCache(object):
if level:
item.expire = self._time + self._life_time
else:
self._size -= len(item.data)
item.data = None
self._empty(item)
self._history_size += 1
if self._max_history_size < self._history_size:
self._remove(head)
self._remove_from_oid_dict(head)
def _empty(self, item):
self._size -= len(item.data)
item.data = None
def _remove(self, item):
level = item.level
if level is not None:
......@@ -176,6 +179,7 @@ class ClientCache(object):
if head.level or head.counter:
self._add(head)
else:
self._empty(head)
self._remove_from_oid_dict(head)
break
......@@ -260,6 +264,7 @@ class ClientCache(object):
head.level = 0
self._add(head)
else:
self._empty(head)
self._remove_from_oid_dict(head)
if self._size <= max_size:
return
......@@ -281,6 +286,8 @@ class ClientCache(object):
for oid, item_list in self._oid_dict.items():
item = item_list[-1]
if item.next_tid is None:
if item.level:
self._empty(item)
self._remove(item)
del item_list[-1]
# We don't preserve statistics of removed items. This could be
......@@ -307,10 +314,13 @@ def test(self):
cache.store(1, *data)
self.assertEqual(cache.load(1, None), data)
cache.clear_current()
self.assertEqual(cache._size, 1)
self.assertEqual(cache.load(1, None), None)
cache.store(1, *data)
cache.invalidate(1, 20)
self.assertEqual(cache._size, 3)
cache.clear_current()
self.assertEqual(cache._size, 3)
self.assertEqual(cache.load(1, 20), ('15', 15, 20))
cache.store(1, '10', 10, 15)
cache.store(1, '20', 20, 21)
......@@ -322,8 +332,10 @@ def test(self):
cache.clear()
cache.store(1, '10*', 10, None)
cache._max_size = cache._size
cache.store(2, '10', 10, None)
cache.store(2, '10', 10, 15)
self.assertEqual(cache._queue_list[0].oid, 1)
cache.store(2, '15', 15, None)
self.assertEqual(cache._queue_list[2].oid, 2)
data = '10', 10, 15
cache.store(1, *data)
self.assertEqual(cache.load(1, 15), data)
......
......@@ -35,7 +35,8 @@ class Transaction(BaseStorage.TransactionRecord):
""" Transaction object yielded by the NEO iterator """
def __init__(self, app, txn):
super(Transaction, self).__init__(txn['id'], ' ',
super(Transaction, self).__init__(txn['id'],
'p' if txn['packed'] else ' ',
txn['user_name'], txn['description'], txn['ext'])
self.app = app
self.oid_list = txn['oids']
......
......@@ -9,6 +9,32 @@ The prompt is accessible through network in case that the process is daemonized:
<neo.master.app.Application object at 0x1fc9750>
"""
import sys
def app_set():
try:
return sys.modules['neo.lib.threaded_app'].app_set
except KeyError:
f = sys._getframe(4)
try:
while f.f_code.co_name != 'run' or \
f.f_locals.get('self').__class__.__name__ != 'Application':
f = f.f_back
return f.f_locals['self'],
except AttributeError:
return ()
def defer(task):
def wrapper():
from traceback import print_exc
try:
task(app)
except:
print_exc()
for app in app_set():
app.em.wakeup(wrapper)
break
IF = 'pdb'
if IF == 'pdb':
# List of (module, callables) to break at.
......@@ -19,7 +45,7 @@ if IF == 'pdb':
#('ZPublisher.Publish', 'publish_module_standard'),
)
import errno, socket, sys, threading, weakref
import errno, socket, threading, weakref
# Unfortunately, IPython does not always print to given stdout.
#from neo.lib.debug import getPdb
from pdb import Pdb as getPdb
......@@ -82,19 +108,7 @@ if IF == 'pdb':
getPdb(stdin=_socket, stdout=_socket).set_trace()
app # this is Application instance (see 'app_set' if there are several)
try:
app_set = sys.modules['neo.lib.threaded_app'].app_set
except KeyError:
f = sys._getframe(3)
try:
while f.f_code.co_name != 'run' or \
f.f_locals.get('self').__class__.__name__ != 'Application':
f = f.f_back
app_set = f.f_locals['self'],
except AttributeError:
app_set = ()
finally:
del f
app_set = app_set()
class setupBreakPoints(list):
......@@ -144,7 +158,7 @@ if IF == 'pdb':
threading.Thread(target=pdb).start()
elif IF == 'frames':
import sys, traceback
import traceback
write = sys.stderr.write
for thread_id, frame in sys._current_frames().iteritems():
write("Thread %s:\n" % thread_id)
......
......@@ -31,8 +31,9 @@ parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data'
' when setting up a new storage node')
help = 'enable deduplication of data when setting'
' up a new storage node (for RocksDB, check'
' https://github.com/facebook/mysql-5.6/issues/702)')
parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current'
......
......@@ -314,6 +314,7 @@ class ImporterDatabaseManager(DatabaseManager):
def commit(self):
self.db.commit()
# XXX: This misses commits done internally by self.db (lockTransaction).
self._last_commit = time.time()
def close(self):
......@@ -364,7 +365,9 @@ class ImporterDatabaseManager(DatabaseManager):
self.storeTransaction(tid, object_list, (
(x[0] for x in object_list),
str(txn.user), str(txn.description),
cPickle.dumps(txn.extension), False, tid), False)
cPickle.dumps(txn.extension),
txn.status == 'p', tid),
False)
self.releaseData(data_id_list)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn.user, txn.description, len(object_list))
......
......@@ -401,9 +401,20 @@ class DatabaseManager(object):
Identifier of object to retrieve.
tid (int, None)
Exact serial to retrieve.
before_tid (packed, None)
before_tid (int, None)
Serial to retrieve is the highest existing one strictly below this
value.
Return value:
None: oid doesn't exist at requested tid/before_tid (getObject
takes care of checking if the oid exists at other serial)
6-tuple: Record content.
- record serial (int)
- serial or next record modifying object (int, None)
- compression (boolean-ish, None)
- checksum (binary string, None)
- data (binary string, None)
- data_serial (int, None)
"""
@requires(_getObject)
......@@ -424,7 +435,7 @@ class DatabaseManager(object):
- record serial (packed)
- serial or next record modifying object (packed, None)
- compression (boolean-ish, None)
- checksum (integer, None)
- checksum (binary string, None)
- data (binary string, None)
- data_serial (packed, None)
"""
......@@ -443,11 +454,19 @@ class DatabaseManager(object):
@fallback
def _fetchObject(self, oid, tid):
"""Specialized version of _getObject, for replication"""
r = self._getObject(oid, tid)
if r:
return r[:1] + r[2:]
return r[:1] + r[2:] # remove next_serial
def fetchObject(self, oid, tid):
"""
Specialized version of getObject, for replication:
- the oid can only be at an exact serial (parameter 'tid')
- next_serial is not part of the result
- if there's no result for the requested serial,
no need check if oid exists at other serial
"""
u64 = util.u64
r = self._fetchObject(u64(oid), u64(tid))
if r:
......
......@@ -143,6 +143,10 @@ class MySQLDatabaseManager(DatabaseManager):
break
except OperationalError as m:
code, m = m.args
# IDEA: Is it safe to retry in case of DISK_FULL ?
# XXX: However, this would another case of failure that would
# be unnoticed by other nodes (ADMIN & MASTER). When
# there are replicas, it may be preferred to not retry.
if self._active or SERVER_GONE_ERROR != code != SERVER_LOST \
or not retry:
raise DatabaseFailure('MySQL error %d: %s\nQuery: %s'
......@@ -705,7 +709,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _fetchObject(self, oid, tid):
r = self.query(
'SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(`partition`)'
' FROM obj FORCE INDEX(PRIMARY)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d AND tid = %d'
% (self._getReadablePartition(oid), oid, tid))
......
......@@ -140,6 +140,9 @@ class StorageOperationHandler(EventHandler):
# Server (all methods must set connection as server so that it isn't closed
# if client tasks are finished)
#
# These are all low-priority packets, in that we don't want to delay
# answers to clients, so tasks are used to postpone work when we're idle.
def getEventQueue(self):
return self.app.tm.read_queue
......@@ -157,6 +160,9 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed):
pass
# Splitting this task would cause useless overhead. However, a
# generator function is expected, hence the following fake yield
# so that iteration stops immediately.
return; yield
app.newTask(check())
......@@ -173,7 +179,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id) # NOTE msg_id: out-of-order answer
except (weakref.ReferenceError, ConnectionClosed):
pass
return; yield
return; yield # same as in askCheckTIDRange
app.newTask(check())
@checkFeedingConnection(check=False)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment