Commit 854a4920 authored by Julien Muchembled's avatar Julien Muchembled

Fix data corruption due to undetected conflicts after storage failures

Without this new mechanism to detect oids that aren't write-locked,
a transaction could be committed successfully without detecting conflicts.
In the added test, the resulting value was 2, whereas it should be 5 if there
was no node failure.
parent 64826794
...@@ -560,12 +560,30 @@ class Application(ThreadedApplication): ...@@ -560,12 +560,30 @@ class Application(ThreadedApplication):
if status == 1 and uuid not in trans_nodes: if status == 1 and uuid not in trans_nodes:
self._askStorageForWrite(txn_context, uuid, packet) self._askStorageForWrite(txn_context, uuid, packet)
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
# If there are failed nodes, ask the master whether they can be if 2 in involved_nodes.itervalues(): # unlikely
# disconnected while keeping the cluster operational. If possible, # If some writes failed, we must first check whether
# this will happen during tpc_finish. # all oids have been locked by at least one node.
failed = [node.getUUID() failed = {node.getUUID(): node.isRunning()
for node in self.nm.getStorageList() for node in self.nm.getStorageList()
if node.isRunning() and involved_nodes.get(node.getUUID()) == 2] if involved_nodes.get(node.getUUID()) == 2}
if txn_context.lockless_dict:
getCellList = self.pt.getCellList
for offset, uuid_set in txn_context.lockless_dict.iteritems():
for cell in getCellList(offset):
uuid = cell.getUUID()
if not (uuid in failed or uuid in uuid_set):
break
else:
# Very unlikely. Instead of raising, we could recover
# the transaction by doing something similar to
# deadlock avoidance; that would be done before voting.
# But it's not worth the complexity.
raise NEOStorageError(
'partition %s not fully write-locked' % offset)
failed = [uuid for uuid, running in failed.iteritems() if running]
# If there are running nodes for which some writes failed, ask the
# master whether they can be disconnected while keeping the cluster
# operational. If possible, this will happen during tpc_finish.
if failed: if failed:
try: try:
self._askPrimary(Packets.FailedVote(ttid, failed)) self._askPrimary(Packets.FailedVote(ttid, failed))
......
...@@ -19,7 +19,7 @@ from ZODB.TimeStamp import TimeStamp ...@@ -19,7 +19,7 @@ from ZODB.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list from neo.lib.compress import decompress_list
from neo.lib.connection import ConnectionClosed from neo.lib.connection import ConnectionClosed
from neo.lib.protocol import Packets, uuid_str from neo.lib.protocol import Packets, uuid_str, ZERO_TID
from neo.lib.util import dump, makeChecksum from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler from neo.lib.handler import MTEventHandler
...@@ -63,6 +63,9 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -63,6 +63,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerStoreObject(self, conn, conflict, oid): def answerStoreObject(self, conn, conflict, oid):
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
if conflict: if conflict:
if conflict == ZERO_TID:
txn_context.written(self.app, conn.getUUID(), oid, True)
return
# Conflicts can not be resolved now because 'conn' is locked. # Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in # We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that # 'conflict_dict') to avoid any deadlock with another thread that
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
from ZODB.POSException import StorageTransactionError from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue from neo.lib.locking import SimpleQueue
...@@ -35,6 +36,7 @@ class Transaction(object): ...@@ -35,6 +36,7 @@ class Transaction(object):
locking_tid = None locking_tid = None
voted = False voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess ttid = None # XXX: useless, except for testBackupReadOnlyAccess
lockless_dict = None # {partition: {uuid}}
def __init__(self, txn): def __init__(self, txn):
self.queue = SimpleQueue() self.queue = SimpleQueue()
...@@ -79,8 +81,9 @@ class Transaction(object): ...@@ -79,8 +81,9 @@ class Transaction(object):
if status < 0 and self.locking_tid and 'oid' in kw: if status < 0 and self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it. # A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as # Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because # for the other nodes. The condition on kw is to
# we don't need that for transaction metadata. # distinguish whether we're writing an oid or
# transaction metadata.
conn.ask(Packets.AskRebaseTransaction( conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue) self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw) conn.ask(packet, queue=self.queue, **kw)
...@@ -95,7 +98,7 @@ class Transaction(object): ...@@ -95,7 +98,7 @@ class Transaction(object):
raise NEOStorageError( raise NEOStorageError(
'no storage available for write to partition %s' % object_id) 'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid): def written(self, app, uuid, oid, lockless=False):
# When a node is being disconnected by the master because it was # When a node is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a # not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the # positive answer (not to be confused with lockless stores) before the
...@@ -119,6 +122,13 @@ class Transaction(object): ...@@ -119,6 +122,13 @@ class Transaction(object):
# - answer to resolved conflict before the first answer from a # - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master # node that was being disconnected by the master
return return
if lockless:
# It's safe to do this after the above excepts: either the cell is
# already marked as lockless or the node will be reported as failed.
lockless = self.lockless_dict
if not lockless:
lockless = self.lockless_dict = defaultdict(set)
lockless[app.pt.getPartition(oid)].add(uuid)
if uuid_list: if uuid_list:
return return
del self.data_dict[oid] del self.data_dict[oid]
......
...@@ -18,7 +18,7 @@ from neo.lib import logging ...@@ -18,7 +18,7 @@ from neo.lib import logging
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \ from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, ZERO_TID, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler from . import BaseHandler
import time import time
...@@ -90,26 +90,27 @@ class ClientOperationHandler(BaseHandler): ...@@ -90,26 +90,27 @@ class ClientOperationHandler(BaseHandler):
def _askStoreObject(self, conn, oid, serial, compression, checksum, data, def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, request_time): data_serial, ttid, request_time):
try: try:
self.app.tm.storeObject(ttid, serial, oid, compression, locked = self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial) checksum, data, data_serial)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid)) locked = err.tid
return
except NonReadableCell: except NonReadableCell:
logging.info('Ignore store of %s:%s by %s: unassigned partition', logging.info('Ignore store of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid)) dump(oid), dump(serial), dump(ttid))
locked = ZERO_TID
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s', logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid), dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid))) dump(self.app.tm.getLockingTID(oid)))
locked = ZERO_TID
else: else:
if request_time and SLOW_STORE is not None: if request_time and SLOW_STORE is not None:
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration) logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(None)) conn.answer(Packets.AnswerStoreObject(locked))
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid): compression, checksum, data, data_serial, ttid):
...@@ -216,25 +217,26 @@ class ClientOperationHandler(BaseHandler): ...@@ -216,25 +217,26 @@ class ClientOperationHandler(BaseHandler):
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time): def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try: try:
self.app.tm.checkCurrentSerial(ttid, oid, serial) locked = self.app.tm.checkCurrentSerial(ttid, oid, serial)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid)) locked = err.tid
return
except NonReadableCell: except NonReadableCell:
logging.info('Ignore check of %s:%s by %s: unassigned partition', logging.info('Ignore check of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid)) dump(oid), dump(serial), dump(ttid))
locked = ZERO_TID
except NotRegisteredError: except NotRegisteredError:
# transaction was aborted, cancel this event # transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s', logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid), dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid))) dump(self.app.tm.getLockingTID(oid)))
locked = ZERO_TID
else: else:
if request_time and SLOW_STORE is not None: if request_time and SLOW_STORE is not None:
duration = time.time() - request_time duration = time.time() - request_time
if duration > SLOW_STORE: if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration) logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(None)) conn.answer(Packets.AnswerCheckCurrentSerial(locked))
# like ClientOperationHandler but read-only & only for tid <= backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
......
...@@ -19,7 +19,7 @@ from neo.lib import logging ...@@ -19,7 +19,7 @@ from neo.lib import logging
from neo.lib.handler import DelayEvent, EventQueue from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, NonReadableCell, \ from neo.lib.protocol import Packets, ProtocolError, NonReadableCell, \
uuid_str, MAX_TID uuid_str, MAX_TID, ZERO_TID
class ConflictError(Exception): class ConflictError(Exception):
""" """
...@@ -407,7 +407,7 @@ class TransactionManager(EventQueue): ...@@ -407,7 +407,7 @@ class TransactionManager(EventQueue):
assert oid in transaction.serial_dict, transaction assert oid in transaction.serial_dict, transaction
logging.info('Transaction %s checking %s more than once', logging.info('Transaction %s checking %s more than once',
dump(ttid), dump(oid)) dump(ttid), dump(oid))
return return True
if previous_serial is None: if previous_serial is None:
# 2 valid cases: # 2 valid cases:
# - the previous undo resulted in a resolved conflict # - the previous undo resulted in a resolved conflict
...@@ -420,7 +420,7 @@ class TransactionManager(EventQueue): ...@@ -420,7 +420,7 @@ class TransactionManager(EventQueue):
# we are down, and the client would stop writing to us. # we are down, and the client would stop writing to us.
logging.info('Transaction %s storing %s more than once', logging.info('Transaction %s storing %s more than once',
dump(ttid), dump(oid)) dump(ttid), dump(oid))
return return True
elif transaction.locking_tid == MAX_TID: elif transaction.locking_tid == MAX_TID:
# Deadlock avoidance. Still no new locking_tid from the client. # Deadlock avoidance. Still no new locking_tid from the client.
raise DelayEvent(transaction) raise DelayEvent(transaction)
...@@ -452,14 +452,17 @@ class TransactionManager(EventQueue): ...@@ -452,14 +452,17 @@ class TransactionManager(EventQueue):
raise ConflictError(previous_serial) raise ConflictError(previous_serial)
logging.debug('Transaction %s locking %s', dump(ttid), dump(oid)) logging.debug('Transaction %s locking %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid self._store_lock_dict[oid] = ttid
return True
def checkCurrentSerial(self, ttid, oid, serial): def checkCurrentSerial(self, ttid, oid, serial):
try: try:
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
self.lockObject(ttid, serial, oid) locked = self.lockObject(ttid, serial, oid)
transaction.serial_dict[oid] = serial transaction.serial_dict[oid] = serial
if not locked:
return ZERO_TID
def storeObject(self, ttid, serial, oid, compression, checksum, data, def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial): value_serial):
...@@ -470,7 +473,7 @@ class TransactionManager(EventQueue): ...@@ -470,7 +473,7 @@ class TransactionManager(EventQueue):
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
self.lockObject(ttid, serial, oid) locked = self.lockObject(ttid, serial, oid)
transaction.serial_dict[oid] = serial transaction.serial_dict[oid] = serial
# store object # store object
if data is None: if data is None:
...@@ -478,6 +481,8 @@ class TransactionManager(EventQueue): ...@@ -478,6 +481,8 @@ class TransactionManager(EventQueue):
else: else:
data_id = self._app.dm.holdData(checksum, oid, data, compression) data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial) transaction.store(oid, data_id, value_serial)
if not locked:
return ZERO_TID
def rebaseObject(self, ttid, oid): def rebaseObject(self, ttid, oid):
try: try:
......
...@@ -1678,6 +1678,48 @@ class Test(NEOThreadedTest): ...@@ -1678,6 +1678,48 @@ class Test(NEOThreadedTest):
self.assertEqual(cluster.neoctl.getClusterState(), self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING) ClusterStates.RUNNING)
@with_cluster(storage_count=2, replicas=1)
def testPartitionNotFullyWriteLocked(self, cluster):
"""
Make sure all oids are write-locked at least once, which is not
guaranteed by just the storage/master nodes when a readable cell
becomes OUT_OF_DATE during a commit. This scenario is special in that
the other readable cell was only writable at the beginning of the
transaction and the replication finished just before the node failure.
The test uses a conflict to detect lockless writes.
"""
s0, s1 = cluster.storage_list
t, c = cluster.getTransaction()
r = c.root()
x = r[''] = PCounterWithResolution()
t.commit()
s1c, = s1.getConnectionList(cluster.client)
s0.stop()
cluster.join((s0,))
s0.resetNode()
x.value += 2
def vote(_):
f.remove(delay)
self.tic()
s1.stop()
cluster.join((s1,))
TransactionalResource(t, 0, tpc_vote=vote)
with ConnectionFilter() as f, cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db)
c2.root()[''].value += 3
t2.commit()
f.delayAnswerStoreObject(lambda conn: conn is s1c)
delay = f.delayAskFetchTransactions()
s0.start()
self.tic()
self.assertRaisesRegexp(NEOStorageError,
'^partition 0 not fully write-locked$',
t.commit)
cluster.client._cache.clear()
t.begin()
x._p_deactivate()
self.assertEqual(x.value, 3)
@with_cluster() @with_cluster()
def testAbortTransaction(self, cluster): def testAbortTransaction(self, cluster):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment