Commit 51e177d7 authored by Jeremy Hylton's avatar Jeremy Hylton

Commit the zeo-1_0-debug-branch to the trunk.

I expect this code will become ZEO 1.1.
parent f8e578b8
...@@ -13,22 +13,27 @@ ...@@ -13,22 +13,27 @@
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.39 $'[11:-2] __version__='$Revision: 1.40 $'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache import struct, time, os, socket, string
import tempfile, Invalidator, ExtensionClass, thread import tempfile, thread
import ThreadedAsync
now=time.time
from struct import pack, unpack from struct import pack, unpack
from types import TupleType
import Invalidator, ExtensionClass
import ThreadedAsync, Sync, zrpc, ClientCache
from ZODB import POSException, BaseStorage from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO
try: from ZODB.ConflictResolution import ResolvedSerial from ZEO.logger import zLogger
except: ResolvedSerial='rs'
log = zLogger("ZEO Client")
TupleType=type(()) try:
from ZODB.ConflictResolution import ResolvedSerial
except:
ResolvedSerial='rs'
class ClientStorageError(POSException.StorageError): class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage""" """An error occured in the ZEO Client Storage"""
...@@ -62,8 +67,12 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -62,8 +67,12 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client', self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0, 'supportsUndo':0, 'supportsVersions': 0,
} }
self._call=zrpc.asyncRPC(connection, debug=debug, if debug:
debug_log = log
else:
debug_log = None
self._call=zrpc.asyncRPC(connection, debug=debug_log,
tmin=min_disconnect_poll, tmin=min_disconnect_poll,
tmax=max_disconnect_poll) tmax=max_disconnect_poll)
...@@ -132,7 +141,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -132,7 +141,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
# If we can't connect right away, go ahead and open the cache # If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect. # and start a separate thread to try and reconnect.
LOG("ClientStorage", PROBLEM, "Failed to connect to storage") log.problem("Failed to connect to storage")
self._cache.open() self._cache.open()
thread.start_new_thread(self._call.connect,(0,)) thread.start_new_thread(self._call.connect,(0,))
...@@ -140,7 +149,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -140,7 +149,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
# notifyConnected # notifyConnected
def notifyConnected(self, s): def notifyConnected(self, s):
LOG("ClientStorage", INFO, "Connected to storage") log.info("Connected to storage")
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -197,7 +206,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -197,7 +206,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
### responsible for starting the thread that makes the connection. ### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored): def notifyDisconnected(self, ignored):
LOG("ClientStorage", PROBLEM, "Disconnected from storage") log.problem("Disconnected from storage")
self._connected=0 self._connected=0
self._transaction=None self._transaction=None
thread.start_new_thread(self._call.connect,(0,)) thread.start_new_thread(self._call.connect,(0,))
...@@ -233,7 +242,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -233,7 +242,7 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
def close(self): def close(self):
self._lock_acquire() self._lock_acquire()
try: try:
LOG("ClientStorage", INFO, "close") log.info("close")
self._call.closeIntensionally() self._call.closeIntensionally()
try: try:
self._tfile.close() self._tfile.close()
...@@ -550,6 +559,9 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -550,6 +559,9 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
def sync(self): self._call.sync() def sync(self): self._call.sync()
def status(self):
self._call.sendMessage('status')
def getWakeup(_w=[]): def getWakeup(_w=[]):
if _w: return _w[0] if _w: return _w[0]
import trigger import trigger
......
...@@ -12,42 +12,29 @@ ...@@ -12,42 +12,29 @@
# #
############################################################################## ##############################################################################
__version__ = "$Revision: 1.35 $"[11:-2] __version__ = "$Revision: 1.36 $"[11:-2]
import asyncore, socket, string, sys, os import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection
from ZODB import POSException
import cPickle import cPickle
from cPickle import Unpickler from cPickle import Unpickler
from cStringIO import StringIO
from thread import start_new_thread
import time
from types import StringType
from ZODB import POSException
from ZODB.POSException import TransactionError, UndoError, VersionCommitError from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
import traceback
from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from thread import start_new_thread from ZODB.utils import U64
from cStringIO import StringIO
from ZEO import trigger from ZEO import trigger
from ZEO import asyncwrap from ZEO import asyncwrap
from ZEO.smac import Disconnected from ZEO.smac import Disconnected, SizedMessageAsyncConnection
from types import StringType from ZEO.logger import zLogger, format_msg
class StorageServerError(POSException.StorageError): pass
max_blather=120
def blather(*args):
accum = []
total_len = 0
for arg in args:
if not isinstance(arg, StringType):
arg = str(arg)
accum.append(arg)
total_len = total_len + len(arg)
if total_len >= max_blather:
break
m = string.join(accum)
if len(m) > max_blather: m = m[:max_blather] + ' ...'
LOG('ZEO Server', TRACE, m)
class StorageServerError(POSException.StorageError):
pass
# We create a special fast pickler! This allows us # We create a special fast pickler! This allows us
# to create slightly more efficient pickles and # to create slightly more efficient pickles and
...@@ -56,6 +43,8 @@ pickler=cPickle.Pickler() ...@@ -56,6 +43,8 @@ pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo pickler.fast=1 # Don't use the memo
dump=pickler.dump dump=pickler.dump
log = zLogger("ZEO Server")
class StorageServer(asyncore.dispatcher): class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages): def __init__(self, connection, storages):
...@@ -80,14 +69,14 @@ class StorageServer(asyncore.dispatcher): ...@@ -80,14 +69,14 @@ class StorageServer(asyncore.dispatcher):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr() self.set_reuse_addr()
LOG('ZEO Server', INFO, 'Listening on %s' % repr(connection)) log.info('Listening on %s' % repr(connection))
self.bind(connection) self.bind(connection)
self.listen(5) self.listen(5)
def register_connection(self, connection, storage_id): def register_connection(self, connection, storage_id):
storage=self.__storages.get(storage_id, None) storage=self.__storages.get(storage_id, None)
if storage is None: if storage is None:
LOG('ZEO Server', ERROR, "Unknown storage_id: %s" % storage_id) log.error("Unknown storage_id: %s" % storage_id)
connection.close() connection.close()
return None, None return None, None
...@@ -126,18 +115,29 @@ class StorageServer(asyncore.dispatcher): ...@@ -126,18 +115,29 @@ class StorageServer(asyncore.dispatcher):
def handle_accept(self): def handle_accept(self):
try: try:
sock, addr = self.accept() r = self.accept()
except socket.error: if r is None:
sys.stderr.write('warning: accept failed\n') return
sock, addr = r
except socket.error, err:
log.warning("accept() failed: %s" % err)
else: else:
ZEOConnection(self, sock, addr) ZEOConnection(self, sock, addr)
def log_info(self, message, type='info'): def status(self):
if type=='error': type=ERROR """Log status information about connections and storages"""
else: type=INFO
LOG('ZEO Server', type, message)
log=log_info lines = []
for storage_id, connections in self.__connections.items():
s = "Storage %s has %d connections" % (storage_id,
len(connections))
lines.append(s)
for c in connections:
lines.append("%s readable=%s writeable=%s" % (
c, c.readable(), c.writable()))
lines.append("\t" + c.stats())
log.info(string.join(lines, "\n"))
return _noreturn
storage_methods={} storage_methods={}
for n in ( for n in (
...@@ -148,6 +148,7 @@ for n in ( ...@@ -148,6 +148,7 @@ for n in (
'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions', 'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions',
'transactionalUndo', 'transactionalUndo',
'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify', 'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
'status'
): ):
storage_methods[n]=1 storage_methods[n]=1
storage_method=storage_methods.has_key storage_method=storage_methods.has_key
...@@ -159,7 +160,8 @@ def find_global(module, name, ...@@ -159,7 +160,8 @@ def find_global(module, name,
raise StorageServerError, ( raise StorageServerError, (
"Couldn\'t import global module %s" % module) "Couldn\'t import global module %s" % module)
try: r=getattr(m, name) try:
r=getattr(m, name)
except: except:
raise StorageServerError, ( raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module)) "Couldn\'t find global %s in module %s" % (name, module))
...@@ -177,12 +179,52 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -177,12 +179,52 @@ class ZEOConnection(SizedMessageAsyncConnection):
def __init__(self, server, sock, addr): def __init__(self, server, sock, addr):
self.__server=server self.__server=server
self.status = server.status
self.__invalidated=[] self.__invalidated=[]
self.__closed=None self.__closed=None
if __debug__: debug='ZEO Server' if __debug__:
else: debug=0 debug = log
else:
debug = None
if __debug__:
# store some detailed statistics about method calls
self._last_method = None
self._t_begin = None
self._t_end = None
self._ncalls = 0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug) SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
LOG('ZEO Server', INFO, 'Connect %s %s' % (id(self), `addr`)) self.logaddr = repr(addr) # form of addr suitable for logging
log.info('Connect %s %s' % (id(self), self.logaddr))
def stats(self):
# This method is called via the status() command. The stats
# are of limited use for the current command, because the
# actual invocation of status() will clobber the previous
# method's statistics.
#
# When there are multiple connections active, a new connection
# can always get detailed statistics about other connections.
if __debug__:
if self._last_method == "status":
return "method=status begin=%s end=... ncalls=%d" % (
self._t_begin, self._ncalls)
if self._t_end is not None and self._t_begin is not None:
delta = self._t_end - self._t_begin
else:
delta = -1
return "method=%s begin=%s end=%s delta=%.3f ncalls=%d" % (
self._last_method, self._t_begin, self._t_end, delta,
self._ncalls)
else:
return ""
def __repr__(self):
return "<ZEOConnection %s%s" % (`self.addr`,
# sort of messy way to add tag 'closed' to
# connections that are closed
(self.__closed is None and '>' or ' closed>'))
def close(self): def close(self):
t=self._transaction t=self._transaction
...@@ -196,19 +238,26 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -196,19 +238,26 @@ class ZEOConnection(SizedMessageAsyncConnection):
self.__server.unregister_connection(self, self.__storage_id) self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1 self.__closed=1
SizedMessageAsyncConnection.close(self) SizedMessageAsyncConnection.close(self)
LOG('ZEO Server', INFO, 'Close %s' % id(self)) log.info('Close %s' % id(self))
def message_input(self, message, def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO, dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None): None=None):
if __debug__: if __debug__:
if len(message) > max_blather:
tmp = `message[:max_blather]` self._t_begin = time.time()
self._t_end = None
if len(message) > 120: # XXX need constant from logger
tmp = `message[:120]`
else: else:
tmp = `message` tmp = `message`
blather('message_input', id(self), tmp) log.trace("message_input %s" % tmp)
if self.__storage is None: if self.__storage is None:
if __debug__:
log.blather("register connection to %s from %s" % (message,
self.logaddr))
# This is the first communication from the client # This is the first communication from the client
self.__storage, self.__storage_id = ( self.__storage, self.__storage_id = (
self.__server.register_connection(self, message)) self.__server.register_connection(self, message))
...@@ -226,27 +275,42 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -226,27 +275,42 @@ class ZEOConnection(SizedMessageAsyncConnection):
name, args = args[0], args[1:] name, args = args[0], args[1:]
if __debug__: if __debug__:
apply(blather, self._last_method = name
("call", id(self), ":", name,) + args) self._ncalls = self._ncalls + 1
log.debug("call %s%s from %s" % (name, format_msg(args),
self.logaddr))
if not storage_method(name): if not storage_method(name):
log.warning("Invalid method name: %s" % name)
if __debug__:
self._t_end = time.time()
raise 'Invalid Method Name', name raise 'Invalid Method Name', name
if hasattr(self, name): if hasattr(self, name):
r=apply(getattr(self, name), args) r=apply(getattr(self, name), args)
else: else:
r=apply(getattr(self.__storage, name), args) r=apply(getattr(self.__storage, name), args)
if r is _noreturn: return if r is _noreturn:
except (UndoError, VersionCommitError): if __debug__:
# These are normal usage errors. No need to leg them log.debug("no return to %s" % self.logaddr)
self._t_end = time.time()
return
except (UndoError, VersionCommitError), err:
if __debug__:
log.debug("return error %s to %s" % (err, self.logaddr))
self._t_end = time.time()
# These are normal usage errors. No need to log them.
self.return_error(sys.exc_info()[0], sys.exc_info()[1]) self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return return
except: except:
LOG('ZEO Server', ERROR, 'error', error=sys.exc_info()) if __debug__:
self._t_end = time.time()
log.error("error", error=sys.exc_info())
self.return_error(sys.exc_info()[0], sys.exc_info()[1]) self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return return
if __debug__: if __debug__:
blather("%s R: %s" % (id(self), `r`)) log.debug("return %s to %s" % (format_msg(r), self.logaddr))
self._t_end = time.time()
r=dump(r,1) r=dump(r,1)
self.message_output('R'+r) self.message_output('R'+r)
...@@ -256,7 +320,7 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -256,7 +320,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
err_value = err_type, err_value err_value = err_type, err_value
if __debug__: if __debug__:
blather("%s E: %s" % (id(self), `err_value`)) log.trace("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1) try: r=dump(err_value, 1)
except: except:
...@@ -292,6 +356,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -292,6 +356,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
} }
def zeoLoad(self, oid): def zeoLoad(self, oid):
if __debug__:
log.blather("zeoLoad(%s) %s" % (U64(oid), self.logaddr))
storage=self.__storage storage=self.__storage
v=storage.modifiedInVersion(oid) v=storage.modifiedInVersion(oid)
if v: pv, sv = storage.load(oid, v) if v: pv, sv = storage.load(oid, v)
...@@ -308,6 +374,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -308,6 +374,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
def beginZeoVerify(self): def beginZeoVerify(self):
if __debug__:
log.blather("beginZeoVerify() %s" % self.logaddr)
self.message_output('bN.') self.message_output('bN.')
return _noreturn return _noreturn
...@@ -324,6 +392,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -324,6 +392,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
return _noreturn return _noreturn
def endZeoVerify(self): def endZeoVerify(self):
if __debug__:
log.blather("endZeoVerify() %s" % self.logaddr)
self.message_output('eN.') self.message_output('eN.')
return _noreturn return _noreturn
...@@ -340,11 +410,11 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -340,11 +410,11 @@ class ZEOConnection(SizedMessageAsyncConnection):
def _pack(self, t, wait=0): def _pack(self, t, wait=0):
try: try:
LOG('ZEO Server', BLATHER, 'pack begin') log.blather('pack begin')
self.__storage.pack(t, referencesf) self.__storage.pack(t, referencesf)
LOG('ZEO Server', BLATHER, 'pack end') log.blather('pack end')
except: except:
LOG('ZEO Server', ERROR, log.error(
'Pack failed for %s' % self.__storage_id, 'Pack failed for %s' % self.__storage_id,
error=sys.exc_info()) error=sys.exc_info())
if wait: if wait:
...@@ -381,6 +451,9 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -381,6 +451,9 @@ class ZEOConnection(SizedMessageAsyncConnection):
def storea(self, oid, serial, data, version, id, def storea(self, oid, serial, data, version, id,
dump=dump): dump=dump):
if __debug__:
log.blather("storea(%s, [%d], %s) %s" % (U64(oid), len(data),
U64(id), self.logaddr))
try: try:
t=self._transaction t=self._transaction
if t is None or id != t.id: if t is None or id != t.id:
...@@ -396,7 +469,7 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -396,7 +469,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
# all errors need to be serialized to prevent unexpected # all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling. # returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged. # IOW, Anything that ends up here is evil enough to be logged.
LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info()) log.error('store error', error=sys.exc_info())
newserial=sys.exc_info()[1] newserial=sys.exc_info()[1]
else: else:
if serial != '\0\0\0\0\0\0\0\0': if serial != '\0\0\0\0\0\0\0\0':
...@@ -420,12 +493,17 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -420,12 +493,17 @@ class ZEOConnection(SizedMessageAsyncConnection):
return self.__storage.tpc_vote(t) return self.__storage.tpc_vote(t)
def transactionalUndo(self, trans_id, id): def transactionalUndo(self, trans_id, id):
if __debug__:
log.blather("transactionalUndo(%s, %s) %s" % (trans_id,
U64(id), self.logaddr))
t=self._transaction t=self._transaction
if t is None or id != t.id: if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id) raise POSException.StorageTransactionError(self, id)
return self.__storage.transactionalUndo(trans_id, self._transaction) return self.__storage.transactionalUndo(trans_id, self._transaction)
def undo(self, transaction_id): def undo(self, transaction_id):
if __debug__:
log.blather("undo(%s) %s" % (transaction_id, self.logaddr))
oids=self.__storage.undo(transaction_id) oids=self.__storage.undo(transaction_id)
if oids: if oids:
self.__server.invalidate( self.__server.invalidate(
...@@ -457,11 +535,15 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -457,11 +535,15 @@ class ZEOConnection(SizedMessageAsyncConnection):
def commitlock_suspend(self, resume, args, onerror): def commitlock_suspend(self, resume, args, onerror):
self.__storage._waiting.append((resume, args, onerror)) self.__storage._waiting.append((resume, args, onerror))
log.blather("suspend %s. %d queued clients" % (resume.im_self,
len(self.__storage._waiting)))
def commitlock_resume(self): def commitlock_resume(self):
waiting = self.__storage._waiting waiting = self.__storage._waiting
while waiting: while waiting:
resume, args, onerror = waiting.pop(0) resume, args, onerror = waiting.pop(0)
log.blather("resuming queued client %s, %d still queued" % (
resume.im_self, len(waiting)))
try: try:
if apply(resume, args): if apply(resume, args):
break break
...@@ -471,12 +553,18 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -471,12 +553,18 @@ class ZEOConnection(SizedMessageAsyncConnection):
# disconnect will have generated its own log event. # disconnect will have generated its own log event.
onerror() onerror()
except: except:
LOG('ZEO Server', ERROR, log.error(
"Unexpected error handling queued tpc_begin()", "Unexpected error handling queued tpc_begin()",
error=sys.exc_info()) error=sys.exc_info())
onerror() onerror()
def tpc_abort(self, id): def tpc_abort(self, id):
if __debug__:
try:
log.blather("tpc_abort(%s) %s" % (U64(id), self.logaddr))
except:
print repr(id)
raise
t = self._transaction t = self._transaction
if t is None or id != t.id: if t is None or id != t.id:
return return
...@@ -492,6 +580,10 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -492,6 +580,10 @@ class ZEOConnection(SizedMessageAsyncConnection):
self.message_output('UN.') self.message_output('UN.')
def tpc_begin(self, id, user, description, ext): def tpc_begin(self, id, user, description, ext):
if __debug__:
log.blather("tpc_begin(%s, %s, %s) %s" % (U64(id), `user`,
`description`,
self.logaddr))
t = self._transaction t = self._transaction
if t is not None: if t is not None:
if id == t.id: if id == t.id:
...@@ -505,7 +597,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -505,7 +597,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
if storage._transaction is not None: if storage._transaction is not None:
self.commitlock_suspend(self.unlock, (), self.close) self.commitlock_suspend(self.unlock, (), self.close)
return 1 # Return a flag indicating a lock condition. return 1 # Return a flag indicating a lock condition.
assert id != 't'
self._transaction=t=Transaction() self._transaction=t=Transaction()
t.id=id t.id=id
t.user=user t.user=user
...@@ -542,6 +635,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -542,6 +635,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
return 1 return 1
def tpc_finish(self, id, user, description, ext): def tpc_finish(self, id, user, description, ext):
if __debug__:
log.blather("tpc_finish(%s) %s" % (U64(id), self.logaddr))
t = self._transaction t = self._transaction
if id != t.id: if id != t.id:
return return
...@@ -564,7 +659,7 @@ def init_storage(storage): ...@@ -564,7 +659,7 @@ def init_storage(storage):
if __name__=='__main__': if __name__=='__main__':
import ZODB.FileStorage import ZODB.FileStorage
name, port = sys.argv[1:3] name, port = sys.argv[1:3]
blather(name, port) log.trace(format_msg(name, port))
try: try:
port='', int(port) port='', int(port)
except: except:
......
import string
from types import StringType
from zLOG import *
__all__ = ["zLogger", "format_msg"]
_MAX_MSG_SIZE = 120
def format_msg(*args):
accum = []
total_len = 0
for arg in args:
if not isinstance(arg, StringType):
arg = str(arg)
accum.append(arg)
total_len = total_len + len(arg)
if total_len >= _MAX_MSG_SIZE:
break
m = string.join(accum)
if len(m) > _MAX_MSG_SIZE:
m = m[:_MAX_MSG_SIZE] + ' ...'
return m
class zLogger:
def __init__(self, channel):
self.channel = channel
def __str__(self):
raise RuntimeError, "don't print me"
def trace(self, msg):
LOG(self.channel, TRACE, msg)
def debug(self, msg):
LOG(self.channel, DEBUG, msg)
def blather(self, msg):
LOG(self.channel, BLATHER, msg)
def info(self, msg):
LOG(self.channel, INFO, msg)
def problem(self, msg):
LOG(self.channel, PROBLEM, msg)
def warning(self, msg):
LOG(self.channel, WARNING, msg)
def error(self, msg, error=None):
LOG(self.channel, ERROR, msg, error=error)
def panic(self, msg):
LOG(self.channel, PANIC, msg)
...@@ -14,11 +14,11 @@ ...@@ -14,11 +14,11 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.15 $"[11:-2] __version__ = "$Revision: 1.16 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno # Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems -- # entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...@@ -48,10 +48,10 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -48,10 +48,10 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
SizedMessageAsyncConnection.inheritedAttribute( SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map) '__init__')(self, sock, map)
self.addr=addr self.addr=addr
if debug is not None: if debug is None and __debug__:
self._debug=debug self._debug = zLogger("smac")
elif not hasattr(self, '_debug'): else:
self._debug=__debug__ and 'smac' self._debug = debug
self.__state=None self.__state=None
self.__inp=None self.__inp=None
self.__inpl=0 self.__inpl=0
...@@ -132,10 +132,12 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -132,10 +132,12 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
if self._debug: if self._debug is not None:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40:
else: m=message m = message[:40]+' ...'
LOG(self._debug, TRACE, 'message_output %s' % `m`) else:
m = message
self._debug.trace('message_output %s' % `m`)
append=self.__append append=self.__append
if append is None: if append is None:
...@@ -143,13 +145,6 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -143,13 +145,6 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
append(pack(">i",len(message))+message) append(pack(">i",len(message))+message)
def log_info(self, message, type='info'):
if type=='error': type=ERROR
else: type=INFO
LOG('ZEO', type, message)
log=log_info
def close(self): def close(self):
if self.__append is not None: if self.__append is not None:
self.__append=None self.__append=None
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"""Start the server storage. """Start the server storage.
""" """
__version__ = "$Revision: 1.30 $"[11:-2] __version__ = "$Revision: 1.31 $"[11:-2]
import sys, os, getopt, string import sys, os, getopt, string
...@@ -75,11 +75,9 @@ def main(argv): ...@@ -75,11 +75,9 @@ def main(argv):
os.path.join(var, 'ZEO_SERVER.pid') os.path.join(var, 'ZEO_SERVER.pid')
) )
opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:') fs = os.path.join(var, 'Data.fs')
fs=os.path.join(var, 'Data.fs') usage = """%s [options] [filename]
usage="""%s [options] [filename]
where options are: where options are:
...@@ -121,6 +119,13 @@ def main(argv): ...@@ -121,6 +119,13 @@ def main(argv):
if no file name is specified, then %s is used. if no file name is specified, then %s is used.
""" % (me, fs) """ % (me, fs)
try:
opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
except getopt.error, err:
print err
print usage
sys.exit(1)
port=None port=None
debug=detailed=0 debug=detailed=0
host='' host=''
...@@ -217,15 +222,15 @@ def main(argv): ...@@ -217,15 +222,15 @@ def main(argv):
import signal import signal
signal.signal(signal.SIGTERM, signal.signal(signal.SIGTERM,
lambda sig, frame, s=storages: shutdown(s) lambda sig, frame, s=storages: shutdown(s))
)
signal.signal(signal.SIGINT, signal.signal(signal.SIGINT,
lambda sig, frame, s=storages: shutdown(s, 0) lambda sig, frame, s=storages: shutdown(s, 0))
) try:
try: signal.signal(signal.SIGHUP, rotate_logs_handler) signal.signal(signal.SIGHUP, rotate_logs_handler)
except: pass except:
pass
except: pass except:
pass
items=storages.items() items=storages.items()
items.sort() items.sort()
...@@ -236,13 +241,16 @@ def main(argv): ...@@ -236,13 +241,16 @@ def main(argv):
ZEO.StorageServer.StorageServer(unix, storages) ZEO.StorageServer.StorageServer(unix, storages)
try: ppid, pid = os.getppid(), os.getpid() try:
except: pass # getpid not supported ppid, pid = os.getppid(), os.getpid()
else: open(zeo_pid,'w').write("%s %s" % (ppid, pid)) except:
pass # getpid not supported
else:
open(zeo_pid,'w').write("%s %s" % (ppid, pid))
except: except:
# Log startup exception and tell zdaemon not to restart us. # Log startup exception and tell zdaemon not to restart us.
info=sys.exc_info() info = sys.exc_info()
try: try:
import zLOG import zLOG
zLOG.LOG("z2", zLOG.PANIC, "Startup exception", zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
...@@ -280,21 +288,29 @@ def shutdown(storages, die=1): ...@@ -280,21 +288,29 @@ def shutdown(storages, die=1):
# unnecessary, since we now use so_reuseaddr. # unnecessary, since we now use so_reuseaddr.
for ignored in 1,2: for ignored in 1,2:
for socket in asyncore.socket_map.values(): for socket in asyncore.socket_map.values():
try: socket.close() try:
except: pass socket.close()
except:
pass
for storage in storages.values(): for storage in storages.values():
try: storage.close() try:
except: pass storage.close()
except:
pass
try: try:
from zLOG import LOG, INFO from zLOG import LOG, INFO
LOG('ZEO Server', INFO, LOG('ZEO Server', INFO,
"Shutting down (%s)" % (die and "shutdown" or "restart") "Shutting down (%s)" % (die and "shutdown" or "restart")
) )
except: pass except:
pass
if die: sys.exit(0) if die:
else: sys.exit(1) sys.exit(0)
else:
sys.exit(1)
if __name__=='__main__': main(sys.argv) if __name__ == '__main__':
main(sys.argv)
...@@ -172,6 +172,7 @@ class GenericTests(ZEOTestBase, ...@@ -172,6 +172,7 @@ class GenericTests(ZEOTestBase,
def tearDown(self): def tearDown(self):
"""Try to cause the tests to halt""" """Try to cause the tests to halt"""
self.running = 0 self.running = 0
self._storage.status()
self._storage.close() self._storage.close()
self._server.close() self._server.close()
os.waitpid(self._pid, 0) os.waitpid(self._pid, 0)
...@@ -218,6 +219,7 @@ class GenericTests(ZEOTestBase, ...@@ -218,6 +219,7 @@ class GenericTests(ZEOTestBase,
oid = self._storage.new_oid() oid = self._storage.new_oid()
self._storage.store(oid, None, '', '', t) self._storage.store(oid, None, '', '', t)
self._storage.tpc_vote(t) self._storage.tpc_vote(t)
self._storage.status()
self._storage.tpc_finish(t) self._storage.tpc_finish(t)
for store, trans in self._storages: for store, trans in self._storages:
...@@ -237,7 +239,7 @@ class GenericTests(ZEOTestBase, ...@@ -237,7 +239,7 @@ class GenericTests(ZEOTestBase,
def _get_timestamp(self): def _get_timestamp(self):
t = time.time() t = time.time()
t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,))) t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return 't' return `t`
class ZEOFileStorageTests(GenericTests): class ZEOFileStorageTests(GenericTests):
__super_setUp = GenericTests.setUp __super_setUp = GenericTests.setUp
......
...@@ -15,10 +15,8 @@ ...@@ -15,10 +15,8 @@
# This module is a simplified version of the select_trigger module # This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server. # from Sam Rushing's Medusa server.
import asyncore import asyncore
#import asynchat import errno
import os import os
import socket import socket
import string import string
...@@ -26,7 +24,7 @@ import thread ...@@ -26,7 +24,7 @@ import thread
if os.name == 'posix': if os.name == 'posix':
class trigger (asyncore.file_dispatcher): class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread" "Wake up a call to select() running in the main thread"
...@@ -58,10 +56,10 @@ if os.name == 'posix': ...@@ -58,10 +56,10 @@ if os.name == 'posix':
# new data onto a channel's outgoing data queue at the same time that # new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some] # the main thread is trying to remove some]
def __init__ (self): def __init__(self):
r, w = self._fds = os.pipe() r, w = self._fds = os.pipe()
self.trigger = w self.trigger = w
asyncore.file_dispatcher.__init__ (self, r) asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
...@@ -69,30 +67,35 @@ if os.name == 'posix': ...@@ -69,30 +67,35 @@ if os.name == 'posix':
os.close(self._fds[0]) os.close(self._fds[0])
os.close(self._fds[1]) os.close(self._fds[1])
def __repr__ (self): def __repr__(self):
return '<select-trigger (pipe) at %x>' % id(self) return '<select-trigger(pipe) at %x>' % id(self)
def readable (self): def readable(self):
return 1 return 1
def writable (self): def writable(self):
return 0 return 0
def handle_connect (self): def handle_connect(self):
pass pass
def pull_trigger (self, thunk=None): def pull_trigger(self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks) # print 'PULL_TRIGGER: ', len(self.thunks)
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
self.thunks.append (thunk) self.thunks.append(thunk)
finally: finally:
self.lock.release() self.lock.release()
os.write (self.trigger, 'x') os.write(self.trigger, 'x')
def handle_read (self): def handle_read(self):
self.recv (8192) try:
self.recv(8192)
except os.error, err:
if err[0] == errno.EAGAIN: # resource temporarily unavailable
return
raise
try: try:
self.lock.acquire() self.lock.acquire()
for thunk in self.thunks: for thunk in self.thunks:
...@@ -101,7 +104,7 @@ if os.name == 'posix': ...@@ -101,7 +104,7 @@ if os.name == 'posix':
except: except:
nil, t, v, tbinfo = asyncore.compact_traceback() nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:' print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo)) '(%s:%s %s)' % (t, v, tbinfo))
self.thunks = [] self.thunks = []
finally: finally:
self.lock.release() self.lock.release()
...@@ -113,13 +116,13 @@ else: ...@@ -113,13 +116,13 @@ else:
# win32-safe version # win32-safe version
class trigger (asyncore.dispatcher): class trigger(asyncore.dispatcher):
address = ('127.9.9.9', 19999) address = ('127.9.9.9', 19999)
def __init__ (self): def __init__(self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM) a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM) w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering # set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1) w.setsockopt(socket.IPPROTO_TCP, 1, 1)
...@@ -137,45 +140,50 @@ else: ...@@ -137,45 +140,50 @@ else:
raise 'Bind Error', 'Cannot bind trigger!' raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1 port=port - 1
a.listen (1) a.listen(1)
w.setblocking (0) w.setblocking(0)
try: try:
w.connect (self.address) w.connect(self.address)
except: except:
pass pass
r, addr = a.accept() r, addr = a.accept()
a.close() a.close()
w.setblocking (1) w.setblocking(1)
self.trigger = w self.trigger = w
asyncore.dispatcher.__init__ (self, r) asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
self._trigger_connected = 0 self._trigger_connected = 0
def __repr__ (self): def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self) return '<select-trigger (loopback) at %x>' % id(self)
def readable (self): def readable(self):
return 1 return 1
def writable (self): def writable(self):
return 0 return 0
def handle_connect (self): def handle_connect(self):
pass pass
def pull_trigger (self, thunk=None): def pull_trigger(self, thunk=None):
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
self.thunks.append (thunk) self.thunks.append(thunk)
finally: finally:
self.lock.release() self.lock.release()
self.trigger.send ('x') self.trigger.send('x')
def handle_read (self): def handle_read(self):
self.recv (8192) try:
self.recv(8192)
except os.error, err:
if err[0] == errno.EAGAIN: # resource temporarily unavailable
return
raise
try: try:
self.lock.acquire() self.lock.acquire()
for thunk in self.thunks: for thunk in self.thunks:
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
"""Simple rpc mechanisms """Simple rpc mechanisms
""" """
__version__ = "$Revision: 1.22 $"[11:-2] __version__ = "$Revision: 1.23 $"[11:-2]
from cPickle import loads from cPickle import loads
import cPickle import cPickle
...@@ -61,13 +61,13 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -61,13 +61,13 @@ class asyncRPC(SizedMessageAsyncConnection):
self.__call_la=l.acquire self.__call_la=l.acquire
self.__call_lr=l.release self.__call_lr=l.release
def connect(self, tryonce=1, log_type='client'): def connect(self, tryonce=1):
t=self._tmin t=self._tmin
connection = self._connection connection = self._connection
debug=self._debug debug=self._debug
while self.__closed == 0: while self.__closed == 0:
if log_type: LOG(log_type, INFO, LOG("client", INFO,
'Trying to connect to server: %s' % `connection`) 'Trying to connect to server: %s' % `connection`)
try: try:
if type(connection) is type(''): if type(connection) is type(''):
s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
...@@ -75,15 +75,15 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -75,15 +75,15 @@ class asyncRPC(SizedMessageAsyncConnection):
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(connection) s.connect(connection)
except Exception, err: except Exception, err:
if debug: if debug is not None:
LOG(debug, DEBUG, "Failed to connect to server: %s" % err) debug.blather("Failed to connect to server: %s" % err)
if tryonce: return 0 if tryonce: return 0
time.sleep(t) time.sleep(t)
t=t*2 t=t*2
if t > self._tmax: t=self._tmax if t > self._tmax: t=self._tmax
else: else:
if debug: if debug is not None:
LOG(debug, DEBUG, "Connected to server") debug.blather("Connected to server")
# Make sure the result lock is set, se we don't # Make sure the result lock is set, se we don't
# get an old result (e.g. the exception that # get an old result (e.g. the exception that
...@@ -199,12 +199,12 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -199,12 +199,12 @@ class asyncRPC(SizedMessageAsyncConnection):
self._outOfBand=f self._outOfBand=f
def message_input(self, m): def message_input(self, m):
if self._debug: if self._debug is not None:
if len(m) > 60: if len(m) > 60:
md = repr(m[:60]) + ' ...' md = repr(m[:60]) + ' ...'
else: else:
md = repr(m) md = repr(m)
LOG(self._debug, TRACE, 'message_input %s' % md) self._debug.trace('message_input %s' % md)
c=m[:1] c=m[:1]
if c in 'RE': if c in 'RE':
......
...@@ -14,11 +14,11 @@ ...@@ -14,11 +14,11 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.15 $"[11:-2] __version__ = "$Revision: 1.16 $"[11:-2]
import asyncore, string, struct, zLOG, sys, Acquisition import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno # Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems -- # entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...@@ -48,10 +48,10 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -48,10 +48,10 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
SizedMessageAsyncConnection.inheritedAttribute( SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map) '__init__')(self, sock, map)
self.addr=addr self.addr=addr
if debug is not None: if debug is None and __debug__:
self._debug=debug self._debug = zLogger("smac")
elif not hasattr(self, '_debug'): else:
self._debug=__debug__ and 'smac' self._debug = debug
self.__state=None self.__state=None
self.__inp=None self.__inp=None
self.__inpl=0 self.__inpl=0
...@@ -132,10 +132,12 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -132,10 +132,12 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
def message_output(self, message, def message_output(self, message,
pack=struct.pack, len=len): pack=struct.pack, len=len):
if self._debug: if self._debug is not None:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40:
else: m=message m = message[:40]+' ...'
LOG(self._debug, TRACE, 'message_output %s' % `m`) else:
m = message
self._debug.trace('message_output %s' % `m`)
append=self.__append append=self.__append
if append is None: if append is None:
...@@ -143,13 +145,6 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -143,13 +145,6 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
append(pack(">i",len(message))+message) append(pack(">i",len(message))+message)
def log_info(self, message, type='info'):
if type=='error': type=ERROR
else: type=INFO
LOG('ZEO', type, message)
log=log_info
def close(self): def close(self):
if self.__append is not None: if self.__append is not None:
self.__append=None self.__append=None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment