Commit 2e6ea3cb authored by Julien Muchembled's avatar Julien Muchembled

Prevent client connecting in BACKINGUP state unless configured with read_only=true

Packing now requires the DB to be open RW.
parent e0b638b5
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage, ConflictResolution, POSException from ZODB import BaseStorage, ConflictResolution
from ZODB.POSException import ConflictError, UndoError from ZODB.POSException import (
ConflictError, POSKeyError, ReadOnlyError, UndoError)
from zope.interface import implementer from zope.interface import implementer
import ZODB.interfaces import ZODB.interfaces
...@@ -25,7 +26,7 @@ from .app import Application ...@@ -25,7 +26,7 @@ from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def raiseReadOnlyError(*args, **kw): def raiseReadOnlyError(*args, **kw):
raise POSException.ReadOnlyError() raise ReadOnlyError
@implementer( @implementer(
ZODB.interfaces.IStorage, ZODB.interfaces.IStorage,
...@@ -39,7 +40,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -39,7 +40,7 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage): ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient.""" """Wrapper class for neoclient."""
def __init__(self, master_nodes, name, read_only=False, def __init__(self, master_nodes, name,
compress=None, logfile=None, _app=None, **kw): compress=None, logfile=None, _app=None, **kw):
""" """
Do not pass those parameters (used internally): Do not pass those parameters (used internally):
...@@ -50,30 +51,28 @@ class Storage(BaseStorage.BaseStorage, ...@@ -50,30 +51,28 @@ class Storage(BaseStorage.BaseStorage,
if logfile: if logfile:
logging.setup(logfile) logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, )) BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if read_only:
for method_id in (
'new_oid',
'tpc_begin',
'tpc_vote',
'tpc_abort',
'store',
'deleteObject',
'undo',
'undoLog',
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None: if _app is None:
ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')] ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')]
_app = Application(master_nodes, name, compress=compress, _app = Application(master_nodes, name, compress=compress,
ssl=ssl if any(ssl) else None, **kw) ssl=ssl if any(ssl) else None, **kw)
self.app = _app self.app = _app
if __debug__ and self._is_read_only:
# For ZODB checkWriteMethods:
self.store = self.undo = raiseReadOnlyError
# For tpc_begin, it's checked in Application because it's used
# internally (e.g. pack) and the caller does not want to clean up
# with tpc_abort.
# For other methods, either the master rejects with
# READ_ONLY_ACCESS or the call is outside of a transaction.
@property @property
def _cache(self): def _cache(self):
return self.app._cache return self.app._cache
@property
def _is_read_only(self): # used in BaseStorage, do not rename it
return self.app.read_only
def load(self, oid, version=''): def load(self, oid, version=''):
# XXX: interface definition states that version parameter is # XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make # mandatory, while some ZODB tests do not provide it. For now, make
...@@ -82,7 +81,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -82,7 +81,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid)[:2] return self.app.load(oid)[:2]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
...@@ -151,7 +150,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -151,7 +150,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid, serial)[0] return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r, serial=%r', oid, serial) logging.exception('oid=%r, serial=%r', oid, serial)
raise raise
...@@ -160,7 +159,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -160,7 +159,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid, None, tid) return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError: except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
return None return None
except Exception: except Exception:
...@@ -195,7 +194,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -195,7 +194,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
data, serial, _ = self.app.load(oid) data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
...@@ -215,7 +214,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -215,7 +214,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.history(oid, *args, **kw) return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
......
...@@ -25,7 +25,8 @@ try: ...@@ -25,7 +25,8 @@ try:
except ImportError: except ImportError:
from cPickle import dumps, loads from cPickle import dumps, loads
_protocol = 1 _protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import (
ConflictError, ReadConflictError, ReadOnlyError, UndoError)
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress from neo.lib.compress import decompress_list, getCompress
...@@ -72,7 +73,7 @@ class Application(ThreadedApplication): ...@@ -72,7 +73,7 @@ class Application(ThreadedApplication):
wait_for_pack = False wait_for_pack = False
def __init__(self, master_nodes, name, compress=True, cache_size=None, def __init__(self, master_nodes, name, compress=True, cache_size=None,
ignore_wrong_checksum=False, **kw): read_only=False, ignore_wrong_checksum=False, **kw):
super(Application, self).__init__(parseMasterList(master_nodes), super(Application, self).__init__(parseMasterList(master_nodes),
name, **kw) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
...@@ -108,6 +109,7 @@ class Application(ThreadedApplication): ...@@ -108,6 +109,7 @@ class Application(ThreadedApplication):
self._connecting_to_storage_node = Lock() self._connecting_to_storage_node = Lock()
self._node_failure_dict = {} self._node_failure_dict = {}
self.compress = getCompress(compress) self.compress = getCompress(compress)
self.read_only = read_only
self.ignore_wrong_checksum = ignore_wrong_checksum self.ignore_wrong_checksum = ignore_wrong_checksum
def __getattr__(self, attr): def __getattr__(self, attr):
...@@ -228,7 +230,8 @@ class Application(ThreadedApplication): ...@@ -228,7 +230,8 @@ class Application(ThreadedApplication):
node=node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, {}) self.uuid, None, self.name, None,
{'read_only': True} if self.read_only else {})
try: try:
ask(conn, p, handler=handler) ask(conn, p, handler=handler)
except ConnectionClosed: except ConnectionClosed:
...@@ -505,6 +508,8 @@ class Application(ThreadedApplication): ...@@ -505,6 +508,8 @@ class Application(ThreadedApplication):
def tpc_begin(self, storage, transaction, tid=None, status=' '): def tpc_begin(self, storage, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
if self.read_only:
raise ReadOnlyError
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
txn_context = self._txn_container.new(transaction) txn_context = self._txn_container.new(transaction)
# use the given TID or request a new one to the master # use the given TID or request a new one to the master
......
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
</key> </key>
<key name="read-only" datatype="boolean"> <key name="read-only" datatype="boolean">
<description> <description>
If true, only reads may be executed against the storage. Note If true, only reads may be executed against the storage.
that the "pack" operation is not considered a write operation If false when cluster is backing up, POSException.ReadOnlyError
and is still allowed on a read-only neostorage. is raised.
</description> </description>
</key> </key>
<key name="logfile" datatype="existing-dirpath"> <key name="logfile" datatype="existing-dirpath">
......
...@@ -150,7 +150,10 @@ class Application(BaseApplication): ...@@ -150,7 +150,10 @@ class Application(BaseApplication):
self.election_handler = master.ElectionHandler(self) self.election_handler = master.ElectionHandler(self)
self.secondary_handler = master.SecondaryHandler(self) self.secondary_handler = master.SecondaryHandler(self)
self.client_service_handler = client.ClientServiceHandler(self) self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self) self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(
self)
self.client_backup_service_handler = client.ClientBackupServiceHandler(
self)
self.storage_service_handler = storage.StorageServiceHandler(self) self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
......
...@@ -131,12 +131,13 @@ class ClientServiceHandler(MasterHandler): ...@@ -131,12 +131,13 @@ class ClientServiceHandler(MasterHandler):
else: else:
pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack)) pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack))
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler): class ClientReadOnlyServiceHandler(ClientServiceHandler):
_read_only_message = 'read-only access as requested by the client'
def _readOnly(self, conn, *args, **kw): def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess( conn.answer(Errors.ReadOnlyAccess(self._read_only_message))
'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly askBeginTransaction = _readOnly
askNewOIDs = _readOnly askNewOIDs = _readOnly
...@@ -145,9 +146,15 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler): ...@@ -145,9 +146,15 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler):
askPack = _readOnly askPack = _readOnly
abortTransaction = _readOnly abortTransaction = _readOnly
# like ClientReadOnlyServiceHandler but only for tid <= backup_tid
class ClientBackupServiceHandler(ClientReadOnlyServiceHandler):
_read_only_message = 'read-only access because cluster is in backuping mode'
# XXX LastIDs is not used by client at all, and it requires work to determine # XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client. # last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly askLastIDs = ClientReadOnlyServiceHandler._readOnly.__func__ # Py3
# like in MasterHandler but returns backup_tid instead of last_tid # like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn): def askLastTransaction(self, conn):
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \ from neo.lib.protocol import CellStates, ClusterStates, Errors, NodeStates, \
NodeTypes, Packets, uuid_str NodeTypes, Packets, uuid_str
from ..app import monotonic_time from ..app import monotonic_time
...@@ -63,10 +63,17 @@ class IdentificationHandler(EventHandler): ...@@ -63,10 +63,17 @@ class IdentificationHandler(EventHandler):
new_nid = extra.pop('new_nid', None) new_nid = extra.pop('new_nid', None)
state = NodeStates.RUNNING state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
read_only = extra.pop('read_only', 'backup' in extra)
if app.cluster_state == ClusterStates.RUNNING: if app.cluster_state == ClusterStates.RUNNING:
handler = app.client_service_handler handler = (app.client_ro_service_handler if read_only else
app.client_service_handler)
elif app.cluster_state == ClusterStates.BACKINGUP: elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler if not read_only:
conn.answer(Errors.ReadOnlyAccess(
"read-write access requested"
" but cluster is backing up"))
return
handler = app.client_backup_service_handler
else: else:
raise NotReadyError raise NotReadyError
human_readable_node_type = ' client ' human_readable_node_type = ' client '
......
...@@ -1027,8 +1027,12 @@ class NEOCluster(object): ...@@ -1027,8 +1027,12 @@ class NEOCluster(object):
if not cell.isReadable()] if not cell.isReadable()]
def getZODBStorage(self, **kw): def getZODBStorage(self, **kw):
kw['_app'] = kw.pop('client', self.client) try:
return Storage.Storage(None, self.name, **kw) app = kw.pop('client')
assert not kw, kw
except KeyError:
app = self._newClient(**kw) if kw else self.client
return Storage.Storage(None, self.name, _app=app)
def importZODB(self, dummy_zodb=None, random=random): def importZODB(self, dummy_zodb=None, random=random):
if dummy_zodb is None: if dummy_zodb is None:
......
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
import random, sys, threading, time import random, sys, threading, time
import transaction import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError from ZODB.POSException import (
POSKeyError, ReadOnlyError, StorageTransactionError)
import unittest import unittest
from collections import defaultdict from collections import defaultdict
from functools import wraps from functools import wraps
from itertools import product from itertools import product
from neo.lib import logging from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.database.manager import DatabaseManager from neo.storage.database.manager import DatabaseManager
...@@ -1112,7 +1112,9 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1112,7 +1112,9 @@ class ReplicationTests(NEOThreadedTest):
B = backup B = backup
U = B.upstream U = B.upstream
Z = U.getZODBStorage() Z = U.getZODBStorage()
#Zb = B.getZODBStorage() # XXX see below about invalidations with B.newClient() as client, self.assertRaises(ReadOnlyError):
client.last_tid
#Zb = B.getZODBStorage(read_only=True) # XXX see below about invalidations
oid_list = [] oid_list = []
tid_list = [] tid_list = []
...@@ -1157,7 +1159,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1157,7 +1159,7 @@ class ReplicationTests(NEOThreadedTest):
# read data from B and verify it is what it should be # read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations # XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode. # are not yet implemented in read-only mode.
Zb = B.getZODBStorage() Zb = B.getZODBStorage(read_only=True)
for j, oid in enumerate(oid_list): for j, oid in enumerate(oid_list):
if cutoff <= i < recover and j >= cutoff: if cutoff <= i < recover and j >= cutoff:
self.assertRaises(POSKeyError, Zb.load, oid, '') self.assertRaises(POSKeyError, Zb.load, oid, '')
...@@ -1170,7 +1172,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1170,7 +1172,6 @@ class ReplicationTests(NEOThreadedTest):
# not-yet-fully fetched backup state (transactions committed at # not-yet-fully fetched backup state (transactions committed at
# [cutoff, recover) should not be there; otherwise transactions # [cutoff, recover) should not be there; otherwise transactions
# should be fully there) # should be fully there)
Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator()) Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1) else i+1)
...@@ -1185,15 +1186,12 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1185,15 +1186,12 @@ class ReplicationTests(NEOThreadedTest):
# try to commit something to backup storage and make sure it is # try to commit something to backup storage and make sure it is
# really read-only # really read-only
Zb._cache.max_size = 0 # make store() do work in sync way
txn = transaction.Transaction() txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn) self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid) self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1], # tpc_vote first checks whether the transaction has begun -
tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# thus not ReadOnlyError # thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn) self.assertRaises(StorageTransactionError, Zb.tpc_vote, txn)
if i == loop // 2: if i == loop // 2:
# Check that we survive a disconnection from upstream # Check that we survive a disconnection from upstream
...@@ -1203,8 +1201,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1203,8 +1201,6 @@ class ReplicationTests(NEOThreadedTest):
conn.close() conn.close()
self.tic() self.tic()
# close storage because client app is otherwise shared in
# threaded tests and we need to refresh last_tid on next run
# (XXX see above about invalidations not working) # (XXX see above about invalidations not working)
Zb.close() Zb.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment