diff --git a/README b/README index fc156037de7127bbfdde3e4295be5a00dcc22d52..924140fa3331494aca1fed6cf4404ad8ca32925f 100644 --- a/README +++ b/README @@ -16,3 +16,5 @@ TODO - Stopping packet processing by returning a boolean value from a handler, otherwise too tricky to exchange a handler with another + + - Replication. diff --git a/neo/storage/app.py b/neo/storage/app.py index 45afa704e9724362a54ad215bf0d3f1827d7b122..3db9539f383c010c4b5f45f542d64f95a59c24d7 100644 --- a/neo/storage/app.py +++ b/neo/storage/app.py @@ -3,6 +3,7 @@ import MySQLdb import os from time import time from struct import pack, unpack +from collections import deque from neo.config import ConfigurationManager from neo.protocol import Packet, ProtocolError, \ @@ -240,8 +241,21 @@ class Application(object): # for locking objects against load operations. self.load_lock_dict = {} + # This is a queue of events used to delay operations due to locks. + self.event_queue = deque() + while 1: em.poll(1) + def queueEvent(self, callable, *args, **kwargs): + self.event_queue.append((callable, args, kwargs)) + + def executeQueuedEvents(self): + l = len(self.event_queue) + p = self.event_queue.popleft + for i in xrange(l): + callable, args, kwargs = p() + callable(*args, **kwargs) + def getPartition(self, oid_or_tid): return unpack('!Q', oid_or_tid)[0] % self.num_partitions diff --git a/neo/storage/database.py b/neo/storage/database.py index 1cdf8702cf355b093b4d89a34eb15e101a78c555..19993695e7e13cdcf7a1d316652c12596098c7a5 100644 --- a/neo/storage/database.py +++ b/neo/storage/database.py @@ -74,13 +74,14 @@ class DatabaseManager(object): raise NotImplementedError('this method must be overridden') def getObject(self, oid, tid = None, before_tid = None): - """Return a tuple of a serial, a compression + """Return a tuple of a serial, next serial, a compression specification, a checksum, and object data, if a given object ID is present. Otherwise, return None. If tid is None and before_tid is None, the latest revision is taken. If tid is specified, the given revision is taken. If tid is not specified, but before_tid is specified, the latest revision before the - given revision is taken.""" + given revision is taken. The next serial is a serial right after + before_tid, if specified. Otherwise, it is None.""" raise NotImplementedError('this method must be overridden') def changePartitionTable(self, ptid, cell_list): @@ -125,8 +126,14 @@ class DatabaseManager(object): raise NotImplementedError('this method must be overridden') def getObjectHistory(self, oid, length = 1): - """Return a list of serials for a given object ID. The length - specifies the maximum size of such a list. The first serial + """Return a list of serials and sizes for a given object ID. + The length specifies the maximum size of such a list. The first serial must be the last serial, and the list must be sorted in descending order. If there is no such object ID in a database, return None.""" raise NotImplementedError('this method must be overridden') + + def getTIDList(self, offset, length, num_partitions, partition_list): + """Return a list of TIDs in descending order from an offset, + at most the specified length. The list of partitions are passed + to filter out non-applicable TIDs.""" + raise NotImplementedError('this method must be overridden') diff --git a/neo/storage/handler.py b/neo/storage/handler.py index e7a833cde3fe727f5e12bcd1cdf3ef9d56c45c48..e995c6f589c4e4ffd6b447005911e717047dd4bc 100644 --- a/neo/storage/handler.py +++ b/neo/storage/handler.py @@ -181,9 +181,20 @@ class StorageEventHandler(EventHandler): def handleUnlockInformation(self, conn, packet, tid): raise NotImplementedError('this method must be overridden') + def handleAskObject(self, conn, packet, oid, serial, tid): + self.handleUnexpectedPacket(conn, packet) + + def handleAskTIDs(self, conn, packet, first, last): + self.handleUnexpectedPacket(conn, packet) + + def handleAskObjectHistory(self, conn, packet, oid, length): + self.handleUnexpectedPacket(conn, packet) + + def handleAskStoreTransaction(self, conn, packet, tid, user, desc, + ext, oid_list): + self.handleUnexpectedPacket(conn, packet) + def handleAskStoreObject(self, conn, packet, msg_id, oid, serial, compression, data, checksum, tid): self.handleUnexpectedPacket(conn, packet) - def handleAskObject(self, conn, packet, oid, serial, tid): - self.handleUnexpectedPacket(conn, packet) diff --git a/neo/storage/mysqldb.py b/neo/storage/mysqldb.py index bd408df20c2a16b1d3bc4b6de5484b94a8b4cf0d..1517c93cccd6053f2b010234aba10ac621a4cfee 100644 --- a/neo/storage/mysqldb.py +++ b/neo/storage/mysqldb.py @@ -267,22 +267,42 @@ class MySQLDatabaseManager(DatabaseManager): r = q("""SELECT serial, compression, checksum, data FROM obj WHERE oid = '%s' AND serial = '%s'""" \ % (oid, tid)) + try: + serial, compression, checksum, data = r[0] + next_serial = None + except IndexError: + return None elif before_tid is not None: before_tid = e(before_tid) r = q("""SELECT serial, compression, checksum, data FROM obj WHERE oid = '%s' AND serial < '%s' ORDER BY serial DESC LIMIT 1""" \ % (oid, before_tid)) + try: + serial, compression, checksum, data = r[0] + r = q("""SELECT serial FROM obj + WHERE oid = '%s' AND serial > '%s' + ORDER BY serial LIMIT 1""" \ + % (oid, before_tid)) + try: + next_serial = r[0][0] + except IndexError: + next_serial = None + except IndexError: + return None else: # XXX I want to express "HAVING serial = MAX(serial)", but # MySQL does not use an index for a HAVING clause! r = q("""SELECT serial, compression, checksum, data FROM obj WHERE oid = '%s' ORDER BY serial DESC LIMIT 1""" \ % oid) - try: - return r[0] - except IndexError: - return None + try: + serial, compression, checksum, data = r[0] + next_serial = None + except IndexError: + return None + + return serial, next_serial, compression, checksum, data def doSetPartitionTable(self, ptid, cell_list, reset): q = self.query @@ -403,9 +423,17 @@ class MySQLDatabaseManager(DatabaseManager): q = self.query e = self.escape oid = e(oid) - r = q("""SELECT serial FROM obj WHERE oid = '%s' + r = q("""SELECT serial, LENGTH(data) FROM obj WHERE oid = '%s' ORDER BY serial DESC LIMIT %d""" \ % (oid, length)) if r: - return [t[0] for t in r] + return r return None + + def getTIDList(self, offset, length, num_partitions, partition_list): + q = self.query + e = self.escape + r = q("""SELECT tid FROM trans WHERE MOD(tid,%d) in (%s) + ORDER BY tid DESC LIMIT %d""" \ + % (offset, num_partitions, ','.join(partition_list), length)) + return [t[0] for t in r] diff --git a/neo/storage/operation.py b/neo/storage/operation.py new file mode 100644 index 0000000000000000000000000000000000000000..5a63f090161d095f6ea7269df570992a36cf11c7 --- /dev/null +++ b/neo/storage/operation.py @@ -0,0 +1,391 @@ +import logging + +from neo.storage.handler import StorageEventHandler +from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ + MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE +from neo.utils import dump +from neo.node import MasterNode, StorageNode, ClientNode +from neo.connetion import ClientConnection +from neo.protocol import Packet + +class TransactionInformation(object): + """This class represents information on a transaction.""" + def __init__(self, uuid): + self._uuid = uuid + self._object_dict = {} + self._transaction = None + + def getUUID(self): + return self._uuid + + def addObject(self, oid, compression, checksum, data): + self._object_dict[oid] = (oid, compression, checksum, data) + + def addTransaction(self, oid_list, user, desc, ext): + self._transaction = (oid_list, user, desc, ext) + + def getObjectList(self): + return self._object_dict.values() + + def getTransaction(self): + return self._transaction + +class OperationEventHandler(StorageEventHandler): + """This class deals with events for a operation phase.""" + + def connectionCompleted(self, conn): + # FIXME this must be implemented for replications. + raise NotImplementedError + + def connectionFailed(self, conn): + # FIXME this must be implemented for replications. + raise NotImplementedError + + def connectionAccepted(self, conn, s, addr): + """Called when a connection is accepted.""" + # Client nodes and other storage nodes may connect. Also, + # master nodes may connect, only if they misunderstand that + # I am a master node. + StorageEventHandler.connectionAccepted(self, conn, s, addr) + + def dealWithClientFailure(self, conn): + uuid = conn.getUUID() + if uuid is not None: + app = self.app + node = app.nm.getNodeByUUID(uuid) + if isinstance(node, ClientNode): + for tid, t in app.transaction_dict.items(): + if t.getUUID() == uuid: + for o in t.getObjectList(): + oid = o[0] + try: + del app.store_lock_dict[oid] + del app.load_lock_dict[oid] + except KeyError: + pass + del app.transaction_dict[tid] + + # Now it may be possible to execute some events. + app.executeQueuedEvents() + + def timeoutExpired(self, conn): + if isinstance(conn, ClientConnection): + if conn.getUUID() == self.app.primary_master_node.getUUID(): + # If a primary master node timeouts, I cannot continue. + logging.critical('the primary master node times out') + raise PrimaryFailure('the primary master node times out') + else: + # Otherwise, this connection is to another storage node. + raise NotImplemented + else: + self.dealWithClientFailure(conn) + + StorageEventHandler.timeoutExpired(self, conn) + + def connectionClosed(self, conn): + if isinstance(conn, ClientConnection): + if conn.getUUID() == self.app.primary_master_node.getUUID(): + # If a primary master node closes, I cannot continue. + logging.critical('the primary master node is dead') + raise PrimaryFailure('the primary master node is dead') + else: + # Otherwise, this connection is to another storage node. + raise NotImplemented + else: + self.dealWithClientFailure(conn) + + StorageEventHandler.connectionClosed(self, conn) + + def peerBroken(self, conn): + if isinstance(conn, ClientConnection): + if conn.getUUID() == self.app.primary_master_node.getUUID(): + # If a primary master node gets broken, I cannot continue. + logging.critical('the primary master node is broken') + raise PrimaryFailure('the primary master node is broken') + else: + # Otherwise, this connection is to another storage node. + raise NotImplemented + else: + self.dealWithClientFailure(conn) + + StorageEventHandler.peerBroken(self, conn) + + def handleRequestNodeIdentification(self, conn, packet, node_type, + uuid, ip_address, port, name): + if isinstance(conn, ClientConnection): + self.handleUnexpectedPacket(conn, packet) + else: + if name != app.name: + logging.error('reject an alien cluster') + conn.addPacket(Packet().protocolError(packet.getId(), + 'invalid cluster name')) + conn.abort() + return + + addr = (ip_address, port) + node = app.nm.getNodeByUUID(uuid) + if node is None: + if node_type == MASTER_NODE_TYPE: + node = app.nm.getNodeByServer(addr) + if node is None: + node = MasterNode(server = addr, uuid = uuid) + app.nm.add(node) + else: + # If I do not know such a node, and it is not even a master + # node, simply reject it. + logging.error('reject an unknown node') + conn.addPacket(Packet().notReady(packet.getId(), + 'unknown node')) + conn.abort() + return + else: + # If this node is broken, reject it. + if node.getUUID() == uuid: + if node.getState() == BROKEN_STATE: + p = Packet() + p.brokenNodeDisallowedError(packet.getId(), 'go away') + conn.addPacket(p) + conn.abort() + return + + # Trust the UUID sent by the peer. + node.setUUID(uuid) + conn.setUUID(uuid) + + p = Packet() + p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE, + app.uuid, app.server[0], app.server[1]) + conn.addPacket(p) + + if node_type == MASTER_NODE_TYPE: + conn.abort() + + def handleAcceptNodeIdentification(self, conn, packet, node_type, + uuid, ip_address, port): + if isinstance(conn, ClientConnection): + raise NotImplementedError + else: + self.handleUnexpectedPacket(conn, packet) + + def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, + known_master_list): + self.handleUnexpectedPacket(conn, packet) + + def handleAskLastIDs(self, conn, packet): + self.handleUnexpectedPacket(conn, packet) + + def handleAskPartitionTable(self, conn, packet, offset_list): + self.handleUnexpectedPacket(conn, packet) + + def handleSendPartitionTable(self, conn, packet, ptid, row_list): + self.handleUnexpectedPacket(conn, packet) + + def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): + """This is very similar to Send Partition Table, except that + the information is only about changes from the previous.""" + if isinstance(conn, ClientConnection): + app = self.app + nm = app.nm + pt = app.pt + if app.ptid >= ptid: + # Ignore this packet. + return + + # First, change the table on memory. + app.ptid = ptid + for offset, uuid, state in cell_list: + node = nm.getNodeByUUID(uuid) + if node is None: + node = StorageNode(uuid = uuid) + if uuid != self.uuid: + node.setState(TEMPORARILY_DOWN_STATE) + nm.add(node) + + pt.setCell(offset, node, state) + + # Then, the database. + app.dm.changePartitionTable(ptid, cell_list) + else: + self.handleUnexpectedPacket(conn, packet) + + def handleStartOperation(self, conn, packet): + self.handleUnexpectedPacket(conn, packet) + + def handleStopOperation(self, conn, packet): + if isinstance(conn, ClientConnection): + raise OperationFailure('operation stopped') + else: + self.handleUnexpectedPacket(conn, packet) + + def handleAskUnfinishedTransactions(self, conn, packet): + self.handleUnexpectedPacket(conn, packet) + + def handleAskTransactionInformation(self, conn, packet, tid): + if isinstance(conn, ClientConnection): + self.handleUnexpectedPacket(conn, packet) + else: + app = self.app + t = app.dm.getTransaction(tid) + + p = Packet() + if t is None: + p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid)) + else: + p.answerTransactionInformation(packet.getId(), tid, + t[1], t[2], t[0]) + conn.addPacket(p) + + def handleAskObjectPresent(self, conn, packet, oid, tid): + self.handleUnexpectedPacket(conn, packet) + + def handleDeleteTransaction(self, conn, packet, tid): + self.handleUnexpectedPacket(conn, packet) + + def handleCommitTransaction(self, conn, packet, tid): + self.handleUnexpectedPacket(conn, packet) + + def handleLockInformation(self, conn, packet, tid): + if isinstance(conn, ClientConnection): + app = self.app + try: + t = app.transaction_dict[tid] + object_list = t.getObjectList() + for o in object_list: + app.load_lock_dict[o[0]] = tid + + app.dm.storeTransaction(tid, object_list, t.getTransaction()) + except KeyError: + pass + + conn.addPacket(Packet().notifyInformationLocked(conn.getId(), tid)) + else: + self.handleUnexpectedPacket(conn, packet) + + def handleUnlockInformation(self, conn, packet, tid): + if isinstance(conn, ClientConnection): + app = self.app + try: + t = app.transaction_dict[tid] + object_list = t.getObjectList() + for o in object_list: + oid = o[0] + del app.load_lock_dict[oid] + del app.store_lock_dict[oid] + + app.dm.finishTransaction(tid) + del app.transaction_dict[tid] + + # Now it may be possible to execute some events. + app.executeQueuedEvents() + except KeyError: + pass + else: + self.handleUnexpectedPacket(conn, packet) + + def handleAskObject(self, conn, packet, oid, serial, tid): + app = self.app + if oid in app.load_lock_dict: + # Delay the response. + app.queueEvent(self.handleAskObject, conn, packet, oid, + serial, tid) + return + + if serial == INVALID_SERIAL: + serial = None + if tid == INVALID_TID: + tid = None + o = app.dm.getObject(oid, serial, tid) + p = Packet() + if o is not None: + serial, next_serial, compression, checksum, data = o + if next_serial is None: + next_serial = INVALID_SERIAL + p.answerObject(packet.getId(), oid, serial, next_serial, + compression, checksum, data) + else: + p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid)) + conn.addPacket(p) + + def handleAskTIDs(self, conn, packet, first, last): + # This method is complicated, because I must return TIDs only + # about usable partitions assigned to me. + if first >= last: + conn.addPacket(Packet().protocolError(packet.getId(), + 'invalid offsets')) + return + + app = self.app + + # Collect all usable partitions for me. + getCellList = app.pt.getCellList + partition_list = [] + for offset in xrange(app.num_partitions): + for cell in getCellList(offset, True): + if cell.getUUID() == app.uuid: + partition_list.append(offset) + break + + tid_list = app.dm.getTIDList(first, last - first, + app.num_partitions, partition_list) + conn.addPacket(Packet().answerTIDs(packet.getId(), tid_list)) + + def handleAskObjectHistory(self, conn, packet, oid, length): + app = self.app + history_list = app.dm.getObjectHistory(oid, length) + conn.addPacket(Packet().answerObjectHistory(packet.getId(), + history_list)) + + def handleAskStoreTransaction(self, conn, packet, tid, user, desc, + ext, oid_list): + app = self.app + + try: + t = app.transaction_dict[tid] + except KeyError: + p = Packet() + p.protocolError(packet.getId(), 'unknown tid %s' % dump(tid)) + conn.addPacket(p) + return + + t.addTransaction(oid_list, user, desc, ext) + conn.addPacket(Packet().answerStoreTransaction(packet.getId(), tid)) + + def handleAskStoreObject(self, conn, packet, msg_id, oid, serial, + compression, data, checksum, tid): + uuid = conn.getUUID() + if uuid is None: + self.handleUnexpectedPacket(conn, packet) + return + + # First, check for the locking state. + app = self.app + locking_tid = app.store_lock_dict.get(oid) + if locking_tid is not None: + if locking_tid < tid: + # Delay the response. + app.queueEvent(self.handleAskStoreObject, conn, packet, + msg_id, oid, serial, compression, data, + checksum, tid) + else: + # If a newer transaction already locks this object, + # do not try to resolve a conflict, so return immediately. + conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, + oid, locking_tid)) + return + + # Next, check if this is generated from the latest revision. + history_list = app.dm.getObjectHistory(oid) + if history_list: + last_serial = history_list[0][0] + if last_serial != serial: + conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, + oid, last_serial)) + return + + # Now store the object. + t = app.transaction_dict.setdefault(tid, Transaction(uuid)) + t.addObject(oid, compression, checksum, data) + conn.addPacket(Packet().answerStoreObject(packet.getId(), 0, + oid, serial)) + app.store_lock_dict[oid] = tid +