Commit e465d2c1 authored by Jim Fulton's avatar Jim Fulton

Made locking logic more robust.

- Improved ZEO server commit lock logging.  Now, locking activity is
  logged at the debug level until the number of wating lock requests
  gets above 3.  Log at the critical level when the number of waiting
  lock requests gets above 9.

- ZEO servers no longer log their pids in every log message. It's just
  not interesting. :)
parent b16a8ba1
......@@ -17,12 +17,25 @@ New Features
help in situations where object ids are used as BTree keys and the
sequential allocation of object ids leads to conflict errors.
- ZEO servers now support a server_status method for for getting
information the number of clients, lock requests and general
statistics.
- The mkzeoinst script has been moved to a separate project:
http://pypi.python.org/pypi/zope.mkzeoinstance
and is no-longer included with ZODB.
- Improved ZEO server commit lock logging. Now, locking activity is
logged at the debug level until the number of wating lock requests
gets above 3. Log at the critical level when the number of waiting
lock requests gets above 9.
- ZEO servers no longer log their pids in every log message. It's just
not interesting. :)
Bugs Fixed
----------
......@@ -47,6 +60,8 @@ Bugs Fixed
https://bugs.launchpad.net/zodb/+bug/435547
- Fixed some problems in ZEO server commit lock management.
3.10.0a1 (2010-02-08)
=====================
......
......@@ -57,20 +57,8 @@ from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer')
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label = "" # default label used for logging.
def set_label():
"""Internal helper to reset the logging label (e.g. after fork())."""
global _label
_label = "%s" % os.getpid()
def log(message, level=logging.INFO, label=None, exc_info=False):
def log(message, level=logging.INFO, label='', exc_info=False):
"""Internal helper to log a message."""
label = label or _label
if label:
message = "(%s) %s" % (label, message)
logger.log(level, message, exc_info=exc_info)
......@@ -97,10 +85,10 @@ class ZEOStorage:
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
self.log_label = 'unconnected'
self.locked = False # Don't have storage lock
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
......@@ -131,13 +119,7 @@ class ZEOStorage:
conn.register_object(ZEOStorage308Adapter(self))
else:
self.client = ClientStub(conn)
addr = conn.addr
if isinstance(addr, type("")):
label = addr
else:
host, port = addr
label = str(host) + ":" + str(port)
self.log_label = _label + "/" + label
self.log_label = _addr_label(conn.addr)
def notifyDisconnected(self):
self.connection = None
......@@ -146,7 +128,7 @@ class ZEOStorage:
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during %s transaction"
% self.locked and 'locked' or 'unlocked')
% (self.locked and 'locked' or 'unlocked'))
self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
......@@ -451,7 +433,9 @@ class ZEOStorage:
if self.locked:
self.server.unlock_storage(self)
self.locked = 0
self.transaction = None
if self.transaction is not None:
self.server.stop_waiting(self)
self.transaction = None
self.stats.active_txns -= 1
if self.txnlog is not None:
self.txnlog.close()
......@@ -462,12 +446,22 @@ class ZEOStorage:
def vote(self, tid):
self._check_tid(tid, exc=StorageTransactionError)
if self.locked or self.server.already_waiting(self):
raise StorageTransactionError(
'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
)
return self._try_to_vote()
def _try_to_vote(self, delay=None):
if self.connection is None:
return # We're disconnected
self.locked = self.server.lock_storage(self)
if delay is not None and delay.sent:
# as a consequence of the unlocking strategy, _try_to_vote
# may be called multiple times for delayed
# transactions. The first call will mark the delay as
# sent. We should skip if the delay was already sent.
return
self.locked, delay = self.server.lock_storage(self, delay)
if self.locked:
try:
self.log(
......@@ -511,17 +505,17 @@ class ZEOStorage:
else:
if delay is not None:
delay.reply(None)
else:
return None
else:
if delay == None:
self.log("(%r) queue lock: transactions waiting: %s"
% (self.storage_id, self.server.waiting(self)+1))
delay = Delay()
self.server.unlock_callback(self, delay)
return delay
def _unlock_callback(self, delay):
connection = self.connection
if connection is not None:
if connection is None:
self.server.stop_waiting(self)
else:
connection.call_from_thread(self._try_to_vote, delay)
# The public methods of the ZEO client API do not do the real work.
......@@ -764,6 +758,8 @@ class ZEOStorage:
for iid in iids:
self._iterators.pop(iid, None)
def server_status(self):
return self.server.server_status(self)
class StorageServerDB:
......@@ -873,7 +869,6 @@ class StorageServer:
self.addr = addr
self.storages = storages
set_label()
msg = ", ".join(
["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
storage.getName())
......@@ -884,7 +879,7 @@ class StorageServer:
self._lock = threading.Lock()
self._commit_locks = {}
self._unlock_callbacks = dict((name, []) for name in storages)
self._waiting = dict((name, []) for name in storages)
self.read_only = read_only
self.auth_protocol = auth_protocol
......@@ -1171,29 +1166,59 @@ class StorageServer:
if conn.obj in cl:
cl.remove(conn.obj)
def lock_storage(self, zeostore):
def lock_storage(self, zeostore, delay):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
if storage_id in self._commit_locks:
return False
self._commit_locks[storage_id] = zeostore
self.timeouts[storage_id].begin(zeostore)
self.stats[storage_id].lock_time = time.time()
return True
# The lock is held by another zeostore
assert self._commit_locks[storage_id] is not zeostore, (
storage_id, delay)
if delay is None:
# New request, queue it
assert not [i for i in waiting if i[0] is zeostore
], "already waiting"
delay = Delay()
waiting.append((zeostore, delay))
zeostore.log("(%r) queue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return False, delay
else:
self._commit_locks[storage_id] = zeostore
self.timeouts[storage_id].begin(zeostore)
self.stats[storage_id].lock_time = time.time()
if delay is not None:
# we were waiting, stop
waiting[:] = [i for i in waiting if i[0] is not zeostore]
zeostore.log("(%r) lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
return True, delay
def unlock_storage(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
assert self._commit_locks[storage_id] is zeostore
del self._commit_locks[storage_id]
self.timeouts[storage_id].end(zeostore)
self.stats[storage_id].lock_time = None
callbacks = self._unlock_callbacks[storage_id][:]
del self._unlock_callbacks[storage_id][:]
callbacks = waiting[:]
if callbacks:
assert not [i for i in waiting if i[0] is zeostore
], "waiting while unlocking"
zeostore.log("(%r) unlock: transactions waiting: %s"
% (storage_id, len(callbacks)-1))
% (storage_id, len(callbacks)),
_level_for_waiting(callbacks)
)
for zeostore, delay in callbacks:
try:
......@@ -1203,13 +1228,41 @@ class StorageServer:
except Exception:
logger.exception("Calling unlock callback")
def unlock_callback(self, zeostore, delay):
def stop_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
self._unlock_callbacks[storage_id].append((zeostore, delay))
new_waiting = [i for i in waiting if i[0] is not zeostore]
if len(new_waiting) == len(waiting):
return
waiting[:] = new_waiting
def waiting(self, zeostore):
return len(self._unlock_callbacks[zeostore.storage_id])
zeostore.log("(%r) dequeue lock: transactions waiting: %s"
% (storage_id, len(waiting)),
_level_for_waiting(waiting)
)
def already_waiting(self, zeostore):
storage_id = zeostore.storage_id
waiting = self._waiting[storage_id]
with self._lock:
return bool([i for i in waiting if i[0] is zeostore])
def server_status(self, zeostore):
storage_id = zeostore.storage_id
status = self.stats[storage_id].__dict__.copy()
status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id])
return status
def _level_for_waiting(waiting):
if len(waiting) > 9:
return logging.CRITICAL
if len(waiting) > 3:
return logging.WARNING
else:
return logging.DEBUG
class StubTimeoutThread:
......@@ -1454,4 +1507,10 @@ class ZEOStorage308Adapter:
def __getattr__(self, name):
return getattr(self.storage, name)
def _addr_label(addr):
if isinstance(addr, type("")):
return addr
else:
host, port = addr
return str(host) + ":" + str(port)
......@@ -1219,7 +1219,7 @@ def runzeo_without_configfile():
------
--T INFO ZEO.runzeo () opening storage '1' using FileStorage
------
--T INFO ZEO.StorageServer () StorageServer created RW with storages 1RWt
--T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
------
--T INFO ZEO.zrpc () listening on ...
------
......
......@@ -13,6 +13,7 @@
##############################################################################
from zope.testing import doctest, setupstack, renormalizing
import logging
import pprint
import re
import sys
import transaction
......@@ -93,7 +94,6 @@ client will be restarted. It will get a conflict error, that is
handled correctly:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
(511/test-addr) ('1') unlock: transactions waiting: 0
2 callAsync serialnos ...
reply 1 None
......@@ -200,12 +200,210 @@ Of course, if vote suceeds, the lock will be held:
"""
def some_basic_locking_tests():
r"""
>>> itid = 0
>>> def start_trans(zs):
... global itid
... itid += 1
... tid = str(itid)
... zs.tpc_begin(tid, '', '', {})
... zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', tid)
... return tid
>>> server = ZEO.tests.servertesting.StorageServer()
>>> handler = logging.StreamHandler(sys.stdout)
>>> handler.setFormatter(logging.Formatter(
... '%(name)s %(levelname)s\n%(message)s'))
>>> logging.getLogger('ZEO').addHandler(handler)
>>> logging.getLogger('ZEO').setLevel(logging.DEBUG)
We start a transaction and vote, this leads to getting the lock.
>>> zs1 = ZEO.tests.servertesting.client(server, '1')
>>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
1 callAsync serialnos ...
If another client tried to vote, it's lock request will be queued and
a delay will be returned:
>>> zs2 = ZEO.tests.servertesting.client(server, '2')
>>> tid2 = start_trans(zs2)
>>> delay = zs2.vote(tid2)
ZEO.StorageServer DEBUG
(test-addr-2) ('1') queue lock: transactions waiting: 1
>>> delay.set_sender(0, zs2.connection)
When we end the first transaction, the queued vote gets the lock.
>>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 1
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes
2 callAsync serialnos ...
Let's try again with the first client. The vote will be queued:
>>> tid1 = start_trans(zs1)
>>> delay = zs1.vote(tid1)
ZEO.StorageServer DEBUG
(test-addr-1) ('1') queue lock: transactions waiting: 1
If the queued transaction is aborted, it will be dequeued:
>>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') dequeue lock: transactions waiting: 0
BTW, voting multiple times will error:
>>> zs2.vote(tid2)
Traceback (most recent call last):
...
StorageTransactionError: Already voting (locked)
>>> tid1 = start_trans(zs1)
>>> delay = zs1.vote(tid1)
ZEO.StorageServer DEBUG
(test-addr-1) ('1') queue lock: transactions waiting: 1
>>> delay.set_sender(0, zs1.connection)
>>> zs1.vote(tid1)
Traceback (most recent call last):
...
StorageTransactionError: Already voting (waiting)
Note that the locking activity is logged at debug level to avoid
cluttering log files, however, as the number of waiting votes
increased, so does the logging level:
>>> clients = []
>>> for i in range(9):
... client = ZEO.tests.servertesting.client(server, str(i+10))
... tid = start_trans(client)
... delay = client.vote(tid)
... clients.append(client)
ZEO.StorageServer DEBUG
(test-addr-10) ('1') queue lock: transactions waiting: 2
ZEO.StorageServer DEBUG
(test-addr-11) ('1') queue lock: transactions waiting: 3
ZEO.StorageServer WARNING
(test-addr-12) ('1') queue lock: transactions waiting: 4
ZEO.StorageServer WARNING
(test-addr-13) ('1') queue lock: transactions waiting: 5
ZEO.StorageServer WARNING
(test-addr-14) ('1') queue lock: transactions waiting: 6
ZEO.StorageServer WARNING
(test-addr-15) ('1') queue lock: transactions waiting: 7
ZEO.StorageServer WARNING
(test-addr-16) ('1') queue lock: transactions waiting: 8
ZEO.StorageServer WARNING
(test-addr-17) ('1') queue lock: transactions waiting: 9
ZEO.StorageServer CRITICAL
(test-addr-18) ('1') queue lock: transactions waiting: 10
If a client with the transaction lock disconnects, it will abort and
release the lock and one of the waiting clients will get the lock.
>>> zs2.notifyDisconnected() # doctest: +ELLIPSIS
ZEO.StorageServer INFO
(test-addr-2) disconnected during locked transaction
ZEO.StorageServer CRITICAL
(test-addr-2) ('1') unlock: transactions waiting: 10
ZEO.StorageServer WARNING
(test-addr-1) ('1') lock: transactions waiting: 9
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
1 callAsync serialnos ...
(In practice, waiting clients won't necessarily get the lock in order.)
We can find out about the current lock state, and get other server
statistics using the server_status method:
>>> pprint.pprint(zs1.server_status(), width=1)
{'aborts': 3,
'active_txns': 10,
'commits': 0,
'conflicts': 0,
'conflicts_resolved': 0,
'connections': 11,
'loads': 0,
'lock_time': 1272653598.693882,
'start': 'Fri Apr 30 14:53:18 2010',
'stores': 13,
'verifying_clients': 0,
'waiting': 9}
(Note that the connections count above is off by 1 due to the way the
test infrastructure works.)
If clients disconnect while waiting, they will be dequeued:
>>> for client in clients:
... client.notifyDisconnected()
ZEO.StorageServer INFO
(test-addr-10) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-10) ('1') dequeue lock: transactions waiting: 8
ZEO.StorageServer INFO
(test-addr-11) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-11) ('1') dequeue lock: transactions waiting: 7
ZEO.StorageServer INFO
(test-addr-12) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-12) ('1') dequeue lock: transactions waiting: 6
ZEO.StorageServer INFO
(test-addr-13) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-13) ('1') dequeue lock: transactions waiting: 5
ZEO.StorageServer INFO
(test-addr-14) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-14) ('1') dequeue lock: transactions waiting: 4
ZEO.StorageServer INFO
(test-addr-15) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-15) ('1') dequeue lock: transactions waiting: 3
ZEO.StorageServer INFO
(test-addr-16) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-16) ('1') dequeue lock: transactions waiting: 2
ZEO.StorageServer INFO
(test-addr-17) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-17) ('1') dequeue lock: transactions waiting: 1
ZEO.StorageServer INFO
(test-addr-18) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-18) ('1') dequeue lock: transactions waiting: 0
>>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
>>> logging.getLogger('ZEO').removeHandler(handler)
"""
def test_suite():
return unittest.TestSuite((
doctest.DocTestSuite(
setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
checker=renormalizing.RENormalizing([
(re.compile('\d+/test-addr'), ''),
(re.compile("'lock_time': \d+.\d+"), 'lock_time'),
(re.compile(r"'start': '[^\n]+'"), 'start'),
]),
),
))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment