Commit 00e1e290 authored by Jim Fulton's avatar Jim Fulton

Added support for message iterators. This allows one, for example, to

use an iterator to send a large file without loading it in memory.
parent b01f4057
......@@ -101,7 +101,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
self.__state = 0
self.__has_mac = 0
self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output
self.__output_messages = []
self.__output = []
self.__closed = False
# Each side of the connection sends and receives messages. A
......@@ -129,9 +129,31 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
def setSessionKey(self, sesskey):
log("set session key %r" % sesskey)
self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
# Low-level construction is now delayed until data are sent.
# This is to allow use of iterators that generate messages
# only when we're ready to do I/O so that we can effeciently
# transmit large files. Because we delay messages, we also
# have to delay setting the session key to retain proper
# ordering.
# The low-level output queue supports strings, a special close
# marker, and iterators. It doesn't support callbacks. We
# can create a allback by providing an iterator that doesn't
# yield anything.
# The hack fucntion below is a callback in iterator's
# clothing. :) It never yields anything, but is a generator
# and thus iterator, because it contains a yield statement.
def hack():
self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
if False:
yield ''
self.message_output(hack())
def get_addr(self):
return self.addr
......@@ -232,86 +254,90 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
return True
def writable(self):
if len(self.__output) == 0:
return False
else:
return True
return bool(self.__output_messages or self.__output)
def should_close(self):
self.__output.append(_close_marker)
self.__output_messages.append(_close_marker)
def handle_write(self):
self.__output_lock.acquire()
try:
output = self.__output
while output:
# Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data
# to send, we will likely incur delays caused by the
# unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time.
l = 0
for i in range(len(output)):
output = self.__output
messages = self.__output_messages
while output or messages:
# Process queued messages until we have enough output
size = sum((len(s) for s in output))
while (size <= SEND_SIZE) and messages:
message = messages[0]
if message.__class__ is str:
size += self.__message_output(messages.pop(0), output)
elif message is _close_marker:
del messages[:]
del output[:]
return self.close()
else:
try:
l += len(output[i])
except TypeError:
# We had an output marker, close the connection
assert output[i] is _close_marker
return self.close()
if l > SEND_SIZE:
break
i += 1
# It is very unlikely that i will be 1.
v = "".join(output[:i])
del output[:i]
try:
n = self.send(v)
except socket.error, err:
if err[0] in expected_socket_write_errors:
break # we couldn't write anything
raise
if n < len(v):
output.insert(0, v[n:])
break # we can't write any more
finally:
self.__output_lock.release()
message = message.next()
except StopIteration:
messages.pop(0)
else:
size += self.__message_output(message, output)
# Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data
# to send, we will likely incur delays caused by the
# unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time.
l = 0
for i in range(len(output)):
l += len(output[i])
if l > SEND_SIZE:
break
i += 1
# It is very unlikely that i will be 1.
v = "".join(output[:i])
del output[:i]
try:
n = self.send(v)
except socket.error, err:
if err[0] in expected_socket_write_errors:
break # we couldn't write anything
raise
if n < l:
output.insert(0, v[n:])
break # we can't write any more
def handle_close(self):
self.close()
def message_output(self, message):
if __debug__:
if self._debug:
log("message_output %d bytes: %s hmac=%d" %
(len(message), short_repr(message),
self.__hmac_send and 1 or 0),
level=TRACE)
if self.__closed:
raise DisconnectedError(
"This action is temporarily unavailable.<p>")
self.__output_lock.acquire()
try:
# do two separate appends to avoid copying the message string
if self.__hmac_send:
self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
self.__hmac_send.update(message)
self.__output.append(self.__hmac_send.digest())
else:
self.__output.append(struct.pack(">I", len(message)))
if len(message) <= SEND_SIZE:
self.__output.append(message)
else:
for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release()
self.__output_messages.append(message)
def __message_output(self, message, output):
# do two separate appends to avoid copying the message string
size = 4
if self.__hmac_send:
output.append(struct.pack(">I", len(message) | MAC_BIT))
self.__hmac_send.update(message)
output.append(self.__hmac_send.digest())
size += 20
else:
output.append(struct.pack(">I", len(message)))
if len(message) <= SEND_SIZE:
output.append(message)
else:
for i in range(0, len(message), SEND_SIZE):
output.append(message[i:i+SEND_SIZE])
return size + len(message)
def close(self):
if not self.__closed:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment