diff --git a/neo/admin/handler.py b/neo/admin/handler.py index 063d9d7c17711344ce1fa7f9d60965970b0b2b59..99f4893f03e3cd8edfe5b08b6f03b3892bb16782 100644 --- a/neo/admin/handler.py +++ b/neo/admin/handler.py @@ -27,12 +27,14 @@ class AdminEventHandler(EventHandler): """This class deals with events for administrating cluster.""" def askPartitionList(self, conn, packet, min_offset, max_offset, uuid): - logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid))) + logging.info("ask partition list from %s to %s for %s" % + (min_offset, max_offset, dump(uuid))) app = self.app # check we have one pt otherwise ask it to PMN if app.pt is None: if self.app.master_conn is None: - raise protocol.NotReadyError('Not connected to a primary master.') + raise protocol.NotReadyError('Not connected to a primary ' \ + 'master.') p = Packets.AskPartitionTable([]) msg_id = self.app.master_conn.ask(p) app.dispatcher.register(msg_id, conn, @@ -41,7 +43,8 @@ class AdminEventHandler(EventHandler): 'uuid' : uuid, 'msg_id' : packet.getId()}) else: - app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId()) + app.sendPartitionTable(conn, min_offset, max_offset, uuid, + packet.getId()) def askNodeList(self, conn, packet, node_type): @@ -89,10 +92,12 @@ class AdminEventHandler(EventHandler): def askClusterState(self, conn, packet): if self.app.cluster_state is None: if self.app.master_conn is None: - raise protocol.NotReadyError('Not connected to a primary master.') + raise protocol.NotReadyError('Not connected to a primary ' \ + 'master.') # required it from PMN first msg_id = self.app.master_conn.ask(Packets.AskClusterState()) - self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) + self.app.dispatcher.register(msg_id, conn, + {'msg_id' : packet.getId()}) else: conn.answer(Packets.AnswerClusterState(self.app.cluster_state), packet.getId()) diff --git a/neo/client/Storage.py b/neo/client/Storage.py index dd7597ad77cd3ff8e1bc155dc4234629022043e2..45a3fd0015001844098cdea4cde0b4e2c389f6c5 100644 --- a/neo/client/Storage.py +++ b/neo/client/Storage.py @@ -18,7 +18,8 @@ from ZODB import BaseStorage, ConflictResolution, POSException from neo.client.app import Application -from neo.client.exception import NEOStorageConflictError, NEOStorageNotFoundError +from neo.client.exception import NEOStorageConflictError, \ + NEOStorageNotFoundError class Storage(BaseStorage.BaseStorage, ConflictResolution.ConflictResolvingStorage): @@ -62,7 +63,8 @@ class Storage(BaseStorage.BaseStorage, def tpc_begin(self, transaction, tid=None, status=' '): if self._is_read_only: raise POSException.ReadOnlyError() - return self.app.tpc_begin(transaction=transaction, tid=tid, status=status) + return self.app.tpc_begin(transaction=transaction, tid=tid, + status=status) def tpc_vote(self, transaction): if self._is_read_only: diff --git a/neo/client/app.py b/neo/client/app.py index 998a65c69727f0889b889b402f94b53ad8cd5c72..2726931f07056cce8503006cfeddaf222dd471f9 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -94,7 +94,8 @@ class ConnectionPool(object): conn.unlock() try: - app._waitMessage(conn, msg_id, handler=app.storage_bootstrap_handler) + app._waitMessage(conn, msg_id, + handler=app.storage_bootstrap_handler) except ConnectionClosed: logging.error('Connection to storage node %s failed', node) return None @@ -116,8 +117,8 @@ class ConnectionPool(object): not self.app.dispatcher.registered(conn): del self.connection_dict[conn.getUUID()] conn.close() - logging.debug('_dropConnections : connection to storage node %s:%d closed', - *(conn.getAddress())) + logging.debug('_dropConnections : connection to storage ' \ + 'node %s:%d closed', *(conn.getAddress())) if len(self.connection_dict) <= self.max_pool_size: break finally: @@ -263,7 +264,8 @@ class Application(object): self.mq_cache = MQ() self.new_oid_list = [] self.last_oid = '\0' * 8 - self.storage_event_handler = storage.StorageEventHandler(self, self.dispatcher) + self.storage_event_handler = storage.StorageEventHandler(self, self. + dispatcher) self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self) self.storage_handler = storage.StorageAnswersHandler(self) self.primary_handler = master.PrimaryAnswersHandler(self) @@ -406,28 +408,30 @@ class Application(object): self.trying_master_node = master_list[0] index += 1 # Connect to master - conn = MTClientConnection(self.em, self.notifications_handler, - addr=self.trying_master_node.getAddress(), - connector_handler=self.connector_handler, - dispatcher=self.dispatcher) + conn = MTClientConnection(self.em, self.notifications_handler, + addr=self.trying_master_node.getAddress(), + connector_handler=self.connector_handler, + dispatcher=self.dispatcher) # Query for primary master node conn.lock() try: if conn.getConnector() is None: - # This happens, if a connection could not be established. + # This happens if a connection could not be established. logging.error('Connection to master node %s failed', self.trying_master_node) continue - msg_id = conn.ask(self.local_var.queue, Packets.AskPrimary()) + msg_id = conn.ask(self.local_var.queue, + Packets.AskPrimary()) finally: conn.unlock() try: - self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) + self._waitMessage(conn, msg_id, + handler=self.primary_bootstrap_handler) except ConnectionClosed: continue # If we reached the primary master node, mark as connected - connected = self.primary_master_node is not None \ - and self.primary_master_node is self.trying_master_node + connected = self.primary_master_node is not None and \ + self.primary_master_node is self.trying_master_node logging.info('connected to a primary master node') # Identify to primary master and request initial data @@ -445,7 +449,8 @@ class Application(object): finally: conn.unlock() try: - self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) + self._waitMessage(conn, msg_id, + handler=self.primary_bootstrap_handler) except ConnectionClosed: self.primary_master_node = None break @@ -468,17 +473,20 @@ class Application(object): Packets.AskNodeInformation()) finally: conn.unlock() - self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) + self._waitMessage(conn, msg_id, + handler=self.primary_bootstrap_handler) conn.lock() try: msg_id = conn.ask(self.local_var.queue, Packets.AskPartitionTable([])) finally: conn.unlock() - self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) + self._waitMessage(conn, msg_id, + handler=self.primary_bootstrap_handler) ready = self.uuid is not None and self.pt is not None \ and self.pt.operational() - logging.info("connected to primary master node %s" % self.primary_master_node) + logging.info("connected to primary master node %s" % + self.primary_master_node) return conn def registerDB(self, db, limit): @@ -530,7 +538,8 @@ class Application(object): cell_list = self._getCellListForOID(oid, readable=True) if len(cell_list) == 0: # No cells available, so why are we running ? - logging.error('oid %s not found because no storage is available for it', dump(oid)) + logging.error('oid %s not found because no storage is ' \ + 'available for it', dump(oid)) raise NEOStorageNotFoundError() shuffle(cell_list) @@ -571,8 +580,10 @@ class Application(object): break if self.local_var.asked_object == 0: - # We didn't got any object from all storage node because of connection error - logging.warning('oid %s not found because of connection failure', dump(oid)) + # We didn't got any object from all storage node because of + # connection error + logging.warning('oid %s not found because of connection failure', + dump(oid)) raise NEOStorageNotFoundError() if self.local_var.asked_object == -1: @@ -685,12 +696,14 @@ class Application(object): if self.local_var.data_dict.has_key(oid): # One storage already accept the object, is it normal ?? # remove from dict and raise ConflictError, don't care of - # previous node which already store data as it would be resent - # again if conflict is resolved or txn will be aborted + # previous node which already store data as it would be + # resent again if conflict is resolved or txn will be + # aborted del self.local_var.data_dict[oid] self.local_var.conflict_serial = self.local_var.object_stored[1] raise NEOStorageConflictError - # increase counter so that we know if a node has stored the object or not + # increase counter so that we know if a node has stored the object + # or not self.local_var.object_stored_counter += 1 if self.local_var.object_stored_counter == 0: @@ -716,7 +729,8 @@ class Application(object): cell_list = self._getCellListForTID(self.local_var.tid, writable=True) self.local_var.voted_counter = 0 for cell in cell_list: - logging.debug("voting object %s %s" %(cell.getAddress(), cell.getState())) + logging.debug("voting object %s %s" %(cell.getAddress(), + cell.getState())) conn = self.cp.getConnForCell(cell) if conn is None: continue @@ -747,7 +761,8 @@ class Application(object): for oid in self.local_var.data_dict.iterkeys(): cell_set |= set(self._getCellListForOID(oid, writable=True)) # select nodes where transaction was stored - cell_set |= set(self._getCellListForTID(self.local_var.tid, writable=True)) + cell_set |= set(self._getCellListForTID(self.local_var.tid, + writable=True)) # cancel transaction one all those nodes for cell in cell_set: @@ -814,7 +829,8 @@ class Application(object): self.local_var.txn_info = 0 try: - self._askStorage(conn, Packets.AskTransactionInformation(transaction_id)) + self._askStorage(conn, Packets.AskTransactionInformation( + transaction_id)) except ConnectionClosed: continue @@ -853,12 +869,13 @@ class Application(object): self.store(oid, transaction_id, data, None, txn) except NEOStorageConflictError, serial: if serial <= self.local_var.tid: - new_data = wrapper.tryToResolveConflict(oid, self.local_var.tid, - serial, data) + new_data = wrapper.tryToResolveConflict(oid, + self.local_var.tid, serial, data) if new_data is not None: self.store(oid, self.local_var.tid, new_data, None, txn) continue - raise ConflictError(oid = oid, serials = (self.local_var.tid, serial), + raise ConflictError(oid = oid, serials = (self.local_var.tid, + serial), data = data) return self.local_var.tid, oid_list @@ -880,8 +897,8 @@ class Application(object): continue try: - conn.ask(self.local_var.queue, - Packets.AskTIDs(first, last, protocol.INVALID_PARTITION)) + conn.ask(self.local_var.queue, Packets.AskTIDs(first, last, + protocol.INVALID_PARTITION)) finally: conn.unlock() @@ -912,7 +929,8 @@ class Application(object): if conn is not None: self.local_var.txn_info = 0 try: - self._askStorage(conn, Packets.AskTransactionInformation(tid)) + self._askStorage(conn, + Packets.AskTransactionInformation(tid)) except ConnectionClosed: continue if isinstance(self.local_var.txn_info, dict): @@ -933,7 +951,8 @@ class Application(object): # Check we return at least one element, otherwise call # again but extend offset if len(undo_info) == 0 and not block: - undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1) + undo_info = self.undoLog(first=first, last=last*5, filter=filter, + block=1) return undo_info def undoLog(self, first, last, filter=None, block=0): @@ -987,7 +1006,8 @@ class Application(object): # ask transaction information self.local_var.txn_info = None try: - self._askStorage(conn, Packets.AskTransactionInformation(serial)) + self._askStorage(conn, + Packets.AskTransactionInformation(serial)) except ConnectionClosed: continue diff --git a/neo/client/config.py b/neo/client/config.py index d91d9c22fdfe3267bdf224025bb4c7cce217a525..997de173efe5a5c09aedc6e6373d70872d312d27 100644 --- a/neo/client/config.py +++ b/neo/client/config.py @@ -21,7 +21,7 @@ class NeoStorage(BaseConfig): def open(self): from Storage import Storage - return Storage(master_nodes = self.config.master_nodes, name = self.config.name, - connector = self.config.connector) + return Storage(master_nodes=self.config.master_nodes, + name=self.config.name, connector = self.config.connector) diff --git a/neo/client/handlers/master.py b/neo/client/handlers/master.py index a99cc22d02f6a23b38a9fe4e89838c1c417064b6..104f4d995dd014e8bb1a17928bbda7b9813d7437 100644 --- a/neo/client/handlers/master.py +++ b/neo/client/handlers/master.py @@ -109,14 +109,16 @@ class PrimaryNotificationsHandler(BaseHandler): app.master_conn = None app.primary_master_node = None else: - logging.warn('app.master_conn is %s, but we are closing %s', app.master_conn, conn) + logging.warn('app.master_conn is %s, but we are closing %s', + app.master_conn, conn) super(PrimaryNotificationsHandler, self).connectionClosed(conn) def timeoutExpired(self, conn): app = self.app if app.master_conn is not None: assert conn is app.master_conn - logging.critical("connection timeout to primary master node expired") + logging.critical("connection timeout to primary master node ' \ + 'expired") BaseHandler.timeoutExpired(self, conn) def peerBroken(self, conn): diff --git a/neo/client/handlers/storage.py b/neo/client/handlers/storage.py index 9bb2de7438dbd212c863ea48f24001ed3b385900..042227061d24e158cbac7a4ab80a477ff7ae6684 100644 --- a/neo/client/handlers/storage.py +++ b/neo/client/handlers/storage.py @@ -78,8 +78,8 @@ class StorageAnswersHandler(AnswerBaseHandler): def answerObject(self, conn, packet, oid, start_serial, end_serial, compression, checksum, data): app = self.app - app.local_var.asked_object = (oid, start_serial, end_serial, compression, - checksum, data) + app.local_var.asked_object = (oid, start_serial, end_serial, + compression, checksum, data) def answerStoreObject(self, conn, packet, conflicting, oid, serial): app = self.app diff --git a/neo/client/mq.py b/neo/client/mq.py index 229eddb8fef6bd62d29e86d9a9832fef3e4c61e5..429293e2b57bf7fc7a2330ead98ca7a333a89aed 100644 --- a/neo/client/mq.py +++ b/neo/client/mq.py @@ -131,7 +131,8 @@ class MQ(object): - The size calculation is not accurate. """ - def __init__(self, life_time=10000, buffer_levels=9, max_history_size=100000, max_size=20*1024*1024): + def __init__(self, life_time=10000, buffer_levels=9, + max_history_size=100000, max_size=20*1024*1024): self._history_buffer = FIFO() self._cache_buffers = [] for level in range(buffer_levels): @@ -203,7 +204,8 @@ class MQ(object): except KeyError: counter = 1 - # XXX It might be better to adjust the level according to the object size. + # XXX It might be better to adjust the level according to the object + # size. level = min(int(log(counter, 2)), self._buffer_levels - 1) element = cache_buffers[level].append() data = Data() diff --git a/neo/connection.py b/neo/connection.py index 879fc5e1f8580d584af5894c88f3c5f0b573c276..de36e3820bba3698a481a403739fcbc6313a46b2 100644 --- a/neo/connection.py +++ b/neo/connection.py @@ -50,7 +50,9 @@ def lockCheckWrapper(func): def wrapper(self, *args, **kw): if not self._lock._is_owned(): import traceback - logging.warning('%s called on %s instance without being locked. Stack:\n%s', func.func_code.co_name, self.__class__.__name__, ''.join(traceback.format_stack())) + logging.warning('%s called on %s instance without being locked.' \ + ' Stack:\n%s', func.func_code.co_name, self.__class__.__name__, + ''.join(traceback.format_stack())) # Call anyway return func(self, *args, **kw) return wrapper @@ -366,7 +368,7 @@ class Connection(BaseConnection): if self.write_buf: self.em.addWriter(self) - def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30): + def expectMessage(self, msg_id=None, timeout=5, additional_timeout=30): """Expect a message for a reply to a given message ID or any message. The purpose of this method is to define how much amount of time is @@ -403,7 +405,9 @@ class Connection(BaseConnection): @not_closed def ask(self, packet, timeout=5, additional_timeout=30): - """ Send a packet with a new ID and register the expectation of an answer """ + """ + Send a packet with a new ID and register the expectation of an answer + """ msg_id = self._getNextId() packet.setId(msg_id) self.expectMessage(msg_id) diff --git a/neo/connector.py b/neo/connector.py index 1517cb194bac0e949e4421d11bb376f0abd6d5cc..b1530de0b2e7624b33109ed69901a937d7551f16 100644 --- a/neo/connector.py +++ b/neo/connector.py @@ -70,8 +70,8 @@ class SocketConnector: raise ConnectorInProgressException if err == errno.ECONNREFUSED: raise ConnectorConnectionRefusedException - raise ConnectorException, 'makeClientConnection to %s failed: ' \ - '%s:%s' % (addr, err, errmsg) + raise ConnectorException, 'makeClientConnection to %s failed:' \ + ' %s:%s' % (addr, err, errmsg) finally: logging.debug('%r connecting to %r', self.socket.getsockname(), addr) @@ -85,15 +85,16 @@ class SocketConnector: self.socket.listen(5) except socket.error, (err, errmsg): self.socket.close() - raise ConnectorException, 'makeListeningConnection on %s failed: %s:%s' % \ - (addr, err, errmsg) + raise ConnectorException, 'makeListeningConnection on %s failed:' \ + ' %s:%s' % (addr, err, errmsg) def getError(self): return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) def getDescriptor(self): - # this descriptor must only be used by the event manager, where it guarantee - # unicity only while the connector is opened and registered in epoll + # this descriptor must only be used by the event manager, where it + # guarantee unicity only while the connector is opened and registered + # in epoll return self.socket.fileno() def getNewConnection(self): diff --git a/neo/event.py b/neo/event.py index ec5e2668e260bb8ca89c8cf0fae464c22028eff6..0a47cd786a841fb3985c850676bb2e52543c8439 100644 --- a/neo/event.py +++ b/neo/event.py @@ -22,8 +22,10 @@ from time import time from neo.epoll import Epoll class IdleEvent(object): - """This class represents an event called when a connection is waiting for - a message too long.""" + """ + This class represents an event called when a connection is waiting for + a message too long. + """ def __init__(self, conn, msg_id, timeout, additional_timeout): self._conn = conn @@ -141,8 +143,8 @@ class SelectEventManager(object): self._addPendingConnection(to_process) def _poll(self, timeout = 1): - rlist, wlist, xlist = select(self.reader_set, self.writer_set, self.exc_list, - timeout) + rlist, wlist, xlist = select(self.reader_set, self.writer_set, + self.exc_list, timeout) for s in rlist: conn = self.connection_dict[s] conn.lock() diff --git a/neo/handler.py b/neo/handler.py index cb6ccd473eb69bf951d23f39b69dcd8e69841fa8..a967f15e624f4a35312271dd0bb2d6100f0b9bb8 100644 --- a/neo/handler.py +++ b/neo/handler.py @@ -37,7 +37,8 @@ class EventHandler(object): # if decoding fail, there's no packet instance logging.error('malformed packet from %s:%d: %s', *args) else: - logging.error('malformed packet %s from %s:%d: %s', packet.getType(), *args) + logging.error('malformed packet %s from %s:%d: %s', + packet.getType(), *args) response = protocol.protocolError(message) if packet is not None: conn.answer(response, packet.getId()) @@ -386,7 +387,8 @@ class EventHandler(object): d[Packets.StartOperation] = self.startOperation d[Packets.StopOperation] = self.stopOperation d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions - d[Packets.AnswerUnfinishedTransactions] = self.answerUnfinishedTransactions + d[Packets.AnswerUnfinishedTransactions] = \ + self.answerUnfinishedTransactions d[Packets.AskObjectPresent] = self.askObjectPresent d[Packets.AnswerObjectPresent] = self.answerObjectPresent d[Packets.DeleteTransaction] = self.deleteTransaction @@ -411,7 +413,8 @@ class EventHandler(object): d[Packets.AskTIDs] = self.askTIDs d[Packets.AnswerTIDs] = self.answerTIDs d[Packets.AskTransactionInformation] = self.askTransactionInformation - d[Packets.AnswerTransactionInformation] = self.answerTransactionInformation + d[Packets.AnswerTransactionInformation] = \ + self.answerTransactionInformation d[Packets.AskObjectHistory] = self.askObjectHistory d[Packets.AnswerObjectHistory] = self.answerObjectHistory d[Packets.AskOIDs] = self.askOIDs diff --git a/neo/locking.py b/neo/locking.py index f0ab86ee6bd596165f26d8634286369688c58480..ffb719468d0f1f2e3342a7f7699b246f8ff5ee9b 100644 --- a/neo/locking.py +++ b/neo/locking.py @@ -48,7 +48,8 @@ class LockUser(object): return isinstance(other, self.__class__) and self.ident == other.ident def __repr__(self): - return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1], self.caller[3]) + return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1], + self.caller[3]) def formatStack(self): return ''.join(traceback.format_list(self.stack)) @@ -59,7 +60,8 @@ class VerboseLockBase(object): self.debug_lock = debug_lock self.owner = None self.waiting = [] - self._note('%s@%X created by %r', self.__class__.__name__, id(self), LockUser(1)) + self._note('%s@%X created by %r', self.__class__.__name__, id(self), + LockUser(1)) def _note(self, fmt, *args): sys.stderr.write(fmt % args + '\n') @@ -75,12 +77,16 @@ class VerboseLockBase(object): def acquire(self, blocking=1): me = LockUser() owner = self._getOwner() - self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r Waiting:%r', me, self, blocking, owner, self.waiting) - if (self.debug_lock and owner is not None) or (not self.reentrant and blocking and me == owner): + self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r ' \ + 'Waiting:%r', me, self, blocking, owner, self.waiting) + if (self.debug_lock and owner is not None) or \ + (not self.reentrant and blocking and me == owner): if me == owner: - self._note('[%r]%s.acquire(%s): Deadlock detected: I already own this lock:%r', me, self, blocking, owner) + self._note('[%r]%s.acquire(%s): Deadlock detected: ' \ + ' I already own this lock:%r', me, self, blocking, owner) else: - self._note('[%r]%s.acquire(%s): debug lock triggered: %r', me, self, blocking, owner) + self._note('[%r]%s.acquire(%s): debug lock triggered: %r', + me, self, blocking, owner) self._note('Owner traceback:\n%s', owner.formatStack()) self._note('My traceback:\n%s', me.formatStack()) self.waiting.append(me) @@ -89,7 +95,8 @@ class VerboseLockBase(object): finally: self.owner = me self.waiting.remove(me) - self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r', me, self, blocking, self.waiting) + self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r', + me, self, blocking, self.waiting) def release(self): me = LockUser() @@ -104,7 +111,8 @@ class VerboseLockBase(object): class VerboseRLock(VerboseLockBase): def __init__(self, verbose=None, debug_lock=False): - super(VerboseRLock, self).__init__(reentrant=True, debug_lock=debug_lock) + super(VerboseRLock, self).__init__(reentrant=True, + debug_lock=debug_lock) self.lock = threading_RLock() def _locked(self): diff --git a/neo/master/app.py b/neo/master/app.py index c29cb93f00f0ddacb6891e5da15ed8e19b22baef..96b51f3ed3fabcc987a8d3b24ad7eb3ea608bd25 100644 --- a/neo/master/app.py +++ b/neo/master/app.py @@ -26,8 +26,8 @@ from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.node import NodeManager from neo.event import EventManager from neo.connection import ListeningConnection, ClientConnection -from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \ - OperationFailure +from neo.exception import ElectionFailure, PrimaryFailure, \ + VerificationFailure, OperationFailure from neo.master.handlers import election, identification, secondary, recovery from neo.master.handlers import verification, storage, client, shutdown from neo.master.handlers import administration @@ -63,8 +63,9 @@ class Application(object): if partitions <= 0: raise RuntimeError, 'partitions must be more than zero' self.pt = PartitionTable(partitions, replicas) - logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s', - replicas, partitions, self.name) + logging.debug('the number of replicas is %d, the number of ' \ + 'partitions is %d, the name is %s', + replicas, partitions, self.name) self.listening_conn = None self.primary = None @@ -173,16 +174,18 @@ class Application(object): t = current_time for node in nm.getMasterList(): if node.isTemporarilyDown() \ - and node.getLastStateChange() + expiration < current_time: + and node.getLastStateChange() + \ + expiration < current_time: logging.info('%s is down' % (node, )) node.setDown() - self.unconnected_master_node_set.discard(node.getAddress()) + self.unconnected_master_node_set.discard( + node.getAddress()) # Try to connect to master nodes. if self.unconnected_master_node_set: for addr in list(self.unconnected_master_node_set): - ClientConnection(em, client_handler, addr = addr, - connector_handler = self.connector_handler) + ClientConnection(em, client_handler, addr=addr, + connector_handler=self.connector_handler) em.poll(1) if len(self.unconnected_master_node_set) == 0 \ and len(self.negotiating_master_node_set) == 0: @@ -195,7 +198,7 @@ class Application(object): if self.primary is None: # I am the primary. self.primary = True - logging.debug('I am the primary, so sending an announcement') + logging.debug('I am the primary, sending an announcement') for conn in em.getClientList(): conn.notify(Packets.AnnouncePrimary()) conn.abort() @@ -224,12 +227,14 @@ class Application(object): if conn.getAddress() != addr: conn.close() - # But if there is no such connection, something wrong happened. + # But if there is no such connection, something wrong + # happened. for conn in em.getClientList(): if conn.getAddress() == addr: break else: - raise ElectionFailure, 'no connection remains to the primary' + raise ElectionFailure, 'no connection remains to ' \ + 'the primary' return except ElectionFailure, m: @@ -321,7 +326,8 @@ class Application(object): row_list.append((offset, self.pt.getRow(offset))) # Split the packet if too huge. if len(row_list) == 1000: - conn.notify(Packets.SendPartitionTable( self.pt.getID(), row_list)) + conn.notify(Packets.SendPartitionTable(self.pt.getID(), + row_list)) del row_list[:] if row_list: conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list)) @@ -366,9 +372,12 @@ class Application(object): pt.make(node_list) def recoverStatus(self): - """Recover the status about the cluster. Obtain the last OID, the last TID, - and the last Partition Table ID from storage nodes, then get back the latest - partition table or make a new table from scratch, if this is the first time.""" + """ + Recover the status about the cluster. Obtain the last OID, the last + TID, and the last Partition Table ID from storage nodes, then get + back the latest partition table or make a new table from scratch, + if this is the first time. + """ logging.info('begin the recovery of the status') self.changeClusterState(ClusterStates.RECOVERING) @@ -545,16 +554,19 @@ class Application(object): self.broadcastPartitionChanges(self.pt.setNextID(), cell_list) def provideService(self): - """This is the normal mode for a primary master node. Handle transactions + """ + This is the normal mode for a primary master node. Handle transactions and stop the service only if a catastrophy happens or the user commits - a shutdown.""" + a shutdown. + """ logging.info('provide service') em = self.em nm = self.nm self.changeClusterState(ClusterStates.RUNNING) - # This dictionary is used to hold information on transactions being finished. + # This dictionary is used to hold information on transactions being + # finished. self.finishing_transaction_dict = {} # Now everything is passive. @@ -562,12 +574,13 @@ class Application(object): try: em.poll(1) except OperationFailure: - # If not operational, send Stop Operation packets to storage nodes - # and client nodes. Abort connections to client nodes. - logging.critical('No longer operational, so stopping the service') + # If not operational, send Stop Operation packets to storage + # nodes and client nodes. Abort connections to client nodes. + logging.critical('No longer operational, stopping the service') for conn in em.getConnectionList(): node = nm.getByUUID(conn.getUUID()) - if node is not None and (node.isStorage() or node.isClient()): + if node is not None and (node.isStorage() + or node.isClient()): conn.notify(Packets.StopOperation()) if node.isClient(): conn.abort() @@ -580,7 +593,8 @@ class Application(object): dump(self.uuid), *(self.server)) # all incoming connections identify through this handler - self.listening_conn.setHandler(identification.IdentificationHandler(self)) + self.listening_conn.setHandler( + identification.IdentificationHandler(self)) handler = secondary.SecondaryMasterHandler(self) em = self.em @@ -596,8 +610,8 @@ class Application(object): conn.setHandler(handler) - # If I know any storage node, make sure that they are not in the running state, - # because they are not connected at this stage. + # If I know any storage node, make sure that they are not in the + # running state, because they are not connected at this stage. for node in nm.getStorageList(): if node.isRunning(): node.setTemporarilyDown() @@ -613,7 +627,9 @@ class Application(object): self.provideService() def playSecondaryRole(self): - """I play a secondary role, thus only wait for a primary master to fail.""" + """ + I play a secondary role, thus only wait for a primary master to fail. + """ logging.info('play the secondary role with %s (%s:%d)', dump(self.uuid), *(self.server)) @@ -631,7 +647,9 @@ class Application(object): self.em.poll(1) def changeClusterState(self, state): - """ Change the cluster state and apply right handler on each connections """ + """ + Change the cluster state and apply right handler on each connections + """ if self.cluster_state == state: return nm, em = self.nm, self.em diff --git a/neo/master/handlers/__init__.py b/neo/master/handlers/__init__.py index 51d19a8666f74b2400dc1b2a1f062f1319f769c9..d406ce268f5a547b890c164cc49e7422a9ef0266 100644 --- a/neo/master/handlers/__init__.py +++ b/neo/master/handlers/__init__.py @@ -86,7 +86,8 @@ class BaseServiceHandler(MasterHandler): node = self.app.nm.getByUUID(conn.getUUID()) assert node is not None if new_state != NodeStates.BROKEN: - new_state = DISCONNECTED_STATE_DICT.get(node.getType(), NodeStates.DOWN) + new_state = DISCONNECTED_STATE_DICT.get(node.getType(), + NodeStates.DOWN) if node.getState() == new_state: return if new_state != NodeStates.BROKEN and node.isPending(): diff --git a/neo/master/handlers/administration.py b/neo/master/handlers/administration.py index a168a1d6c9edbeda48feef7a18751dde1ad8149b..00298bcade7b6ebc0da4003be2cfaded82fc15ac 100644 --- a/neo/master/handlers/administration.py +++ b/neo/master/handlers/administration.py @@ -43,7 +43,8 @@ class AdministrationHandler(MasterHandler): self.app.shutdown() def setNodeState(self, conn, packet, uuid, state, modify_partition_table): - logging.info("set node state for %s-%s : %s" % (dump(uuid), state, modify_partition_table)) + logging.info("set node state for %s-%s : %s" % + (dump(uuid), state, modify_partition_table)) app = self.app node = app.nm.getByUUID(uuid) if node is None: diff --git a/neo/master/handlers/client.py b/neo/master/handlers/client.py index a157fa67dedae048a2570186359226717d2796fe..3adfbfe409081c411cf7761f01cc02b384c24071 100644 --- a/neo/master/handlers/client.py +++ b/neo/master/handlers/client.py @@ -111,7 +111,7 @@ class ClientServiceHandler(BaseServiceHandler): # Collect the UUIDs of nodes related to this transaction. uuid_set = set() for part in partition_set: - uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) \ + uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) if cell.getNodeState() != NodeStates.HIDDEN)) # Request locking data. diff --git a/neo/master/handlers/identification.py b/neo/master/handlers/identification.py index 03b012c534825bb646b6bdc7aab1a28748f84942..a943a076b35528b4fcec14a336dc84eeac80b82d 100644 --- a/neo/master/handlers/identification.py +++ b/neo/master/handlers/identification.py @@ -57,7 +57,8 @@ class IdentificationHandler(MasterHandler): node.setAddress(address) node.setRunning() - # ask the app the node identification, if refused, an exception is raised + # ask the app the node identification, if refused, an exception is + # raised result = self.app.identifyNode(node_type, uuid, node) (uuid, node, state, handler, node_ctor) = result if uuid is None: diff --git a/neo/master/handlers/recovery.py b/neo/master/handlers/recovery.py index 2a6e459b726e0fe33941120b1f36f50762739138..f3c65e5e9913c3693ef52af85e90d8428548a01c 100644 --- a/neo/master/handlers/recovery.py +++ b/neo/master/handlers/recovery.py @@ -46,8 +46,8 @@ class RecoveryHandler(MasterHandler): app = self.app if uuid != app.target_uuid: # If this is not from a target node, ignore it. - logging.warn('got answer partition table from %s while waiting for %s', - dump(uuid), dump(app.target_uuid)) + logging.warn('got answer partition table from %s while waiting ' \ + 'for %s', dump(uuid), dump(app.target_uuid)) return # load unknown storage nodes for offset, row in row_list: diff --git a/neo/master/handlers/storage.py b/neo/master/handlers/storage.py index 71988406bbbcb6458505a60e7b9d6edbfb8ea857..bbd6c36abc798d05c06f60386dffeb852694b286 100644 --- a/neo/master/handlers/storage.py +++ b/neo/master/handlers/storage.py @@ -44,11 +44,13 @@ class StorageServiceHandler(BaseServiceHandler): def askLastIDs(self, conn, packet): app = self.app - conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()), packet.getId()) + conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()), + packet.getId()) def askUnfinishedTransactions(self, conn, packet): app = self.app - p = Packets.AnswerUnfinishedTransactions(app.finishing_transaction_dict.keys()) + p = Packets.AnswerUnfinishedTransactions( + app.finishing_transaction_dict.keys()) conn.answer(p, packet.getId()) def notifyInformationLocked(self, conn, packet, tid): @@ -78,7 +80,8 @@ class StorageServiceHandler(BaseServiceHandler): p = Packets.NotifyTransactionFinished(tid) c.answer(p, t.getMessageId()) else: - p = Packets.InvalidateObjects(t.getOIDList(), tid) + p = Packets.InvalidateObjects(t.getOIDList(), + tid) c.notify(p) elif node.isStorage(): if uuid in t.getUUIDSet(): @@ -107,16 +110,18 @@ class StorageServiceHandler(BaseServiceHandler): continue offset = cell[0] - logging.debug("node %s is up for offset %s" %(dump(node.getUUID()), offset)) + logging.debug("node %s is up for offset %s" % + (dump(node.getUUID()), offset)) - # check the storage said it is up to date for a partition it was assigne to + # check the storage said it is up to date for a partition it was + # assigne to for xcell in app.pt.getCellList(offset): if xcell.getNode().getUUID() == node.getUUID() and \ xcell.getState() not in (CellStates.OUT_OF_DATE, CellStates.UP_TO_DATE): msg = "node %s telling that it is UP TO DATE for offset \ - %s but where %s for that offset" % (dump(node.getUUID()), offset, - xcell.getState()) + %s but where %s for that offset" % (dump(node.getUUID()), + offset, xcell.getState()) raise ProtocolError(msg) diff --git a/neo/master/pt.py b/neo/master/pt.py index f1086a51e01353224a4168f4eeec4df796be83ef..1df160ec198c5c273b6cd7a408ecd194d5f82899 100644 --- a/neo/master/pt.py +++ b/neo/master/pt.py @@ -44,11 +44,11 @@ class PartitionTable(neo.pt.PartitionTable): and n.getUUID() is not None] if len(node_list) == 0: # Impossible. - raise RuntimeError, \ - 'cannot make a partition table with an empty storage node list' + raise RuntimeError, 'cannot make a partition table with an ' \ + 'empty storage node list' - # Take it into account that the number of storage nodes may be less than the - # number of replicas. + # Take it into account that the number of storage nodes may be less + # than the number of replicas. repeats = min(self.nr + 1, len(node_list)) index = 0 for offset in xrange(self.np): @@ -79,22 +79,23 @@ class PartitionTable(neo.pt.PartitionTable): cell_list = [] uuid = node.getUUID() for offset, row in enumerate(self.partition_list): - if row is not None: - for cell in row: - if cell.getNode() is node: - if not cell.isFeeding(): - # If this cell is not feeding, find another node - # to be added. - node_list = [c.getNode() for c in row] - n = self.findLeastUsedNode(node_list) - if n is not None: - row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE)) - self.count_dict[n] += 1 - cell_list.append((offset, n.getUUID(), - CellStates.OUT_OF_DATE)) - row.remove(cell) - cell_list.append((offset, uuid, CellStates.DISCARDED)) - break + if row is None: + continue + for cell in row: + if cell.getNode() is node: + if not cell.isFeeding(): + # If this cell is not feeding, find another node + # to be added. + node_list = [c.getNode() for c in row] + n = self.findLeastUsedNode(node_list) + if n is not None: + row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE)) + self.count_dict[n] += 1 + cell_list.append((offset, n.getUUID(), + CellStates.OUT_OF_DATE)) + row.remove(cell) + cell_list.append((offset, uuid, CellStates.DISCARDED)) + break try: del self.count_dict[node] @@ -135,7 +136,8 @@ class PartitionTable(neo.pt.PartitionTable): if num_cells <= self.nr: row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE)) - cell_list.append((offset, node.getUUID(), CellStates.OUT_OF_DATE)) + cell_list.append((offset, node.getUUID(), + CellStates.OUT_OF_DATE)) node_count += 1 else: if max_count - node_count > 1: @@ -192,7 +194,8 @@ class PartitionTable(neo.pt.PartitionTable): removed_cell_list.append(feeding_cell) ideal_num = self.nr + 1 - while len(out_of_date_cell_list) + len(up_to_date_cell_list) > ideal_num: + while len(out_of_date_cell_list) + len(up_to_date_cell_list) > \ + ideal_num: # This row contains too many cells. if len(up_to_date_cell_list) > 1: # There are multiple up-to-date cells, so choose whatever @@ -220,7 +223,8 @@ class PartitionTable(neo.pt.PartitionTable): row.remove(cell) if not cell.isFeeding(): self.count_dict[cell.getNode()] -= 1 - changed_cell_list.append((offset, cell.getUUID(), CellStates.DISCARDED)) + changed_cell_list.append((offset, cell.getUUID(), + CellStates.DISCARDED)) # Add cells, if a row contains less than the number of replicas. for offset, row in enumerate(self.partition_list): @@ -233,7 +237,8 @@ class PartitionTable(neo.pt.PartitionTable): if node is None: break row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE)) - changed_cell_list.append((offset, node.getUUID(), CellStates.OUT_OF_DATE)) + changed_cell_list.append((offset, node.getUUID(), + CellStates.OUT_OF_DATE)) self.count_dict[node] += 1 num_cells += 1 @@ -251,6 +256,7 @@ class PartitionTable(neo.pt.PartitionTable): for cell in row: if not cell.getNode().isRunning() and not cell.isOutOfDate(): cell.setState(CellStates.OUT_OF_DATE) - cell_list.append((offset, cell.getUUID(), CellStates.OUT_OF_DATE)) + cell_list.append((offset, cell.getUUID(), + CellStates.OUT_OF_DATE)) return cell_list diff --git a/neo/neoctl/app.py b/neo/neoctl/app.py index 24681a1b437464660f9cdf0eadbb97ea1fe034af..a9f9b24565045e5a16d2bcfea43d03880748d416 100644 --- a/neo/neoctl/app.py +++ b/neo/neoctl/app.py @@ -193,10 +193,12 @@ class Application(object): def execute(self, args): """Execute the command given.""" - # print node type : print list of node of the given type (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...) - # set node uuid state [1|0] : set the node for the given uuid to the state (RUNNING, DOWN...) - # and modify the partition if asked - # set cluster name [shutdown|operational] : either shutdown the cluster or mark it as operational + # print node type : print list of node of the given type + # (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...) + # set node uuid state [1|0] : set the node for the given uuid to the + # state (RUNNING, DOWN...) and modify the partition if asked + # set cluster name [shutdown|operational] : either shutdown the + # cluster or mark it as operational current_action = action_dict level = 0 while current_action is not None and \ diff --git a/neo/protocol.py b/neo/protocol.py index 984afc74ab2d10925bf369077598769c7b3972c2..2f98e4ed8e9b9047b159b82d744cca9af735fcbe 100644 --- a/neo/protocol.py +++ b/neo/protocol.py @@ -147,7 +147,8 @@ def _decodeNodeType(original_node_type): def _decodeErrorCode(original_error_code): error_code = ErrorCodes.get(original_error_code) if error_code is None: - raise PacketMalformedError('invalid error code %d' % original_error_code) + raise PacketMalformedError('invalid error code %d' % + original_error_code) return error_code def _decodeAddress(address): @@ -337,7 +338,8 @@ class AcceptIdentification(Packet): node_type = _decodeNodeType(node_type) uuid = _decodeUUID(uuid) your_uuid == _decodeUUID(uuid) - return (node_type, uuid, address, num_partitions, num_replicas, your_uuid) + return (node_type, uuid, address, num_partitions, num_replicas, + your_uuid) class AskPrimary(Packet): """ @@ -395,8 +397,9 @@ class AnswerLastIDs(Packet): Reply to Ask Last IDs. S -> PM, PM -> S. """ def _encode(self, loid, ltid, lptid): - # in this case, loid is a valid OID but considered as invalid. This is not - # an issue because the OID 0 is hard coded and will never be generated + # in this case, loid is a valid OID but considered as invalid. This is + # not an issue because the OID 0 is hard coded and will never be + # generated if loid is None: loid = INVALID_OID ltid = _encodeTID(ltid) @@ -882,7 +885,8 @@ class AnswerTransactionInformation(Packet): Answer information (user, description) about a transaction. S -> Any. """ def _encode(self, tid, user, desc, ext, oid_list): - body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))] + body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), + len(oid_list))] body.append(user) body.append(desc) body.append(ext) @@ -1268,7 +1272,8 @@ class PacketRegistry(dict): StartOperation = register(0x000B, StartOperation) StopOperation = register(0x000C, StopOperation) AskUnfinishedTransactions = register(0x000D, AskUnfinishedTransactions) - AnswerUnfinishedTransactions = register(0x800d, AnswerUnfinishedTransactions) + AnswerUnfinishedTransactions = register(0x800d, + AnswerUnfinishedTransactions) AskObjectPresent = register(0x000f, AskObjectPresent) AnswerObjectPresent = register(0x800f, AnswerObjectPresent) DeleteTransaction = register(0x0010, DeleteTransaction) @@ -1293,7 +1298,8 @@ class PacketRegistry(dict): AskTIDs = register(0x001C, AskTIDs) AnswerTIDs = register(0x801D, AnswerTIDs) AskTransactionInformation = register(0x001E, AskTransactionInformation) - AnswerTransactionInformation = register(0x801E, AnswerTransactionInformation) + AnswerTransactionInformation = register(0x801E, + AnswerTransactionInformation) AskObjectHistory = register(0x001F, AskObjectHistory) AnswerObjectHistory = register(0x801F, AnswerObjectHistory) AskOIDs = register(0x0020, AskOIDs) diff --git a/neo/pt.py b/neo/pt.py index bd1e9da768f95ffbedf72d9b7584ee71b6412020..db0353f0190b407b16721c03704df5718bea79ab 100644 --- a/neo/pt.py +++ b/neo/pt.py @@ -249,7 +249,8 @@ class PartitionTable(object): line.append('X' * len(node_list)) else: cell = [] - cell_dict = dict([(node_dict.get(x.getUUID(), None), x) for x in row]) + cell_dict = dict([(node_dict.get(x.getUUID(), None), x) + for x in row]) for node in xrange(len(node_list)): if node in cell_dict: cell.append(cell_state_dict[cell_dict[node].getState()]) diff --git a/neo/storage/handlers/__init__.py b/neo/storage/handlers/__init__.py index cb2c7b70e1f8417cce7386e8ef50d5daea72ce2a..3103c3da6448f6d173c224b1d77520cab1b52b3c 100644 --- a/neo/storage/handlers/__init__.py +++ b/neo/storage/handlers/__init__.py @@ -37,7 +37,8 @@ class BaseMasterHandler(BaseStorageHandler): raise PrimaryFailure('re-election occurs') def notifyClusterInformation(self, conn, packet, state): - logging.error('ignoring notify cluster information in %s' % self.__class__.__name__) + logging.error('ignoring notify cluster information in %s' % + self.__class__.__name__) def notifyLastOID(self, conn, packet, oid): self.app.loid = oid @@ -104,7 +105,8 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): if t is None: p = protocol.tidNotFound('%s does not exist' % dump(tid)) else: - p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0]) + p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], + t[0]) conn.answer(p, packet.getId()) def askObject(self, conn, packet, oid, serial, tid): diff --git a/neo/storage/handlers/verification.py b/neo/storage/handlers/verification.py index 12d0f2285b02c30b2f88469e6baac0f2f2e5145b..f090d338982a62040bc2d44dbd19525ee45b4bc8 100644 --- a/neo/storage/handlers/verification.py +++ b/neo/storage/handlers/verification.py @@ -83,7 +83,8 @@ class VerificationHandler(BaseMasterHandler): if t is None: p = protocol.tidNotFound('%s does not exist' % dump(tid)) else: - p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0]) + p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], + t[0]) conn.answer(p, packet.getId()) def askObjectPresent(self, conn, packet, oid, tid): diff --git a/neo/storage/mysqldb.py b/neo/storage/mysqldb.py index d2f78b9a30abf1c2e137c569331b203e27b5d1c8..ec1b2f36c5810335c366406a835648bcae825ad8 100644 --- a/neo/storage/mysqldb.py +++ b/neo/storage/mysqldb.py @@ -444,7 +444,7 @@ class MySQLDatabaseManager(DatabaseManager): q("""INSERT INTO obj SELECT * FROM tobj WHERE tobj.serial = %d""" \ % tid) q("""DELETE FROM tobj WHERE serial = %d""" % tid) - q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d""" \ + q("""INSERT INTO trans SELECT * FROM ttrans WHERE ttrans.tid = %d""" % tid) q("""DELETE FROM ttrans WHERE tid = %d""" % tid) except: diff --git a/neo/storage/replicator.py b/neo/storage/replicator.py index 7375c065b363ad092255a67291b0abe8b50d5126..9acae862ec18ccb63faf9fe0684d3f8d7455bf85 100644 --- a/neo/storage/replicator.py +++ b/neo/storage/replicator.py @@ -126,7 +126,8 @@ class Replicator(object): partition.setCriticalTID(tid) del self.critical_tid_dict[msg_id] except KeyError: - logging.debug("setCriticalTID raised KeyError for msg_id %s" %(msg_id,)) + logging.debug("setCriticalTID raised KeyError for msg_id %s" % + (msg_id, )) def _askCriticalTID(self): conn = self.primary_master_connection @@ -164,7 +165,8 @@ class Replicator(object): addr = node.getAddress() if addr is None: - logging.error("no address known for the selected node %s" %(dump(node.getUUID()))) + logging.error("no address known for the selected node %s" % + (dump(node.getUUID()), )) return if self.current_connection is not None: if self.current_connection.getAddress() == addr: @@ -177,8 +179,7 @@ class Replicator(object): if self.current_connection is None: handler = replication.ReplicationHandler(app) self.current_connection = ClientConnection(app.em, handler, - addr = addr, - connector_handler = app.connector_handler) + addr = addr, connector_handler = app.connector_handler) p = Packets.RequestIdentification(NodeTypes.STORAGE, app.uuid, app.server, app.name) self.current_connection.ask(p) @@ -196,7 +197,8 @@ class Replicator(object): # Notify to a primary master node that my cell is now up-to-date. conn = self.primary_master_connection p = Packets.NotifyPartitionChanges(app.pt.getID(), - [(self.current_partition.getRID(), app.uuid, CellStates.UP_TO_DATE)]) + [(self.current_partition.getRID(), app.uuid, + CellStates.UP_TO_DATE)]) conn.notify(p) except KeyError: pass @@ -234,7 +236,8 @@ class Replicator(object): self._askUnfinishedTIDs() else: if self.replication_done: - logging.info('replication is done for %s' %(self.current_partition.getRID(),)) + logging.info('replication is done for %s' % + (self.current_partition.getRID(), )) self._finishReplication() def removePartition(self, rid):