Commit d9200f28 authored by Julien Muchembled's avatar Julien Muchembled

Prevent client connecting in BACKINGUP state unless configured with read_only=true

Packing now requires the DB to be open RW.
parent 9dc3e7b4
......@@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.POSException import ConflictError, UndoError
from ZODB import BaseStorage, ConflictResolution
from ZODB.POSException import (
ConflictError, POSKeyError, ReadOnlyError, UndoError)
from zope.interface import implementer
import ZODB.interfaces
......@@ -25,7 +26,7 @@ from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def raiseReadOnlyError(*args, **kw):
raise POSException.ReadOnlyError()
raise ReadOnlyError
@implementer(
ZODB.interfaces.IStorage,
......@@ -39,7 +40,7 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
def __init__(self, master_nodes, name, read_only=False,
def __init__(self, master_nodes, name,
compress=None, logfile=None, _app=None, **kw):
"""
Do not pass those parameters (used internally):
......@@ -50,30 +51,28 @@ class Storage(BaseStorage.BaseStorage,
if logfile:
logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if read_only:
for method_id in (
'new_oid',
'tpc_begin',
'tpc_vote',
'tpc_abort',
'store',
'deleteObject',
'undo',
'undoLog',
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None:
ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')]
_app = Application(master_nodes, name, compress=compress,
ssl=ssl if any(ssl) else None, **kw)
self.app = _app
if __debug__ and self._is_read_only:
# For ZODB checkWriteMethods:
self.store = self.undo = raiseReadOnlyError
# For tpc_begin, it's checked in Application because it's used
# internally (e.g. pack) and the caller does not want to clean up
# with tpc_abort.
# For other methods, either the master rejects with
# READ_ONLY_ACCESS or the call is outside of a transaction.
@property
def _cache(self):
return self.app._cache
@property
def _is_read_only(self): # used in BaseStorage, do not rename it
return self.app.read_only
def load(self, oid, version=''):
# XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make
......@@ -82,7 +81,7 @@ class Storage(BaseStorage.BaseStorage,
try:
return self.app.load(oid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
raise POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
......@@ -151,7 +150,7 @@ class Storage(BaseStorage.BaseStorage,
try:
return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
raise POSKeyError(oid)
except Exception:
logging.exception('oid=%r, serial=%r', oid, serial)
raise
......@@ -160,7 +159,7 @@ class Storage(BaseStorage.BaseStorage,
try:
return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid)
raise POSKeyError(oid)
except NEOStorageNotFoundError:
return None
except Exception:
......@@ -195,7 +194,7 @@ class Storage(BaseStorage.BaseStorage,
try:
data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
raise POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
......@@ -215,7 +214,7 @@ class Storage(BaseStorage.BaseStorage,
try:
return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
raise POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
......
......@@ -25,7 +25,8 @@ try:
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from ZODB.POSException import (
ConflictError, ReadConflictError, ReadOnlyError, UndoError)
from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress
......@@ -72,7 +73,7 @@ class Application(ThreadedApplication):
wait_for_pack = False
def __init__(self, master_nodes, name, compress=True, cache_size=None,
ignore_wrong_checksum=False, **kw):
read_only=False, ignore_wrong_checksum=False, **kw):
super(Application, self).__init__(parseMasterList(master_nodes),
name, **kw)
# Internal Attributes common to all thread
......@@ -108,6 +109,7 @@ class Application(ThreadedApplication):
self._connecting_to_storage_node = Lock()
self._node_failure_dict = {}
self.compress = getCompress(compress)
self.read_only = read_only
self.ignore_wrong_checksum = ignore_wrong_checksum
def __getattr__(self, attr):
......@@ -228,7 +230,8 @@ class Application(ThreadedApplication):
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, {})
self.uuid, None, self.name, None,
{'read_only': True} if self.read_only else {})
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -505,6 +508,8 @@ class Application(ThreadedApplication):
def tpc_begin(self, storage, transaction, tid=None, status=' '):
"""Begin a new transaction."""
if self.read_only:
raise ReadOnlyError
# First get a transaction, only one is allowed at a time
txn_context = self._txn_container.new(transaction)
# use the given TID or request a new one to the master
......
......@@ -26,9 +26,9 @@
</key>
<key name="read-only" datatype="boolean">
<description>
If true, only reads may be executed against the storage. Note
that the "pack" operation is not considered a write operation
and is still allowed on a read-only neostorage.
If true, only reads may be executed against the storage.
If false when cluster is backing up, POSException.ReadOnlyError
is raised.
</description>
</key>
<key name="logfile" datatype="existing-dirpath">
......
......@@ -150,7 +150,10 @@ class Application(BaseApplication):
self.election_handler = master.ElectionHandler(self)
self.secondary_handler = master.SecondaryHandler(self)
self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(
self)
self.client_backup_service_handler = client.ClientBackupServiceHandler(
self)
self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log)
......
......@@ -131,12 +131,13 @@ class ClientServiceHandler(MasterHandler):
else:
pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack))
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler):
_read_only_message = 'read-only access as requested by the client'
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
conn.answer(Errors.ReadOnlyAccess(self._read_only_message))
askBeginTransaction = _readOnly
askNewOIDs = _readOnly
......@@ -145,9 +146,15 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler):
askPack = _readOnly
abortTransaction = _readOnly
# like ClientReadOnlyServiceHandler but only for tid <= backup_tid
class ClientBackupServiceHandler(ClientReadOnlyServiceHandler):
_read_only_message = 'read-only access because cluster is in backuping mode'
# XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly
askLastIDs = ClientReadOnlyServiceHandler._readOnly.__func__ # Py3
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
......
......@@ -17,7 +17,7 @@
from neo.lib import logging
from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError
from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
from neo.lib.protocol import CellStates, ClusterStates, Errors, NodeStates, \
NodeTypes, Packets, uuid_str
from ..app import monotonic_time
......@@ -63,10 +63,17 @@ class IdentificationHandler(EventHandler):
new_nid = extra.pop('new_nid', None)
state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT:
read_only = extra.pop('read_only', 'backup' in extra)
if app.cluster_state == ClusterStates.RUNNING:
handler = app.client_service_handler
handler = (app.client_ro_service_handler if read_only else
app.client_service_handler)
elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler
if not read_only:
conn.answer(Errors.ReadOnlyAccess(
"read-write access requested"
" but cluster is backing up"))
return
handler = app.client_backup_service_handler
else:
raise NotReadyError
human_readable_node_type = ' client '
......
......@@ -1027,8 +1027,12 @@ class NEOCluster(object):
if not cell.isReadable()]
def getZODBStorage(self, **kw):
kw['_app'] = kw.pop('client', self.client)
return Storage.Storage(None, self.name, **kw)
try:
app = kw.pop('client')
assert not kw, kw
except KeyError:
app = self._newClient(**kw) if kw else self.client
return Storage.Storage(None, self.name, _app=app)
def importZODB(self, dummy_zodb=None, random=random):
if dummy_zodb is None:
......
......@@ -17,13 +17,13 @@
import random, sys, threading, time
import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError
from ZODB.POSException import (
POSKeyError, ReadOnlyError, StorageTransactionError)
import unittest
from collections import defaultdict
from functools import wraps
from itertools import product
from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT
from neo.storage.database.manager import DatabaseManager
......@@ -1112,7 +1112,9 @@ class ReplicationTests(NEOThreadedTest):
B = backup
U = B.upstream
Z = U.getZODBStorage()
#Zb = B.getZODBStorage() # XXX see below about invalidations
with B.newClient() as client, self.assertRaises(ReadOnlyError):
client.last_tid
#Zb = B.getZODBStorage(read_only=True) # XXX see below about invalidations
oid_list = []
tid_list = []
......@@ -1157,7 +1159,7 @@ class ReplicationTests(NEOThreadedTest):
# read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode.
Zb = B.getZODBStorage()
Zb = B.getZODBStorage(read_only=True)
for j, oid in enumerate(oid_list):
if cutoff <= i < recover and j >= cutoff:
self.assertRaises(POSKeyError, Zb.load, oid, '')
......@@ -1170,7 +1172,6 @@ class ReplicationTests(NEOThreadedTest):
# not-yet-fully fetched backup state (transactions committed at
# [cutoff, recover) should not be there; otherwise transactions
# should be fully there)
Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1)
......@@ -1185,15 +1186,12 @@ class ReplicationTests(NEOThreadedTest):
# try to commit something to backup storage and make sure it is
# really read-only
Zb._cache.max_size = 0 # make store() do work in sync way
txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1],
tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# tpc_vote first checks whether the transaction has begun -
# thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)
self.assertRaises(StorageTransactionError, Zb.tpc_vote, txn)
if i == loop // 2:
# Check that we survive a disconnection from upstream
......@@ -1203,8 +1201,6 @@ class ReplicationTests(NEOThreadedTest):
conn.close()
self.tic()
# close storage because client app is otherwise shared in
# threaded tests and we need to refresh last_tid on next run
# (XXX see above about invalidations not working)
Zb.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment