Commit 2272593b authored by Aurel's avatar Aurel

add handler for invalidateObject

add conflict and non-undoable transaction management in undo
fix some typo, bugs


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@73 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e7639d7a
...@@ -100,17 +100,19 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -100,17 +100,19 @@ class NEOStorage(BaseStorage.BaseStorage,
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
try: try:
return self.app.process_method('store', oid=oid, serial=serial, data=data, return self.app.process_method('store', oid=oid, serial=serial, data=data,
version=version, transaction=transaction) version=version, transaction=transaction)
except NEOStorageConflictError: except NEOStorageConflictError, conflict_serial:
new_data = self.tryToResolveConflict(oid, self.app.tid, if conflict_serial <= self.app.tid:
serial, data) # Try to resolve conflict only if conflicting serial is older
if new_data is not None: # than the current transaction ID
# try again after conflict resolution new_data = self.tryToResolveConflict(oid, self.app.tid,
self.store(oid, serial, new_data, version, transaction) serial, data)
else: if new_data is not None:
raise POSException.ConflictError(oid=oid, # Try again after conflict resolution
serials=(self.app.tid, return self.store(oid, serial, new_data, version, transaction)
serial),data=data) raise POSException.ConflictError(oid=oid,
serials=(self.app.tid,
serial),data=data)
def _clear_temp(self): def _clear_temp(self):
raise NotImplementedError raise NotImplementedError
...@@ -143,7 +145,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -143,7 +145,7 @@ class NEOStorage(BaseStorage.BaseStorage,
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
self._txn_lock_acquire() self._txn_lock_acquire()
try: try:
return self.app.process_method('undo', transaction_id=transaction_id, txn=txn) return self.app.process_method('undo', transaction_id=transaction_id, txn=txn, wrapper=self)
except: except:
self._txn_lock_release() self._txn_lock_release()
......
...@@ -10,11 +10,14 @@ from neo.client.mq import MQ ...@@ -10,11 +10,14 @@ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode from neo.node import NodeManager, MasterNode
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, CLIENT_NODE_TYPE, \ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, CLIENT_NODE_TYPE, \
UP_TO_DATE_STATE, FEEDING_STATE UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import ClientEventHandler from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageNotFoundError from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn from neo.client.multithreading import ThreadingMixIn
from ZODB.POSException import UndoError
class ConnectionManager(object): class ConnectionManager(object):
"""This class manage a pool of connection to storage node.""" """This class manage a pool of connection to storage node."""
...@@ -343,6 +346,8 @@ class Application(ThreadingMixIn, object): ...@@ -343,6 +346,8 @@ class Application(ThreadingMixIn, object):
self.cache[oid] = start_serial, data self.cache[oid] = start_serial, data
finally: finally:
self.cache_lock_release() self.cache_lock_release()
if end_serial == INVALID_SERIAL:
end_serial = None
return loads(data), start_serial, end_serial return loads(data), start_serial, end_serial
...@@ -420,14 +425,14 @@ class Application(ThreadingMixIn, object): ...@@ -420,14 +425,14 @@ class Application(ThreadingMixIn, object):
# Check we don't get any conflict # Check we don't get any conflict
self.txn_object_stored = 0 self.txn_object_stored = 0
self._waitMessage() self._waitMessage()
if self.object_stored == -1: if self.object_stored[0] == -1:
if self.txn_data_dict.has_key(oid): if self.txn_data_dict.has_key(oid):
# One storage already accept the object, is it normal ?? # One storage already accept the object, is it normal ??
# remove from dict and raise ConflictError, don't care of # remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be resent # previous node which already store data as it would be resent
# again if conflict is resolved or txn will be aborted # again if conflict is resolved or txn will be aborted
self.txn_data_dict.pop(oid) self.txn_data_dict.pop(oid)
raise NEOStorageConflictError() raise NEOStorageConflictError(self.object_stored[1])
# Store object in tmp cache # Store object in tmp cache
noid, nserial = self.object_stored noid, nserial = self.object_stored
...@@ -472,25 +477,11 @@ class Application(ThreadingMixIn, object): ...@@ -472,25 +477,11 @@ class Application(ThreadingMixIn, object):
"""Abort current transaction.""" """Abort current transaction."""
if transaction is not self.txn: if transaction is not self.txn:
return return
try:
# Abort txn in node where objects were stored # Abort txn in node where objects were stored
aborted_node = {} aborted_node = {}
for oid in self.txn_oid_list: for oid in self.txn_oid_list:
partition_id = oid % self.num_paritions partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
self.queue.put((None, msg_id, conn, p), True)
aborted_node[storage_node] = 1
# Abort in nodes where transaction was stored
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
...@@ -501,46 +492,57 @@ class Application(ThreadingMixIn, object): ...@@ -501,46 +492,57 @@ class Application(ThreadingMixIn, object):
p = Packet() p = Packet()
p.abortTransaction(msg_id, self.tid) p.abortTransaction(msg_id, self.tid)
self.queue.put((None, msg_id, conn, p), True) self.queue.put((None, msg_id, conn, p), True)
finally: aborted_node[storage_node] = 1
self._clear_txn()
# Abort in nodes where transaction was stored
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
self.queue.put((None, msg_id, conn, p), True)
self._clear_txn()
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
"""Finish current transaction.""" """Finish current transaction."""
if self.txn is not transaction: if self.txn is not transaction:
return return
# Call function given by ZODB
if f is not None:
f()
# Call finish on master
conn = self.master_conn
msg_id = conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self._waitMessage()
if self.txn_finished != 1:
raise NEOStorateError('tpc_finish failed')
# Update cache
self.cache_lock_acquire()
try: try:
# Call function given by ZODB for oid in self.txn_data_dict.keys:
if f is not None: ddata = self.txn_data_dict[oid]
f() # Now serial is same as tid
# Call finish on master self.cache[oid] = self.tid, ddata
conn = self.master_conn
msg_id = conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
self._waitMessage()
if self.txn_finished != 1:
raise NEOStorateError('tpc_finish failed')
# Update cache
self.cache_lock_acquire()
try:
for oid in self.txn_data_dict.keys:
ddata = self.txn_data_dict[oid]
# Now serial is same as tid
self.cache[oid] = self.tid, ddata
finally:
self.cache_lock_release()
return self.tid
finally: finally:
self._clear_txn() self.cache_lock_release()
self._clear_txn()
return self.tid
def undo(self, transaction_id, txn): def undo(self, transaction_id, txn, wrapper):
# XXX conflict and non-undoable txn management is missing
if transaction is not self.txn: if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
...@@ -574,13 +576,35 @@ class Application(ThreadingMixIn, object): ...@@ -574,13 +576,35 @@ class Application(ThreadingMixIn, object):
# Second get object data from storage node using loadBefore # Second get object data from storage node using loadBefore
data_dict = {} data_dict = {}
for oid in oid_list: for oid in oid_list:
data, start, end = self.loadBefore(oid, transaction_id) try:
data, start, end = self.loadBefore(oid, transaction_id)
except NEOStorageNotFoundError:
# Object created by transaction, so no previous record
data_dict[oid] = None
continue
# end must be TID we are going to undone otherwise it means
# a later transaction modify the object
if end != transaction_id:
raise UndoError("non-undoable transaction")
data_dict[oid] = data data_dict[oid] = data
# Third do transaction with old data # Third do transaction with old data
self.tpc_begin(txn) self.tpc_begin(txn)
for oid in data_dict.keys(): for oid in data_dict.keys():
data = data_dict[oid] data = data_dict[oid]
self.store(oid, self.tid, data, None, txn) try:
self.store(oid, self.tid, data, None, txn)
except NEOStorageConflictError, serial:
if serial <= self.tid:
new_data = wrapper.tryToResolveConflict(oid, self.tid, serial
data)
if new_data is not None:
self.store(oid, self.tid, new_data, None, txn)
continue
raise POSException.ConflictError(oid=oid,
serials=(self.tid,
serial),data=data)
self.tpc_vote(txn) self.tpc_vote(txn)
self.tpc_finish(txn) self.tpc_finish(txn)
......
...@@ -3,9 +3,10 @@ import logging ...@@ -3,9 +3,10 @@ import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ from neo.protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID INVALID_UUID, TEMPORARILY_DOWN_STATE, BROKEN_STATE
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.client.NEOStorage import NEOStorageError
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64 from ZODB.utils import p64
...@@ -242,19 +243,19 @@ class ClientEventHandler(EventHandler): ...@@ -242,19 +243,19 @@ class ClientEventHandler(EventHandler):
else: else:
n = app.app.nm.getNodeByServer(addr) n = app.app.nm.getNodeByServer(addr)
if n is None: if n is None:
if node_type == MASTER_NODE: if node_type == MASTER_NODE_TYPE:
n = MasterNode(server = addr) n = MasterNode(server = addr)
if uuid != INVALID_UUID: if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer # If I don't know the UUID yet, believe what the peer
# told me at the moment. # told me at the moment.
if n.getUUID() is None: if n.getUUID() is None:
n.setUUID(uuid) n.setUUID(uuid)
elif node_typ == STORAGE_NODE: elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID: if uuid == INVALID_UUID:
# No interest. # No interest.
continue continue
n = StorageNode(server = addr) n = StorageNode(server = addr)
elif node_typ == CLIENT_NODE: elif node_type == CLIENT_NODE_TYPE:
if uuid == INVALID_UUID: if uuid == INVALID_UUID:
# No interest. # No interest.
continue continue
...@@ -306,7 +307,17 @@ class ClientEventHandler(EventHandler): ...@@ -306,7 +307,17 @@ class ClientEventHandler(EventHandler):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleInvalidateObjects(self, conn, packet, oid_list): def handleInvalidateObjects(self, conn, packet, oid_list):
raise NotImplementedError('this method must be overridden') if isinstance(conn, ClientConnection):
app = self.app
app._cache_lock_acquire()
try:
for oid in oid_list:
if app.cache.has_key(oid):
del app.cache[oid]
finally:
app._cache_lock_release()
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewOIDs(self, conn, packet, oid_list): def handleAnswerNewOIDs(self, conn, packet, oid_list):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
...@@ -337,7 +348,7 @@ class ClientEventHandler(EventHandler): ...@@ -337,7 +348,7 @@ class ClientEventHandler(EventHandler):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
app = self.app app = self.app
if conflicting == '1': if conflicting == '1':
app.object_stored = -1 app.object_stored = -1, serial
else: else:
app.object_stored = oid, serial app.object_stored = oid, serial
else: else:
......
...@@ -102,7 +102,7 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -102,7 +102,7 @@ class SecondaryEventHandler(MasterEventHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE: if node_type != MASTER_NODE_TYPE:
# No interest. # No interest.
continue continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment