Commit 48aa3a91 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2d5b3dff
...@@ -15,9 +15,12 @@ ...@@ -15,9 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import handler from neo.lib import handler
from ZODB.POSException import StorageError from ZODB.POSException import StorageError, ReadOnlyError
class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX
def protocolError(self, conn, message): def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message) raise StorageError("protocol error: %s" % message)
def readOnlyAccess(self, conn, message):
raise ReadOnlyError(message)
...@@ -273,6 +273,7 @@ class NEOLogger(Logger): ...@@ -273,6 +273,7 @@ class NEOLogger(Logger):
self.parent.callHandlers(record) self.parent.callHandlers(record)
def packet(self, connection, packet, outgoing): def packet(self, connection, packet, outgoing):
return
if True or self._db is not None: if True or self._db is not None:
body = packet._body body = packet._body
if self._max_packet and self._max_packet < len(body): if self._max_packet and self._max_packet < len(body):
......
...@@ -75,6 +75,7 @@ def ErrorCodes(): ...@@ -75,6 +75,7 @@ def ErrorCodes():
REPLICATION_ERROR REPLICATION_ERROR
CHECKING_ERROR CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS # TODO use it
@Enum @Enum
def ClusterStates(): def ClusterStates():
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, NotReadyError from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from . import MasterHandler from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
...@@ -122,8 +122,10 @@ class ClientServiceHandler(MasterHandler): ...@@ -122,8 +122,10 @@ class ClientServiceHandler(MasterHandler):
# like ClientServiceHandler but read-only & only for tid <= backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientROServiceHandler(ClientServiceHandler): class ClientROServiceHandler(ClientServiceHandler):
# XXX somehow make sure to propagate this to raiseReadOnlyError() on client ? def _readOnly(self, conn, *args, **kw):
def _readOnly(self, *args, **kw): raise NotReadyError('read-only access') p = Errors.ReadOnlyAccess('read-only access because cluster is in backuping mode')
conn.answer(p)
askBeginTransaction = _readOnly askBeginTransaction = _readOnly
askNewOIDs = _readOnly askNewOIDs = _readOnly
...@@ -138,9 +140,7 @@ class ClientROServiceHandler(ClientServiceHandler): ...@@ -138,9 +140,7 @@ class ClientROServiceHandler(ClientServiceHandler):
# like in MasterHandler but returns backup_tid instead of last_tid # like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn): def askLastTransaction(self, conn):
# XXX wrong vvv assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.backup_tid # FIXME this is not regularly updated - only on request from outside ? backup_tid = self.app.pt.getBackupTid()
# XXX yes -> see askRecovery in master/handlers/
assert backup_tid is not None # in BACKUPING mode it is always set
print '\n\n\nASK LAST_TID -> %r\n' % backup_tid print '\n\n\nASK LAST_TID -> %r\n' % backup_tid
conn.answer(Packets.AnswerLastTransaction(backup_tid)) conn.answer(Packets.AnswerLastTransaction(backup_tid))
...@@ -329,9 +329,9 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -329,9 +329,9 @@ class PartitionTable(neo.lib.pt.PartitionTable):
cell.backup_tid = backup_tid_dict.get(cell.getUUID(), cell.backup_tid = backup_tid_dict.get(cell.getUUID(),
ZERO_TID) ZERO_TID)
def getBackupTid(self, mean=max): # XXX sometimes called with mean=min def getBackupTid(self, mean=max): # XXX naming: sometimes called with mean=min
try: try:
return min(mean(x.backup_tid for x in row if x.isReadable()) # XXX min(mean(...)) - correct? return min(mean(x.backup_tid for x in row if x.isReadable())
for row in self.partition_list) for row in self.partition_list)
except ValueError: except ValueError:
return ZERO_TID return ZERO_TID
......
...@@ -111,7 +111,6 @@ class RecoveryManager(MasterHandler): ...@@ -111,7 +111,6 @@ class RecoveryManager(MasterHandler):
if cell_list: if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges( self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list)) pt.setNextID(), cell_list))
# NOTE
if app.backup_tid: if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict) pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid() app.backup_tid = pt.getBackupTid()
...@@ -148,7 +147,6 @@ class RecoveryManager(MasterHandler): ...@@ -148,7 +147,6 @@ class RecoveryManager(MasterHandler):
# ask the last IDs to perform the recovery # ask the last IDs to perform the recovery
conn.ask(Packets.AskRecovery()) conn.ask(Packets.AskRecovery())
# NOTE
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid): def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID() uuid = conn.getUUID()
if self.target_ptid <= ptid: if self.target_ptid <= ptid:
......
...@@ -18,7 +18,7 @@ from neo.lib import logging ...@@ -18,7 +18,7 @@ from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \ from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION Errors, ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError from ..transactions import ConflictError, DelayedError, NotRegisteredError
from ..exception import AlreadyPendingError from ..exception import AlreadyPendingError
import time import time
...@@ -47,7 +47,7 @@ class ClientOperationHandler(EventHandler): ...@@ -47,7 +47,7 @@ class ClientOperationHandler(EventHandler):
app.queueEvent(self.askObject, conn, (oid, serial, tid)) app.queueEvent(self.askObject, conn, (oid, serial, tid))
return return
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, serial, tid)
print 'AAA %r' % o print 'askObject -> %r' % (o,)
try: try:
serial, next_serial, compression, checksum, data, data_serial = o serial, next_serial, compression, checksum, data, data_serial = o
except TypeError: except TypeError:
...@@ -233,7 +233,9 @@ class ClientOperationHandler(EventHandler): ...@@ -233,7 +233,9 @@ class ClientOperationHandler(EventHandler):
# like ClientOperationHandler but read-only & only for tid <= backup_tid # like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientROOperationHandler(ClientOperationHandler): class ClientROOperationHandler(ClientOperationHandler):
def _readOnly(self, *args, **kw): raise NotReadyError('read-only access') def _readOnly(self, conn, *args, **kw):
p = Errors.ReadOnlyAccess('read-only access because cluster is in backuping mode')
conn.answer(p)
abortTransaction = _readOnly abortTransaction = _readOnly
askStoreTransaction = _readOnly askStoreTransaction = _readOnly
...@@ -255,7 +257,7 @@ class ClientROOperationHandler(ClientOperationHandler): ...@@ -255,7 +257,7 @@ class ClientROOperationHandler(ClientOperationHandler):
def askObject(self, conn, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID() backup_tid = self.app.dm.getBackupTID()
print '\n\n\nASK OBJECT %r, %r, %r (backup_tid: %r)\n\n\n' % (oid, serial, tid, backup_tid) print '\n\n\nASK OBJECT %r, %r, %r (backup_tid: %r)' % (oid, serial, tid, backup_tid)
if serial and serial > backup_tid: if serial and serial > backup_tid:
# obj lookup will find nothing, but return properly either # obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound # OidDoesNotExist or OidNotFound
...@@ -267,9 +269,9 @@ class ClientROOperationHandler(ClientOperationHandler): ...@@ -267,9 +269,9 @@ class ClientROOperationHandler(ClientOperationHandler):
if not serial and not tid: if not serial and not tid:
tid = add64(backup_tid, 1) tid = add64(backup_tid, 1)
print '-> %r %r %r' % (oid, serial, tid) print '-> asking as oid: %r serial: %r tid: %r' % (oid, serial, tid)
super(ClientROOperationHandler, self).askObject(conn, oid, serial, tid) super(ClientROOperationHandler, self).askObject(conn, oid, serial, tid)
print 'XXX' print '(ask object done)'
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition): def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID() backup_tid = self.app.dm.getBackupTID()
......
...@@ -70,6 +70,7 @@ class IdentificationHandler(EventHandler): ...@@ -70,6 +70,7 @@ class IdentificationHandler(EventHandler):
else: else:
raise ProtocolError('reject non-client-or-storage node') raise ProtocolError('reject non-client-or-storage node')
# apply the handler and set up the connection # apply the handler and set up the connection
print 'S: -> handler: %s' % handler
handler = handler(self.app) handler = handler(self.app)
conn.setHandler(handler) conn.setHandler(handler)
node.setConnection(conn, app.uuid < uuid) node.setConnection(conn, app.uuid < uuid)
......
...@@ -698,6 +698,7 @@ class NEOCluster(object): ...@@ -698,6 +698,7 @@ class NEOCluster(object):
@property @property
def backup_tid(self): def backup_tid(self):
# TODO -> self.master.pt.getBackupTid() ?
return self.neoctl.getRecovery()[1] return self.neoctl.getRecovery()[1]
@property @property
......
...@@ -34,10 +34,13 @@ from neo.lib.util import p64 ...@@ -34,10 +34,13 @@ from neo.lib.util import p64
from .. import Patch from .. import Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
from ZODB.POSException import ReadOnlyError
from neo.client.exception import NEOStorageError
# dump log to stderr # dump log to stderr
logging.backlog(max_size=None) logging.backlog(max_size=None)
del logging.default_root_handler.handle del logging.default_root_handler.handle
getLogger().setLevel(DEBUG) getLogger().setLevel(INFO)
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped): def decorator(wrapped):
...@@ -59,7 +62,6 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}): ...@@ -59,7 +62,6 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
return decorator return decorator
class ReplicationTests(NEOThreadedTest): class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition, max_tid=MAX_TID): def checksumPartition(self, storage, partition, max_tid=MAX_TID):
...@@ -522,17 +524,17 @@ class ReplicationTests(NEOThreadedTest): ...@@ -522,17 +524,17 @@ class ReplicationTests(NEOThreadedTest):
@backup_test() @backup_test()
def testBackupReadAccess(self, backup): def testBackupReadAccess(self, backup):
"""Check data can be read from backup cluster by clients""" """Check backup cluster can be used in read-only mode by ZODB clients"""
B = backup B = backup
U = B.upstream U = B.upstream
Z = U.getZODBStorage() Z = U.getZODBStorage()
#Zb = B.getZODBStorage() #Zb = B.getZODBStorage() # XXX see below about invalidations
oid_list = [] oid_list = []
tid_list = [] tid_list = []
for i in xrange(10): for i in xrange(10):
# store new data to U # commit new data to U
txn = transaction.Transaction() txn = transaction.Transaction()
Z.tpc_begin(txn) Z.tpc_begin(txn)
oid = Z.new_oid() oid = Z.new_oid()
...@@ -548,20 +550,32 @@ class ReplicationTests(NEOThreadedTest): ...@@ -548,20 +550,32 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(B.last_tid, U.last_tid) self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(1, self.checkBackup(B)) self.assertEqual(1, self.checkBackup(B))
print '\n\n111 tid: %r, last_tid: %r, backup_tid: %r' % (tid, B.backup_tid, U.last_tid) # read data from B and verify it is what it should be
# XXX we open new storage every time because invalidations are not
# try to read data from B # yet implemented in read-only mode.
# XXX we open new storage every time becasue invalidations are not yet implemented in read-only mode.
Zb = B.getZODBStorage() Zb = B.getZODBStorage()
#Zb.sync()
#Zb._cache.clear()
for j, oid in enumerate(oid_list): for j, oid in enumerate(oid_list):
data = Zb.load(oid, '') data, serial = Zb.load(oid, '')
self.assertEqual(data, '%s-%s' % (oid, j)) self.assertEqual(data, '%s-%s' % (oid, j))
#Zb.loadSerial(oid, tid) self.assertEqual(serial, tid_list[j])
#Zb.loadBefore(oid, tid)
# close storage because client app is otherwise shared in threaded
# tests and we need to refresh last_tid on next run
# (see above about invalidations not working)
Zb.close()
# try to commit something to backup storage and make sure it is really read-only
Zb = B.getZODBStorage()
Zb._cache._max_size = 0 # make stores do work in sync way
txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1], tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies - thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)
Zb.close()
# TODO close Zb / client
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment