Commit bef7c0fe authored by Jeremy Hylton's avatar Jeremy Hylton

Add provisional monitor server that reports server statistics

Also, remove unused reuse_addr arg to ZEO.zrpc.server.  The server was
always calling set_reuse_addr().

No tests yet, that's the next step.  Simple functional tests work.
parent 873eb117
......@@ -31,13 +31,15 @@ import time
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.utils import u64
......@@ -65,7 +67,9 @@ class ZEOStorage:
def __init__(self, server, read_only=0):
self.server = server
# timeout and stats will be initialized in register()
self.timeout = None
self.stats = None
self.connection = None
self.client = None
self.storage = None
......@@ -73,6 +77,7 @@ class ZEOStorage:
self.transaction = None
self.read_only = read_only
self.locked = 0
self.verifying = 0
self.log_label = _label
def notifyConnected(self, conn):
......@@ -94,6 +99,8 @@ class ZEOStorage:
self._abort()
else:
self.log("disconnected")
if self.stats is not None:
self.stats.clients -= 1
def __repr__(self):
tid = self.transaction and repr(self.transaction.id)
......@@ -130,7 +137,7 @@ class ZEOStorage:
setattr(self, name, getattr(self.storage, name))
self.lastTransaction = self.storage.lastTransaction
def check_tid(self, tid, exc=None):
def _check_tid(self, tid, exc=None):
if self.read_only:
raise ReadOnlyError()
caller = sys._getframe().f_back.f_code.co_name
......@@ -150,6 +157,18 @@ class ZEOStorage:
return 0
return 1
# _lock() and _unlock() control the locked flag
def _lock(self):
self.locked = 1
self.timeout.begin(self)
self.stats.lock_time = time.time()
def _unlock(self):
self.locked = 0
self.timeout.end(self)
self.stats.lock_time = None
def register(self, storage_id, read_only):
"""Select the storage that this client will use
......@@ -170,7 +189,8 @@ class ZEOStorage:
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
self.timeout = self.server.register_connection(storage_id, self)
self.timeout, self.stats = self.server.register_connection(storage_id,
self)
def get_info(self):
return {'length': len(self.storage),
......@@ -197,6 +217,7 @@ class ZEOStorage:
return e()
def zeoLoad(self, oid):
self.stats.loads += 1
v = self.storage.modifiedInVersion(oid)
if v:
pv, sv = self.storage.load(oid, v)
......@@ -221,6 +242,9 @@ class ZEOStorage:
return invtid, invlist
def zeoVerify(self, oid, s, sv):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
try:
os = self.storage.getSerial(oid)
except KeyError:
......@@ -251,6 +275,9 @@ class ZEOStorage:
self.client.invalidateVerify((oid, ''))
def endZeoVerify(self):
if self.verifying:
self.stats.verifying_clients -= 1
self.verifying = 0
self.client.endVerify()
def pack(self, time, wait=1):
......@@ -320,31 +347,34 @@ class ZEOStorage:
self.txnlog = CommitLog()
self.tid = tid
self.status = status
self.stats.active_txns += 1
def tpc_finish(self, id):
if not self.check_tid(id):
if not self._check_tid(id):
return
assert self.locked
self.stats.active_txns -= 1
self.stats.commits += 1
self.storage.tpc_finish(self.transaction)
tid = self.storage.lastTransaction()
if self.invalidated:
self.server.invalidate(self, self.storage_id, tid,
self.invalidated, self.get_size_info())
self.transaction = None
self.locked = 0
self.timeout.end(self)
self._unlock()
# Return the tid, for cache invalidation optimization
self._handle_waiting()
return tid
def tpc_abort(self, id):
if not self.check_tid(id):
if not self._check_tid(id):
return
self.stats.active_txns -= 1
self.stats.aborts += 1
if self.locked:
self.storage.tpc_abort(self.transaction)
self.transaction = None
self.locked = 0
self.timeout.end(self)
self._unlock()
self._handle_waiting()
def _abort(self):
......@@ -361,6 +391,8 @@ class ZEOStorage:
break
if self.transaction:
self.stats.active_txns -= 1
self.stats.aborts += 1
self.tpc_abort(self.transaction.id)
# The public methods of the ZEO client API do not do the real work.
......@@ -369,44 +401,44 @@ class ZEOStorage:
# an _.
def storea(self, oid, serial, data, version, id):
self.check_tid(id, exc=StorageTransactionError)
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
def vote(self, id):
self.check_tid(id, exc=StorageTransactionError)
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._vote()
else:
return self._wait(lambda: self._vote())
def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError)
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._abortVersion(src)
else:
return self._wait(lambda: self._abortVersion(src))
def commitVersion(self, src, dest, id):
self.check_tid(id, exc=StorageTransactionError)
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._commitVersion(src, dest)
else:
return self._wait(lambda: self._commitVersion(src, dest))
def transactionalUndo(self, trans_id, id):
self.check_tid(id, exc=StorageTransactionError)
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._transactionalUndo(trans_id)
else:
return self._wait(lambda: self._transactionalUndo(trans_id))
def _tpc_begin(self, txn, tid, status):
self.locked = 1
self._lock()
self.storage.tpc_begin(txn, tid, status)
self.timeout.begin(self)
def _store(self, oid, serial, data, version):
try:
......@@ -415,6 +447,8 @@ class ZEOStorage:
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
if isinstance(err, ConflictError):
self.stats.conflicts += 1
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
exc_info = sys.exc_info()
......@@ -436,6 +470,8 @@ class ZEOStorage:
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
def _vote(self):
......@@ -543,7 +579,8 @@ class StorageServer:
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
transaction_timeout=None):
transaction_timeout=None,
monitor_address=None):
"""StorageServer constructor.
This is typically invoked from the start.py script.
......@@ -580,6 +617,11 @@ class StorageServer:
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
monitor_address -- The address at which the monitor server
should listen. If specified, a monitor server is started.
The monitor server provides server statistics in a simple
text format.
"""
self.addr = addr
......@@ -599,10 +641,11 @@ class StorageServer:
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection,
reuse_addr=1)
factory=self.new_connection)
self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
self.stats[name] = StorageStats()
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
......@@ -610,6 +653,10 @@ class StorageServer:
timeout = TimeoutThread(transaction_timeout)
timeout.start()
self.timeouts[name] = timeout
if monitor_address:
self.monitor = StatsServer(monitor_address, self.stats)
else:
self.monitor = None
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
......@@ -633,13 +680,15 @@ class StorageServer:
is needed to handle invalidation. This function updates this
dictionary.
Returns the timeout object for the appropriate storage.
Returns the timeout and stats objects for the appropriate storage.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
return self.timeouts[storage_id]
stats = self.stats[storage_id]
stats.clients += 1
return self.timeouts[storage_id], stats
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
......@@ -707,6 +756,8 @@ class StorageServer:
for timeout in self.timeouts.values():
timeout.stop()
self.dispatcher.close()
if self.monitor is not None:
self.monitor.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
......
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
$id:$
"""
import asyncore
import socket
import time
import types
import ZEO
class StorageStats:
"""Per-storage usage statistics."""
def __init__(self):
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.clients = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
def dump(self, f):
print >> f, "Clients:", self.clients
print >> f, "Clients verifying:", self.verifying_clients
print >> f, "Active transactions:", self.active_txns
if self.lock_time:
howlong = time.time() - self.lock_time
print >> f, "Commit lock held for:", int(howlong)
print >> f, "Commits:", self.commits
print >> f, "Aborts:", self.aborts
print >> f, "Loads:", self.loads
print >> f, "Stores:", self.stores
print >> f, "Conflicts:", self.conflicts
print >> f, "Conflicts resolved:", self.conflicts_resolved
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
# XXX what goes here?
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s)
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print >> f, "ZEO monitor server version %s" % ZEO.version
print >> f, time.ctime()
print >> f
L = self.stats.keys()
L.sort()
for k in L:
stats = self.stats[k]
print >> f, "Storage:", k
stats.dump(f)
print >> f
......@@ -22,6 +22,7 @@ Options:
(a PATH must contain at least one "/")
-f/--filename FILENAME -- filename for FileStorage
-h/--help -- print this usage message and exit
-m/--monitor ADDRESS -- address of monitor server
Unless -C is specified, -a and -f are required.
"""
......@@ -147,43 +148,62 @@ class Options:
sys.stderr.write("For help, use %s -h\n" % self.progname)
sys.exit(2)
def parse_address(arg):
if "/" in arg:
family = socket.AF_UNIX
address = arg
else:
family = socket.AF_INET
if ":" in arg:
host, port = arg.split(":", 1)
else:
host = ""
port = arg
try:
port = int(port)
except: # int() can raise all sorts of errors
raise ValueError("invalid port number: %r" % port)
address = host, port
return family, address
class ZEOOptions(Options):
read_only = None
transaction_timeout = None
invalidation_queue_size = None
monitor_address = None
family = None # set by -a; AF_UNIX or AF_INET
address = None # set by -a; string or (host, port)
storages = None # set by -f
_short_options = "a:C:f:h"
_short_options = "a:C:f:hm:"
_long_options = [
"address=",
"configuration=",
"filename=",
"help",
"monitor=",
]
def handle_option(self, opt, arg):
# Alphabetical order please!
if opt in ("-a", "--address"):
if "/" in arg:
self.family = socket.AF_UNIX
self.address = arg
else:
self.family = socket.AF_INET
if ":" in arg:
host, port = arg.split(":", 1)
try:
f, a = parse_address(arg)
except ValueError, err:
self.usage(str(err))
else:
host = ""
port = arg
self.family = f
self.address = a
elif opt in ("-m", "--monitor"):
try:
port = int(port)
except: # int() can raise all sorts of errors
self.usage("invalid port number: %r" % port)
self.address = (host, port)
f, a = parse_address(arg)
except ValueError, err:
self.usage(str(err))
else:
self.monitor_family = f
self.monitor_address = a
elif opt in ("-f", "--filename"):
from ZODB.config import FileStorage
class FSConfig:
......@@ -238,7 +258,7 @@ class ZEOOptions(Options):
self.read_only = self.rootconf.read_only
self.transaction_timeout = self.rootconf.transaction_timeout
self.invalidation_queue_size = self.rootconf.invalidation_queue_size
self.invalidation_queue_size = 100
def load_logconf(self):
# Get logging options from conf, unless overridden by environment
......@@ -349,7 +369,8 @@ class ZEOServer:
self.storages,
read_only=self.options.read_only,
invalidation_queue_size=self.options.invalidation_queue_size,
transaction_timeout=self.options.transaction_timeout)
transaction_timeout=self.options.transaction_timeout,
monitor_address=self.options.monitor_address)
def loop_forever(self):
import ThreadedAsync.LoopCallback
......
......@@ -58,6 +58,14 @@
</description>
</key>
<key name="monitor-address" datatype="socket-address" required="no">
<description>
The address at which the monitor server should listen. If
specified, a monitor server is started. The monitor server
provides server statistics in a simple text format.
</description>
</key>
<multisection name="+" type="storage"
attribute="storages"
required="yes">
......
......@@ -27,15 +27,11 @@ class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, factory=Connection, reuse_addr=None):
def __init__(self, addr, factory=Connection):
self.__super_init()
self.addr = addr
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment