Commit 08742377 authored by Julien Muchembled's avatar Julien Muchembled

More bugfixes to backup mode

- catch OperationFailure
- reset transaction manager when leaving backup mode
- send appropriate target tid to a storage that updates a outdated cell
- clean up partition table when leaving BACKINGUP state unexpectedly
- make sure all readable cells of a partition have the same 'backup_tid'
  if they have the same data, so that we know when internal replication is
  finished when leaving backup mode
- fix storage not finished internal replication when leaving backup mode
parent 92a828dc
...@@ -23,6 +23,9 @@ RC - Review output of pylint (CODE) ...@@ -23,6 +23,9 @@ RC - Review output of pylint (CODE)
Consider the need to implement a keep-alive system (packets sent Consider the need to implement a keep-alive system (packets sent
automatically when there is no activity on the connection for a period automatically when there is no activity on the connection for a period
of time). of time).
- When all cells are OUT_OF_DATE in backup mode, the one with most data
could become UP_TO_DATE with appropriate backup_tid, so that the cluster
stays operational (FEATURE).
- Finish renaming UUID into NID everywhere (CODE) - Finish renaming UUID into NID everywhere (CODE)
- Consider using multicast for cluster-wide notifications. (BANDWITH) - Consider using multicast for cluster-wide notifications. (BANDWITH)
Currently, multi-receivers notifications are sent in unicast to each Currently, multi-receivers notifications are sent in unicast to each
......
...@@ -26,7 +26,7 @@ except ImportError: ...@@ -26,7 +26,7 @@ except ImportError:
pass pass
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (13, 1) PROTOCOL_VERSION = (14, 1)
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -1497,12 +1497,12 @@ class ReplicationDone(Packet): ...@@ -1497,12 +1497,12 @@ class ReplicationDone(Packet):
class Truncate(Packet): class Truncate(Packet):
""" """
XXX: Used for both make storage consistent and leave backup mode
M -> S M -> S
""" """
_fmt = PStruct('ask_truncate', _fmt = PStruct('truncate',
PTID('tid'), PTID('tid'),
) )
_answer = PFEmpty
StaticRegistry = {} StaticRegistry = {}
...@@ -1723,7 +1723,7 @@ class Packets(dict): ...@@ -1723,7 +1723,7 @@ class Packets(dict):
AddTransaction) AddTransaction)
AddObject = register( AddObject = register(
AddObject) AddObject)
AskTruncate, AnswerTruncate = register( Truncate = register(
Truncate) Truncate)
def Errors(): def Errors():
......
...@@ -279,10 +279,6 @@ class Application(object): ...@@ -279,10 +279,6 @@ class Application(object):
try: try:
while True: while True:
poll(1) poll(1)
except OperationFailure:
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational')
except StateChangedException, e: except StateChangedException, e:
if e.args[0] != ClusterStates.STARTING_BACKUP: if e.args[0] != ClusterStates.STARTING_BACKUP:
raise raise
...@@ -337,13 +333,20 @@ class Application(object): ...@@ -337,13 +333,20 @@ class Application(object):
self.runManager(RecoveryManager) self.runManager(RecoveryManager)
while True: while True:
self.runManager(VerificationManager) self.runManager(VerificationManager)
try:
if self.backup_tid: if self.backup_tid:
if self.backup_app is None: if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup" raise RuntimeError("No upstream cluster to backup"
" defined in configuration") " defined in configuration")
self.backup_app.provideService() self.backup_app.provideService()
else: # Reset connection with storages (and go through a
# recovery phase) when leaving backup mode in order
# to get correct last oid/tid.
self.runManager(RecoveryManager)
continue
self.provideService() self.provideService()
except OperationFailure:
logging.critical('No longer operational')
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation()) node.notify(Packets.StopOperation())
...@@ -463,8 +466,6 @@ class Application(object): ...@@ -463,8 +466,6 @@ class Application(object):
while self.tm.hasPending(): while self.tm.hasPending():
self.em.poll(1) self.em.poll(1)
except OperationFailure: except OperationFailure:
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational') logging.critical('No longer operational')
logging.info("asking remaining nodes to shutdown") logging.info("asking remaining nodes to shutdown")
......
...@@ -21,9 +21,10 @@ from neo.lib import logging ...@@ -21,9 +21,10 @@ from neo.lib import logging
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from neo.lib.connector import getConnectorHandler from neo.lib.connector import getConnectorHandler
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.node import NodeManager from neo.lib.node import NodeManager
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets from neo.lib.protocol import CellStates, ClusterStates, \
from neo.lib.protocol import uuid_str, INVALID_TID, ZERO_TID NodeStates, NodeTypes, Packets, uuid_str, INVALID_TID, ZERO_TID
from neo.lib.util import add64, dump from neo.lib.util import add64, dump
from .app import StateChangedException from .app import StateChangedException
from .pt import PartitionTable from .pt import PartitionTable
...@@ -144,26 +145,32 @@ class BackupApplication(object): ...@@ -144,26 +145,32 @@ class BackupApplication(object):
while pt.getCheckTid(xrange(pt.getPartitions())) < tid: while pt.getCheckTid(xrange(pt.getPartitions())) < tid:
poll(1) poll(1)
last_tid = app.getLastTransaction() last_tid = app.getLastTransaction()
handler = EventHandler(app)
if tid < last_tid: if tid < last_tid:
assert tid != ZERO_TID assert tid != ZERO_TID
logging.warning("Truncating at %s (last_tid was %s)", logging.warning("Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid)) dump(app.backup_tid), dump(last_tid))
p = Packets.AskTruncate(tid) # XXX: We want to go through a recovery phase in order to
connection_list = [] # initialize the transaction manager, but this is only
# possible if storages already know that we left backup
# mode. To that purpose, we always send a Truncate packet,
# even if there's nothing to truncate.
p = Packets.Truncate(tid)
for node in app.nm.getStorageList(only_identified=True): for node in app.nm.getStorageList(only_identified=True):
conn = node.getConnection() conn = node.getConnection()
conn.ask(p) conn.setHandler(handler)
connection_list.append(conn) node.setState(NodeStates.TEMPORARILY_DOWN)
for conn in connection_list: # Packets will be sent at the beginning of the recovery
while conn.isPending(): # phase.
poll(1) conn.notify(p)
app.setLastTransaction(tid) conn.abort()
# If any error happened before reaching this line, we'd go back # If any error happened before reaching this line, we'd go back
# to backup mode, which is the right mode to recover. # to backup mode, which is the right mode to recover.
del app.backup_tid del app.backup_tid
break break
finally: finally:
del self.primary_partition_dict, self.tid_list del self.primary_partition_dict, self.tid_list
pt.clearReplicating()
def nodeLost(self, node): def nodeLost(self, node):
getCellList = self.app.pt.getCellList getCellList = self.app.pt.getCellList
...@@ -205,9 +212,25 @@ class BackupApplication(object): ...@@ -205,9 +212,25 @@ class BackupApplication(object):
node_list = [] node_list = []
for cell in pt.getCellList(offset, readable=True): for cell in pt.getCellList(offset, readable=True):
node = cell.getNode() node = cell.getNode()
assert node.isConnected() assert node.isConnected(), node
assert cell.backup_tid < last_max_tid or \ if cell.backup_tid == prev_tid:
cell.backup_tid == prev_tid # Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates
# up to t3: t0, t1, t2 or t3 - 1
# Choosing the smallest one (t0) is easier to implement
# but when leaving backup mode, we would always lose
# data if the last full transaction does not modify
# all partitions. t1 is wrong for the same reason.
# So we have chosen the highest one (t3 - 1).
# t2 should also work but maybe harder to implement.
cell.backup_tid = add64(tid, -1)
logging.debug(
"partition %u: updating backup_tid of %r to %s",
offset, cell, dump(cell.backup_tid))
else:
assert cell.backup_tid < last_max_tid, (
cell.backup_tid, last_max_tid, prev_tid, tid)
if app.isStorageReady(node.getUUID()): if app.isStorageReady(node.getUUID()):
node_list.append(node) node_list.append(node)
assert node_list assert node_list
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates from neo.lib.protocol import CellStates, ZERO_TID
class BackupHandler(EventHandler): class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state""" """Handler dedicated to upstream master during BACKINGUP state"""
...@@ -39,7 +39,10 @@ class BackupHandler(EventHandler): ...@@ -39,7 +39,10 @@ class BackupHandler(EventHandler):
def answerLastTransaction(self, conn, tid): def answerLastTransaction(self, conn, tid):
app = self.app app = self.app
if tid != ZERO_TID:
app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions()))) app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions())))
else: # upstream DB is empty
assert app.app.getLastTransaction() == tid
def invalidateObjects(self, conn, tid, oid_list): def invalidateObjects(self, conn, tid, oid_list):
app = self.app app = self.app
......
...@@ -48,6 +48,8 @@ class IdentificationHandler(MasterHandler): ...@@ -48,6 +48,8 @@ class IdentificationHandler(MasterHandler):
handler = app.client_service_handler handler = app.client_service_handler
human_readable_node_type = ' client ' human_readable_node_type = ' client '
elif node_type == NodeTypes.STORAGE: elif node_type == NodeTypes.STORAGE:
if app.cluster_state == ClusterStates.STOPPING_BACKUP:
raise NotReadyError
node_ctor = app.nm.createStorage node_ctor = app.nm.createStorage
manager = app._current_manager manager = app._current_manager
if manager is None: if manager is None:
......
...@@ -58,9 +58,13 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -58,9 +58,13 @@ class StorageServiceHandler(BaseServiceHandler):
app.backup_tid)) app.backup_tid))
def askUnfinishedTransactions(self, conn): def askUnfinishedTransactions(self, conn):
tm = self.app.tm app = self.app
pending_list = tm.registerForNotification(conn.getUUID()) if app.backup_tid:
last_tid = tm.getLastTID() last_tid = app.pt.getBackupTid()
pending_list = ()
else:
last_tid = app.tm.getLastTID()
pending_list = app.tm.registerForNotification(conn.getUUID())
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list) p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p) conn.answer(p)
...@@ -99,9 +103,6 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -99,9 +103,6 @@ class StorageServiceHandler(BaseServiceHandler):
logging.debug("%s is up for offset %s", node, offset) logging.debug("%s is up for offset %s", node, offset)
self.app.broadcastPartitionChanges(cell_list) self.app.broadcastPartitionChanges(cell_list)
def answerTruncate(self, conn):
pass
def answerPack(self, conn, status): def answerPack(self, conn, status):
app = self.app app = self.app
if app.packing is not None: if app.packing is not None:
......
...@@ -311,9 +311,18 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -311,9 +311,18 @@ class PartitionTable(neo.lib.pt.PartitionTable):
for cell in row for cell in row
if cell.isReadable()) if cell.isReadable())
def clearReplicating(self):
for row in self.partition_list:
for cell in row:
try:
del cell.replicating
except AttributeError:
pass
def setBackupTidDict(self, backup_tid_dict): def setBackupTidDict(self, backup_tid_dict):
for row in self.partition_list: for row in self.partition_list:
for cell in row: for cell in row:
if cell.isReadable():
cell.backup_tid = backup_tid_dict.get(cell.getUUID(), cell.backup_tid = backup_tid_dict.get(cell.getUUID(),
ZERO_TID) ZERO_TID)
......
...@@ -312,6 +312,9 @@ class Application(object): ...@@ -312,6 +312,9 @@ class Application(object):
_poll() _poll()
finally: finally:
del self.task_queue del self.task_queue
# XXX: Although no handled exception should happen between
# replicator.populate() and the beginning of this 'try'
# clause, the replicator should be reset in a safer place.
self.replicator = Replicator(self) self.replicator = Replicator(self)
# Abort any replication, whether we are feeding or out-of-date. # Abort any replication, whether we are feeding or out-of-date.
for node in self.nm.getStorageList(only_identified=True): for node in self.nm.getStorageList(only_identified=True):
......
...@@ -436,7 +436,7 @@ class DatabaseManager(object): ...@@ -436,7 +436,7 @@ class DatabaseManager(object):
def truncate(self, tid): def truncate(self, tid):
assert tid not in (None, ZERO_TID), tid assert tid not in (None, ZERO_TID), tid
assert self.getBackupTID() assert self.getBackupTID()
self.setBackupTID(tid) self.setBackupTID(None) # XXX
for partition in xrange(self.getNumPartitions()): for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid) self._deleteRange(partition, tid)
self.commit() self.commit()
......
...@@ -68,10 +68,10 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -68,10 +68,10 @@ class MasterOperationHandler(BaseMasterHandler):
dict((p, a and (a, upstream_name)) dict((p, a and (a, upstream_name))
for p, a in source_dict.iteritems())) for p, a in source_dict.iteritems()))
def askTruncate(self, conn, tid): def truncate(self, conn, tid):
self.app.replicator.cancel() self.app.replicator.cancel()
self.app.dm.truncate(tid) self.app.dm.truncate(tid)
conn.answer(Packets.AnswerTruncate()) conn.close()
def checkPartition(self, conn, *args): def checkPartition(self, conn, *args):
self.app.checker(*args) self.app.checker(*args)
...@@ -354,16 +354,17 @@ class Replicator(object): ...@@ -354,16 +354,17 @@ class Replicator(object):
self.getCurrentConnection().close() self.getCurrentConnection().close()
def stop(self): def stop(self):
d = None, None # Close any open connection to an upstream storage,
# possibly aborting current replication.
node = self.current_node
if node is not None is node.getUUID():
self.cancel()
# Cancel all replication orders from upstream cluster. # Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys(): for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, d) addr, name = self.source_dict.get(offset, (None, None))
if name: if name:
tid = self.replicate_dict.pop(offset) tid = self.replicate_dict.pop(offset)
logging.info('cancel replication of partition %u from %r' logging.info('cancel replication of partition %u from %r'
' up to %s', offset, addr, dump(tid)) ' up to %s', offset, addr, dump(tid))
# Close any open connection to an upstream storage, # Make UP_TO_DATE cells really UP_TO_DATE
# possibly aborting current replication. self._nextPartition()
node = self.current_node
if node is not None is node.getUUID():
self.cancel()
...@@ -20,6 +20,7 @@ import time ...@@ -20,6 +20,7 @@ import time
import threading import threading
import transaction import transaction
import unittest import unittest
from collections import defaultdict
from functools import wraps from functools import wraps
from neo.lib import logging from neo.lib import logging
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
...@@ -200,6 +201,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -200,6 +201,10 @@ class ReplicationTests(NEOThreadedTest):
try: try:
upstream.start() upstream.start()
importZODB = upstream.importZODB(random=random) importZODB = upstream.importZODB(random=random)
# Do not start with an empty DB so that 'primary_dict' below is not
# empty on the first iteration.
importZODB(1)
upstream.client.setPoll(0)
backup = NEOCluster(partitions=np, replicas=2, storage_count=4, backup = NEOCluster(partitions=np, replicas=2, storage_count=4,
upstream=upstream) upstream=upstream)
try: try:
...@@ -214,11 +219,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -214,11 +219,10 @@ class ReplicationTests(NEOThreadedTest):
p = Patch(upstream.master.tm, p = Patch(upstream.master.tm,
_on_commit=onTransactionCommitted) _on_commit=onTransactionCommitted)
else: else:
primary_dict = {} primary_dict = defaultdict(list)
for k, v in sorted(backup.master.backup_app for k, v in sorted(backup.master.backup_app
.primary_partition_dict.iteritems()): .primary_partition_dict.iteritems()):
primary_dict.setdefault(storage_list.index(v._uuid), primary_dict[storage_list.index(v._uuid)].append(k)
[]).append(k)
if event % 2: if event % 2:
storage = slave(primary_dict).pop() storage = slave(primary_dict).pop()
else: else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment