Commit af86eebd authored by Gintautas Miliauskas's avatar Gintautas Miliauskas

Converted zRPC to use 'logging' instead of zLOG.

This probably broke the log analyzers... :(
parent 0580425d
......@@ -18,11 +18,12 @@ import sys
import threading
import time
import types
import logging
import ThreadedAsync
import zLOG
from ZODB.POSException import ReadOnlyError
from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
......@@ -99,7 +100,7 @@ class ConnectionManager(object):
t.join(30)
if t.isAlive():
log("CM.close(): self.thread.join() timed out",
level=zLOG.WARNING)
level=logging.WARNING)
if conn is not None:
# This will call close_conn() below which clears self.connection
conn.close()
......@@ -121,7 +122,7 @@ class ConnectionManager(object):
# XXX need each connection started with async==0 to have a
# callback
log("CM.set_async(%s)" % repr(map), level=zLOG.DEBUG)
log("CM.set_async(%s)" % repr(map), level=logging.DEBUG)
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
......@@ -204,7 +205,7 @@ class ConnectionManager(object):
try:
if conn is not self.connection:
# Closing a non-current connection
log("CM.close_conn() non-current", level=zLOG.BLATHER)
log("CM.close_conn() non-current", level=BLATHER)
return
log("CM.close_conn()")
self.connection = None
......@@ -298,7 +299,7 @@ class ConnectThread(threading.Thread):
time.sleep(delay)
if self.mgr.is_connected():
log("CT: still trying to replace fallback connection",
level=zLOG.INFO)
level=logging.INFO)
delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName())
......@@ -372,7 +373,7 @@ class ConnectThread(threading.Thread):
log("CT: select() %d, %d, %d" % tuple(map(len, (r,w,x))))
except select.error, msg:
log("CT: select failed; msg=%s" % str(msg),
level=zLOG.WARNING) # XXX Is this the right level?
level=logging.WARNING) # XXX Is this the right level?
continue
# Exceptable wrappers are in trouble; close these suckers
for wrap in x:
......@@ -438,7 +439,7 @@ class ConnectWrapper:
self.sock = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("CW: can't create socket, domain=%s: %s" % (domain, err),
level=zLOG.ERROR)
level=logging.ERROR)
self.close()
return
self.sock.setblocking(0)
......@@ -451,7 +452,7 @@ class ConnectWrapper:
err = self.sock.connect_ex(self.addr)
except socket.error, msg:
log("CW: connect_ex(%r) failed: %s" % (self.addr, msg),
level=zLOG.ERROR)
level=logging.ERROR)
self.close()
return
log("CW: connect_ex(%s) returned %s" %
......@@ -462,7 +463,7 @@ class ConnectWrapper:
if err not in _CONNECT_OK:
log("CW: error connecting to %s: %s" %
(self.addr, errno.errorcode.get(err) or str(err)),
level=zLOG.WARNING)
level=logging.WARNING)
self.close()
return
self.state = "connected"
......@@ -487,7 +488,7 @@ class ConnectWrapper:
return
except:
log("CW: error in testConnection (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
level=logging.ERROR, exc_info=True)
self.close()
return
if self.preferred:
......@@ -506,7 +507,7 @@ class ConnectWrapper:
self.client.notifyConnected(self.conn)
except:
log("CW: error in notifyConnected (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
level=logging.ERROR, exc_info=True)
self.close()
return
self.state = "notified"
......
......@@ -17,20 +17,21 @@ import select
import sys
import threading
import types
import logging
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.log import short_repr, log
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZEO.zrpc.log import short_repr, log
from ZODB.loglevels import BLATHER, TRACE
REPLY = ".reply" # message name used for replies
ASYNC = 1
class Delay:
"""Used to delay response to client for synchronous calls
"""Used to delay response to client for synchronous calls.
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
......@@ -46,7 +47,7 @@ class Delay:
self.send_reply(self.msgid, obj)
def error(self, exc_info):
log("Error raised in delayed method", zLOG.ERROR, error=exc_info)
log("Error raised in delayed method", logging.ERROR, exc_info=True)
self.return_error(self.msgid, 0, *exc_info[:2])
class MTDelay(Delay):
......@@ -141,10 +142,11 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.closed = False
self.msgid = 0
self.peer_protocol_version = None # Set in recv_handshake()
self.logger = logging.getLogger('ZEO.zrpc.Connection')
if isinstance(addr, types.TupleType):
self.log_label = "zrpc-conn:%s:%d" % addr
self.log_label = "(%s:%d) " % addr
else:
self.log_label = "zrpc-conn:%s" % addr
self.log_label = "(%s) " % addr
self.__super_init(sock, addr)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
......@@ -178,8 +180,8 @@ class Connection(smac.SizedMessageAsyncConnection, object):
__str__ = __repr__ # Defeat asyncore's dreaded __getattr__
def log(self, message, level=zLOG.BLATHER, error=None):
zLOG.LOG(self.log_label, level, message, error=error)
def log(self, message, level=BLATHER, exc_info=False):
self.logger.log(level, self.log_label + message, exc_info=exc_info)
def close(self):
if self.closed:
......@@ -195,7 +197,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.trigger.close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
"""Register obj as the true object to invoke methods on."""
self.obj = obj
def handshake(self, proto=None):
......@@ -218,9 +220,10 @@ class Connection(smac.SizedMessageAsyncConnection, object):
del self.message_input
self.peer_protocol_version = proto
if self.oldest_protocol_version <= proto <= self.protocol_version:
self.log("received handshake %r" % proto, level=zLOG.INFO)
self.log("received handshake %r" % proto, level=logging.INFO)
else:
self.log("bad handshake %s" % short_repr(proto), level=zLOG.ERROR)
self.log("bad handshake %s" % short_repr(proto),
level=logging.ERROR)
raise ZRPCError("bad handshake %r" % proto)
def message_input(self, message):
......@@ -234,7 +237,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
if __debug__:
self.log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
short_repr(args)),
level=zLOG.TRACE)
level=TRACE)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
......@@ -243,7 +246,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def handle_reply(self, msgid, flags, args):
if __debug__:
self.log("recv reply: %s, %s, %s"
% (msgid, flags, short_repr(args)), level=zLOG.TRACE)
% (msgid, flags, short_repr(args)), level=TRACE)
self.replies_cond.acquire()
try:
self.replies[msgid] = flags, args
......@@ -257,7 +260,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
raise ZRPCError(msg)
if __debug__:
self.log("calling %s%s" % (name, short_repr(args)),
level=zLOG.DEBUG)
level=logging.DEBUG)
meth = getattr(self.obj, name)
try:
......@@ -269,10 +272,9 @@ class Connection(smac.SizedMessageAsyncConnection, object):
except (SystemExit, KeyboardInterrupt):
raise
except Exception, msg:
error = sys.exc_info()
self.log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
error=error)
error = error[:2]
self.log("%s() raised exception: %s" % (name, msg), logging.INFO,
exc_info=True)
error = sys.exc_info()[:2]
return self.return_error(msgid, flags, *error)
if flags & ASYNC:
......@@ -281,7 +283,8 @@ class Connection(smac.SizedMessageAsyncConnection, object):
(name, short_repr(ret)))
else:
if __debug__:
self.log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
self.log("%s returns %s" % (name, short_repr(ret)),
logging.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply, self.return_error)
else:
......@@ -294,12 +297,10 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def handle_error(self):
if sys.exc_info()[0] == SystemExit:
raise sys.exc_info()
self.log_error("Error caught in asyncore")
self.log("Error caught in asyncore",
level=logging.ERROR, exc_info=True)
self.close()
def log_error(self, msg="No error message supplied"):
self.log(msg, zLOG.ERROR, error=sys.exc_info())
def check_method(self, name):
# XXX Is this sufficient "security" for now?
if name.startswith('_'):
......@@ -321,7 +322,8 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def return_error(self, msgid, flags, err_type, err_value):
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
self.log("Asynchronous call raised exception: %s" % self,
level=logging.ERROR, exc_info=True)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
......@@ -357,7 +359,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.msgid_lock.release()
if __debug__:
self.log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
zLOG.TRACE)
level=TRACE)
buf = self.marshal.encode(msgid, flags, method, args)
self.message_output(buf)
return msgid
......@@ -444,7 +446,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
"""Invoke asyncore mainloop and wait for reply."""
if __debug__:
self.log("wait(%d), async=%d" % (msgid, self.is_async()),
level=zLOG.TRACE)
level=TRACE)
if self.is_async():
self._pull_trigger()
......@@ -462,7 +464,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
del self.replies[msgid]
if __debug__:
self.log("wait(%d): reply=%s" %
(msgid, short_repr(reply)), level=zLOG.TRACE)
(msgid, short_repr(reply)), level=TRACE)
return reply
if self.is_async():
self.replies_cond.wait(10.0)
......@@ -472,13 +474,13 @@ class Connection(smac.SizedMessageAsyncConnection, object):
try:
if __debug__:
self.log("wait(%d): asyncore.poll(%s)" %
(msgid, delay), level=zLOG.TRACE)
(msgid, delay), level=TRACE)
asyncore.poll(delay, self._singleton)
if delay < 1.0:
delay += delay
except select.error, err:
self.log("Closing. asyncore.poll() raised %s."
% err, level=zLOG.BLATHER)
% err, level=BLATHER)
self.close()
finally:
self.replies_cond.acquire()
......@@ -495,7 +497,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def poll(self):
"""Invoke asyncore mainloop to get pending message out."""
if __debug__:
self.log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
self.log("poll(), async=%d" % self.is_async(), level=TRACE)
if self.is_async():
self._pull_trigger()
else:
......@@ -504,7 +506,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
def pending(self, timeout=0):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
self.log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
self.log("pending(), async=%d" % self.is_async(), level=TRACE)
if self.is_async():
return
# Inline the asyncore poll() function to know whether any input
......
......@@ -12,31 +12,35 @@
#
##############################################################################
import os
import zLOG
import threading
import logging
from ZODB.loglevels import BLATHER
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
_label = "zrpc:%s" % os.getpid()
logger = logging.getLogger('ZEO.zrpc')
_label = "%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
_label = str(os.getpid())
def log(message, level=zLOG.BLATHER, label=None, error=None):
def log(message, level=BLATHER, label=None, exc_info=False):
label = label or _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
zLOG.LOG(label, level, message, error=error)
label = label + ':' + threading.currentThread().getName()
logger.log(level, '(%s) %s' % (label, message), exc_info=exc_info)
REPR_LIMIT = 60
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. It's wastes
# a lot of memory to repr them and then truncate, so special case
# them in this function.
# Some of the objects being repr'd are large strings. A lot of memory
# would be wasted to repr them and then truncate, so they are treated
# specially in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
......
......@@ -14,8 +14,7 @@
import cPickle
from cStringIO import StringIO
import types
import zLOG
import logging
from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr
......@@ -38,7 +37,8 @@ class Marshaller:
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg), level=zLOG.ERROR)
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
_globals = globals()
......
......@@ -17,7 +17,7 @@ import types
from ZEO.zrpc.connection import Connection
from ZEO.zrpc.log import log
import zLOG
import logging
import ThreadedAsync.LoopCallback
# Export the main asyncore loop
......@@ -39,7 +39,7 @@ class Dispatcher(asyncore.dispatcher):
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log("listening on %s" % str(self.addr), zLOG.INFO)
log("listening on %s" % str(self.addr), logging.INFO)
self.bind(self.addr)
self.listen(5)
......
......@@ -35,11 +35,13 @@ import sha
import socket
import struct
import threading
import logging
from types import StringType
from ZODB.loglevels import TRACE
from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.error import DisconnectedError
import zLOG
# Use the dictionary to make sure we get the minimum number of errno
......@@ -278,7 +280,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
log("message_output %d bytes: %s hmac=%d" %
(len(message), short_repr(message),
self.__hmac_send and 1 or 0),
level=zLOG.TRACE)
level=TRACE)
if self.__closed:
raise DisconnectedError(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment