Commit 97129eb2 authored by Jim Fulton's avatar Jim Fulton

Simplified the tracing code (blather).

Got rid of the no-longer-used (synchronous) store method.

Added a wait argument to pack.  If the wait argument is provided,
then the pack method doesn't return a value till packing is done.

Simplified the way errors are handled in normal calls and
packs by putting the error marshalling logic into a separate
method.

No longer log UndoError, VersionCommitError, and TransactionError
exceptions on the server, since these are user-level errors.

Do log pack errors.
parent 9a11b856
##############################################################################
#############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
......@@ -83,11 +83,12 @@
#
##############################################################################
__version__ = "$Revision: 1.18 $"[11:-2]
__version__ = "$Revision: 1.19 $"[11:-2]
import asyncore, socket, string, sys, cPickle, os
from smac import SizedMessageAsyncConnection
from ZODB import POSException
from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction
import traceback
from zLOG import LOG, INFO, ERROR, TRACE
......@@ -98,8 +99,11 @@ from cStringIO import StringIO
class StorageServerError(POSException.StorageError): pass
max_blather=120
def blather(*args):
LOG('ZEO Server', TRACE, string.join(map(str,args)))
m=string.join(map(str,args))
if len(m) > max_blather: m=m[:max_blather]+' ...'
LOG('ZEO Server', TRACE, m)
# We create a special fast pickler! This allows us
......@@ -249,10 +253,7 @@ class Connection(SizedMessageAsyncConnection):
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__:
m=`message`
if len(m) > 60: m=m[:60]+' ...'
blather('message_input', m, id(self))
if __debug__: blather('message_input', id(self), `message`)
if self.__storage is None:
# This is the first communication from the client
......@@ -262,7 +263,6 @@ class Connection(SizedMessageAsyncConnection):
self.message_output('S'+dump(self.get_info(), 1))
return
rt='R'
try:
# Unpickle carefully.
......@@ -271,10 +271,7 @@ class Connection(SizedMessageAsyncConnection):
args=unpickler.load()
name, args = args[0], args[1:]
if __debug__:
m=`tuple(args)`
if len(m) > 90: m=m[:90]+' ...'
blather('call: %s%s' % (name, m), id(self))
if __debug__: blather('call %s: %s%s' % (id(self), name, `args`))
if not storage_method(name):
raise 'Invalid Method Name', name
......@@ -283,26 +280,35 @@ class Connection(SizedMessageAsyncConnection):
else:
r=apply(getattr(self.__storage, name), args)
if r is _noreturn: return
except (UndoError, VersionCommitError):
# These are normal usage errors. No need to leg them
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
except:
LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
t, r = sys.exc_info()[:2]
if type(r) is not type(self): r=t,r
rt='E'
if __debug__:
m=`r`
if len(m) > 60: m=m[:60]+' ...'
blather('%s: %s' % (rt, m), id(self))
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
if __debug__: blather("%s R: %s" % (id(self), `r`))
try: r=dump(r,1)
r=dump(r,1)
self.message_output('R'+r)
def return_error(self, err_type, err_value, type=type, dump=dump):
if type(err_value) is not type(self):
err_value = err_type, err_value
if __debug__: blather("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1)
except:
# Ugh, must be an unpicklable exception
r=StorageServerError("Couldn't pickle result %s" % `r`)
r=StorageServerError("Couldn't pickle error %s" % `r`)
dump('',1) # clear pickler
r=dump(r,1)
rt='E'
self.message_output(rt+r)
self.message_output('E'+r)
def get_info(self):
storage=self.__storage
......@@ -364,15 +370,25 @@ class Connection(SizedMessageAsyncConnection):
for i in r: r[i]=new_oid()
return r
def pack(self, t):
start_new_thread(self._pack, (t,))
def pack(self, t, wait=0):
start_new_thread(self._pack, (t,wait))
if wait: return _noreturn
def _pack(self, t):
self.__storage.pack(t, referencesf)
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
self.get_size_info())
def _pack(self, t, wait=0):
try:
self.__storage.pack(t, referencesf)
except:
LOG('ZEO Server', ERROR,
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
if wait: self.return_error(sys.exc_info()[0], sys.exc_info()[1])
else:
if wait:
self.message_output('RN.')
else:
# Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (),
self.get_size_info())
def abortVersion(self, src, id):
t=self._transaction
......@@ -394,15 +410,6 @@ class Connection(SizedMessageAsyncConnection):
if dest: a((oid,src))
return oids
def store(self, oid, serial, data, version, id):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, serial, data, version, t)
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version))
return newserial
def storea(self, oid, serial, data, version, id,
dump=dump):
try:
......@@ -411,10 +418,16 @@ class Connection(SizedMessageAsyncConnection):
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, serial, data, version, t)
except TransactionError, v:
# This is a normal transaction errorm such as a conflict error
# or a version lock or conflict error. It doen't need to be
# logged.
newserial=v
except:
# all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling.
# IOW,
# IOW, Anything that ends up here is evil enough to be logged.
LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info())
newserial=sys.exc_info()[1]
else:
if serial != '\0\0\0\0\0\0\0\0':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment