Commit 711bd878 authored by Julien Muchembled's avatar Julien Muchembled Committed by Levin Zimmermann

protocol: switch to msgpack for packet serialization

Not only for performance reasons (at least 3% faster) but also because of
several ugly things in the way packets were defined:
- packet field names, which are only documentary; for roots fields,
  they even just duplicate the packet names
- a lot of repetitions for packet names, and even confusion between the name
  of the packet definition and the name of the actual notify/request packet
- the need to implement field types for anything, like PByte to support new
  compression formats, since PBoolean is not enough

neo/lib/protocol.py is now much smaller.
parent b89b447c
......@@ -13,6 +13,13 @@
##############################################################################
def patch():
# For msgpack & Py2/ZODB5.
try:
from zodbpickle import binary
binary._pack = bytes.__str__
except ImportError:
pass
from hashlib import md5
from ZODB.Connection import Connection
......
......@@ -181,7 +181,7 @@ class Application(ThreadedApplication):
with self._connecting_to_master_node:
result = self.master_conn
if result is None:
self.new_oid_list = ()
self.new_oids = ()
result = self.master_conn = self._connectToPrimaryNode()
return result
......@@ -305,15 +305,19 @@ class Application(ThreadedApplication):
"""Get a new OID."""
self._oid_lock_acquire()
try:
if not self.new_oid_list:
for oid in self.new_oids:
break
else:
# Get new oid list from master node
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
self._askPrimary(Packets.AskNewOIDs(100))
if not self.new_oid_list:
for oid in self.new_oids:
break
else:
raise NEOStorageError('new_oid failed')
self.last_oid = oid = self.new_oid_list.pop()
self.last_oid = oid
return oid
finally:
self._oid_lock_release()
......@@ -612,7 +616,7 @@ class Application(ThreadedApplication):
# user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), ext, txn_context.cache_dict)
str(transaction.description), ext, list(txn_context.cache_dict))
queue = txn_context.queue
conn_dict = txn_context.conn_dict
# Ask in parallel all involved storage nodes to commit object metadata.
......@@ -697,7 +701,7 @@ class Application(ThreadedApplication):
else:
try:
notify(Packets.AbortTransaction(txn_context.ttid,
txn_context.conn_dict))
list(txn_context.conn_dict)))
except ConnectionClosed:
pass
# No need to flush queue, as it will be destroyed on return,
......@@ -731,7 +735,8 @@ class Application(ThreadedApplication):
for oid in checked_list:
del cache_dict[oid]
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
p = Packets.AskFinishTransaction(ttid, list(cache_dict),
checked_list)
try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid
......
......@@ -163,8 +163,7 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list):
oid_list.reverse()
self.app.new_oid_list = oid_list
self.app.new_oids = iter(oid_list)
def incompleteTransaction(self, conn, message):
raise NEOStorageError("storage nodes for which vote failed can not be"
......
......@@ -26,7 +26,7 @@ from .exception import NEOStorageError
class _WakeupPacket(object):
handler_method_name = 'pong'
decode = tuple
_args = ()
getId = int
class Transaction(object):
......
......@@ -16,12 +16,19 @@
from functools import wraps
from time import time
import msgpack
from msgpack.exceptions import UnpackValueError
from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorDelayedConnection
from .locking import RLock
from .protocol import uuid_str, Errors, PacketMalformedError, Packets
from .util import dummy_read_buffer, ReadBuffer
from .protocol import uuid_str, Errors, PacketMalformedError, Packets, \
Unpacker
@apply
class dummy_read_buffer(msgpack.Unpacker):
def feed(self, _):
pass
class ConnectionClosed(Exception):
pass
......@@ -292,7 +299,7 @@ class ListeningConnection(BaseConnection):
# message.
else:
conn._connected()
self.em.addWriter(conn) # for ENCODED_VERSION
self.em.addWriter(conn) # for HANDSHAKE_PACKET
def getAddress(self):
return self.connector.getAddress()
......@@ -311,12 +318,12 @@ class Connection(BaseConnection):
client = False
server = False
peer_id = None
_parser_state = None
_total_unpacked = 0
_timeout = None
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.read_buf = Unpacker()
# NOTE cur_id will be set in Server|Client to maintain `cur_id % 2 == const` invariant
#self.cur_id = 0
self.aborted = False
......@@ -429,42 +436,39 @@ class Connection(BaseConnection):
self._closure()
def _parse(self):
read = self.read_buf.read
version = read(4)
if version is None:
return
from .protocol import (ENCODED_VERSION, MAX_PACKET_SIZE,
PACKET_HEADER_FORMAT, Packets)
if version != ENCODED_VERSION:
logging.warning('Protocol version mismatch with %r', self)
from .protocol import HANDSHAKE_PACKET, MAGIC_SIZE, Packets
read_buf = self.read_buf
handshake = read_buf.read_bytes(len(HANDSHAKE_PACKET))
if handshake != HANDSHAKE_PACKET:
if HANDSHAKE_PACKET.startswith(handshake): # unlikely so tested last
# Not enough data and there's no API to know it in advance.
# Put it back.
read_buf.feed(handshake)
return
if HANDSHAKE_PACKET.startswith(handshake[:MAGIC_SIZE]):
logging.warning('Protocol version mismatch with %r', self)
else:
logging.debug('Rejecting non-NEO %r', self)
raise ConnectorException
header_size = PACKET_HEADER_FORMAT.size
unpack = PACKET_HEADER_FORMAT.unpack
read_next = read_buf.next
read_pos = read_buf.tell
def parse():
state = self._parser_state
if state is None:
header = read(header_size)
if header is None:
return
msg_id, msg_type, msg_len = unpack(header)
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
else:
msg_id, packet_klass, msg_len = state
data = read(msg_len)
if data is None:
# Not enough.
if state is None:
self._parser_state = msg_id, packet_klass, msg_len
else:
self._parser_state = None
packet = packet_klass()
packet.setContent(msg_id, data)
return packet
try:
msg_id, msg_type, args = read_next()
except StopIteration:
return
except UnpackValueError as e:
raise PacketMalformedError(str(e))
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
pos = read_pos()
packet = packet_klass(*args)
packet.setId(msg_id)
packet.size = pos - self._total_unpacked
self._total_unpacked = pos
return packet
self._parse = parse
return parse()
......@@ -517,7 +521,7 @@ class Connection(BaseConnection):
def close(self):
if self.connector is None:
assert self._on_close is None
assert not self.read_buf
assert not self.read_buf.read_bytes(1)
assert not self.isPending()
return
# process the network events with the last registered handler to
......@@ -528,7 +532,7 @@ class Connection(BaseConnection):
if self._on_close is not None:
self._on_close()
self._on_close = None
self.read_buf.clear()
self.read_buf = dummy_read_buffer
try:
if self.connecting:
handler.connectionFailed(self)
......
......@@ -19,7 +19,7 @@ import ssl
import errno
from time import time
from . import logging
from .protocol import ENCODED_VERSION
from .protocol import HANDSHAKE_PACKET
# Global connector registry.
# Fill by calling registerConnectorHandler.
......@@ -74,15 +74,14 @@ class SocketConnector(object):
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION]
self.queue_size = len(ENCODED_VERSION)
self.queued = [HANDSHAKE_PACKET]
self.queue_size = len(HANDSHAKE_PACKET)
return self
def queue(self, data):
was_empty = not self.queued
self.queued += data
for data in data:
self.queue_size += len(data)
self.queued.append(data)
self.queue_size += len(data)
return was_empty
def _error(self, op, exc=None):
......@@ -172,7 +171,7 @@ class SocketConnector(object):
except socket.error, e:
self._error('recv', e)
if data:
read_buf.append(data)
read_buf.feed(data)
return
self._error('recv')
......@@ -283,7 +282,7 @@ class _SSL:
# non-ragged EOF (peer properly closed its side of connection)
self._error('recv', None)
return
read_buf.append(data)
read_buf.feed(data)
except ssl.SSLWantReadError:
pass
except socket.error, e:
......
......@@ -23,7 +23,7 @@ NOBODY = []
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
decode = tuple
_args = ()
class getId(object):
def __eq__(self, other):
......
......@@ -71,7 +71,7 @@ class EventHandler(object):
method = getattr(self, packet.handler_method_name)
except AttributeError:
raise UnexpectedPacketError('no handler found')
args = packet.decode() or ()
args = packet._args
method(conn, *args, **kw)
except DelayEvent, e:
assert not kw, kw
......@@ -79,9 +79,6 @@ class EventHandler(object):
except UnexpectedPacketError, e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', conn, e)
conn.close()
except NotReadyError, message:
if not conn.isClosed():
if not message.args:
......
......@@ -154,7 +154,8 @@ class NEOLogger(Logger):
def _setup(self, filename=None, reset=False):
from . import protocol as p
global uuid_str
global packb, uuid_str
packb = p.packb
uuid_str = p.uuid_str
if self._db is not None:
self._db.close()
......@@ -257,7 +258,7 @@ class NEOLogger(Logger):
pktcls.__name__, peer, r.pkt.decode())
"""
if msg is not None:
msg = buffer(msg)
msg = buffer(msg if type(msg) is bytes else packb(msg))
q = "INSERT INTO packet VALUES (?,?,?,?,?,?)"
x = [r.created, nid, r.msg_id, r.code, peer, msg]
else:
......@@ -307,9 +308,14 @@ class NEOLogger(Logger):
def packet(self, connection, packet, outgoing):
#if True or self._db is not None:
if self._db is not None:
body = packet._body
if self._max_packet and self._max_packet < len(body):
body = None
if self._max_packet and self._max_packet < packet.size:
args = None
else:
args = packet._args
try:
hash(args)
except TypeError:
args = packb(args)
self._queue(PacketRecord(
pkt=packet,
created=time(),
......@@ -318,7 +324,7 @@ class NEOLogger(Logger):
outgoing=outgoing,
uuid=connection.getUUID(),
addr=connection.getAddress(),
msg=body))
msg=args))
def node(self, *cluster_nid):
name = self.name and str(self.name)
......
......@@ -14,27 +14,63 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import traceback
from cStringIO import StringIO
from struct import Struct
import threading
from functools import partial
from msgpack import packb
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 6
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# to upgrade other nodes.
PROTOCOL_VERSION = 0
# By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16).
HANDSHAKE_PACKET = packb(('NEO', PROTOCOL_VERSION))
# Used to distinguish non-NEO stream from version mismatch.
MAGIC_SIZE = len(HANDSHAKE_PACKET) - len(packb(PROTOCOL_VERSION))
# Avoid memory errors on corrupted data.
MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_FORMAT = Struct('!LHL')
RESPONSE_MASK = 0x8000
# Avoid some memory errors on corrupted data.
# Before we use msgpack, we limited the size of a whole packet. That's not
# possible anymore because the size is not known in advance. Packets bigger
# than the buffer size are possible (e.g. a huge list of small items) and for
# that we could compare the stream position (Unpacker.tell); it's not worth it.
UNPACK_BUFFER_SIZE = 0x4000000
@apply
def Unpacker():
global registerExtType, packb
from msgpack import ExtType, unpackb, Packer, Unpacker
ext_type_dict = []
kw = dict(use_bin_type=True)
pack_ext = Packer(**kw).pack
def registerExtType(getstate, make):
code = len(ext_type_dict)
ext_type_dict.append(lambda data: make(unpackb(data, use_list=False)))
return lambda obj: ExtType(code, pack_ext(getstate(obj)))
iterable_types = set, tuple
def default(obj):
try:
pack = obj._pack
except AttributeError:
assert type(obj) in iterable_types, type(obj)
return list(obj)
return pack()
lock = threading.Lock()
pack = Packer(default, strict_types=True, **kw).pack
def packb(obj):
with lock: # in case that 'default' is called
return pack(obj)
return partial(Unpacker, use_list=False, max_buffer_size=UNPACK_BUFFER_SIZE,
ext_hook=lambda code, data: ext_type_dict[code](data))
class Enum(tuple):
class Item(int):
__slots__ = '_name', '_enum'
__slots__ = '_name', '_enum', '_pack'
def __str__(self):
return self._name
def __repr__(self):
......@@ -49,31 +85,38 @@ class Enum(tuple):
names = func.func_code.co_names
self = tuple.__new__(cls, map(cls.Item, xrange(len(names))))
self._name = func.__name__
pack = registerExtType(int, self.__getitem__)
for item, name in zip(self, names):
setattr(self, name, item)
item._name = name
item._enum = self
item._pack = (lambda x: lambda: x)(pack(item))
return self
def __repr__(self):
return "<Enum %s>" % self._name
# The order of extension type is important.
# Enum types first, sorted alphabetically.
@Enum
def ErrorCodes():
ACK
DENIED
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
NON_READABLE_CELL
READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION
def CellStates():
# Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done.
OUT_OF_DATE
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node
# to another. It is also discarded immediately if out-of-date.
FEEDING
# A check revealed that data differs from other replicas. Cell is neither
# readable nor writable.
CORRUPTED
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
@Enum
def ClusterStates():
......@@ -108,11 +151,20 @@ def ClusterStates():
STOPPING_BACKUP
@Enum
def NodeTypes():
MASTER
STORAGE
CLIENT
ADMIN
def ErrorCodes():
ACK
DENIED
NOT_READY
OID_NOT_FOUND
TID_NOT_FOUND
OID_DOES_NOT_EXIST
PROTOCOL_ERROR
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
NON_READABLE_CELL
READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION
@Enum
def NodeStates():
......@@ -122,23 +174,11 @@ def NodeStates():
PENDING
@Enum
def CellStates():
# Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done.
OUT_OF_DATE
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node
# to another. It is also discarded immediately if out-of-date.
FEEDING
# A check revealed that data differs from other replicas. Cell is neither
# readable nor writable.
CORRUPTED
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
def NodeTypes():
MASTER
STORAGE
CLIENT
ADMIN
# used for logging
node_state_prefix_dict = {
......@@ -214,45 +254,24 @@ class NonReadableCell(Exception):
On such event, the client must retry, preferably another cell.
"""
class Packet(object):
"""
Base class for any packet definition. The _fmt class attribute must be
defined for any non-empty packet.
Base class for any packet definition.
"""
_ignore_when_closed = False
_request = None
_answer = None
_body = None
_code = None
_fmt = None
_id = None
allow_dict = False
nodelay = True
poll_thread = False
def __init__(self, *args):
assert self._code is not None, "Packet class not registered"
if args:
buf = StringIO()
self._fmt.encode(buf.write, args)
self._body = buf.getvalue()
else:
self._body = ''
def decode(self):
assert self._body is not None
if self._fmt is None:
return ()
buf = StringIO(self._body)
try:
return self._fmt.decode(buf.read)
except ParseError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
def setContent(self, msg_id, body):
""" Register the packet content for future decoding """
self._id = msg_id
self._body = body
assert self.allow_dict or dict not in map(type, args), args
self._args = args
def setId(self, value):
self._id = value
......@@ -261,14 +280,11 @@ class Packet(object):
assert self._id is not None, "No identifier applied on the packet"
return self._id
def encode(self):
def encode(self, packb=packb):
""" Encode a packet as a string to send it over the network """
content = self._body
return (PACKET_HEADER_FORMAT.pack(self._id, self._code, len(content)),
content)
def __len__(self):
return PACKET_HEADER_FORMAT.size + len(self._body)
r = packb((self._id, self._code, self._args))
self.size = len(r)
return r
def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._id)
......@@ -281,10 +297,10 @@ class Packet(object):
return self._code == other._code
def isError(self):
return isinstance(self, Error)
return self._code == RESPONSE_MASK
def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK
return self._code & RESPONSE_MASK
def getAnswerClass(self):
return self._answer
......@@ -296,1548 +312,530 @@ class Packet(object):
"""
return self._ignore_when_closed
class ParseError(Exception):
"""
An exception that encapsulate another and build the 'path' of the
packet item that generate the error.
"""
def __init__(self, item, trace):
Exception.__init__(self)
self._trace = trace
self._items = [item]
def append(self, item):
self._items.append(item)
def __repr__(self):
chain = '/'.join([item.getName() for item in reversed(self._items)])
return 'at %s:\n%s' % (chain, self._trace)
__str__ = __repr__
# packet parsers
class PItem(object):
"""
Base class for any packet item, _encode and _decode must be overridden
by subclasses.
"""
def __init__(self, name):
self._name = name
def __repr__(self):
return self.__class__.__name__
def getName(self):
return self._name
def _trace(self, method, *args):
try:
return method(*args)
except ParseError, e:
# trace and forward exception
e.append(self)
raise
except Exception:
# original exception, encapsulate it
trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
raise ParseError(self, trace)
def encode(self, writer, items):
return self._trace(self._encode, writer, items)
def decode(self, reader):
return self._trace(self._decode, reader)
def _encode(self, writer, items):
raise NotImplementedError, self.__class__.__name__
def _decode(self, reader):
raise NotImplementedError, self.__class__.__name__
class PStruct(PItem):
"""
Aggregate other items
"""
def __init__(self, name, *items):
PItem.__init__(self, name)
self._items = items
def _encode(self, writer, items):
assert len(self._items) == len(items), (items, self._items)
for item, value in zip(self._items, items):
item.encode(writer, value)
def _decode(self, reader):
return tuple([item.decode(reader) for item in self._items])
class PStructItem(PItem):
"""
A single value encoded with struct
"""
def __init__(self, name):
PItem.__init__(self, name)
struct = Struct(self._fmt)
self.pack = struct.pack
self.unpack = struct.unpack
self.size = struct.size
def _encode(self, writer, value):
writer(self.pack(value))
def _decode(self, reader):
return self.unpack(reader(self.size))[0]
class PStructItemOrNone(PStructItem):
def _encode(self, writer, value):
return writer(self._None if value is None else self.pack(value))
def _decode(self, reader):
value = reader(self.size)
return None if value == self._None else self.unpack(value)[0]
class POption(PStruct):
def _encode(self, writer, value):
if value is None:
writer('\0')
else:
writer('\1')
PStruct._encode(self, writer, value)
def _decode(self, reader):
if '\0\1'.index(reader(1)):
return PStruct._decode(self, reader)
class PList(PStructItem):
"""
A list of homogeneous items
"""
_fmt = '!L'
def __init__(self, name, item):
PStructItem.__init__(self, name)
self._item = item
def _encode(self, writer, items):
writer(self.pack(len(items)))
item = self._item
for value in items:
item.encode(writer, value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
item = self._item
return [item.decode(reader) for _ in xrange(length)]
class PDict(PStructItem):
"""
A dictionary with custom key and value formats
"""
_fmt = '!L'
def __init__(self, name, key, value):
PStructItem.__init__(self, name)
self._key = key
self._value = value
def _encode(self, writer, item):
assert isinstance(item , dict), (type(item), item)
writer(self.pack(len(item)))
key, value = self._key, self._value
for k, v in item.iteritems():
key.encode(writer, k)
value.encode(writer, v)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
key, value = self._key, self._value
new_dict = {}
for _ in xrange(length):
k = key.decode(reader)
v = value.decode(reader)
new_dict[k] = v
return new_dict
class PEnum(PStructItem):
"""
Encapsulate an enumeration value
"""
_fmt = 'b'
def __init__(self, name, enum):
PStructItem.__init__(self, name)
self._enum = enum
class PacketRegistryFactory(dict):
def _encode(self, writer, item):
if item is None:
item = -1
writer(self.pack(item))
def _decode(self, reader):
code = self.unpack(reader(self.size))[0]
if code == -1:
return None
try:
return self._enum[code]
except KeyError:
enum = self._enum.__class__.__name__
raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
class PString(PStructItem):
"""
A variable-length string
"""
_fmt = '!L'
def _encode(self, writer, value):
writer(self.pack(len(value)))
writer(value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
return reader(length)
class PAddress(PString):
"""
An host address (IPv4/IPv6)
"""
def __init__(self, name):
PString.__init__(self, name)
self._port = Struct('!H')
def _encode(self, writer, address):
if address:
host, port = address
PString._encode(self, writer, host)
writer(self._port.pack(port))
def __call__(self, name, base, d):
for k, v in d.items():
if isinstance(v, type) and issubclass(v, Packet):
v.__name__ = k
v.handler_method_name = k[0].lower() + k[1:]
# this builds a "singleton"
return type('PacketRegistry', base, d)(self)
def register(self, doc, ignore_when_closed=None, request=False, error=False,
_base=(Packet,), **kw):
""" Register a packet in the packet registry """
code = len(self)
if doc is None:
self[code] = None
return # None registered only to skip a code number (for compatibility)
if error and not request:
assert not code
code = RESPONSE_MASK
kw.update(__doc__=doc, _code=code)
packet = type('', _base, kw)
# register the request
self[code] = packet
if request:
if ignore_when_closed is None:
# By default, on a closed connection:
# - request: ignore
# - answer: keep
# - notification: keep
packet._ignore_when_closed = True
else:
assert ignore_when_closed is False
if error:
packet._answer = self[RESPONSE_MASK]
else:
# build a class for the answer
code |= RESPONSE_MASK
kw['_code'] = code
answer = packet._answer = self[code] = type('', _base, kw)
return packet, answer
else:
PString._encode(self, writer, '')
assert ignore_when_closed is None
return packet
def _decode(self, reader):
host = PString._decode(self, reader)
if host:
p = self._port
return host, p.unpack(reader(p.size))[0]
class PBoolean(PStructItem):
"""
A boolean value, encoded as a single byte
"""
_fmt = '!?'
class PNumber(PStructItem):
"""
A integer number (4-bytes length)
"""
_fmt = '!L'
class PIndex(PStructItem):
class Packets(dict):
"""
A big integer to defined indexes in a huge list.
Packet registry that checks packet code uniqueness and provides an index
"""
_fmt = '!Q'
__metaclass__ = PacketRegistryFactory()
notify = __metaclass__.register
request = partial(notify, request=True)
class PPTID(PStructItemOrNone):
"""
A None value means an invalid PTID
"""
_fmt = '!Q'
_None = Struct(_fmt).pack(0)
Error = notify("""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually.
class PChecksum(PItem):
"""
A hash (SHA1)
"""
def _encode(self, writer, checksum):
assert len(checksum) == 20, (len(checksum), checksum)
writer(checksum)
:nodes: * -> *
""", error=True)
def _decode(self, reader):
return reader(20)
RequestIdentification, AcceptIdentification = request("""
Request a node identification. This must be the first packet for any
connection.
class PSignedNull(PStructItemOrNone):
_fmt = '!l'
_None = Struct(_fmt).pack(0)
:nodes: * -> *
""", poll_thread=True)
class PUUID(PSignedNull):
"""
An UUID (node identifier, 4-bytes signed integer)
"""
Ping, Pong = request("""
Empty request used as network barrier.
class PTID(PItem):
"""
A transaction identifier
"""
def _encode(self, writer, tid):
if tid is None:
tid = INVALID_TID
assert len(tid) == 8, (len(tid), tid)
writer(tid)
def _decode(self, reader):
tid = reader(8)
if tid == INVALID_TID:
tid = None
return tid
# same definition, for now
POID = PTID
class PFloat(PStructItemOrNone):
"""
A float number (8-bytes length)
"""
_fmt = '!d'
_None = '\xff' * 8
# common definitions
PFEmpty = PStruct('no_content')
PFNodeType = PEnum('type', NodeTypes)
PFNodeState = PEnum('state', NodeStates)
PFCellState = PEnum('state', CellStates)
PFNodeList = PList('node_list',
PStruct('node',
PFNodeType,
PAddress('address'),
PUUID('uuid'),
PFNodeState,
PFloat('id_timestamp'),
),
)
PFCellList = PList('cell_list',
PStruct('cell',
PUUID('uuid'),
PFCellState,
),
)
PFRowList = PList('row_list',
PFCellList,
)
PFHistoryList = PList('history_list',
PStruct('history_entry',
PTID('serial'),
PNumber('size'),
),
)
PFUUIDList = PList('uuid_list',
PUUID('uuid'),
)
PFTidList = PList('tid_list',
PTID('tid'),
)
PFOidList = PList('oid_list',
POID('oid'),
)
# packets definition
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually.
:nodes: * -> *
""")
:nodes: * -> *
"""
_fmt = PStruct('error',
PNumber('code'),
PString('message'),
)
CloseClient = notify("""
Tell peer that it can close the connection if it has finished with us.
class Ping(Packet):
"""
Empty request used as network barrier.
:nodes: * -> *
""")
:nodes: * -> *
"""
_answer = PFEmpty
AskPrimary, AnswerPrimary = request("""
Ask node identier of the current primary master.
class CloseClient(Packet):
"""
Tell peer that it can close the connection if it has finished with us.
:nodes: ctl -> A
""")
:nodes: * -> *
"""
NotPrimaryMaster = notify("""
Notify peer that I'm not the primary master. Attach any extra
information to help the peer joining the cluster.
class RequestIdentification(Packet):
"""
Request a node identification. This must be the first packet for any
connection.
:nodes: SM -> *
""")
:nodes: * -> *
"""
poll_thread = True
_fmt = PStruct('request_identification',
PFNodeType,
PUUID('uuid'),
PAddress('address'),
PString('name'),
PFloat('id_timestamp'),
# storage:
PList('devpath', PString('devid')),
PList('new_nid', PNumber('offset')),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PUUID('your_uuid'),
)
class PrimaryMaster(Packet):
"""
Ask node identier of the current primary master.
NotifyNodeInformation = notify("""
Notify information about one or more nodes.
:nodes: ctl -> A
"""
_answer = PStruct('answer_primary',
PUUID('primary_uuid'),
)
:nodes: M -> *
""")
class NotPrimaryMaster(Packet):
"""
Notify peer that I'm not the primary master. Attach any extra information
to help the peer joining the cluster.
AskRecovery, AnswerRecovery = request("""
Ask storage nodes data needed by master to recover.
Reused by `neoctl print ids`.
:nodes: SM -> *
"""
_fmt = PStruct('not_primary_master',
PSignedNull('primary'),
PList('known_master_list',
PAddress('address'),
),
)
class Recovery(Packet):
"""
Ask storage nodes data needed by master to recover.
Reused by `neoctl print ids`.
:nodes: M -> S; ctl -> A -> M
""")
:nodes: M -> S; ctl -> A -> M
"""
_answer = PStruct('answer_recovery',
PPTID('ptid'),
PTID('backup_tid'),
PTID('truncate_tid'),
)
AskLastIDs, AnswerLastIDs = request("""
Ask the last OID/TID so that a master can initialize its
TransactionManager. Reused by `neoctl print ids`.
class LastIDs(Packet):
"""
Ask the last OID/TID so that a master can initialize its TransactionManager.
Reused by `neoctl print ids`.
:nodes: M -> S; ctl -> A -> M
""")
:nodes: M -> S; ctl -> A -> M
"""
_answer = PStruct('answer_last_ids',
POID('last_oid'),
PTID('last_tid'),
)
AskPartitionTable, AnswerPartitionTable = request("""
Ask storage node the remaining data needed by master to recover.
class PartitionTable(Packet):
"""
Ask storage node the remaining data needed by master to recover.
:nodes: M -> S
""")
:nodes: M -> S
"""
_answer = PStruct('answer_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
SendPartitionTable = notify("""
Send the full partition table to admin/client/storage nodes on
connection.
class NotifyPartitionTable(Packet):
"""
Send the full partition table to admin/client/storage nodes on connection.
:nodes: M -> A, C, S
""")
:nodes: M -> A, C, S
"""
_fmt = PStruct('send_partition_table',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
NotifyPartitionChanges = notify("""
Notify about changes in the partition table.
class PartitionChanges(Packet):
"""
Notify about changes in the partition table.
:nodes: M -> *
""")
:nodes: M -> *
"""
_fmt = PStruct('notify_partition_changes',
PPTID('ptid'),
PNumber('num_replicas'),
PList('cell_list',
PStruct('cell',
PNumber('offset'),
PUUID('uuid'),
PFCellState,
),
),
)
class StartOperation(Packet):
"""
Tell a storage node to start operation. Before this message, it must only
communicate with the primary master.
StartOperation = notify("""
Tell a storage node to start operation. Before this message,
it must only communicate with the primary master.
:nodes: M -> S
"""
_fmt = PStruct('start_operation',
# XXX: Is this boolean needed ? Maybe this
# can be deduced from cluster state.
PBoolean('backup'),
)
:nodes: M -> S
""")
class StopOperation(Packet):
"""
Notify that the cluster is not operational anymore. Any operation between
nodes must be aborted.
StopOperation = notify("""
Notify that the cluster is not operational anymore.
Any operation between nodes must be aborted.
:nodes: M -> S, C
"""
:nodes: M -> S, C
""")
class UnfinishedTransactions(Packet):
"""
Ask unfinished transactions, which will be replicated when they're finished.
AskUnfinishedTransactions, AnswerUnfinishedTransactions = request("""
Ask unfinished transactions, which will be replicated
when they're finished.
:nodes: S -> M
"""
_fmt = PStruct('ask_unfinished_transactions',
PList('row_list',
PNumber('offset'),
),
)
_answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'),
PList('tid_list',
PTID('unfinished_tid'),
),
)
class LockedTransactions(Packet):
"""
Ask locked transactions to replay committed transactions that haven't been
unlocked.
:nodes: S -> M
""")
:nodes: M -> S
"""
_answer = PStruct('answer_locked_transactions',
PDict('tid_dict',
PTID('ttid'),
PTID('tid'),
),
)
class FinalTID(Packet):
"""
Return final tid if ttid has been committed, to recover from certain
failures during tpc_finish.
AskLockedTransactions, AnswerLockedTransactions = request("""
Ask locked transactions to replay committed transactions
that haven't been unlocked.
:nodes: M -> S; C -> M, S
"""
_fmt = PStruct('final_tid',
PTID('ttid'),
)
:nodes: M -> S
""", allow_dict=True)
_answer = PStruct('final_tid',
PTID('tid'),
)
AskFinalTID, AnswerFinalTID = request("""
Return final tid if ttid has been committed, to recover from certain
failures during tpc_finish.
class ValidateTransaction(Packet):
"""
Do replay a committed transaction that was not unlocked.
:nodes: M -> S; C -> M, S
""")
:nodes: M -> S
"""
_fmt = PStruct('validate_transaction',
PTID('ttid'),
PTID('tid'),
)
ValidateTransaction = notify("""
Do replay a committed transaction that was not unlocked.
class BeginTransaction(Packet):
"""
Ask to begin a new transaction. This maps to `tpc_begin`.
:nodes: M -> S
""")
:nodes: C -> M
"""
_fmt = PStruct('ask_begin_transaction',
PTID('tid'),
)
AskBeginTransaction, AnswerBeginTransaction = request("""
Ask to begin a new transaction. This maps to `tpc_begin`.
_answer = PStruct('answer_begin_transaction',
PTID('tid'),
)
:nodes: C -> M
""")
class FailedVote(Packet):
"""
Report storage nodes for which vote failed.
True is returned if it's still possible to finish the transaction.
FailedVote = request("""
Report storage nodes for which vote failed.
True is returned if it's still possible to finish the transaction.
:nodes: C -> M
"""
_fmt = PStruct('failed_vote',
PTID('tid'),
PFUUIDList,
)
:nodes: C -> M
""", error=True)
_answer = Error
AskFinishTransaction, AnswerTransactionFinished = request("""
Finish a transaction. Return the TID of the committed transaction.
This maps to `tpc_finish`.
class FinishTransaction(Packet):
"""
Finish a transaction. Return the TID of the committed transaction.
This maps to `tpc_finish`.
:nodes: C -> M
""", ignore_when_closed=False, poll_thread=True)
:nodes: C -> M
"""
poll_thread = True
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
PList('checked_list',
POID('oid'),
),
)
_answer = PStruct('answer_information_locked',
PTID('ttid'),
PTID('tid'),
)
class NotifyTransactionFinished(Packet):
"""
Notify that a transaction blocking a replication is now finished.
AskLockInformation, AnswerInformationLocked = request("""
Commit a transaction. The new data is read-locked.
:nodes: M -> S
"""
_fmt = PStruct('notify_transaction_finished',
PTID('ttid'),
PTID('max_tid'),
)
:nodes: M -> S
""", ignore_when_closed=False)
class LockInformation(Packet):
"""
Commit a transaction. The new data is read-locked.
InvalidateObjects = notify("""
Notify about a new transaction modifying objects,
invalidating client caches.
:nodes: M -> S
"""
_fmt = PStruct('ask_lock_informations',
PTID('ttid'),
PTID('tid'),
)
:nodes: M -> C
""")
_answer = PStruct('answer_information_locked',
PTID('ttid'),
)
NotifyUnlockInformation = notify("""
Notify about a successfully committed transaction. The new data can be
unlocked.
class InvalidateObjects(Packet):
"""
Notify about a new transaction modifying objects,
invalidating client caches.
:nodes: M -> S
""")
:nodes: M -> C
"""
_fmt = PStruct('ask_finish_transaction',
PTID('tid'),
PFOidList,
)
AskNewOIDs, AnswerNewOIDs = request("""
Ask new OIDs to create objects.
class UnlockInformation(Packet):
"""
Notify about a successfully committed transaction. The new data can be
unlocked.
:nodes: C -> M
""")
:nodes: M -> S
"""
_fmt = PStruct('notify_unlock_information',
PTID('ttid'),
)
NotifyDeadlock = notify("""
Ask master to generate a new TTID that will be used by the client to
solve a deadlock by rebasing the transaction on top of concurrent
changes.
class GenerateOIDs(Packet):
"""
Ask new OIDs to create objects.
:nodes: S -> M -> C
""")
:nodes: C -> M
"""
_fmt = PStruct('ask_new_oids',
PNumber('num_oids'),
)
AskRebaseTransaction, AnswerRebaseTransaction = request("""
Rebase a transaction to solve a deadlock.
_answer = PStruct('answer_new_oids',
PFOidList,
)
:nodes: C -> S
""")
class Deadlock(Packet):
"""
Ask master to generate a new TTID that will be used by the client to solve
a deadlock by rebasing the transaction on top of concurrent changes.
AskRebaseObject, AnswerRebaseObject = request("""
Rebase an object change to solve a deadlock.
:nodes: S -> M -> C
"""
_fmt = PStruct('notify_deadlock',
PTID('ttid'),
PTID('locking_tid'),
)
:nodes: C -> S
class RebaseTransaction(Packet):
"""
Rebase a transaction to solve a deadlock.
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
""", data_path=(1, 0, 2, 0))
:nodes: C -> S
"""
_fmt = PStruct('ask_rebase_transaction',
PTID('ttid'),
PTID('locking_tid'),
)
AskStoreObject, AnswerStoreObject = request("""
Ask to create/modify an object. This maps to `store`.
_answer = PStruct('answer_rebase_transaction',
PFOidList,
)
As for IStorage, 'serial' is ZERO_TID for new objects.
class RebaseObject(Packet):
"""
Rebase an object change to solve a deadlock.
:nodes: C -> S
""", data_path=(0, 2))
:nodes: C -> S
AbortTransaction = notify("""
Abort a transaction. This maps to `tpc_abort`.
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
"""
_fmt = PStruct('ask_rebase_object',
PTID('ttid'),
PTID('oid'),
)
_answer = PStruct('answer_rebase_object',
POption('conflict',
PTID('serial'),
PTID('conflict_serial'),
POption('data',
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
),
)
)
class StoreObject(Packet):
"""
Ask to create/modify an object. This maps to `store`.
:nodes: C -> S; C -> M -> S
""")
As for IStorage, 'serial' is ZERO_TID for new objects.
AskStoreTransaction, AnswerStoreTransaction = request("""
Ask to store a transaction. Implies vote.
:nodes: C -> S
"""
_fmt = PStruct('ask_store_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
PTID('tid'),
)
_answer = PStruct('answer_store_object',
PTID('conflict'),
)
class AbortTransaction(Packet):
"""
Abort a transaction. This maps to `tpc_abort`.
:nodes: C -> S
""")
:nodes: C -> S; C -> M -> S
"""
_fmt = PStruct('abort_transaction',
PTID('tid'),
PFUUIDList, # unused for * -> S
)
AskVoteTransaction, AnswerVoteTransaction = request("""
Ask to vote a transaction.
class StoreTransaction(Packet):
"""
Ask to store a transaction. Implies vote.
:nodes: C -> S
""")
:nodes: C -> S
"""
_fmt = PStruct('ask_store_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PFOidList,
)
_answer = PFEmpty
class VoteTransaction(Packet):
"""
Ask to vote a transaction.
AskObject, AnswerObject = request("""
Ask a stored object by its OID, optionally at/before a specific tid.
This maps to `load/loadBefore/loadSerial`.
:nodes: C -> S
"""
_fmt = PStruct('ask_vote_transaction',
PTID('tid'),
)
_answer = PFEmpty
:nodes: C -> S
""", data_path=(1, 3))
class GetObject(Packet):
"""
Ask a stored object by its OID, optionally at/before a specific tid.
This maps to `load/loadBefore/loadSerial`.
AskTIDs, AnswerTIDs = request("""
Ask for TIDs between a range of offsets. The order of TIDs is
descending, and the range is [first, last). This maps to `undoLog`.
:nodes: C -> S
"""
_fmt = PStruct('ask_object',
POID('oid'),
PTID('at'),
PTID('before'),
)
_answer = PStruct('answer_object',
POID('oid'),
PTID('serial_start'),
PTID('serial_end'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class TIDList(Packet):
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). This maps to `undoLog`.
:nodes: C -> S
""")
:nodes: C -> S
"""
_fmt = PStruct('ask_tids',
PIndex('first'),
PIndex('last'),
PNumber('partition'),
)
AskTransactionInformation, AnswerTransactionInformation = request("""
Ask for transaction metadata.
_answer = PStruct('answer_tids',
PFTidList,
)
:nodes: C -> S
""")
class TIDListFrom(Packet):
"""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
Used by `iterator`.
AskObjectHistory, AnswerObjectHistory = request("""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. This maps to `history`.
:nodes: C -> S
"""
_fmt = PStruct('tid_list_from',
PTID('min_tid'),
PTID('max_tid'),
PNumber('length'),
PNumber('partition'),
)
_answer = PStruct('answer_tids',
PFTidList,
)
class TransactionInformation(Packet):
"""
Ask for transaction metadata.
:nodes: C -> S
""")
:nodes: C -> S
"""
_fmt = PStruct('ask_transaction_information',
PTID('tid'),
)
_answer = PStruct('answer_transaction_information',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PFOidList,
)
class ObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. This maps to `history`.
AskPartitionList, AnswerPartitionList = request("""
Ask information about partitions.
:nodes: C -> S
"""
_fmt = PStruct('ask_object_history',
POID('oid'),
PIndex('first'),
PIndex('last'),
)
_answer = PStruct('answer_object_history',
POID('oid'),
PFHistoryList,
)
class PartitionList(Packet):
"""
Ask information about partitions.
:nodes: ctl -> A
""")
:nodes: ctl -> A
"""
_fmt = PStruct('ask_partition_list',
PNumber('min_offset'),
PNumber('max_offset'),
PUUID('uuid'),
)
_answer = PStruct('answer_partition_list',
PPTID('ptid'),
PNumber('num_replicas'),
PFRowList,
)
class NodeList(Packet):
"""
Ask information about nodes.
AskNodeList, AnswerNodeList = request("""
Ask information about nodes.
:nodes: ctl -> A
"""
_fmt = PStruct('ask_node_list',
PFNodeType,
)
:nodes: ctl -> A
""")
_answer = PStruct('answer_node_list',
PFNodeList,
)
SetNodeState = request("""
Change the state of a node.
class SetNodeState(Packet):
"""
Change the state of a node.
:nodes: ctl -> A -> M
""", error=True, ignore_when_closed=False)
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_node_state',
PUUID('uuid'),
PFNodeState,
)
AddPendingNodes = request("""
Mark given pending nodes as running, for future inclusion when tweaking
the partition table.
_answer = Error
:nodes: ctl -> A -> M
""", error=True, ignore_when_closed=False)
class AddPendingNodes(Packet):
"""
Mark given pending nodes as running, for future inclusion when tweaking
the partition table.
TweakPartitionTable, AnswerTweakPartitionTable = request("""
Ask the master to balance the partition table, optionally excluding
specific nodes in anticipation of removing them.
:nodes: ctl -> A -> M
"""
_fmt = PStruct('add_pending_nodes',
PFUUIDList,
)
:nodes: ctl -> A -> M
""")
_answer = Error
SetNumReplicas = request("""
Set the number of replicas.
class TweakPartitionTable(Packet):
"""
Ask the master to balance the partition table, optionally excluding
specific nodes in anticipation of removing them.
:nodes: ctl -> A -> M
""", error=True, ignore_when_closed=False)
:nodes: ctl -> A -> M
"""
_fmt = PStruct('tweak_partition_table',
PBoolean('dry_run'),
PFUUIDList,
)
SetClusterState = request("""
Set the cluster state.
_answer = PStruct('answer_tweak_partition_table',
PBoolean('changed'),
PFRowList,
)
:nodes: ctl -> A -> M
""", error=True, ignore_when_closed=False)
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes.
Repair = request("""
Ask storage nodes to repair their databases.
:nodes: M -> *
"""
_fmt = PStruct('notify_node_informations',
PFloat('id_timestamp'),
PFNodeList,
)
:nodes: ctl -> A -> M
""", error=True)
class SetNumReplicas(Packet):
"""
Set the number of replicas.
NotifyRepair = notify("""
Repair is translated to this message, asking a specific storage node to
repair its database.
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_num_replicas',
PNumber('num_replicas'),
)
:nodes: M -> S
""")
_answer = Error
NotifyClusterInformation = notify("""
Notify about a cluster state change.
class SetClusterState(Packet):
"""
Set the cluster state.
:nodes: M -> *
""")
:nodes: ctl -> A -> M
"""
_fmt = PStruct('set_cluster_state',
PEnum('state', ClusterStates),
)
AskClusterState, AnswerClusterState = request("""
Ask the state of the cluster
_answer = Error
:nodes: ctl -> A; A -> M
""")
class Repair(Packet):
"""
Ask storage nodes to repair their databases.
AskObjectUndoSerial, AnswerObjectUndoSerial = request("""
Ask storage the serial where object data is when undoing given
transaction, for a list of OIDs.
:nodes: ctl -> A -> M
"""
_flags = map(PBoolean, ('dry_run',
# 'prune_orphan' (commented because it's the only option for the moment)
))
_fmt = PStruct('repair',
PFUUIDList,
*_flags)
Answer a dict mapping oids to 3-tuples:
current_serial (TID)
The latest serial visible to the undoing transaction.
undo_serial (TID)
Where undone data is (tid at which data is before given undo).
is_current (bool)
If current_serial's data is current on storage.
_answer = Error
:nodes: C -> S
""", allow_dict=True)
class RepairOne(Packet):
"""
Repair is translated to this message, asking a specific storage node to
repair its database.
AskTIDsFrom, AnswerTIDsFrom = request("""
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
Used by `iterator`.
:nodes: M -> S
"""
_fmt = PStruct('repair', *Repair._flags)
:nodes: C -> S
""")
class ClusterInformation(Packet):
"""
Notify about a cluster state change.
AskPack, AnswerPack = request("""
Request a pack at given TID.
:nodes: M -> *
"""
_fmt = PStruct('notify_cluster_information',
PEnum('state', ClusterStates),
)
:nodes: C -> M -> S
""", ignore_when_closed=False)
class ClusterState(Packet):
"""
Ask the state of the cluster
CheckReplicas = request("""
Ask the cluster to search for mismatches between replicas, metadata
only, and optionally within a specific range. Reference nodes can be
specified.
:nodes: ctl -> A; A -> M
"""
:nodes: ctl -> A -> M
""", error=True, allow_dict=True)
_answer = PStruct('answer_cluster_state',
PEnum('state', ClusterStates),
)
CheckPartition = notify("""
Ask a storage node to compare a partition with all other nodes.
Like for CheckReplicas, only metadata are checked, optionally within a
specific range. A reference node can be specified.
class ObjectUndoSerial(Packet):
"""
Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs.
:nodes: M -> S
""")
object_tid_dict has the following format:
key: oid
value: 3-tuple
current_serial (TID)
The latest serial visible to the undoing transaction.
undo_serial (TID)
Where undone data is (tid at which data is before given undo).
is_current (bool)
If current_serial's data is current on storage.
AskCheckTIDRange, AnswerCheckTIDRange = request("""
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
:nodes: C -> S
"""
_fmt = PStruct('ask_undo_transaction',
PTID('tid'),
PTID('ltid'),
PTID('undone_tid'),
PFOidList,
)
_answer = PStruct('answer_undo_transaction',
PDict('object_tid_dict',
POID('oid'),
PStruct('object_tid_value',
PTID('current_serial'),
PTID('undo_serial'),
PBoolean('is_current'),
),
),
)
class CheckCurrentSerial(Packet):
"""
Check if given serial is current for the given oid, and lock it so that
this state is not altered until transaction ends.
This maps to `checkCurrentSerialInTransaction`.
:nodes: S -> S
""")
:nodes: C -> S
"""
_fmt = PStruct('ask_check_current_serial',
PTID('tid'),
POID('oid'),
PTID('serial'),
)
AskCheckSerialRange, AnswerCheckSerialRange = request("""
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
_answer = StoreObject._answer
:nodes: S -> S
""")
class Pack(Packet):
"""
Request a pack at given TID.
NotifyPartitionCorrupted = notify("""
Notify that mismatches were found while check replicas for a partition.
:nodes: C -> M -> S
"""
_fmt = PStruct('ask_pack',
PTID('tid'),
)
:nodes: S -> M
""")
_answer = PStruct('answer_pack',
PBoolean('status'),
)
NotifyReady = notify("""
Notify that we're ready to serve requests.
class CheckReplicas(Packet):
"""
Ask the cluster to search for mismatches between replicas, metadata only,
and optionally within a specific range. Reference nodes can be specified.
:nodes: S -> M
""")
:nodes: ctl -> A -> M
"""
_fmt = PStruct('check_replicas',
PDict('partition_dict',
PNumber('partition'),
PUUID('source'),
),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = Error
class CheckPartition(Packet):
"""
Ask a storage node to compare a partition with all other nodes.
Like for CheckReplicas, only metadata are checked, optionally within a
specific range. A reference node can be specified.
AskLastTransaction, AnswerLastTransaction = request("""
Ask last committed TID.
:nodes: M -> S
"""
_fmt = PStruct('check_partition',
PNumber('partition'),
PStruct('source',
PString('upstream_name'),
PAddress('address'),
),
PTID('min_tid'),
PTID('max_tid'),
)
class CheckTIDRange(Packet):
"""
Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and
reference node.
:nodes: C -> M; ctl -> A -> M
""", poll_thread=True)
:nodes: S -> S
"""
_fmt = PStruct('ask_check_tid_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
)
_answer = PStruct('answer_check_tid_range',
PNumber('count'),
PChecksum('checksum'),
PTID('max_tid'),
)
class CheckSerialRange(Packet):
"""
Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and
reference node.
AskCheckCurrentSerial, AnswerCheckCurrentSerial = request("""
Check if given serial is current for the given oid, and lock it so that
this state is not altered until transaction ends.
This maps to `checkCurrentSerialInTransaction`.
:nodes: S -> S
"""
_fmt = PStruct('ask_check_serial_range',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
)
_answer = PStruct('answer_check_serial_range',
PNumber('count'),
PChecksum('tid_checksum'),
PTID('max_tid'),
PChecksum('oid_checksum'),
POID('max_oid'),
)
class PartitionCorrupted(Packet):
"""
Notify that mismatches were found while check replicas for a partition.
:nodes: C -> S
""")
:nodes: S -> M
"""
_fmt = PStruct('partition_corrupted',
PNumber('partition'),
PList('cell_list',
PUUID('uuid'),
),
)
class LastTransaction(Packet):
"""
Ask last committed TID.
NotifyTransactionFinished = notify("""
Notify that a transaction blocking a replication is now finished.
:nodes: C -> M; ctl -> A -> M
"""
poll_thread = True
:nodes: M -> S
""")
_answer = PStruct('answer_last_transaction',
PTID('tid'),
)
Replicate = notify("""
Notify a storage node to replicate partitions up to given 'tid'
and from given sources.
class NotifyReady(Packet):
"""
Notify that we're ready to serve requests.
args: tid, upstream_name, {partition: address}
- upstream_name: replicate from an upstream cluster
- address: address of the source storage node, or None if there's
no new data up to 'tid' for the given partition
:nodes: S -> M
"""
:nodes: M -> S
""", allow_dict=True)
class FetchTransactions(Packet):
"""
Ask a storage node to send all transaction data we don't have,
and reply with the list of transactions we should not have.
NotifyReplicationDone = notify("""
Notify the master node that a partition has been successfully
replicated from a storage to another.
:nodes: S -> S
"""
_fmt = PStruct('ask_transaction_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
PFTidList, # already known transactions
)
_answer = PStruct('answer_transaction_list',
PTID('pack_tid'),
PTID('next_tid'),
PFTidList, # transactions to delete
)
class AddTransaction(Packet):
"""
Send metadata of a transaction to a node that do not have them.
:nodes: S -> M
""")
:nodes: S -> S
"""
nodelay = False
_fmt = PStruct('add_transaction',
PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PTID('ttid'),
PFOidList,
)
class FetchObjects(Packet):
"""
Ask a storage node to send object records we don't have,
and reply with the list of records we should not have.
AskFetchTransactions, AnswerFetchTransactions = request("""
Ask a storage node to send all transaction data we don't have,
and reply with the list of transactions we should not have.
:nodes: S -> S
"""
_fmt = PStruct('ask_object_list',
PNumber('partition'),
PNumber('length'),
PTID('min_tid'),
PTID('max_tid'),
POID('min_oid'),
PDict('object_dict', # already known objects
PTID('serial'),
PFOidList,
),
)
_answer = PStruct('answer_object_list',
PTID('pack_tid'),
PTID('next_tid'),
POID('next_oid'),
PDict('object_dict', # objects to delete
PTID('serial'),
PFOidList,
),
)
class AddObject(Packet):
"""
Send an object record to a node that do not have it.
:nodes: S -> S
""")
:nodes: S -> S
"""
nodelay = False
_fmt = PStruct('add_object',
POID('oid'),
PTID('serial'),
PBoolean('compression'),
PChecksum('checksum'),
PString('data'),
PTID('data_serial'),
)
class Replicate(Packet):
"""
Notify a storage node to replicate partitions up to given 'tid'
and from given sources.
AskFetchObjects, AnswerFetchObjects = request("""
Ask a storage node to send object records we don't have,
and reply with the list of records we should not have.
- upstream_name: replicate from an upstream cluster
- address: address of the source storage node, or None if there's no new
data up to 'tid' for the given partition
:nodes: S -> S
""", allow_dict=True)
:nodes: M -> S
"""
_fmt = PStruct('replicate',
PTID('tid'),
PString('upstream_name'),
PDict('source_dict',
PNumber('partition'),
PAddress('address'),
)
)
class ReplicationDone(Packet):
"""
Notify the master node that a partition has been successfully replicated
from a storage to another.
AddTransaction = notify("""
Send metadata of a transaction to a node that does not have them.
:nodes: S -> M
"""
_fmt = PStruct('notify_replication_done',
PNumber('offset'),
PTID('tid'),
)
:nodes: S -> S
""", nodelay=False)
class Truncate(Packet):
"""
Request DB to be truncated. Also used to leave backup mode.
AddObject = notify("""
Send an object record to a node that does not have it.
:nodes: ctl -> A -> M; M -> S
"""
_fmt = PStruct('truncate',
PTID('tid'),
)
:nodes: S -> S
""", nodelay=False, data_path=(0, 2))
_answer = Error
Truncate = request("""
Request DB to be truncated. Also used to leave backup mode.
class FlushLog(Packet):
"""
Request all nodes to flush their logs.
:nodes: ctl -> A -> M; M -> S
""", error=True)
:nodes: ctl -> A -> M -> *
"""
FlushLog = notify("""
Request all nodes to flush their logs.
:nodes: ctl -> A -> M -> *
""")
_next_code = 0
def register(request, ignore_when_closed=None):
""" Register a packet in the packet registry """
global _next_code
code = _next_code
assert code < RESPONSE_MASK
_next_code = code + 1
if request is Error:
code |= RESPONSE_MASK
# register the request
request._code = code
answer = request._answer
if ignore_when_closed is None:
# By default, on a closed connection:
# - request: ignore
# - answer: keep
# - notification: keep
ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed
if answer in (Error, None):
return request
# build a class for the answer
answer = type('Answer' + request.__name__, (Packet, ), {})
answer._fmt = request._answer
answer.poll_thread = request.poll_thread
answer._request = request
assert answer._code is None, "Answer of %s is already used" % (request, )
answer._code = code | RESPONSE_MASK
request._answer = answer
return request, answer
del notify, request
class Packets(dict):
"""
Packet registry that checks packet code uniqueness and provides an index
"""
def __metaclass__(name, base, d):
# this builds a "singleton"
cls = type('PacketRegistry', base, d)()
for k, v in d.iteritems():
if isinstance(v, type) and issubclass(v, Packet):
v.handler_method_name = k[0].lower() + k[1:]
cls[v._code] = v
return cls
Error = register(
Error)
RequestIdentification, AcceptIdentification = register(
RequestIdentification, ignore_when_closed=True)
Ping, Pong = register(
Ping)
CloseClient = register(
CloseClient)
AskPrimary, AnswerPrimary = register(
PrimaryMaster)
NotPrimaryMaster = register(
NotPrimaryMaster)
NotifyNodeInformation = register(
NotifyNodeInformation)
AskRecovery, AnswerRecovery = register(
Recovery)
AskLastIDs, AnswerLastIDs = register(
LastIDs)
AskPartitionTable, AnswerPartitionTable = register(
PartitionTable)
SendPartitionTable = register(
NotifyPartitionTable)
NotifyPartitionChanges = register(
PartitionChanges)
StartOperation = register(
StartOperation)
StopOperation = register(
StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
UnfinishedTransactions)
AskLockedTransactions, AnswerLockedTransactions = register(
LockedTransactions)
AskFinalTID, AnswerFinalTID = register(
FinalTID)
ValidateTransaction = register(
ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction)
FailedVote = register(
FailedVote)
AskFinishTransaction, AnswerTransactionFinished = register(
FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register(
LockInformation, ignore_when_closed=False)
InvalidateObjects = register(
InvalidateObjects)
NotifyUnlockInformation = register(
UnlockInformation)
AskNewOIDs, AnswerNewOIDs = register(
GenerateOIDs)
NotifyDeadlock = register(
Deadlock)
AskRebaseTransaction, AnswerRebaseTransaction = register(
RebaseTransaction)
AskRebaseObject, AnswerRebaseObject = register(
RebaseObject)
AskStoreObject, AnswerStoreObject = register(
StoreObject)
AbortTransaction = register(
AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register(
StoreTransaction)
AskVoteTransaction, AnswerVoteTransaction = register(
VoteTransaction)
AskObject, AnswerObject = register(
GetObject)
AskTIDs, AnswerTIDs = register(
TIDList)
AskTransactionInformation, AnswerTransactionInformation = register(
TransactionInformation)
AskObjectHistory, AnswerObjectHistory = register(
ObjectHistory)
AskPartitionList, AnswerPartitionList = register(
PartitionList)
AskNodeList, AnswerNodeList = register(
NodeList)
SetNodeState = register(
SetNodeState, ignore_when_closed=False)
AddPendingNodes = register(
AddPendingNodes, ignore_when_closed=False)
TweakPartitionTable, AnswerTweakPartitionTable = register(
TweakPartitionTable)
SetNumReplicas = register(
SetNumReplicas, ignore_when_closed=False)
SetClusterState = register(
SetClusterState, ignore_when_closed=False)
Repair = register(
Repair)
NotifyRepair = register(
RepairOne)
NotifyClusterInformation = register(
ClusterInformation)
AskClusterState, AnswerClusterState = register(
ClusterState)
AskObjectUndoSerial, AnswerObjectUndoSerial = register(
ObjectUndoSerial)
AskTIDsFrom, AnswerTIDsFrom = register(
TIDListFrom)
AskPack, AnswerPack = register(
Pack, ignore_when_closed=False)
CheckReplicas = register(
CheckReplicas)
CheckPartition = register(
CheckPartition)
AskCheckTIDRange, AnswerCheckTIDRange = register(
CheckTIDRange)
AskCheckSerialRange, AnswerCheckSerialRange = register(
CheckSerialRange)
NotifyPartitionCorrupted = register(
PartitionCorrupted)
NotifyReady = register(
NotifyReady)
AskLastTransaction, AnswerLastTransaction = register(
LastTransaction)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
CheckCurrentSerial)
NotifyTransactionFinished = register(
NotifyTransactionFinished)
Replicate = register(
Replicate)
NotifyReplicationDone = register(
ReplicationDone)
AskFetchTransactions, AnswerFetchTransactions = register(
FetchTransactions)
AskFetchObjects, AnswerFetchObjects = register(
FetchObjects)
AddTransaction = register(
AddTransaction)
AddObject = register(
AddObject)
Truncate = register(
Truncate)
FlushLog = register(
FlushLog)
def Errors():
registry_dict = {}
handler_method_name_dict = {}
Error = Packets.Error
def register_error(code):
return lambda self, message='': Error(code, message)
for error in ErrorCodes:
......@@ -1856,19 +854,20 @@ from operator import itemgetter
def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
if node_list:
node_list.sort(key=_sort_key)
node_list = [(
uuid_str(uuid), str(node_type),
('[%s]:%s' if ':' in addr[0] else '%s:%s')
% addr if addr else '', str(state),
str(id_timestamp and datetime.utcfromtimestamp(id_timestamp)))
for node_type, addr, uuid, state, id_timestamp in node_list]
for node_type, addr, uuid, state, id_timestamp
in sorted(node_list, key=_sort_key)]
t = ''.join('%%-%us | ' % max(len(x[i]) for x in node_list)
for i in xrange(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list)
return ()
NotifyNodeInformation._neolog = staticmethod(lambda timestamp, node_list:
Packets.NotifyNodeInformation._neolog = staticmethod(
lambda timestamp, node_list:
((timestamp,), formatNodeList(node_list, ' ! ')))
Error._neolog = staticmethod(lambda *args: ((), ("%s (%s)" % args,)))
Packets.Error._neolog = staticmethod(lambda *args: ((), ("%s (%s)" % args,)))
......@@ -166,65 +166,6 @@ def parseMasterList(masters):
return map(parseNodeAddress, masters.split())
class ReadBuffer(object):
"""
Implementation of a lazy buffer. Main purpose if to reduce useless
copies of data by storing chunks and join them only when the requested
size is available.
TODO: For better performance, use:
- socket.recv_into (64kiB blocks)
- struct.unpack_from
- and a circular buffer of dynamic size (initial size:
twice the length passed to socket.recv_into ?)
"""
def __init__(self):
self.size = 0
self.content = deque()
def append(self, data):
""" Append some data and compute the new buffer size """
self.size += len(data)
self.content.append(data)
def __len__(self):
""" Return the current buffer size """
return self.size
def read(self, size):
""" Read and consume size bytes """
if self.size < size:
return None
self.size -= size
chunk_list = []
pop_chunk = self.content.popleft
append_data = chunk_list.append
to_read = size
# select required chunks
while to_read > 0:
chunk_data = pop_chunk()
to_read -= len(chunk_data)
append_data(chunk_data)
if to_read < 0:
# too many bytes consumed, cut the last chunk
last_chunk = chunk_list[-1]
keep, let = last_chunk[:to_read], last_chunk[to_read:]
self.content.appendleft(let)
chunk_list[-1] = keep
# join all chunks (one copy)
data = ''.join(chunk_list)
assert len(data) == size
return data
def clear(self):
""" Erase all buffer content """
self.size = 0
self.content.clear()
dummy_read_buffer = ReadBuffer()
dummy_read_buffer.append = lambda _: None
class cached_property(object):
"""
A property that is only computed once per instance and then replaces itself
......
......@@ -585,7 +585,9 @@ class Application(BaseApplication):
self.tm.executeQueuedEvents()
def startStorage(self, node):
node.send(Packets.StartOperation(self.backup_tid))
# XXX: Is this boolean 'backup' field needed ?
# Maybe this can be deduced from cluster state.
node.send(Packets.StartOperation(bool(self.backup_tid)))
uuid = node.getUUID()
assert uuid not in self.storage_starting_set
assert uuid not in self.storage_ready_dict
......
......@@ -157,27 +157,49 @@ class Log(object):
for x in 'uuid_str', 'Packets', 'PacketMalformedError':
setattr(self, x, g[x])
x = {}
try:
Unpacker = g['Unpacker']
except KeyError:
unpackb = None
else:
from msgpack import ExtraData, UnpackException
def unpackb(data):
u = Unpacker()
u.feed(data)
data = u.unpack()
if u.read_bytes(1):
raise ExtraData
return data
self.PacketMalformedError = UnpackException
self.unpackb = unpackb
if self._decode > 1:
PStruct = g['PStruct']
PBoolean = g['PBoolean']
def hasData(item):
items = item._items
for i, item in enumerate(items):
if isinstance(item, PStruct):
j = hasData(item)
if j:
return (i,) + j
elif (isinstance(item, PBoolean)
and item._name == 'compression'
and i + 2 < len(items)
and items[i+2]._name == 'data'):
return i,
for p in self.Packets.itervalues():
if p._fmt is not None:
path = hasData(p._fmt)
if path:
assert not hasattr(p, '_neolog'), p
x[p._code] = path
try:
PStruct = g['PStruct']
except KeyError:
for p in self.Packets.itervalues():
data_path = getattr(p, 'data_path', (None,))
if p._code >> 15 == data_path[0]:
x[p._code] = data_path[1:]
else:
PBoolean = g['PBoolean']
def hasData(item):
items = item._items
for i, item in enumerate(items):
if isinstance(item, PStruct):
j = hasData(item)
if j:
return (i,) + j
elif (isinstance(item, PBoolean)
and item._name == 'compression'
and i + 2 < len(items)
and items[i+2]._name == 'data'):
return i,
for p in self.Packets.itervalues():
if p._fmt is not None:
path = hasData(p._fmt)
if path:
assert not hasattr(p, '_neolog'), p
x[p._code] = path
self._getDataPath = x.get
try:
......@@ -215,11 +237,13 @@ class Log(object):
if body is not None:
log = getattr(p, '_neolog', None)
if log or self._decode:
p = p()
p._id = msg_id
p._body = body
try:
args = p.decode()
if self.unpackb:
args = self.unpackb(body)
else:
p = p()
p._body = body
args = p.decode()
except self.PacketMalformedError:
msg.append("Can't decode packet")
else:
......
......@@ -461,8 +461,12 @@ class SQLiteDatabaseManager(DatabaseManager):
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data WHERE id=?", (data_id,)).fetchone()
compression, checksum, data = self.query(
"SELECT compression, hash, value FROM data WHERE id=?",
(data_id,)).fetchone()
if checksum:
return compression, str(checksum), str(data)
return compression, checksum, data
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
......
......@@ -53,7 +53,7 @@ class ClientOperationHandler(BaseHandler):
p = Errors.TidNotFound('%s does not exist' % dump(tid))
else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
t[4], t[0])
bool(t[4]), t[0])
conn.answer(p)
def getEventQueue(self):
......
......@@ -212,7 +212,7 @@ class StorageOperationHandler(EventHandler):
# Sending such packet does not mark the connection
# for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user,
desc, ext, packed, ttid, oid_list), msg_id)
desc, ext, bool(packed), ttid, oid_list), msg_id)
# To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the
......@@ -248,7 +248,7 @@ class StorageOperationHandler(EventHandler):
for serial, oid in object_list:
oid_set = object_dict.get(serial)
if oid_set:
if type(oid_set) is list:
if type(oid_set) is tuple:
object_dict[serial] = oid_set = set(oid_set)
if oid in oid_set:
oid_set.remove(oid)
......
......@@ -71,7 +71,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid)
self.checkNoPacketSent(conn)
ptid = self.checkAskPacket(storage_conn, Packets.AskPack).decode()[0]
ptid = self.checkAskPacket(storage_conn, Packets.AskPack)._args[0]
self.assertEqual(ptid, tid)
self.assertTrue(self.app.packing[0] is conn)
self.assertEqual(self.app.packing[1], peer_id)
......@@ -83,7 +83,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid)
self.checkNoPacketSent(storage_conn)
status = self.checkAnswerPacket(conn, Packets.AnswerPack).decode()[0]
status = self.checkAnswerPacket(conn, Packets.AnswerPack)._args[0]
self.assertFalse(status)
if __name__ == '__main__':
......
......@@ -72,7 +72,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.service.answerPack(conn2, False)
packet = self.checkNotifyPacket(client_conn, Packets.AnswerPack)
# TODO: verify packet peer id
self.assertTrue(packet.decode()[0])
self.assertTrue(packet._args[0])
self.assertEqual(self.app.packing, None)
if __name__ == '__main__':
......
......@@ -33,9 +33,9 @@ class HandlerTests(NeoUnitTestBase):
def getFakePacket(self):
p = Mock({
'decode': (),
'__repr__': 'Fake Packet',
})
p._args = ()
p.handler_method_name = 'fake_method'
return p
......@@ -53,13 +53,6 @@ class HandlerTests(NeoUnitTestBase):
self.handler.dispatch(conn, packet)
self.checkErrorPacket(conn)
self.checkAborted(conn)
# raise PacketMalformedError
conn.mockCalledMethods = {}
def fake(c):
raise PacketMalformedError('message')
self.setFakeMethod(fake)
self.handler.dispatch(conn, packet)
self.checkClosed(conn)
# raise NotReadyError
conn.mockCalledMethods = {}
def fake(c):
......
......@@ -17,7 +17,7 @@
import unittest
import socket
from . import NeoUnitTestBase
from neo.lib.util import ReadBuffer, parseNodeAddress
from neo.lib.util import parseNodeAddress
class UtilTests(NeoUnitTestBase):
......@@ -40,24 +40,6 @@ class UtilTests(NeoUnitTestBase):
self.assertIn(parseNodeAddress('localhost'), local_address(0))
self.assertIn(parseNodeAddress('localhost:10'), local_address(10))
def testReadBufferRead(self):
""" Append some chunk then consume the data """
buf = ReadBuffer()
self.assertEqual(len(buf), 0)
buf.append('abc')
self.assertEqual(len(buf), 3)
# no enough data
self.assertEqual(buf.read(4), None)
self.assertEqual(len(buf), 3)
buf.append('def')
# consume a part
self.assertEqual(len(buf), 6)
self.assertEqual(buf.read(4), 'abcd')
self.assertEqual(len(buf), 2)
# consume the rest
self.assertEqual(buf.read(3), None)
self.assertEqual(buf.read(2), 'ef')
if __name__ == "__main__":
unittest.main()
......@@ -1340,7 +1340,7 @@ class Test(NEOThreadedTest):
# Also check that the master reset the last oid to a correct value.
t.begin()
self.assertEqual(1, u64(c.root()['x']._p_oid))
self.assertFalse(cluster.client.new_oid_list)
self.assertFalse(cluster.client.new_oids)
self.assertEqual(2, u64(cluster.client.new_oid()))
@with_cluster()
......@@ -2106,7 +2106,7 @@ class Test(NEOThreadedTest):
except threading.ThreadError:
l[j].acquire()
threads[j-1].start()
if x != 'StoreTransaction':
if x != 'AskStoreTransaction':
try:
l[i].acquire()
except IndexError:
......@@ -2183,15 +2183,16 @@ class Test(NEOThreadedTest):
x = self._testComplexDeadlockAvoidanceWithOneStorage(changes,
(1, 1, 0, 1, 2, 2, 2, 2, 0, 1, 2, 1, 0, 0, 1, 0, 0, 1),
('tpc_begin', 'tpc_begin', 1, 2, 3, 'tpc_begin', 1, 2, 4, 3, 4,
'StoreTransaction', 'RebaseTransaction', 'RebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'RebaseTransaction', 'AnswerRebaseTransaction'),
'AskStoreTransaction', 'AskRebaseTransaction',
'AskRebaseTransaction', 'AnswerRebaseTransaction',
'AnswerRebaseTransaction', 'AskRebaseTransaction',
'AnswerRebaseTransaction'),
[4, 6, 2, 6])
try:
x[1].remove(1)
except ValueError:
pass
self.assertEqual(x, {0: [2, 'StoreTransaction'], 1: ['tpc_abort']})
self.assertEqual(x, {0: [2, 'AskStoreTransaction'], 1: ['tpc_abort']})
def testCascadedDeadlockAvoidanceWithOneStorage2(self):
def changes(r1, r2, r3):
......@@ -2214,8 +2215,8 @@ class Test(NEOThreadedTest):
(0, 1, 1, 0, 1, 2, 2, 2, 2, 0, 1, 2, 1,
0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1),
('tpc_begin', 1, 'tpc_begin', 1, 2, 3, 'tpc_begin',
2, 3, 4, 3, 4, 'StoreTransaction', 'RebaseTransaction',
'RebaseTransaction', 'AnswerRebaseTransaction'),
2, 3, 4, 3, 4, 'AskStoreTransaction', 'AskRebaseTransaction',
'AskRebaseTransaction', 'AnswerRebaseTransaction'),
[1, 7, 9, 0])
x[0].sort(key=str)
try:
......@@ -2224,8 +2225,8 @@ class Test(NEOThreadedTest):
pass
self.assertEqual(x, {
0: [2, 3, 'AnswerRebaseTransaction',
'RebaseTransaction', 'StoreTransaction'],
1: ['AnswerRebaseTransaction','RebaseTransaction',
'AskRebaseTransaction', 'AskStoreTransaction'],
1: ['AnswerRebaseTransaction','AskRebaseTransaction',
'AnswerRebaseTransaction', 'tpc_abort'],
})
......@@ -2258,7 +2259,7 @@ class Test(NEOThreadedTest):
end = self._testComplexDeadlockAvoidanceWithOneStorage(changes,
(0, 1, 1, 0, 1, 1, 0, 0, 2, 2, 2, 2, 1, vote_t2, tic_t1),
('tpc_begin', 1) * 2, [3, 0, 0, 0], None)
self.assertLessEqual(2, end[0].count('RebaseTransaction'))
self.assertLessEqual(2, end[0].count('AskRebaseTransaction'))
def testFailedConflictOnBigValueDuringDeadlockAvoidance(self):
def changes(r1, r2, r3):
......@@ -2274,10 +2275,10 @@ class Test(NEOThreadedTest):
x = self._testComplexDeadlockAvoidanceWithOneStorage(changes,
(1, 1, 1, 2, 2, 2, 1, 2, 2, 0, 0, 1, 1, 1, 0),
('tpc_begin', 'tpc_begin', 1, 2, 'tpc_begin', 1, 3, 3, 4,
'StoreTransaction', 2, 4, 'RebaseTransaction',
'AskStoreTransaction', 2, 4, 'AskRebaseTransaction',
'AnswerRebaseTransaction', 'tpc_abort'),
[5, 1, 0, 2], POSException.ConflictError)
self.assertEqual(x, {0: ['StoreTransaction']})
self.assertEqual(x, {0: ['AskStoreTransaction']})
@with_cluster(replicas=1, partitions=4)
def testNotifyReplicated(self, cluster):
......@@ -2364,7 +2365,7 @@ class Test(NEOThreadedTest):
def delayConflict(conn, packet):
app = self.getConnectionApp(conn)
if (isinstance(packet, Packets.AnswerStoreObject)
and packet.decode()[0]):
and packet._args[0]):
conn, = cluster.client.getConnectionList(app)
kw = conn._handlers._pending[0][0][packet._id][1]
return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop()
......@@ -2382,8 +2383,9 @@ class Test(NEOThreadedTest):
self.thread_switcher(threads,
(1, 2, 3, 0, 1, 0, 2, t3_c, 1, 3, 2, t3_resolve, 0, 0, 0,
t1_rebase, 2, t3_b, 3, t4_d, 0, 2, 2),
('tpc_begin', 'tpc_begin', 'tpc_begin', 'tpc_begin', 2, 1, 1,
3, 3, 4, 4, 3, 1, 'RebaseTransaction', 'RebaseTransaction',
('tpc_begin', 'tpc_begin', 'tpc_begin', 'tpc_begin',
2, 1, 1, 3, 3, 4, 4, 3, 1,
'AskRebaseTransaction', 'AskRebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction', 2
)) as end:
delay = f.delayAskFetchTransactions()
......@@ -2395,11 +2397,11 @@ class Test(NEOThreadedTest):
t4.begin()
self.assertEqual([15, 11, 13, 16], [r[x].value for x in 'abcd'])
self.assertEqual([2, 2], map(end.pop(2).count,
['RebaseTransaction', 'AnswerRebaseTransaction']))
['AskRebaseTransaction', 'AnswerRebaseTransaction']))
self.assertEqual(end, {
0: [1, 'StoreTransaction'],
1: ['StoreTransaction'],
3: [4, 'StoreTransaction'],
0: [1, 'AskStoreTransaction'],
1: ['AskStoreTransaction'],
3: [4, 'AskStoreTransaction'],
})
self.assertFalse(s1.dm.getOrphanList())
......@@ -2435,7 +2437,8 @@ class Test(NEOThreadedTest):
self.thread_switcher((thread,),
(1, 0, 1, 1, t2_b, 0, 0, 1, t2_vote, 0, 0),
('tpc_begin', 'tpc_begin', 1, 1, 2, 2,
'RebaseTransaction', 'RebaseTransaction', 'StoreTransaction',
'AskRebaseTransaction', 'AskRebaseTransaction',
'AskStoreTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
)) as end:
delay = f.delayAskFetchTransactions()
......@@ -2498,9 +2501,10 @@ class Test(NEOThreadedTest):
with self.thread_switcher((commit23,),
(1, 1, 0, 0, t1_rebase, 0, 0, 0, 1, 1, 1, 1, 0),
('tpc_begin', 'tpc_begin', 0, 1, 0,
'RebaseTransaction', 'RebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'StoreTransaction', 'tpc_begin', 1, 'tpc_abort')) as end:
'AskRebaseTransaction', 'AskRebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'AskStoreTransaction', 'tpc_begin', 1, 'tpc_abort',
)) as end:
self.assertRaises(POSException.ConflictError, t1.commit)
commit23.join()
self.assertEqual(end, {0: ['tpc_abort']})
......@@ -2587,9 +2591,9 @@ class Test(NEOThreadedTest):
self.thread_switcher((commit2,),
(1, 1, 0, 0, t1_b, t1_resolve, 0, 0, 0, 0, 1, t2_vote, t1_end),
('tpc_begin', 'tpc_begin', 2, 1, 2, 1, 1,
'RebaseTransaction', 'RebaseTransaction',
'AskRebaseTransaction', 'AskRebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'StoreTransaction')) as end:
'AskStoreTransaction')) as end:
t1.commit()
commit2.join()
t1.begin()
......@@ -2597,7 +2601,7 @@ class Test(NEOThreadedTest):
self.assertEqual(r['a'].value, 9)
self.assertEqual(r['b'].value, 6)
t1 = end.pop(0)
self.assertEqual(t1.pop(), 'StoreTransaction')
self.assertEqual(t1.pop(), 'AskStoreTransaction')
self.assertEqual(sorted(t1), [1, 2])
self.assertFalse(end)
self.assertPartitionTable(cluster, 'UU|UU')
......@@ -2699,9 +2703,9 @@ class Test(NEOThreadedTest):
with Patch(cluster.client, _loadFromStorage=load) as p, \
self.thread_switcher((commit2,),
(1, 0, tic1, 0, t1_resolve, 1, t2_begin, 0, 1, 1, 0),
('tpc_begin', 'tpc_begin', 1, 1, 1, 'StoreTransaction',
'tpc_begin', 'RebaseTransaction', 'RebaseTransaction', 1,
'StoreTransaction')) as end:
('tpc_begin', 'tpc_begin', 1, 1, 1, 'AskStoreTransaction',
'tpc_begin', 'AskRebaseTransaction', 'AskRebaseTransaction',
1, 'AskStoreTransaction')) as end:
self.assertRaisesRegexp(NEOStorageError,
'^partition 0 not fully write-locked$',
t1.commit)
......@@ -2754,13 +2758,14 @@ class Test(NEOThreadedTest):
f.remove(delayFinish)
with self.thread_switcher((commit2,),
(1, 0, 0, 1, t2_b, 0, t1_resolve),
('tpc_begin', 'tpc_begin', 0, 2, 2, 'StoreTransaction')) as end:
('tpc_begin', 'tpc_begin', 0, 2, 2, 'AskStoreTransaction')
) as end:
t1.commit()
commit2.join()
t1.begin()
self.assertEqual(c1.root()['b'].value, 6)
self.assertPartitionTable(cluster, 'UU|UU')
self.assertEqual(end, {0: [2, 2, 'StoreTransaction']})
self.assertEqual(end, {0: [2, 2, 'AskStoreTransaction']})
self.assertFalse(s1.dm.getOrphanList())
@with_cluster(storage_count=2, partitions=2)
......@@ -2783,19 +2788,19 @@ class Test(NEOThreadedTest):
yield 1
self.tic()
with self.thread_switcher((t,), (1, 0, 1, 0, t1_b, 0, 0, 0, 1),
('tpc_begin', 'tpc_begin', 1, 3, 3, 1, 'RebaseTransaction',
('tpc_begin', 'tpc_begin', 1, 3, 3, 1, 'AskRebaseTransaction',
2, 'AnswerRebaseTransaction')) as end:
t1.commit()
t.join()
t2.begin()
self.assertEqual([6, 9, 6], [r[x].value for x in 'abc'])
self.assertEqual([2, 2], map(end.pop(1).count,
['RebaseTransaction', 'AnswerRebaseTransaction']))
['AskRebaseTransaction', 'AnswerRebaseTransaction']))
# Rarely, there's an extra deadlock for t1:
# 0: ['AnswerRebaseTransaction', 'RebaseTransaction',
# 'RebaseTransaction', 'AnswerRebaseTransaction',
# 0: ['AnswerRebaseTransaction', 'AskRebaseTransaction',
# 'AskRebaseTransaction', 'AnswerRebaseTransaction',
# 'AnswerRebaseTransaction', 2, 3, 1,
# 'StoreTransaction', 'VoteTransaction']
# 'AskStoreTransaction', 'VoteTransaction']
self.assertEqual(end.pop(0)[0], 'AnswerRebaseTransaction')
self.assertFalse(end)
......@@ -2825,13 +2830,13 @@ class Test(NEOThreadedTest):
threads = map(self.newPausedThread, (t2.commit, t3.commit))
with self.thread_switcher(threads, (1, 2, 0, 1, 2, 1, 0, 2, 0, 1, 2),
('tpc_begin', 'tpc_begin', 'tpc_begin', 1, 2, 3, 4, 4, 4,
'RebaseTransaction', 'StoreTransaction')) as end:
'AskRebaseTransaction', 'AskStoreTransaction')) as end:
t1.commit()
for t in threads:
t.join()
self.assertEqual(end, {
0: ['AnswerRebaseTransaction', 'StoreTransaction'],
2: ['StoreTransaction']})
0: ['AnswerRebaseTransaction', 'AskStoreTransaction'],
2: ['AskStoreTransaction']})
@with_cluster(replicas=1)
def testConflictAfterDeadlockWithSlowReplica1(self, cluster,
......@@ -2874,16 +2879,16 @@ class Test(NEOThreadedTest):
order[-1] = t1_resolve
delay = f.delayAskStoreObject()
with self.thread_switcher((t,), order,
('tpc_begin', 'tpc_begin', 1, 1, 2, 2, 'RebaseTransaction',
'RebaseTransaction', 'AnswerRebaseTransaction',
'StoreTransaction')) as end:
('tpc_begin', 'tpc_begin', 1, 1, 2, 2, 'AskRebaseTransaction',
'AskRebaseTransaction', 'AnswerRebaseTransaction',
'AskStoreTransaction')) as end:
t1.commit()
t.join()
self.assertNotIn(delay, f)
t2.begin()
end[0].sort(key=str)
self.assertEqual(end, {0: [1, 'AnswerRebaseTransaction',
'StoreTransaction']})
'AskStoreTransaction']})
self.assertEqual([4, 2], [r[x].value for x in 'ab'])
def testConflictAfterDeadlockWithSlowReplica2(self):
......@@ -2934,7 +2939,7 @@ class Test(NEOThreadedTest):
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.RequestIdentification)
and packet.decode()[0] == NodeTypes.STORAGE)
and packet._args[0] == NodeTypes.STORAGE)
self.tic()
m2.start()
self.tic()
......@@ -2974,7 +2979,7 @@ class Test(NEOThreadedTest):
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.RequestIdentification)
and packet.decode()[0] == NodeTypes.MASTER)
and packet._args[0] == NodeTypes.MASTER)
cluster.start(recovering=True)
neoctl = cluster.neoctl
getClusterState = neoctl.getClusterState
......
......@@ -113,7 +113,7 @@ class ReplicationTests(NEOThreadedTest):
importZODB(3)
def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode()
tid, upstream_name, source_dict = packet._args
return not upstream_name and all(source_dict.itervalues())
# U -> B propagation
with NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
......@@ -513,7 +513,7 @@ class ReplicationTests(NEOThreadedTest):
"""
def delayAskFetch(conn, packet):
return isinstance(packet, delayed) and \
packet.decode()[0] == offset and \
packet._args[0] == offset and \
conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, num_replicas, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
......@@ -768,7 +768,7 @@ class ReplicationTests(NEOThreadedTest):
def logReplication(conn, packet):
if isinstance(packet, (Packets.AskFetchTransactions,
Packets.AskFetchObjects)):
ask.append(packet.decode()[2:])
ask.append(packet._args[2:])
def getTIDList():
return [t.tid for t in c.db().storage.iterator()]
s0, s1 = cluster.storage_list
......@@ -869,7 +869,7 @@ class ReplicationTests(NEOThreadedTest):
return True
elif not isinstance(packet, Packets.AskFetchTransactions):
return
ask.append(packet.decode())
ask.append(packet._args)
conn, = upstream.master.getConnectionList(backup.master)
with ConnectionFilter() as f, Patch(replicator.Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset):
......@@ -930,11 +930,11 @@ class ReplicationTests(NEOThreadedTest):
@f.add
def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchTransactions):
trans.append(packet.decode()[2])
trans.append(packet._args[2])
elif isinstance(packet, Packets.AskFetchObjects):
if obj:
return True
obj.append(packet.decode()[2])
obj.append(packet._args[2])
s2.start()
self.tic()
cluster.neoctl.enableStorageList([s2.uuid])
......@@ -1021,7 +1021,7 @@ class ReplicationTests(NEOThreadedTest):
def expected(changed):
s0 = 1, CellStates.UP_TO_DATE
s = CellStates.OUT_OF_DATE if changed else CellStates.UP_TO_DATE
return changed, 3 * [[s0, (2, s)], [s0, (3, s)]]
return changed, 3 * ((s0, (2, s)), (s0, (3, s)))
for dry_run in True, False:
self.assertEqual(expected(True),
cluster.neoctl.tweakPartitionTable(drop_list, dry_run))
......
......@@ -53,7 +53,7 @@ extras_require = {
'master': [],
'storage-sqlite': [],
'storage-mysqldb': ['mysqlclient'],
'storage-importer': zodb_require + ['msgpack>=0.5.6', 'setproctitle'],
'storage-importer': zodb_require + ['setproctitle'],
}
extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2',
'neoppod[%s]' % ', '.join(extras_require)]
......@@ -109,6 +109,7 @@ setup(
],
},
install_requires = [
'msgpack>=0.5.6',
'python-dateutil', # neolog --from
],
extras_require = extras_require,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment