Commit b09c876e authored by Aurel's avatar Aurel

redefine handler

merge loadBefore with other load in client part
first version for undo method


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@62 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 06b23421
......@@ -3,15 +3,15 @@ import os
from time import time
from threading import Lock, Condition
from cPickle import dumps, loads
from zlib import compress, alder32, decompress
from zlib import compress, adler32, decompress
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
from neo.protocol import Packet, INVALID_UUID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE
from neo.client.master import MasterEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageKeyError
from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageNotFoundError
class ConnectionManager(object):
"""This class manage a pool of connection to storage node."""
......@@ -27,15 +27,15 @@ class ConnectionManager(object):
l = Lock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
def _initNodeConnection(self, addr):
"""Init a connection to a given storage node."""
handler = StorageEventHandler(self.storage)
"""Init a connection to a given storage node."""
handler = ClientEventHandler(self.storage)
conn = ClientConnection(self.storage.em, handler, addr)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, addr[0],
addr[1], 'main')
addr[1], self.storage.name)
conn.expectMessage(msg_id)
while 1:
self.em.poll(1)
......@@ -43,7 +43,7 @@ class ConnectionManager(object):
break
logging.debug('connected to a storage node %s' %(addr,))
return conn
def _dropConnection(self,):
"""Drop a connection."""
pass
......@@ -78,19 +78,20 @@ class ConnectionManager(object):
else:
# Create new connection to node
return self._createNodeConnection(node)
class Application(object):
"""The client node application."""
def __init__(self, master_addr, master_port, **kw):
def __init__(self, master_addr, master_port, name, **kw):
logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address is %s, port is %d' %(master_addr, master_port))
# Internal Attributes
self.name = name
self.em = EventManager()
self.nm = NodeManager()
self.cm = ConnectionManager(self)
self.cm = ConnectionManager(self)
self.pt = None
self.primary_master_node = None
self.master_conn = None
......@@ -107,8 +108,8 @@ class Application(object):
# object_stored is used to know if storage node
# accepted the object or raised a conflict
# 0 : no answer yet
# 1 : ok
# 2 : conflict
# -1 : conflict
# oid, serial : ok
self.object_stored = 0
# Lock definition :
# _oid_lock is used in order to not call multiple oid
......@@ -121,7 +122,7 @@ class Application(object):
# _info_lock is used when retrieving information for object or transaction
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
self._oid_lock_release = lock.release
lock = Lock()
self._txn_lock_acquire = lock.acquire
self._txn_lock_release = lock.release
......@@ -147,7 +148,7 @@ class Application(object):
defined_master_addr = (master_addr, master_port)
while 1:
self.node_not_ready = 0
logging.debug("trying to connect to primary master...")
logging.debug("trying to connect to primary master...")
self.connectToPrimaryMasterNode(defined_master_addr)
if not self.node_not_ready and self.pt.filled():
# got a connection and partition table
......@@ -161,7 +162,7 @@ class Application(object):
def connectToPrimaryMasterNode(self, defined_master_addr):
"""Connect to the primary master node."""
handler = MasterEventHandler(self)
handler = ClientEventHandler(self)
n = MasterNode(server = defined_master_addr)
self.nm.add(n)
......@@ -172,7 +173,7 @@ class Application(object):
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
defined_master_addr[0],
defined_master_addr[1], 'main')
defined_master_addr[1], self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1:
......@@ -195,7 +196,7 @@ class Application(object):
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
self.primary_master_node.server[0],
self.primary_master_node.server[1] , 'main')
self.primary_master_node.server[1] , self.name)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.master_conn = conn
......@@ -242,9 +243,9 @@ class Application(object):
if len(hist) == 0:
raise NEOStorageNotFoundError()
return hist[0][0]
def _load(self, oid, serial="", cache=0):
"""Internal method which manage load and loadSerial."""
"""Internal method which manage load ,loadSerial and loadBefore."""
partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
......@@ -259,7 +260,7 @@ class Application(object):
p.askObjectByOID(msg_id, oid, serial)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
# Wait for answer
self.loaded_object = None
try:
while 1:
......@@ -270,7 +271,7 @@ class Application(object):
# OID not found
continue
# Copy object data here to release lock as soon as possible
noid, serial, compression, checksum, data = self.loaded_object
noid, start_serial, end_serial, compression, checksum, data = self.loaded_object
finally:
self._load_lock_release()
# Check data here
......@@ -281,7 +282,7 @@ class Application(object):
# Reacquire lock and try again
self._load_lock_acquire()
continue
elif compression and checksum != alder32(data):
elif compression and checksum != adler32(data):
# Check checksum if we use compression
logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid))
......@@ -300,10 +301,10 @@ class Application(object):
if cache:
self.cache_lock_acquire()
try:
self.cache[oid] = serial, data
self.cache[oid] = start_serial, data
finally:
self.cache_lock_release()
return loads(data), serial
return loads(data), start_serial, end_serial
def load(self, oid, version=None):
"""Load an object for a given oid."""
......@@ -315,13 +316,18 @@ class Application(object):
finally:
self._cache_lock_release()
# Otherwise get it from storage node
return self._load(oid, cache=1)
return self._load(oid, cache=1)[:2]
def loadSerial(self, oid, serial):
"""Load an object for a given oid and serial."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, serial), None
return self._load(oid, serial)[:2], None
def loadBefore(oid, tid):
"""Load an object for a given oid before tid committed."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, tid)
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
......@@ -341,7 +347,7 @@ class Application(object):
p.askNewTID(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
# Wait for answer
while 1:
self.em.poll(1)
if self.tid is not None:
......@@ -359,7 +365,7 @@ class Application(object):
# Store data on each node
ddata = dumps(data)
compressed_data = compress(ddata)
crc = alder32(compressed_data)
crc = adler32(compressed_data)
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
......@@ -390,7 +396,7 @@ class Application(object):
# Store object in tmp cache
self.txn_data_dict[oid] = ddata
break
def tpc_vote(self, transaction):
"""Store current transaction."""
if transaction is not self.txn:
......@@ -418,7 +424,7 @@ class Application(object):
"""Clear some transaction parameter and release lock."""
self.txn = None
self._txn_lock_release()
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.txn:
......@@ -426,7 +432,7 @@ class Application(object):
try:
# Abort transaction on each node used for it
# In node where objects were stored
aborted_node = {}
aborted_node = {}
for oid in self.txn_oid_list:
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
......@@ -450,7 +456,7 @@ class Application(object):
conn.addPacket(p)
finally:
self._clear_txn()
def tpc_finish(self, transaction, f=None):
"""Finish current transaction."""
if self.txn is not transaction:
......@@ -478,7 +484,7 @@ class Application(object):
for oid in self.txn_data_dict.keys:
ddata = self.txn_data_dict[oid]
# Now serial is same as tid
self.cache[oid] = self.tid, ddata
self.cache[oid] = self.tid, ddata
finally:
self.cache_lock_release()
# Release transaction
......@@ -486,66 +492,43 @@ class Application(object):
finally:
self._clear_txn()
def loadBefore(self, oid, tid):
partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
self._load_before_lock_acquire()
data = None
# Store data on each node
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askObjectByTID(msg_id, oid, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.loaded_object_by_tid = None
try:
def undo(self, transaction_id, txn):
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
# First get transaction information from master node
self._info_lock_acquire()
try:
partition_id = transaction_id % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.txn_info = None
while 1:
self.em.poll(1)
if self.loaded_object_by_tid is not None:
if self.txn_info is not None:
break
if self.loaded_object_by_tid == -1:
# OID not found
continue
# Copy object data here to release lock as soon as possible
noid, start, end, compression, checksum, data = self.loaded_object
finally:
self._load_before_lock_release()
# Check data here
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s' \
%(noid, oid, storage_node.getServer()))
# Reacquire lock and try again
self._load_before_lock_acquire()
continue
elif compression and checksum != alder32(data):
# Check checksum if we use compression
logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid))
# Reacquire lock and try again
self._load_before_lock_acquire()
continue
else:
break
if data is None:
# We didn't got any object from storage node
raise NEOStorageNotFoundError()
# Uncompress data
if compression:
data = decompressed(data)
return loads(data), start, end
oid_list = self.txn_info['oids']
finally:
self._info_lock_releas()
# Second get object data from storage node using loadBefore
data_dict = {}
for oid in oid_list:
data, start, end = self.loadBefore(oid, transaction_id)
data_dict[oid] = data
# Third do transaction with old data
self.tpc_begin(txn)
for oid in data_dict.keys():
data = data_dict[oid]
self.store(oid, self.tid, data, None, txn)
self.tpc_vote(txn)
self.tpc_finish(txn)
def undo(self, transaction_id, txn):
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
raise NotImplementedError
def undoInfo(self, first, last, specification=None):
# First get list of transaction from master node
self._info_lock_acquire()
......@@ -553,7 +536,7 @@ class Application(object):
conn = self.master_conn
msg_id = conn.getNextId()
p = Packet()
p.getTIDList(msg_id, first, last, specification)
p.askTIDs(msg_id, first, last, specification)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
......@@ -576,16 +559,17 @@ class Application(object):
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.undo_txn_info = None
self.txn_info = None
while 1:
self.em.poll(1)
if self.undo_txn_info is not None:
if self.txn_info is not None:
break
undo_txn_list.append(self.undo_txn_info)
self.txn_info.pop("oids")
undo_txn_list.append(self.txn_info)
return undo_txn_dict
finally:
self._info_lock_release()
self._info_lock_release()
def history(self, oid, version, length=1, filter=None, object_only=0):
self._info_lock_acquire()
history_list = []
......@@ -634,6 +618,7 @@ class Application(object):
break
# create history dict
self.txn_info.remove('id')
self.txn_info.remove('oids')
self.txn_info['serial'] = serial
self.txn_info['version'] = None
self.txn_info['size'] = size
......
......@@ -7,7 +7,10 @@ from neo.protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NOD
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
class MasterEventHandler(EventHandler):
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
class ClientEventHandler(EventHandler):
"""This class deals with events for a master."""
def __init__(self, app):
......@@ -27,17 +30,16 @@ class MasterEventHandler(EventHandler):
if isinstance(conn, ClientConnection):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
# It can be eiter a master node or a storage node
if node_type == CLIENT_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
......@@ -45,20 +47,32 @@ class MasterEventHandler(EventHandler):
conn.setUUID(uuid)
node.setUUID(uuid)
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
if node_type == MASTER_NODE_TYPE:
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
else:
self.handleUnexpectedPacket(conn, packet)
# Master node handler
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
if isinstance(conn, ClientConnection):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if not isinstance(node, MasterNode):
return
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
......@@ -73,7 +87,7 @@ class MasterEventHandler(EventHandler):
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
......@@ -91,44 +105,91 @@ class MasterEventHandler(EventHandler):
else:
self.handleUnexpectedPacket(conn, packet)
def handleSendPartitionTable(self, conn, packet, row_list):
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
if isinstance(conn, ClientConnection):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if not isinstance(node, MasterNode) \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
for offset, node in row_list:
app.pt.setRow(offset, row)
app.pt.setRow(offset, row)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.app.nm.getNodeByServer(addr)
if n is None:
if node_type == MASTER_NODE:
n = MasterNode(server = addr)
elif node_typ == STORAGE_NODE:
n = StorageNode(server = addr)
elif node_typ == CLIENT_NODE:
n = ClientNode(server = addr)
else:
continue
app.app.nm.add(n)
if isinstance(conn, ClientConnection):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
def handleNotifyPartitionChanges(self, conn, packet, cell_list):
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if not isinstance(node, MasterNode) \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.app.nm.getNodeByServer(addr)
if n is None:
if node_type == MASTER_NODE:
n = MasterNode(server = addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_typ == STORAGE_NODE:
if uuid == INVALID_UUID:
# No interest.
continue
n = StorageNode(server = addr)
elif node_typ == CLIENT_NODE:
if uuid == INVALID_UUID:
# No interest.
continue
n = ClientNode(server = addr)
else:
continue
app.app.nm.add(n)
n.setState(state)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
if isinstance(conn, ClientConnection):
app = self.app
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if not isinstance(node, MasterNode) \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
for cell in cell_list:
app.pt.addNode(cell)
else:
......@@ -144,9 +205,9 @@ class MasterEventHandler(EventHandler):
def handleNotifyTransactionFinished(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
if tid != app.tid:
# what's this ?
raise
if tid != app.tid:
# What's this ?
raise NEOStorageError
else:
app.txn_finished = 1
else:
......@@ -155,10 +216,60 @@ class MasterEventHandler(EventHandler):
def handleInvalidateObjects(self, conn, packet, oid_list):
raise NotImplementedError('this method must be overridden')
def handleAnswerNewOIDList(self, conn, packet, oid_list):
def handleAnswerNewOIDs(self, conn, packet, oid_list):
if isinstance(conn, ClientConnection):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
else:
self.handleUnexpectedPacket(conn, packet)
# Storage node handler
def handleAnwserObjectByOID(self, oid, start_serial, end_serial, compression,
checksum, data):
if isinstance(conn, ClientConnection):
app = self.app
app.loaded_object = (oid, start_serial, end_serial, compression,
checksum, data)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreObject(self, conflicting, oid, serial):
if isinstance(conn, ClientConnection):
app = self.app
if conflicting == '1':
app.object_stored = -1
else:
app.object_stored = oid, serial
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreTransaction(self, tid):
if isinstance(conn, ClientConnection):
app = self.app
app.txn_stored = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTransactionInformation(self, tid, user, desc, oid_list):
if isinstance(conn, ClientConnection):
app = self.app
# transaction information are returned as a dict
info = {}
info['time'] = TimeStamp(p64(long(tid))).timeTime()
info['user_name'] = user
info['description'] = desc
info['id'] = p64(long(tid))
info['oids'] = oid_list
app.txn_info = info
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectHistory(self, oid, history_list):
if isinstance(conn, ClientConnection):
app = self.app
# history_list is a list of tuple (serial, size)
self.history = oid, history_list
else:
self.handleUnexpectedPacket(conn, packet)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment