Commit fc514154 authored by Vincent Pelletier's avatar Vincent Pelletier

Allow multiple conflict resolutions for a single object and transaction.

As we don't (and must not, otherwise it would deadlock) take a write lock
on a storage node when detecting a conflict, it is possible that multiple
conflicts get successively reported for a single object in a single
transaction. To solve this, client must tolerate multiple conflict for a
single object. The last resolved conflict being sent to all storage nodes,
transaction data will be consistent at tpc_finish.

- conflict_serial_dict and resolved_conflict_serial_dict become
  multivalued (sets instead of scalar)
- each resolution only resolve conflict with highest serial if greater
  than previous resolutions for the same object
  NOTE: this assumes that ("r" being conflict resolution method, "Tn"
  being the TID on which object is based, and "Cn" being a conflict with
  transaction n) when r(T0, C1) generates T1 but triggers C2,
    r(T0, C1) + r(T1, C2) == r(T0, C2)

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2108 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a31e23d7
...@@ -609,7 +609,14 @@ class Application(object): ...@@ -609,7 +609,14 @@ class Application(object):
object_serial_dict = local_var.object_serial_dict object_serial_dict = local_var.object_serial_dict
conflict_serial_dict = local_var.conflict_serial_dict conflict_serial_dict = local_var.conflict_serial_dict
resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
for oid, conflict_serial in conflict_serial_dict.items(): for oid, conflict_serial_set in conflict_serial_dict.items():
resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set())
conflict_serial = max(conflict_serial_set)
if resolved_serial_set and conflict_serial <= max(resolved_serial_set):
# A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_dict.pop(oid))
continue
serial, version = object_serial_dict[oid] serial, version = object_serial_dict[oid]
data = data_dict[oid] data = data_dict[oid]
tid = local_var.tid tid = local_var.tid
...@@ -621,8 +628,7 @@ class Application(object): ...@@ -621,8 +628,7 @@ class Application(object):
logging.info('Conflict resolution succeed for %r:%r with %r', logging.info('Conflict resolution succeed for %r:%r with %r',
dump(oid), dump(serial), dump(conflict_serial)) dump(oid), dump(serial), dump(conflict_serial))
# Mark this conflict as resolved # Mark this conflict as resolved
resolved_conflict_serial_dict[oid] = \ resolved_serial_set.update(conflict_serial_dict.pop(oid))
conflict_serial_dict.pop(oid)
# Try to store again # Try to store again
self.store(oid, conflict_serial, new_data, version, self.store(oid, conflict_serial, new_data, version,
local_var.txn) local_var.txn)
......
...@@ -75,24 +75,14 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -75,24 +75,14 @@ class StorageAnswersHandler(AnswerBaseHandler):
logging.info('%r report a conflict for %r with %r', conn, logging.info('%r report a conflict for %r with %r', conn,
dump(oid), dump(serial)) dump(oid), dump(serial))
conflict_serial_dict = local_var.conflict_serial_dict conflict_serial_dict = local_var.conflict_serial_dict
pending_serial = conflict_serial_dict.get(oid)
resolved_serial = local_var.resolved_conflict_serial_dict.get(oid)
if pending_serial not in (None, serial) or \
resolved_serial not in (None, serial):
raise NEOStorageError, 'Multiple conflicts for a single ' \
'object (%s) in a single store: %s, %s, %s' % (
dump(oid), dump(pending_serial),
dump(resolved_serial), dump(serial))
if serial in object_stored_counter_dict: if serial in object_stored_counter_dict:
raise NEOStorageError, 'A storage accepted object for ' \ raise NEOStorageError, 'A storage accepted object for ' \
'serial %s but another reports a conflict for it.' % ( 'serial %s but another reports a conflict for it.' % (
dump(serial), ) dump(serial), )
# If this conflict is not already resolved, mark it for # If this conflict is not already resolved, mark it for
# resolution. # resolution.
if resolved_serial is None: if serial not in local_var.resolved_conflict_serial_dict.get(oid, ()):
# Note: we might overwrite an entry, but above test protects conflict_serial_dict.setdefault(oid, set()).add(serial)
# against overwriting a different value.
conflict_serial_dict[oid] = serial
else: else:
object_stored_counter_dict[serial] = \ object_stored_counter_dict[serial] = \
object_stored_counter_dict.get(serial, 0) + 1 object_stored_counter_dict.get(serial, 0) + 1
......
...@@ -462,7 +462,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -462,7 +462,7 @@ class ClientApplicationTests(NeoTestBase):
self.assertRaises(ConflictError, app.waitStoreResponses, self.assertRaises(ConflictError, app.waitStoreResponses,
failing_tryToResolveConflict) failing_tryToResolveConflict)
self.assertTrue(oid not in app.local_var.data_dict) self.assertTrue(oid not in app.local_var.data_dict)
self.assertEquals(app.local_var.conflict_serial_dict[oid], tid) self.assertEquals(app.local_var.conflict_serial_dict[oid], set([tid, ]))
self.assertEquals(app.local_var.object_stored_counter_dict[oid], {}) self.assertEquals(app.local_var.object_stored_counter_dict[oid], {})
self.checkAskStoreObject(conn) self.checkAskStoreObject(conn)
......
...@@ -97,7 +97,7 @@ class StorageAnswerHandlerTests(NeoTestBase): ...@@ -97,7 +97,7 @@ class StorageAnswerHandlerTests(NeoTestBase):
local_var.conflict_serial_dict = {} local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {} local_var.resolved_conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 1, oid, tid) self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertEqual(local_var.conflict_serial_dict[oid], tid) self.assertEqual(local_var.conflict_serial_dict[oid], set([tid, ]))
self.assertEqual(local_var.object_stored_counter_dict[oid], {}) self.assertEqual(local_var.object_stored_counter_dict[oid], {})
self.assertFalse(oid in local_var.resolved_conflict_serial_dict) self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
# object was already accepted by another storage, raise # object was already accepted by another storage, raise
...@@ -115,24 +115,23 @@ class StorageAnswerHandlerTests(NeoTestBase): ...@@ -115,24 +115,23 @@ class StorageAnswerHandlerTests(NeoTestBase):
# resolution-pending conflict # resolution-pending conflict
local_var = self.app.local_var local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: {}} local_var.object_stored_counter_dict = {oid: {}}
local_var.conflict_serial_dict = {oid: tid} local_var.conflict_serial_dict = {oid: set([tid, ])}
local_var.resolved_conflict_serial_dict = {} local_var.resolved_conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 1, oid, tid) self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertEqual(local_var.conflict_serial_dict[oid], tid) self.assertEqual(local_var.conflict_serial_dict[oid], set([tid, ]))
self.assertFalse(oid in local_var.resolved_conflict_serial_dict) self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
self.assertEqual(local_var.object_stored_counter_dict[oid], {}) self.assertEqual(local_var.object_stored_counter_dict[oid], {})
# object was already accepted by another storage, raise # object was already accepted by another storage, raise
local_var.object_stored_counter_dict = {oid: {tid: 1}} local_var.object_stored_counter_dict = {oid: {tid: 1}}
local_var.conflict_serial_dict = {oid: tid} local_var.conflict_serial_dict = {oid: set([tid, ])}
local_var.resolved_conflict_serial_dict = {} local_var.resolved_conflict_serial_dict = {}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject, self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
conn, 1, oid, tid) conn, 1, oid, tid)
# detected conflict is different, raise # detected conflict is different, don't raise
local_var.object_stored_counter_dict = {oid: {}} local_var.object_stored_counter_dict = {oid: {}}
local_var.conflict_serial_dict = {oid: tid} local_var.conflict_serial_dict = {oid: set([tid, ])}
local_var.resolved_conflict_serial_dict = {} local_var.resolved_conflict_serial_dict = {}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject, self.handler.answerStoreObject(conn, 1, oid, tid_2)
conn, 1, oid, tid_2)
def test_answerStoreObject_3(self): def test_answerStoreObject_3(self):
conn = self.getConnection() conn = self.getConnection()
...@@ -146,17 +145,17 @@ class StorageAnswerHandlerTests(NeoTestBase): ...@@ -146,17 +145,17 @@ class StorageAnswerHandlerTests(NeoTestBase):
local_var = self.app.local_var local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: {tid_2: 1}} local_var.object_stored_counter_dict = {oid: {tid_2: 1}}
local_var.conflict_serial_dict = {} local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {oid: tid} local_var.resolved_conflict_serial_dict = {oid: set([tid, ])}
self.handler.answerStoreObject(conn, 1, oid, tid) self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertFalse(oid in local_var.conflict_serial_dict) self.assertFalse(oid in local_var.conflict_serial_dict)
self.assertEqual(local_var.resolved_conflict_serial_dict[oid], tid) self.assertEqual(local_var.resolved_conflict_serial_dict[oid],
set([tid, ]))
self.assertEqual(local_var.object_stored_counter_dict[oid], {tid_2: 1}) self.assertEqual(local_var.object_stored_counter_dict[oid], {tid_2: 1})
# detected conflict is different, raise # detected conflict is different, don't raise
local_var.object_stored_counter_dict = {oid: {tid: 1}} local_var.object_stored_counter_dict = {oid: {tid: 1}}
local_var.conflict_serial_dict = {} local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {oid: tid} local_var.resolved_conflict_serial_dict = {oid: set([tid, ])}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject, self.handler.answerStoreObject(conn, 1, oid, tid_2)
conn, 1, oid, tid_2)
def test_answerStoreObject_4(self): def test_answerStoreObject_4(self):
conn = self.getConnection() conn = self.getConnection()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment