Commit 32e5274a authored by Julien Muchembled's avatar Julien Muchembled

Fix initialization of storage in backup mode leading to data corruption

parent 9dd5a721
...@@ -25,7 +25,7 @@ except ImportError: ...@@ -25,7 +25,7 @@ except ImportError:
pass pass
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (14, 1) PROTOCOL_VERSION = (15, 1)
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -769,9 +769,8 @@ class ReelectPrimary(Packet): ...@@ -769,9 +769,8 @@ class ReelectPrimary(Packet):
class LastIDs(Packet): class LastIDs(Packet):
""" """
Ask the last OID, the last TID and the last Partition Table ID that Ask the last OID, the last TID and the last Partition Table ID so that
a storage node stores. Used to recover information. PM -> S, S -> PM. a master recover. PM -> S, S -> PM.
Reply to Ask Last IDs. S -> PM, PM -> S.
""" """
_answer = PStruct('answer_last_ids', _answer = PStruct('answer_last_ids',
POID('last_oid'), POID('last_oid'),
...@@ -820,6 +819,9 @@ class StartOperation(Packet): ...@@ -820,6 +819,9 @@ class StartOperation(Packet):
Tell a storage nodes to start an operation. Until a storage node receives Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S. this message, it must not serve client nodes. PM -> S.
""" """
_fmt = PStruct('start_operation',
PBoolean('backup'),
)
class StopOperation(Packet): class StopOperation(Packet):
""" """
......
...@@ -135,7 +135,7 @@ class AdministrationHandler(MasterHandler): ...@@ -135,7 +135,7 @@ class AdministrationHandler(MasterHandler):
for node in app.nm.getStorageList() for node in app.nm.getStorageList()
if node.isPending() and node.getUUID() in uuid_list)) if node.isPending() and node.getUUID() in uuid_list))
if node_list: if node_list:
p = Packets.StartOperation() p = Packets.StartOperation(bool(app.backup_tid))
for node in node_list: for node in node_list:
node.setRunning() node.setRunning()
node.notify(p) node.notify(p)
......
...@@ -32,7 +32,7 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -32,7 +32,7 @@ class StorageServiceHandler(BaseServiceHandler):
app.setStorageNotReady(uuid) app.setStorageNotReady(uuid)
# XXX: what other values could happen ? # XXX: what other values could happen ?
if node.isRunning(): if node.isRunning():
conn.notify(Packets.StartOperation()) conn.notify(Packets.StartOperation(bool(app.backup_tid)))
def nodeLost(self, conn, node): def nodeLost(self, conn, node):
logging.info('storage node lost') logging.info('storage node lost')
......
...@@ -277,7 +277,6 @@ class Application(object): ...@@ -277,7 +277,6 @@ class Application(object):
# ask node list and partition table # ask node list and partition table
self.pt.clear() self.pt.clear()
self.master_conn.ask(Packets.AskLastIDs())
self.master_conn.ask(Packets.AskNodeInformation()) self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable()) self.master_conn.ask(Packets.AskPartitionTable())
while self.master_conn.isPending(): while self.master_conn.isPending():
......
...@@ -46,9 +46,6 @@ class InitializationHandler(BaseMasterHandler): ...@@ -46,9 +46,6 @@ class InitializationHandler(BaseMasterHandler):
app.dm.setPartitionTable(ptid, cell_list) app.dm.setPartitionTable(ptid, cell_list)
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
self.app.dm.setBackupTID(backup_tid)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
# XXX: This is safe to ignore those notifications because all of the # XXX: This is safe to ignore those notifications because all of the
# following applies: # following applies:
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from . import BaseMasterHandler from . import BaseMasterHandler
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import Packets, Errors, INVALID_TID from neo.lib.protocol import Packets, Errors, INVALID_TID, ZERO_TID
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.exception import OperationFailure from neo.lib.exception import OperationFailure
...@@ -48,8 +48,16 @@ class VerificationHandler(BaseMasterHandler): ...@@ -48,8 +48,16 @@ class VerificationHandler(BaseMasterHandler):
app.pt.update(ptid, cell_list, app.nm) app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
def startOperation(self, conn): def startOperation(self, conn, backup):
self.app.operational = True self.app.operational = True
dm = self.app.dm
if backup:
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm.setBackupTID(tid)
def stopOperation(self, conn): def stopOperation(self, conn):
raise OperationFailure('operation stopped') raise OperationFailure('operation stopped')
......
...@@ -21,7 +21,7 @@ from neo.lib.pt import PartitionTable ...@@ -21,7 +21,7 @@ from neo.lib.pt import PartitionTable
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.verification import VerificationHandler from neo.storage.handlers.verification import VerificationHandler
from neo.lib.protocol import CellStates, ErrorCodes from neo.lib.protocol import CellStates, ErrorCodes
from neo.lib.exception import PrimaryFailure, OperationFailure from neo.lib.exception import PrimaryFailure
from neo.lib.util import p64, u64 from neo.lib.util import p64, u64
class StorageVerificationHandlerTests(NeoUnitTestBase): class StorageVerificationHandlerTests(NeoUnitTestBase):
...@@ -95,16 +95,6 @@ class StorageVerificationHandlerTests(NeoUnitTestBase): ...@@ -95,16 +95,6 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
self.assertEqual(calls[0].getParam(0), ptid) self.assertEqual(calls[0].getParam(0), ptid)
self.assertEqual(calls[0].getParam(1), (cell, )) self.assertEqual(calls[0].getParam(1), (cell, ))
def test_11_startOperation(self):
conn = self.getMasterConnection()
self.assertFalse(self.app.operational)
self.verification.startOperation(conn)
self.assertTrue(self.app.operational)
def test_12_stopOperation(self):
conn = self.getMasterConnection()
self.assertRaises(OperationFailure, self.verification.stopOperation, conn)
def test_13_askUnfinishedTransactions(self): def test_13_askUnfinishedTransactions(self):
# client connection with no data # client connection with no data
self.app.dm = Mock({ self.app.dm = Mock({
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment