Commit fd09575f authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a transaction manager for storage node.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1647 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c99bfcc8
......@@ -28,6 +28,7 @@ from neo.storage.handlers import identification, verification, initialization
from neo.storage.handlers import master, hidden
from neo.storage.replicator import Replicator
from neo.storage.database import buildDatabaseManager
from neo.storage.transactions import TransactionManager
from neo.connector import getConnectorHandler
from neo.pt import PartitionTable
from neo.util import dump
......@@ -46,6 +47,7 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(), config.getDatabase())
# load master nodes
......@@ -67,9 +69,6 @@ class Application(object):
self.master_node = None
# operation related data
self.transaction_dict = {}
self.store_lock_dict = {}
self.load_lock_dict = {}
self.event_queue = None
self.operational = False
......@@ -242,19 +241,7 @@ class Application(object):
# Forget all unfinished data.
self.dm.dropUnfinishedData()
# This is a mapping between transaction IDs and information on
# UUIDs of client nodes which issued transactions and objects
# which were stored.
self.transaction_dict = {}
# This is a mapping between object IDs and transaction IDs. Used
# for locking objects against store operations.
self.store_lock_dict = {}
# This is a mapping between object IDs and transactions IDs. Used
# for locking objects against load operations.
self.load_lock_dict = {}
self.tm.reset()
# This is a queue of events used to delay operations due to locks.
self.event_queue = deque()
......
......@@ -97,10 +97,9 @@ class BaseClientAndStorageOperationHandler(EventHandler):
def askObject(self, conn, oid, serial, tid):
app = self.app
if oid in app.load_lock_dict:
if self.app.tm.loadLocked(oid):
# Delay the response.
app.queueEvent(self.askObject, conn, oid,
serial, tid)
app.queueEvent(self.askObject, conn, oid, serial, tid)
return
o = app.dm.getObject(oid, serial, tid)
if o is not None:
......
......@@ -15,159 +15,46 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo import protocol
from neo.protocol import Packets
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.util import dump
class TransactionInformation(object):
"""This class represents information on a transaction."""
def __init__(self, uuid):
self._uuid = uuid
self._object_dict = {}
self._transaction = None
self._last_oid_changed = False
self._locked = False
def isLocked(self):
return self._locked
def setLocked(self):
self._locked = True
def lastOIDLchange(self):
self._last_oid_changed = True
def isLastOIDChanged(self):
return self._last_oid_changed
def getUUID(self):
return self._uuid
def addObject(self, oid, compression, checksum, data):
self._object_dict[oid] = (oid, compression, checksum, data)
def addTransaction(self, oid_list, user, desc, ext):
self._transaction = (oid_list, user, desc, ext)
def getObjectList(self):
return self._object_dict.values()
def getTransaction(self):
return self._transaction
from neo.storage.transactions import ConflictError, DelayedError
class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def dealWithClientFailure(self, uuid):
app = self.app
for tid, t in app.transaction_dict.items():
if t.getUUID() == uuid:
if t.isLocked():
logging.warning('Node lost while finishing transaction')
break
for o in t.getObjectList():
oid = o[0]
# TODO: remove try..except: pass
# XXX: we release locks without checking if tid owns them
try:
del app.store_lock_dict[oid]
del app.load_lock_dict[oid]
except KeyError:
pass
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
def timeoutExpired(self, conn):
self.dealWithClientFailure(conn.getUUID())
self.app.tm.abortFor(conn.getUUID())
BaseClientAndStorageOperationHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn):
self.dealWithClientFailure(conn.getUUID())
self.app.tm.abortFor(conn.getUUID())
BaseClientAndStorageOperationHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
self.dealWithClientFailure(conn.getUUID())
self.app.tm.abortFor(conn.getUUID())
BaseClientAndStorageOperationHandler.peerBroken(self, conn)
def abortTransaction(self, conn, tid):
app = self.app
# TODO: remove try..except: pass
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
# TODO: remove try..except: pass
# XXX: we release locks without checking if tid owns them
try:
del app.load_lock_dict[oid]
except KeyError:
pass
del app.store_lock_dict[oid]
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
except KeyError:
pass
self.app.tm.abort(tid)
def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list):
uuid = conn.getUUID()
t = self.app.transaction_dict.get(tid, None)
if t is None:
t = TransactionInformation(uuid)
self.app.transaction_dict[tid] = t
if t.isLastOIDChanged():
self.app.dm.setLastOID(self.app.loid)
t.addTransaction(oid_list, user, desc, ext)
self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext)
conn.answer(Packets.AnswerStoreTransaction(tid))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, tid):
uuid = conn.getUUID()
# First, check for the locking state.
app = self.app
locking_tid = app.store_lock_dict.get(oid)
if locking_tid is not None:
if locking_tid < tid:
# Delay the response.
app.queueEvent(self.askStoreObject, conn, oid, serial,
compression, checksum, data, tid)
else:
# If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid))
conn.answer(Packets.AnswerStoreObject(1, oid, locking_tid))
return
# Next, check if this is generated from the latest revision.
history_list = app.dm.getObjectHistory(oid)
if history_list:
last_serial = history_list[0][0]
if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid))
conn.answer(Packets.AnswerStoreObject(1, oid, last_serial))
return
# Now store the object.
t = self.app.transaction_dict.get(tid, None)
if t is None:
t = TransactionInformation(uuid)
self.app.transaction_dict[tid] = t
t.addObject(oid, compression, checksum, data)
try:
self.app.tm.storeObject(uuid, tid, serial, oid, compression,
checksum, data)
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
app.store_lock_dict[oid] = tid
# check if a greater OID last the last generated was used
if oid != protocol.INVALID_OID and oid > self.app.loid:
args = dump(oid), dump(self.app.loid)
logging.warning('Greater OID used in StoreObject : %s > %s', *args)
self.app.loid = oid
t.lastOIDLchange()
except ConflictError, err:
# resolvable or not
tid_or_serial = err.getTID()
conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial))
except DelayedError:
# locked by a previous transaction, retry later
self.app.queueEvent(self.askStoreObject, conn, oid, serial,
compression, checksum, data, tid)
......@@ -57,30 +57,13 @@ class MasterOperationHandler(BaseMasterHandler):
app.replicator.addPartition(offset)
def lockInformation(self, conn, tid):
t = self.app.transaction_dict.get(tid, None)
if t is None:
if not tid in self.app.tm:
raise ProtocolError('Unknown transaction')
t.setLocked()
object_list = t.getObjectList()
for o in object_list:
self.app.load_lock_dict[o[0]] = tid
self.app.dm.storeTransaction(tid, object_list, t.getTransaction())
self.app.tm.lock(tid)
conn.answer(Packets.AnswerInformationLocked(tid))
def notifyUnlockInformation(self, conn, tid):
t = self.app.transaction_dict.get(tid, None)
if t is None:
if not tid in self.app.tm:
raise ProtocolError('Unknown transaction')
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del self.app.load_lock_dict[oid]
del self.app.store_lock_dict[oid]
self.app.dm.finishTransaction(tid)
del self.app.transaction_dict[tid]
# Now it may be possible to execute some events.
self.app.executeQueuedEvents()
# TODO: send an answer
self.app.tm.unlock(tid)
#
# Copyright (C) 2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo.util import dump
class ConflictError(Exception):
"""
Raised when a resolvable conflict occurs
Argument: tid of locking transaction or latest revision
"""
def __init__(self, tid):
self._tid = tid
def getTID(self):
return self._tid
class DelayedError(Exception):
"""
Raised when an object is locked by a previous transaction
"""
class Transaction(object):
"""
Container for a pending transaction
"""
def __init__(self, uuid, tid):
self._uuid = uuid
self._tid = tid
self._object_dict = {}
self._transaction = None
self._locked = False
def getTID(self):
return self._tid
def getUUID(self):
return self._uuid
def lock(self):
assert not self._locked
self._locked = True
def isLocked(self):
return self._locked
def prepare(self, oid_list, user, desc, ext):
"""
Set the transaction informations
"""
# assert self._transaction is not None
self._transaction = (oid_list, user, desc, ext)
def addObject(self, oid, compression, checksum, data):
"""
Add an object to the transaction
"""
self._object_dict[oid] = (oid, compression, checksum, data)
def getObjectList(self):
return self._object_dict.values()
def getOIDList(self):
return self._object_dict.keys()
def getTransactionInformations(self):
assert self._transaction is not None
return self._transaction
class TransactionManager(object):
"""
Manage pending transaction and locks
"""
def __init__(self, app):
self._app = app
self._transaction_dict = {}
self._store_lock_dict = {}
self._load_lock_dict = {}
self._uuid_dict = {}
# TODO: replace app.loid with this one:
self._loid = None
def __contains__(self, tid):
"""
Returns True if the TID is known by the manager
"""
return tid in self._transaction_dict
def _getTransaction(self, tid, uuid):
"""
Get or create the transaction object for this tid
"""
transaction = self._transaction_dict.get(tid, None)
if transaction is None:
transaction = Transaction(uuid, tid)
self._uuid_dict.setdefault(uuid, set()).add(transaction)
self._transaction_dict[tid] = transaction
return transaction
def reset(self):
"""
Reset the transaction manager
"""
self._transaction_dict.clear()
self._store_lock_dict.clear()
self._load_lock_dict.clear()
self._uuid_dict.clear()
def lock(self, tid):
"""
Lock a transaction
"""
transaction = self._transaction_dict[tid]
# remember that the transaction has been locked
transaction.lock()
for oid in transaction.getOIDList():
self._load_lock_dict[oid] = tid
object_list = transaction.getObjectList()
txn_info = transaction.getTransactionInformations()
# store data from memory to temporary table
self._app.dm.storeTransaction(tid, object_list, txn_info)
def unlock(self, tid):
"""
Unlock transaction
"""
self._app.dm.finishTransaction(tid)
self.abort(tid, even_if_locked=True)
# update loid if needed
if self._loid != self._app.loid:
args = dump(self._loid), dump(self._app.loid)
logging.warning('Greater OID used in StoreObject : %s > %s', *args)
self._app.loid = self._loid
self._app.dm.setLastOID(self._app.loid)
def storeTransaction(self, uuid, tid, oid_list, user, desc, ext):
"""
Store transaction information received from client node
"""
transaction = self._getTransaction(tid, uuid)
transaction.prepare(oid_list, user, desc, ext)
def storeObject(self, uuid, tid, serial, oid, compression, checksum, data):
"""
Store an object received from client node
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid, None)
if locking_tid is not None:
if locking_tid < tid:
# a previous transaction lock this object, retry later
raise DelayedError
# If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid))
raise ConflictError(locking_tid)
# check if this is generated from the latest revision.
history_list = self._app.dm.getObjectHistory(oid)
if history_list and history_list[0][0] != serial:
logging.info('resolvable conflict in %s', dump(oid))
raise ConflictError(history_list[0][0])
# store object
transaction = self._getTransaction(tid, uuid)
transaction.addObject(oid, compression, checksum, data)
self._store_lock_dict[oid] = tid
# update loid
self._loid = max(oid, self._app.loid)
def abort(self, tid, even_if_locked=True):
"""
Abort a transaction
"""
if tid not in self._transaction_dict:
# XXX: this happen sometimes, explain or fix
return
transaction = self._transaction_dict[tid]
# if the transaction is locked, ensure we can drop it
if not even_if_locked and transaction.isLocked():
return
# unlock any object
for oid in transaction.getOIDList():
try:
del self._load_lock_dict[oid]
except KeyError:
# XXX: explain why
pass
del self._store_lock_dict[oid]
# _uuid_dict entry will be deleted at node disconnection
self._uuid_dict[transaction.getUUID()].discard(transaction)
del self._transaction_dict[tid]
self._app.executeQueuedEvents()
def abortFor(self, uuid):
"""
Abort any non-locked transaction of a node
"""
# abort any non-locked transaction of this node
for tid in [x.getTID() for x in self._uuid_dict.get(uuid, [])]:
self.abort(tid, even_if_locked=False)
# cleanup _uuid_dict if no transaction remains for this node
if not self._uuid_dict.get(uuid):
del self._uuid_dict[uuid]
def loadLocked(self, oid):
return oid in self._load_lock_dict
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment