Commit 153fb529 authored by Guido van Rossum's avatar Guido van Rossum

Tim saw Win2k crashes that made me realize that the input and output

buffer variables here are accessed from multiple threads without any
locking@  Add such locking: a separate lock for input and one for
output.  XXX Note: handle_read() keeps the lock for a potentially long
time.  But this is required to serialize incoming calls anyway.

Unrelated nicety: use short_repr() when logging message output, for
consistency with other places.
parent b3c871b2
...@@ -13,15 +13,16 @@ ...@@ -13,15 +13,16 @@
############################################################################## ##############################################################################
"""Sized message async connections """Sized message async connections
$Id: smac.py,v 1.30 2002/09/29 02:46:58 gvanrossum Exp $ $Id: smac.py,v 1.31 2002/09/29 03:22:28 gvanrossum Exp $
""" """
import asyncore, struct import asyncore, struct
import threading
from ZEO.Exceptions import Disconnected from ZEO.Exceptions import Disconnected
import zLOG import zLOG
from types import StringType from types import StringType
from ZEO.zrpc.log import log from ZEO.zrpc.log import log, short_repr
import socket, errno import socket, errno
...@@ -63,6 +64,8 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -63,6 +64,8 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
self._debug = debug self._debug = debug
elif not hasattr(self, '_debug'): elif not hasattr(self, '_debug'):
self._debug = __debug__ self._debug = __debug__
# __input_lock protects __inp, __input_len, __state, __msg_size
self.__input_lock = threading.Lock()
self.__inp = None # None, a single String, or a list self.__inp = None # None, a single String, or a list
self.__input_len = 0 self.__input_len = 0
# Instance variables __state and __msg_size work together: # Instance variables __state and __msg_size work together:
...@@ -74,6 +77,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -74,6 +77,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
# The state alternates between 0 and 1. # The state alternates between 0 and 1.
self.__state = 0 self.__state = 0
self.__msg_size = 4 self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output
self.__output = [] self.__output = []
self.__closed = 0 self.__closed = 0
self.__super_init(sock, map) self.__super_init(sock, map)
...@@ -95,47 +99,61 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -95,47 +99,61 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
if not d: if not d:
return return
input_len = self.__input_len + len(d) self.__input_lock.acquire()
msg_size = self.__msg_size try:
state = self.__state input_len = self.__input_len + len(d)
msg_size = self.__msg_size
inp = self.__inp state = self.__state
if msg_size > input_len:
if inp is None: inp = self.__inp
self.__inp = d if msg_size > input_len:
elif type(self.__inp) is StringType: if inp is None:
self.__inp = [self.__inp, d] self.__inp = d
else: elif type(self.__inp) is StringType:
self.__inp.append(d) self.__inp = [self.__inp, d]
self.__input_len = input_len else:
return # keep waiting for more input self.__inp.append(d)
self.__input_len = input_len
# load all previous input and d into single string inp return # keep waiting for more input
if isinstance(inp, StringType):
inp = inp + d # load all previous input and d into single string inp
elif inp is None: if isinstance(inp, StringType):
inp = d inp = inp + d
else: elif inp is None:
inp.append(d) inp = d
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if not state:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else: else:
msg_size = 4 inp.append(d)
state = 0 inp = "".join(inp)
self.message_input(msg)
offset = 0
self.__state = state while (offset + msg_size) <= input_len:
self.__msg_size = msg_size msg = inp[offset:offset + msg_size]
self.__inp = inp[offset:] offset = offset + msg_size
self.__input_len = input_len - offset if not state:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
msg_size = 4
state = 0
# XXX We call message_input() with __input_lock
# held!!! And message_input() may end up calling
# message_output(), which has its own lock. But
# message_output() cannot call message_input(), so
# the locking order is always consistent, which
# prevents deadlock. Also, message_input() may
# take a long time, because it can cause an
# incoming call to be handled. During all this
# time, the __input_lock is held. That's a good
# thing, because it serializes incoming calls.
self.message_input(msg)
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
finally:
self.__input_lock.release()
def readable(self): def readable(self):
return 1 return 1
...@@ -147,36 +165,40 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -147,36 +165,40 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
return 1 return 1
def handle_write(self): def handle_write(self):
output = self.__output self.__output_lock.acquire()
while output: try:
# Accumulate output into a single string so that we avoid output = self.__output
# multiple send() calls, but avoid accumulating too much while output:
# data. If we send a very small string and have more data # Accumulate output into a single string so that we avoid
# to send, we will likely incur delays caused by the # multiple send() calls, but avoid accumulating too much
# unfortunate interaction between the Nagle algorithm and # data. If we send a very small string and have more data
# delayed acks. If we send a very large string, only a # to send, we will likely incur delays caused by the
# portion of it will actually be delivered at a time. # unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
l = 0 # portion of it will actually be delivered at a time.
for i in range(len(output)):
l += len(output[i]) l = 0
if l > SEND_SIZE: for i in range(len(output)):
break l += len(output[i])
if l > SEND_SIZE:
i += 1 break
# It is very unlikely that i will be 1.
v = "".join(output[:i]) i += 1
del output[:i] # It is very unlikely that i will be 1.
v = "".join(output[:i])
try: del output[:i]
n = self.send(v)
except socket.error, err: try:
if err[0] in expected_socket_write_errors: n = self.send(v)
break # we couldn't write anything except socket.error, err:
raise if err[0] in expected_socket_write_errors:
if n < len(v): break # we couldn't write anything
output.insert(0, v[n:]) raise
break # we can't write any more if n < len(v):
output.insert(0, v[n:])
break # we can't write any more
finally:
self.__output_lock.release()
def handle_close(self): def handle_close(self):
self.close() self.close()
...@@ -184,11 +206,8 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -184,11 +206,8 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
def message_output(self, message): def message_output(self, message):
if __debug__: if __debug__:
if self._debug: if self._debug:
if len(message) > 40: log('message_output %d bytes: %s' %
m = message[:40]+' ...' (len(message), short_repr(message)),
else:
m = message
log('message_output %d bytes: %s' % (len(message), `m`),
level=zLOG.TRACE) level=zLOG.TRACE)
if self.__closed: if self.__closed:
...@@ -196,13 +215,17 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -196,13 +215,17 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
"This action is temporarily unavailable." "This action is temporarily unavailable."
"<p>" "<p>"
) )
# do two separate appends to avoid copying the message string self.__output_lock.acquire()
self.__output.append(struct.pack(">i", len(message))) try:
if len(message) <= SEND_SIZE: # do two separate appends to avoid copying the message string
self.__output.append(message) self.__output.append(struct.pack(">i", len(message)))
else: if len(message) <= SEND_SIZE:
for i in range(0, len(message), SEND_SIZE): self.__output.append(message)
self.__output.append(message[i:i+SEND_SIZE]) else:
for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release()
def close(self): def close(self):
if not self.__closed: if not self.__closed:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment