Commit f184a1d9 authored by Kirill Smelkov's avatar Kirill Smelkov

X trace IO in between client and server

parent fc0729b3
...@@ -31,10 +31,14 @@ The ZEO protocol sits on top of a sized message protocol. ...@@ -31,10 +31,14 @@ The ZEO protocol sits on top of a sized message protocol.
The ZEO protocol has client and server variants. The ZEO protocol has client and server variants.
""" """
from __future__ import print_function
import logging import logging
import socket import socket
from struct import unpack from struct import unpack
import sys import sys
import tempfile
from .compat import asyncio from .compat import asyncio
...@@ -43,6 +47,8 @@ logger = logging.getLogger(__name__) ...@@ -43,6 +47,8 @@ logger = logging.getLogger(__name__)
INET_FAMILIES = socket.AF_INET, socket.AF_INET6 INET_FAMILIES = socket.AF_INET, socket.AF_INET6
traceio = not ('runzeo' in sys.argv[0]) # trace on client side
print('%s traceio=%s' % (sys.argv[0], traceio))
class Protocol(asyncio.Protocol): class Protocol(asyncio.Protocol):
"""asyncio low-level ZEO base interface """asyncio low-level ZEO base interface
...@@ -66,6 +72,18 @@ class Protocol(asyncio.Protocol): ...@@ -66,6 +72,18 @@ class Protocol(asyncio.Protocol):
# Handle the first message, the protocol handshake, differently # Handle the first message, the protocol handshake, differently
self.message_received = self.first_message_received self.message_received = self.first_message_received
self.tracefile = tempfile.NamedTemporaryFile(bufsize=1*1024*1024, prefix='ZEO', suffix='.iotrace', delete=False, dir='/tmp')
def _traceio(self, txrx, message):
if traceio:
x = message
decode = getattr(self, 'decode', None)
if decode is not None:
try:
x = decode(x)
except:
pass
print('%s %r' % (txrx, x), file=self.tracefile)
def __repr__(self): def __repr__(self):
return self.name return self.name
...@@ -98,6 +116,7 @@ class Protocol(asyncio.Protocol): ...@@ -98,6 +116,7 @@ class Protocol(asyncio.Protocol):
if paused: if paused:
append(message) append(message)
else: else:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message)) writelines((pack(">I", len(message)), message))
self.write_message = write_message self.write_message = write_message
...@@ -111,6 +130,7 @@ class Protocol(asyncio.Protocol): ...@@ -111,6 +130,7 @@ class Protocol(asyncio.Protocol):
append(data) append(data)
return return
for message in data: for message in data:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message)) writelines((pack(">I", len(message)), message))
if paused: if paused:
append(data) append(data)
...@@ -159,6 +179,7 @@ class Protocol(asyncio.Protocol): ...@@ -159,6 +179,7 @@ class Protocol(asyncio.Protocol):
else: else:
self.want = 4 self.want = 4
self.getting_size = True self.getting_size = True
self._traceio('rx', collected)
self.message_received(collected) self.message_received(collected)
except Exception: except Exception:
logger.exception("data_received %s %s %s", logger.exception("data_received %s %s %s",
...@@ -189,10 +210,12 @@ class Protocol(asyncio.Protocol): ...@@ -189,10 +210,12 @@ class Protocol(asyncio.Protocol):
while output and not paused: while output and not paused:
message = output.pop(0) message = output.pop(0)
if isinstance(message, bytes): if isinstance(message, bytes):
self._traceio('tx', message)
writelines((pack(">I", len(message)), message)) writelines((pack(">I", len(message)), message))
else: else:
data = message data = message
for message in data: for message in data:
self._traceio('tx', message)
writelines((pack(">I", len(message)), message)) writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back. if paused: # paused again. Put iter back.
output.insert(0, data) output.insert(0, data)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment