Commit e3cd5c5b authored by Julien Muchembled's avatar Julien Muchembled

qa: add testrunner options to dump/check the format of network packets

With the switch to msgpack, there was no schema anymore whereas it was
sometimes used for both automatic conversion (e.g. the last argument of
AskStoreTransaction must now be explicitly cast to list) and type checking.

This somewhat reintroduces a kind of schema that:
- is used by the test suite for type checking
- can be generated automatically from the test suite
  when one change the procotol
parent 9d0bf97a
......@@ -263,13 +263,11 @@ class Packet(object):
_answer = None
_code = None
_id = None
allow_dict = False
nodelay = True
poll_thread = False
def __init__(self, *args):
assert self._code is not None, "Packet class not registered"
assert self.allow_dict or dict not in map(type, args), args
self._args = args
def setId(self, value):
......@@ -471,7 +469,7 @@ class Packets(dict):
that haven't been unlocked.
:nodes: M -> S
""", allow_dict=True)
""")
AskFinalTID, AnswerFinalTID = request("""
Return final tid if ttid has been committed, to recover from certain
......@@ -692,7 +690,7 @@ class Packets(dict):
If current_serial's data is current on storage.
:nodes: C -> S
""", allow_dict=True)
""")
AskTIDsFrom, AnswerTIDsFrom = request("""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
......@@ -713,7 +711,7 @@ class Packets(dict):
specified.
:nodes: ctl -> A -> M
""", error=True, allow_dict=True)
""", error=True)
CheckPartition = notify("""
Ask a storage node to compare a partition with all other nodes.
......@@ -781,7 +779,7 @@ class Packets(dict):
no new data up to 'tid' for the given partition
:nodes: M -> S
""", allow_dict=True)
""")
NotifyReplicationDone = notify("""
Notify the master node that a partition has been successfully
......@@ -802,7 +800,7 @@ class Packets(dict):
and reply with the list of records we should not have.
:nodes: S -> S
""", allow_dict=True)
""")
AddTransaction = notify("""
Send metadata of a transaction to a node that does not have them.
......
......@@ -296,6 +296,13 @@ class TestRunner(BenchmarkRunner):
x('-S', '--stop-on-success', action='store_true', default=None,
help='Opposite of --stop-on-error: stop as soon as a test'
' passes. Details about errors are not printed at exit.')
x = parser.add_mutually_exclusive_group().add_argument
x('-p', '--dump-protocol', const=True,
dest='protocol', action='store_const',
help='Dump schema of protocol instead of checking it.')
x('-P', '--no-check-protocol', const=False,
dest='protocol', action='store_const',
help='Do not check schema of protocol.')
_('-r', '--readable-tid', action='store_true',
help='Change master behaviour to generate readable TIDs for easier'
' debugging (rather than from current time).')
......@@ -345,6 +352,7 @@ Environment Variables:
coverage = args.coverage,
cov_unit = args.cov_unit,
only = args.only,
protocol = args.protocol,
stop_on_success = args.stop_on_success,
readable_tid = args.readable_tid,
)
......@@ -372,6 +380,13 @@ Environment Variables:
self.__coverage.save()
del self.__coverage
orig(self, success)
if config.protocol is False:
from contextlib import nested
protocol_checker = nested()
else:
from neo.tests.protocol_checker import protocolChecker
protocol_checker = protocolChecker(config.protocol)
with protocol_checker:
try:
for _ in xrange(config.loop):
if config.unit:
......
# generated by running the whole test suite with -p
AbortTransaction(p64,[int])
AcceptIdentification(NodeTypes,?int,?int)
AddObject(p64,p64,int,bin,bin,?p64)
AddPendingNodes([int])
AddTransaction(p64,bin,bin,bin,bool,p64,[p64])
AnswerBeginTransaction(p64)
AnswerCheckCurrentSerial(?p64)
AnswerCheckSerialRange(int,bin,p64,bin,p64)
AnswerCheckTIDRange(int,bin,p64)
AnswerClusterState(?ClusterStates)
AnswerFetchObjects(?,?p64,?p64,{:})
AnswerFetchTransactions(?,?p64,[])
AnswerFinalTID(p64)
AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64)
AnswerLastTransaction(p64)
AnswerLockedTransactions({p64:?p64})
AnswerNewOIDs([p64])
AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
AnswerObjectHistory(p64,[(p64,int)])
AnswerObjectUndoSerial({p64:(p64,?p64,bool)})
AnswerPack(bool)
AnswerPartitionList(int,int,[[(int,CellStates)]])
AnswerPartitionTable(int,int,[[(int,CellStates)]])
AnswerPrimary(int)
AnswerRebaseObject(?(p64,p64,?(int,bin,bin)))
AnswerRebaseTransaction([p64])
AnswerRecovery(?int,?p64,?p64)
AnswerStoreObject(?p64)
AnswerStoreTransaction()
AnswerTIDs([p64])
AnswerTIDsFrom([p64])
AnswerTransactionFinished(p64,p64)
AnswerTransactionInformation(p64,bin,bin,bin,bool,[p64])
AnswerTweakPartitionTable(bool,[[(int,CellStates)]])
AnswerUnfinishedTransactions(p64,[p64])
AnswerVoteTransaction()
AskBeginTransaction(?p64)
AskCheckCurrentSerial(p64,p64,p64)
AskCheckSerialRange(int,int,p64,p64,p64)
AskCheckTIDRange(int,int,p64,p64)
AskClusterState()
AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]})
AskFetchTransactions(int,int,p64,p64,[p64])
AskFinalTID(p64)
AskFinishTransaction(p64,[p64],[p64])
AskLastIDs()
AskLastTransaction()
AskLockInformation(p64,p64)
AskLockedTransactions()
AskNewOIDs(int)
AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64)
AskObjectHistory(p64,int,int)
AskObjectUndoSerial(p64,p64,p64,[p64])
AskPack(p64)
AskPartitionList(int,int,?)
AskPartitionTable()
AskPrimary()
AskRebaseObject(p64,p64)
AskRebaseTransaction(p64,p64)
AskRecovery()
AskStoreObject(p64,p64,int,bin,bin,?p64,?p64)
AskStoreTransaction(p64,bin,bin,bin,[p64])
AskTIDs(int,int,int)
AskTIDsFrom(p64,p64,int,int)
AskTransactionInformation(p64)
AskUnfinishedTransactions([int])
AskVoteTransaction(p64)
CheckPartition(int,(bin,?(bin,int)),p64,p64)
CheckReplicas({int:?int},p64,?)
Error(int,bin)
FailedVote(p64,[int])
InvalidateObjects(p64,[p64])
NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64)
NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)])
NotifyPartitionChanges(int,int,[(int,int,CellStates)])
NotifyPartitionCorrupted(int,[int])
NotifyReady()
NotifyRepair(int)
NotifyReplicationDone(int,p64)
NotifyTransactionFinished(p64,p64)
NotifyUnlockInformation(p64)
Ping()
Pong()
Repair([int],int)
Replicate(p64,bin,{int:?(bin,int)})
RequestIdentification(NodeTypes,?int,?(bin,int),bin,?float,any,[int])
SendPartitionTable(?int,int,[[(int,CellStates)]])
SetClusterState(ClusterStates)
SetNodeState(int,NodeStates)
SetNumReplicas(int)
StartOperation(bool)
StopOperation()
Truncate(p64)
TweakPartitionTable(bool,[int])
ValidateTransaction(p64,p64)
# The use of ast is convoluted, and the result quite verbose,
# but that remains simpler than writing a parser from scratch.
import ast, os
from contextlib import contextmanager
from neo.lib.protocol import Packet, Enum
array = list, set, tuple
item = Enum.Item
class _ast(object):
def __getattr__(self, k):
v = lambda *args: getattr(ast, k)(lineno=0, col_offset=0, *args)
setattr(self, k, v)
return v
_ast = _ast()
class parseArgument(ast.NodeTransformer):
def visit_UnaryOp(self, node):
assert isinstance(node.op, ast.USub)
return _ast.Call(_ast.Name('option', ast.Load()),
[self.visit(node.operand)], [], None, None)
def visit_Name(self, node):
return _ast.Str(node.id.replace('_', '?'))
parseArgument = parseArgument().visit
class Argument(object):
merge = True
type = ''
option = False
@classmethod
def load(cls, arg):
arg = ast.parse(arg.rstrip()
.replace('?(', '-(').replace('?[', '-[').replace('?{', '-{')
.replace('?', '_').replace('[]', '[""]')
.replace('{:', '{"":').replace(':}', ':""}'),
mode="eval")
x = arg.body
name = x.func.id
arg.body = parseArgument(_ast.Tuple(x.args, ast.Load()))
return name, cls._load(eval(compile(arg, '', mode="eval"),
{'option': cls._option}))
@classmethod
def _load(cls, arg):
t = type(arg)
if t is cls:
return arg
x = object.__new__(cls)
if t is tuple:
x.type = map(cls._load, arg)
elif t is list:
x.type = cls._load(*arg),
elif t is dict:
(k, v), = arg.iteritems()
x.type = cls._load(k), cls._load(v)
else:
if arg.startswith('?'):
arg = arg[1:]
x.option = True
x.type = arg
return x
@classmethod
def _option(cls, arg):
arg = cls._load(arg)
arg.option = True
return arg
@classmethod
def _merge(cls, args):
if args:
x, = {cls(x) for x in args}
return x
return object.__new__(cls)
def __init__(self, arg, root=False):
if arg is None:
self.option = True
elif isinstance(arg, tuple) and (root or len(arg) > 1):
self.type = map(self.__class__, arg)
elif isinstance(arg, array):
self.type = self._merge(arg),
elif isinstance(arg, dict):
self.type = self._merge(arg), self._merge(arg.values())
else:
self.type = (('p64' if len(arg) == 8 else
'bin') if isinstance(arg, bytes) else
arg._enum._name if isinstance(arg, item) else
'str' if isinstance(arg, unicode) else
'int' if isinstance(arg, long) else
type(arg).__name__)
def __repr__(self):
x = self.type
if type(x) is tuple:
x = ('[%s]' if len(x) == 1 else '{%s:%s}') % x
elif type(x) is list:
x = '(%s)' % ','.join(map(repr, x))
return '?' + x if self.option else x
def __hash__(self):
return 0
def __eq__(self, other):
x = self.type
y = other.type
if x and y and x != 'any':
# Since we don't know whether an array is fixed-size record of
# heterogeneous values or a collection of homogeneous values,
# we end up with the following complicated heuristic.
t = type(x)
if t is tuple:
if len(x) == 1 and type(y) is list:
z = set(x)
z.update(y)
if len(z) == 1:
x = y = tuple(z)
if self.merge:
self.type = x
elif t is list:
if type(y) is tuple and len(y) == 1:
z = set(y)
z.update(x)
if len(z) == 1:
x = y = tuple(z)
if self.merge:
self.type = x
t = tuple
elif t is str is type(y) and {x, y}.issuperset(('bin', 'p64')):
x = y = 'bin'
if self.merge:
self.type = x
if not (t is type(y) and (t is not tuple or
len(x) == len(y)) and x == y):
if not self.merge:
return False
self.type = 'any'
if self.merge:
if not x:
self.type = y
if not self.option:
self.option = other.option
elif y and not x or other.option and not self.option:
return False
return True
class FrozenArgument(Argument):
merge = False
@contextmanager
def protocolChecker(dump):
x = 'Packet(p64,?[(bin,{int:})],{:?(?,[])},?{?:float})'
assert x == '%s%r' % Argument.load(x)
assert not (FrozenArgument([]) == Argument([0]))
path = os.path.join(os.path.dirname(__file__), 'protocol')
if dump:
import threading
from multiprocessing import Lock
lock = Lock()
schema = {}
pid = os.getpid()
r, w = os.pipe()
def _check(name, arg):
try:
schema[name] == arg
except KeyError:
schema[name] = arg
def check(name, args):
arg = Argument(args, True)
if pid == os.getpid():
_check(name, arg)
else:
with lock:
os.write(w, '%s%r\n' % (name, arg))
def check_thread(r):
for x in os.fdopen(r):
_check(*Argument.load(x))
check_thread = threading.Thread(target=check_thread, args=(r,))
check_thread.daemon = True
check_thread.start()
else:
with open(path) as p:
x = p.readline()
assert x[0] == '#', x
schema = dict(map(FrozenArgument.load, p))
def check(name, args):
arg = Argument(args, True)
if not (None is not schema.get(name) == arg):
raise Exception('invalid packet: %s%r' % (name, arg))
w = None
Packet_encode = Packet.__dict__['encode']
def encode(packet):
check(type(packet).__name__, packet._args)
return Packet_encode(packet)
Packet.encode = encode
try:
yield
finally:
Packet.encode = Packet_encode
if w:
os.close(w)
check_thread.join()
if dump:
with open(path, 'w') as p:
p.write('# generated by running the whole test suite with -p\n')
for x in sorted(schema.iteritems()):
p.write('%s%r\n' % x)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment