Commit 121f2e20 authored by Guido van Rossum's avatar Guido van Rossum

This checkin contains changes (by me) that will allow multiple

parallel outstanding calls.  However it also contains code (by Jeremy,
with one notifyAll() call added by me) that enforces the old rule of a
single outstanding call.  This is hopefully unnecessessary, but we
haven't reviewed the server side yet to make sure that that's really
the case (the server was until now getting serialized calls per
connection).
parent f89b404b
......@@ -74,14 +74,12 @@ class Connection(smac.SizedMessageAsyncConnection):
and asynchronous calls, which do not.
It uses the Marshaller class to handle encoding and decoding of
method calls are arguments. Marshaller uses pickle to encode
method calls and arguments. Marshaller uses pickle to encode
arbitrary Python objects. The code here doesn't ever see the wire
format.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
outstanding.
A socket connection between a client and a server allows either
side to invoke methods on the other side. The processes on each
......@@ -137,13 +135,10 @@ class Connection(smac.SizedMessageAsyncConnection):
self._map = {self._fileno: self}
# __msgid_lock guards access to msgid
self.__msgid_lock = threading.Lock()
# __call_lock prevents more than one synchronous call from
# being issued at one time.
self.__call_lock = threading.Lock()
# __reply_lock is used to block when a synchronous call is
# __replies_cond is used to block when a synchronous call is
# waiting for a response
self.__reply_lock = threading.Lock()
self.__reply_lock.acquire()
self.__replies_cond = threading.Condition()
self.__replies = {}
self.register_object(obj)
self.handshake()
......@@ -153,6 +148,7 @@ class Connection(smac.SizedMessageAsyncConnection):
def close(self):
if self.closed:
return
self._map.clear()
self.closed = 1
self.close_trigger()
self.__super_close()
......@@ -183,6 +179,9 @@ class Connection(smac.SizedMessageAsyncConnection):
def recv_handshake(self, message):
if message == self.protocol_version:
self.message_input = self._message_input
else:
log("recv_handshake: bad handshake %s" % repr(message),
level=zLOG.ERROR)
# otherwise do something else...
def message_input(self, message):
......@@ -208,8 +207,12 @@ class Connection(smac.SizedMessageAsyncConnection):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
level=zLOG.DEBUG)
self.__reply = msgid, flags, args
self.__reply_lock.release() # will fail if lock is unlocked
self.__replies_cond.acquire()
try:
self.__replies[msgid] = flags, args
self.__replies_cond.notifyAll()
finally:
self.__replies_cond.release()
def handle_request(self, msgid, flags, name, args):
if not self.check_method(name):
......@@ -225,6 +228,10 @@ class Connection(smac.SizedMessageAsyncConnection):
raise
except Exception, msg:
error = sys.exc_info()
# XXX Since we're just passing this on to the caller, and
# there are several cases where this happens during the
# normal course of action, shouldn't this be logged at the
# INFO level?
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR,
error=error)
error = error[:2]
......@@ -298,22 +305,19 @@ class Connection(smac.SizedMessageAsyncConnection):
return msgid
def call(self, method, *args):
self.__call_lock.acquire()
self.__replies_cond.acquire()
try:
return self._call(method, args)
finally:
self.__call_lock.release()
def _call(self, method, args):
while self.__replies and not self.closed:
log("waiting for previous call to finish %s" %
repr(self.__replies.values()[0]))
self.__replies_cond.wait(30)
if self.closed:
raise DisconnectedError()
msgid = self.send_call(method, args, 0)
self.__reply = None
self.wait() # will release reply lock before returning
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
self.__replies[msgid] = None
finally:
self.__replies_cond.release()
r_flags, r_args = self.wait(msgid)
if (isinstance(r_args, types.TupleType)
and type(r_args[0]) == types.ClassType
and issubclass(r_args[0], Exception)):
......@@ -347,27 +351,39 @@ class Connection(smac.SizedMessageAsyncConnection):
else:
return 0
def wait(self):
def wait(self, msgid):
"""Invoke asyncore mainloop and wait for reply."""
if __debug__:
log("wait() async=%d" % self.is_async(), level=zLOG.TRACE)
if self.is_async():
self.trigger.pull_trigger()
self.__reply_lock.acquire()
# wait until reply...
self.__replies_cond.acquire()
try:
while 1:
if self.closed:
raise DisconnectedError()
reply = self.__replies.get(msgid)
if reply is not None:
del self.__replies[msgid]
assert len(self.__replies) == 0
self.__replies_cond.notifyAll()
return reply
if self.is_async():
self.__replies_cond.wait(10.0)
else:
# Do loop until asyncore handler unlocks the lock.
assert not self.__reply_lock.acquire(0)
while not self.__reply_lock.acquire(0):
self.__replies_cond.release()
try:
try:
asyncore.poll(10.0, self._map)
except select.error, err:
log("Closing. asyncore.poll() raised %s." % err,
level=zLOG.BLATHER)
self.close()
if self.closed:
raise DisconnectedError()
self.__reply_lock.release()
finally:
self.__replies_cond.acquire()
finally:
self.__replies_cond.release()
def poll(self, wait_for_reply=0):
"""Invoke asyncore mainloop to get pending message out."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment