Commit 112d91e3 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement an operation handler for storage node. This does not support replication yet.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@67 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f5b79552
......@@ -16,3 +16,5 @@ TODO
- Stopping packet processing by returning a boolean value from
a handler, otherwise too tricky to exchange a handler with another
- Replication.
......@@ -3,6 +3,7 @@ import MySQLdb
import os
from time import time
from struct import pack, unpack
from collections import deque
from neo.config import ConfigurationManager
from neo.protocol import Packet, ProtocolError, \
......@@ -240,8 +241,21 @@ class Application(object):
# for locking objects against load operations.
self.load_lock_dict = {}
# This is a queue of events used to delay operations due to locks.
self.event_queue = deque()
while 1:
em.poll(1)
def queueEvent(self, callable, *args, **kwargs):
self.event_queue.append((callable, args, kwargs))
def executeQueuedEvents(self):
l = len(self.event_queue)
p = self.event_queue.popleft
for i in xrange(l):
callable, args, kwargs = p()
callable(*args, **kwargs)
def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions
......@@ -74,13 +74,14 @@ class DatabaseManager(object):
raise NotImplementedError('this method must be overridden')
def getObject(self, oid, tid = None, before_tid = None):
"""Return a tuple of a serial, a compression
"""Return a tuple of a serial, next serial, a compression
specification, a checksum, and object data, if a given object
ID is present. Otherwise, return None. If tid is None and
before_tid is None, the latest revision is taken. If tid is
specified, the given revision is taken. If tid is not specified,
but before_tid is specified, the latest revision before the
given revision is taken."""
given revision is taken. The next serial is a serial right after
before_tid, if specified. Otherwise, it is None."""
raise NotImplementedError('this method must be overridden')
def changePartitionTable(self, ptid, cell_list):
......@@ -125,8 +126,14 @@ class DatabaseManager(object):
raise NotImplementedError('this method must be overridden')
def getObjectHistory(self, oid, length = 1):
"""Return a list of serials for a given object ID. The length
specifies the maximum size of such a list. The first serial
"""Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. The first serial
must be the last serial, and the list must be sorted in descending
order. If there is no such object ID in a database, return None."""
raise NotImplementedError('this method must be overridden')
def getTIDList(self, offset, length, num_partitions, partition_list):
"""Return a list of TIDs in descending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise NotImplementedError('this method must be overridden')
......@@ -181,9 +181,20 @@ class StorageEventHandler(EventHandler):
def handleUnlockInformation(self, conn, packet, tid):
raise NotImplementedError('this method must be overridden')
def handleAskObject(self, conn, packet, oid, serial, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskTIDs(self, conn, packet, first, last):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectHistory(self, conn, packet, oid, length):
self.handleUnexpectedPacket(conn, packet)
def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskStoreObject(self, conn, packet, msg_id, oid, serial,
compression, data, checksum, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskObject(self, conn, packet, oid, serial, tid):
self.handleUnexpectedPacket(conn, packet)
......@@ -267,12 +267,29 @@ class MySQLDatabaseManager(DatabaseManager):
r = q("""SELECT serial, compression, checksum, data FROM obj
WHERE oid = '%s' AND serial = '%s'""" \
% (oid, tid))
try:
serial, compression, checksum, data = r[0]
next_serial = None
except IndexError:
return None
elif before_tid is not None:
before_tid = e(before_tid)
r = q("""SELECT serial, compression, checksum, data FROM obj
WHERE oid = '%s' AND serial < '%s'
ORDER BY serial DESC LIMIT 1""" \
% (oid, before_tid))
try:
serial, compression, checksum, data = r[0]
r = q("""SELECT serial FROM obj
WHERE oid = '%s' AND serial > '%s'
ORDER BY serial LIMIT 1""" \
% (oid, before_tid))
try:
next_serial = r[0][0]
except IndexError:
next_serial = None
except IndexError:
return None
else:
# XXX I want to express "HAVING serial = MAX(serial)", but
# MySQL does not use an index for a HAVING clause!
......@@ -280,10 +297,13 @@ class MySQLDatabaseManager(DatabaseManager):
WHERE oid = '%s' ORDER BY serial DESC LIMIT 1""" \
% oid)
try:
return r[0]
serial, compression, checksum, data = r[0]
next_serial = None
except IndexError:
return None
return serial, next_serial, compression, checksum, data
def doSetPartitionTable(self, ptid, cell_list, reset):
q = self.query
e = self.escape
......@@ -403,9 +423,17 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
e = self.escape
oid = e(oid)
r = q("""SELECT serial FROM obj WHERE oid = '%s'
r = q("""SELECT serial, LENGTH(data) FROM obj WHERE oid = '%s'
ORDER BY serial DESC LIMIT %d""" \
% (oid, length))
if r:
return [t[0] for t in r]
return r
return None
def getTIDList(self, offset, length, num_partitions, partition_list):
q = self.query
e = self.escape
r = q("""SELECT tid FROM trans WHERE MOD(tid,%d) in (%s)
ORDER BY tid DESC LIMIT %d""" \
% (offset, num_partitions, ','.join(partition_list), length))
return [t[0] for t in r]
import logging
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.utils import dump
from neo.node import MasterNode, StorageNode, ClientNode
from neo.connetion import ClientConnection
from neo.protocol import Packet
class TransactionInformation(object):
"""This class represents information on a transaction."""
def __init__(self, uuid):
self._uuid = uuid
self._object_dict = {}
self._transaction = None
def getUUID(self):
return self._uuid
def addObject(self, oid, compression, checksum, data):
self._object_dict[oid] = (oid, compression, checksum, data)
def addTransaction(self, oid_list, user, desc, ext):
self._transaction = (oid_list, user, desc, ext)
def getObjectList(self):
return self._object_dict.values()
def getTransaction(self):
return self._transaction
class OperationEventHandler(StorageEventHandler):
"""This class deals with events for a operation phase."""
def connectionCompleted(self, conn):
# FIXME this must be implemented for replications.
raise NotImplementedError
def connectionFailed(self, conn):
# FIXME this must be implemented for replications.
raise NotImplementedError
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
# Client nodes and other storage nodes may connect. Also,
# master nodes may connect, only if they misunderstand that
# I am a master node.
StorageEventHandler.connectionAccepted(self, conn, s, addr)
def dealWithClientFailure(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, ClientNode):
for tid, t in app.transaction_dict.items():
if t.getUUID() == uuid:
for o in t.getObjectList():
oid = o[0]
try:
del app.store_lock_dict[oid]
del app.load_lock_dict[oid]
except KeyError:
pass
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection):
if conn.getUUID() == self.app.primary_master_node.getUUID():
# If a primary master node timeouts, I cannot continue.
logging.critical('the primary master node times out')
raise PrimaryFailure('the primary master node times out')
else:
# Otherwise, this connection is to another storage node.
raise NotImplemented
else:
self.dealWithClientFailure(conn)
StorageEventHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn):
if isinstance(conn, ClientConnection):
if conn.getUUID() == self.app.primary_master_node.getUUID():
# If a primary master node closes, I cannot continue.
logging.critical('the primary master node is dead')
raise PrimaryFailure('the primary master node is dead')
else:
# Otherwise, this connection is to another storage node.
raise NotImplemented
else:
self.dealWithClientFailure(conn)
StorageEventHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
if isinstance(conn, ClientConnection):
if conn.getUUID() == self.app.primary_master_node.getUUID():
# If a primary master node gets broken, I cannot continue.
logging.critical('the primary master node is broken')
raise PrimaryFailure('the primary master node is broken')
else:
# Otherwise, this connection is to another storage node.
raise NotImplemented
else:
self.dealWithClientFailure(conn)
StorageEventHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
if name != app.name:
logging.error('reject an alien cluster')
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid cluster name'))
conn.abort()
return
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
if node_type == MASTER_NODE_TYPE:
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If I do not know such a node, and it is not even a master
# node, simply reject it.
logging.error('reject an unknown node')
conn.addPacket(Packet().notReady(packet.getId(),
'unknown node'))
conn.abort()
return
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p)
conn.abort()
return
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = Packet()
p.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
app.uuid, app.server[0], app.server[1])
conn.addPacket(p)
if node_type == MASTER_NODE_TYPE:
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
if isinstance(conn, ClientConnection):
raise NotImplementedError
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskLastIDs(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAskPartitionTable(self, conn, packet, offset_list):
self.handleUnexpectedPacket(conn, packet)
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
if isinstance(conn, ClientConnection):
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
return
# First, change the table on memory.
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != self.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
# Then, the database.
app.dm.changePartitionTable(ptid, cell_list)
else:
self.handleUnexpectedPacket(conn, packet)
def handleStartOperation(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleStopOperation(self, conn, packet):
if isinstance(conn, ClientConnection):
raise OperationFailure('operation stopped')
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskUnfinishedTransactions(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAskTransactionInformation(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
self.handleUnexpectedPacket(conn, packet)
else:
app = self.app
t = app.dm.getTransaction(tid)
p = Packet()
if t is None:
p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
else:
p.answerTransactionInformation(packet.getId(), tid,
t[1], t[2], t[0])
conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid):
self.handleUnexpectedPacket(conn, packet)
def handleDeleteTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleCommitTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleLockInformation(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.addPacket(Packet().notifyInformationLocked(conn.getId(), tid))
else:
self.handleUnexpectedPacket(conn, packet)
def handleUnlockInformation(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del app.load_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid)
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
except KeyError:
pass
else:
self.handleUnexpectedPacket(conn, packet)
def handleAskObject(self, conn, packet, oid, serial, tid):
app = self.app
if oid in app.load_lock_dict:
# Delay the response.
app.queueEvent(self.handleAskObject, conn, packet, oid,
serial, tid)
return
if serial == INVALID_SERIAL:
serial = None
if tid == INVALID_TID:
tid = None
o = app.dm.getObject(oid, serial, tid)
p = Packet()
if o is not None:
serial, next_serial, compression, checksum, data = o
if next_serial is None:
next_serial = INVALID_SERIAL
p.answerObject(packet.getId(), oid, serial, next_serial,
compression, checksum, data)
else:
p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid))
conn.addPacket(p)
def handleAskTIDs(self, conn, packet, first, last):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid offsets'))
return
app = self.app
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list)
conn.addPacket(Packet().answerTIDs(packet.getId(), tid_list))
def handleAskObjectHistory(self, conn, packet, oid, length):
app = self.app
history_list = app.dm.getObjectHistory(oid, length)
conn.addPacket(Packet().answerObjectHistory(packet.getId(),
history_list))
def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list):
app = self.app
try:
t = app.transaction_dict[tid]
except KeyError:
p = Packet()
p.protocolError(packet.getId(), 'unknown tid %s' % dump(tid))
conn.addPacket(p)
return
t.addTransaction(oid_list, user, desc, ext)
conn.addPacket(Packet().answerStoreTransaction(packet.getId(), tid))
def handleAskStoreObject(self, conn, packet, msg_id, oid, serial,
compression, data, checksum, tid):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
# First, check for the locking state.
app = self.app
locking_tid = app.store_lock_dict.get(oid)
if locking_tid is not None:
if locking_tid < tid:
# Delay the response.
app.queueEvent(self.handleAskStoreObject, conn, packet,
msg_id, oid, serial, compression, data,
checksum, tid)
else:
# If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately.
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1,
oid, locking_tid))
return
# Next, check if this is generated from the latest revision.
history_list = app.dm.getObjectHistory(oid)
if history_list:
last_serial = history_list[0][0]
if last_serial != serial:
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1,
oid, last_serial))
return
# Now store the object.
t = app.transaction_dict.setdefault(tid, Transaction(uuid))
t.addObject(oid, compression, checksum, data)
conn.addPacket(Packet().answerStoreObject(packet.getId(), 0,
oid, serial))
app.store_lock_dict[oid] = tid
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment