Commit 38e98a12 authored by Julien Muchembled's avatar Julien Muchembled

qa: new tool to stress-test NEO

Example output:

    stress: yes (toggle with F1)
    cluster state: RUNNING
    last oid: 0x44c0
    last tid: 0x3cdee272ef19355 (2019-02-26 15:35:11.002419)
    clients: 2308, 2311, 2302, 2173, 2226, 2215, 2306, 2255, 2314, 2356 (+48)
            8m53.988s (42.633861/s)
    pt id: 4107
        RRRDDRRR
     0: OU......
     1: ..UO....
     2: ....OU..
     3: ......UU
     4: OU......
     5: ..UO....
     6: ....OU..
     7: ......UU
     8: OU......
     9: ..UO....
    10: ....OU..
    11: ......UU
    12: OU......
    13: ..UO....
    14: ....OU..
    15: ......UU
    16: OU......
    17: ..UO....
    18: ....OU..
    19: ......UU
    20: OU......
    21: ..UO....
    22: ....OU..
    23: ......UU
parent ce25e429
...@@ -6,5 +6,6 @@ ...@@ -6,5 +6,6 @@
/build/ /build/
/dist/ /dist/
/htmlcov/ /htmlcov/
/neo/tests/ConflictFree.py
/neo/tests/mock.py /neo/tests/mock.py
/neoppod.egg-info/ /neoppod.egg-info/
...@@ -45,50 +45,12 @@ if IF == 'pdb': ...@@ -45,50 +45,12 @@ if IF == 'pdb':
#('ZPublisher.Publish', 'publish_module_standard'), #('ZPublisher.Publish', 'publish_module_standard'),
) )
import errno, socket, threading, weakref import socket, threading, weakref
# Unfortunately, IPython does not always print to given stdout. from neo.lib.debug import PdbSocket
#from neo.lib.debug import getPdb # We don't use the one from neo.lib.debug because unfortunately,
# IPython does not always print to given stdout.
from pdb import Pdb as getPdb from pdb import Pdb as getPdb
class Socket(object):
def __init__(self, socket):
# In case that the default timeout is not None.
socket.settimeout(None)
self._socket = socket
self._buf = ''
def write(self, data):
self._socket.send(data)
def readline(self):
recv = self._socket.recv
data = self._buf
while True:
i = 1 + data.find('\n')
if i:
self._buf = data[i:]
return data[:i]
d = recv(4096)
data += d
if not d:
self._buf = ''
return data
def flush(self):
pass
def closed(self):
self._socket.setblocking(0)
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
def pdb(): def pdb():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
...@@ -98,7 +60,7 @@ if IF == 'pdb': ...@@ -98,7 +60,7 @@ if IF == 'pdb':
s.listen(0) s.listen(0)
print 'Listening to %u' % s.getsockname()[1] print 'Listening to %u' % s.getsockname()[1]
sys.stdout.flush() # BBB: On Python 3, print() takes a 'flush' arg. sys.stdout.flush() # BBB: On Python 3, print() takes a 'flush' arg.
_socket = Socket(s.accept()[0]) _socket = PdbSocket(s.accept()[0])
finally: finally:
s.close() s.close()
try: try:
...@@ -155,9 +117,12 @@ if IF == 'pdb': ...@@ -155,9 +117,12 @@ if IF == 'pdb':
if BP: if BP:
setupBreakPoints(BP) setupBreakPoints(BP)
else: else:
threading.Thread(target=pdb).start() threading.Thread(target=pdb, name='pdb').start()
elif IF == 'frames': elif IF == 'frames':
# WARNING: Because of https://bugs.python.org/issue17094, the output is
# usually incorrect for subprocesses started by the functional
# test framework.
import traceback import traceback
write = sys.stderr.write write = sys.stderr.write
for thread_id, frame in sys._current_frames().iteritems(): for thread_id, frame in sys._current_frames().iteritems():
......
...@@ -34,6 +34,7 @@ class SocketConnector(object): ...@@ -34,6 +34,7 @@ class SocketConnector(object):
is_closed = is_server = None is_closed = is_server = None
connect_limit = {} connect_limit = {}
CONNECT_LIMIT = 1 CONNECT_LIMIT = 1
KEEPALIVE = 60, 3, 10
SOMAXCONN = 5 # for threaded tests SOMAXCONN = 5 # for threaded tests
def __new__(cls, addr, s=None): def __new__(cls, addr, s=None):
...@@ -66,9 +67,10 @@ class SocketConnector(object): ...@@ -66,9 +67,10 @@ class SocketConnector(object):
# The following 3 lines are specific to Linux. It seems that OSX # The following 3 lines are specific to Linux. It seems that OSX
# has similar options (TCP_KEEPALIVE/TCP_KEEPINTVL/TCP_KEEPCNT), # has similar options (TCP_KEEPALIVE/TCP_KEEPINTVL/TCP_KEEPCNT),
# and Windows has SIO_KEEPALIVE_VALS (fixed count of 10). # and Windows has SIO_KEEPALIVE_VALS (fixed count of 10).
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) idle, cnt, intvl = self.KEEPALIVE
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, cnt)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, intvl)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency # disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
......
...@@ -14,11 +14,7 @@ ...@@ -14,11 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback import errno, imp, os, signal, socket, sys, traceback
import signal
import imp
import os
import sys
from functools import wraps from functools import wraps
import neo import neo
...@@ -82,9 +78,55 @@ def winpdb(depth=0): ...@@ -82,9 +78,55 @@ def winpdb(depth=0):
os.abort() os.abort()
def register(on_log=None): def register(on_log=None):
try:
if on_log is not None: if on_log is not None:
@safe_handler @safe_handler
def on_log_signal(signum, signal): def on_log_signal(signum, signal):
on_log() on_log()
signal.signal(signal.SIGRTMIN+2, on_log_signal) signal.signal(signal.SIGRTMIN+2, on_log_signal)
signal.signal(signal.SIGRTMIN+3, debugHandler) signal.signal(signal.SIGRTMIN+3, debugHandler)
except ValueError: # signal only works in main thread
pass
class PdbSocket(object):
def __init__(self, socket):
# In case that the default timeout is not None.
socket.settimeout(None)
self._socket = socket
self._buf = ''
def close(self):
self._socket.close()
def write(self, data):
self._socket.send(data)
def readline(self):
recv = self._socket.recv
data = self._buf
while True:
i = 1 + data.find('\n')
if i:
self._buf = data[i:]
return data[:i]
d = recv(4096)
data += d
if not d:
self._buf = ''
return data
def flush(self):
pass
def closed(self):
self._socket.setblocking(0)
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
...@@ -15,9 +15,8 @@ ...@@ -15,9 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import thread, threading, weakref import thread, threading, weakref
from . import logging from . import debug, logging
from .app import BaseApplication from .app import BaseApplication
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher from .dispatcher import Dispatcher
from .locking import SimpleQueue from .locking import SimpleQueue
...@@ -28,7 +27,10 @@ class app_set(weakref.WeakSet): ...@@ -28,7 +27,10 @@ class app_set(weakref.WeakSet):
app.log() app.log()
app_set = app_set() app_set = app_set()
registerLiveDebugger(app_set.on_log)
def registerLiveDebugger():
debug.register(app_set.on_log)
registerLiveDebugger()
class ThreadContainer(threading.local): class ThreadContainer(threading.local):
......
...@@ -86,8 +86,6 @@ SSL = SSL + "ca.crt", SSL + "node.crt", SSL + "node.key" ...@@ -86,8 +86,6 @@ SSL = SSL + "ca.crt", SSL + "node.crt", SSL + "node.key"
logging.default_root_handler.handle = lambda record: None logging.default_root_handler.handle = lambda record: None
debug.register() debug.register()
# prevent "signal only works in main thread" errors in subprocesses
debug.register = lambda on_log=None: None
def mockDefaultValue(name, function): def mockDefaultValue(name, function):
def method(self, *args, **kw): def method(self, *args, **kw):
......
...@@ -118,7 +118,7 @@ class PortAllocator(object): ...@@ -118,7 +118,7 @@ class PortAllocator(object):
class Process(object): class Process(object):
_coverage_fd = None _coverage_fd = None
_coverage_prefix = os.path.join(getTempDirectory(), 'coverage-') _coverage_prefix = None
_coverage_index = 0 _coverage_index = 0
on_fork = [logging.resetNids] on_fork = [logging.resetNids]
pid = 0 pid = 0
...@@ -147,6 +147,9 @@ class Process(object): ...@@ -147,6 +147,9 @@ class Process(object):
if coverage: if coverage:
cls = self.__class__ cls = self.__class__
cls._coverage_index += 1 cls._coverage_index += 1
if not cls._coverage_prefix:
cls._coverage_prefix = os.path.join(
getTempDirectory(), 'coverage-')
coverage_data_path = cls._coverage_prefix + str(cls._coverage_index) coverage_data_path = cls._coverage_prefix + str(cls._coverage_index)
self._coverage_fd, w = os.pipe() self._coverage_fd, w = os.pipe()
def save_coverage(*args): def save_coverage(*args):
...@@ -294,6 +297,10 @@ class NEOProcess(Process): ...@@ -294,6 +297,10 @@ class NEOProcess(Process):
""" """
self.uuid = uuid self.uuid = uuid
@property
def logfile(self):
return self.arg_dict['logfile']
class NEOCluster(object): class NEOCluster(object):
SSL = None SSL = None
...@@ -485,14 +492,15 @@ class NEOCluster(object): ...@@ -485,14 +492,15 @@ class NEOCluster(object):
except (AlreadyStopped, NodeProcessError): except (AlreadyStopped, NodeProcessError):
pass pass
def getZODBStorage(self, **kw): def getClientConfig(self, **kw):
master_nodes = self.master_nodes.replace('/', ' ') kw['name'] = self.cluster_name
kw['master_nodes'] = self.master_nodes.replace('/', ' ')
if self.SSL: if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL kw['ca'], kw['cert'], kw['key'] = self.SSL
result = Storage( return kw
master_nodes=master_nodes,
name=self.cluster_name, def getZODBStorage(self, **kw):
**kw) result = Storage(**self.getClientConfig(**kw))
result.app.max_reconnection_to_master = 10 result.app.max_reconnection_to_master = 10
self.zodb_storage_list.append(result) self.zodb_storage_list.append(result)
return result return result
......
# tools/stress is split in such a way that this file can be reused to
# implement another tool to stress an existing cluster, which would be filled
# by a real application.
import curses, os, random, re, select, threading, time
from collections import deque
from neo.lib import logging, protocol
from neo.lib.app import BaseApplication
from neo.lib.debug import register as registerLiveDebugger
from neo.lib.exception import PrimaryFailure
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.admin.app import Application as AdminApplication
from neo.admin.handler import MasterEventHandler
class Handler(MasterEventHandler):
def answerClusterState(self, conn, state):
super(Handler, self).answerClusterState(conn, state)
self.app.refresh('state')
def answerPartitionTable(self, *args):
super(Handler, self).answerPartitionTable(*args)
self.app.refresh('pt')
def sendPartitionTable(self, *args):
raise AssertionError
def notifyPartitionChanges(self, *args):
super(Handler, self).notifyPartitionChanges(*args)
self.app.refresh('pt')
def answerLastIDs(self, conn, *args):
self.app.answerLastIDs(*args)
def notifyNodeInformation(self, conn, timestamp, node_list):
for node_type, addr, uuid, state, id_timestamp in node_list:
if node_type == NodeTypes.CLIENT and state == NodeStates.UNKNOWN:
self.app.clientDown()
break
getStorageList = self.app.nm.getStorageList
before = [node for node in getStorageList() if node.isRunning()]
super(Handler, self).notifyNodeInformation(conn, timestamp, node_list)
self.app.notifyNodeInformation(
{node for node in getStorageList() if node.isRunning()}
.difference(before))
class StressApplication(AdminApplication):
cluster_state = server = uuid = None
listening_conn = True
restart_ratio = float('inf') # no firewall support
_stress = False
def __init__(self, ssl, master_nodes):
BaseApplication.__init__(self, ssl)
for address in master_nodes:
self.nm.createMaster(address=address)
self.pt = None
self.master_event_handler = Handler(self)
self.reset()
registerLiveDebugger(on_log=self.log)
self.failing = set()
self.restart_lock = threading.Lock()
def close(self):
BaseApplication.close(self)
def run(self):
visibility = None
from logging import disable, ERROR
disable(ERROR)
self.stdscr = curses.initscr()
try:
curses.noecho()
curses.cbreak()
self.stdscr.keypad(1)
visibility = curses.curs_set(0)
self._run()
finally:
if visibility:
curses.curs_set(visibility)
self.stdscr.keypad(0)
curses.echo()
curses.nocbreak()
curses.endwin()
def _run(self):
stdscr = self.stdscr
r, w = os.pipe()
l = threading.Lock()
stdscr.nodelay(1)
input_queue = deque()
def input_read():
x = []
while 1:
c = stdscr.getch()
if c < 0:
if x:
input_queue.append(x)
return input_queue
x.append(c)
def input_thread():
try:
poll = select.poll()
poll.register(0, select.POLLIN)
poll.register(r, select.POLLIN)
while 1:
for fd, _ in poll.poll():
if fd:
return
with l:
empty = not input_queue
if input_read() and empty:
self.em.wakeup()
finally:
os.close(r)
t = threading.Thread(target=input_thread)
t.deamon = True
wait = None
try:
t.start()
self.startCluster()
self.refresh('stress', False)
while 1:
self.failing.clear()
try:
self.connectToPrimary()
self.askLastIDs()
while 1:
self.em.poll(1)
with l:
if input_read():
for x in input_queue:
try:
x, = x
except ValueError:
continue
if x == curses.KEY_RESIZE:
self.refresh()
elif x == curses.KEY_F1:
self.stress()
else:
try:
x = chr(x)
except ValueError:
continue
if x == 'q':
return
input_queue.clear()
except PrimaryFailure:
logging.error('primary master is down')
if self.cluster_state == ClusterStates.STOPPING:
break
self.primaryFailure()
finally:
if self._stress:
self.stress()
wait = time.time()
finally:
os.write(w, '\0')
os.close(w)
t.join()
self.stopCluster(wait)
def primaryFailure(self):
raise
def startCluster(self):
raise NotImplementedError
def stopCluster(self, wait):
raise NotImplementedError
def clientDown(self):
send = self.master_conn.send
send(Packets.FlushLog())
send(Packets.SetClusterState(ClusterStates.STOPPING))
def notifyNodeInformation(self, node_list):
for node in node_list:
self.failing.discard(node.getUUID())
def askLastIDs(self):
conn = self.master_conn
if conn:
conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, loid, ltid):
self.loid = loid
self.ltid = ltid
self.em.setTimeout(int(time.time() + 1), self.askLastIDs)
if self._stress:
node_list = self.nm.getStorageList()
random.shuffle(node_list)
fw = []
kill = []
restart_ratio = self.restart_ratio
for node in node_list:
nid = node.getUUID()
if nid in self.failing:
if restart_ratio <= 1:
fw.append(nid)
continue
running = node.isRunning()
if running or restart_ratio <= 1:
self.failing.add(nid)
if self.pt.operational(self.failing):
(kill if running and random.random() < restart_ratio
else fw).append(nid)
if len(self.failing) == self._fault_count:
break
else:
self.failing.remove(nid)
if fw or kill:
for nid in fw:
self.tcpReset(nid)
if kill:
t = threading.Thread(target=self._restart, args=kill)
t.daemon = 1
t.start()
self.refresh('pt', False)
self.refresh('ids')
def _restart(self, *nids):
with self.restart_lock:
self.restartStorages(nids)
def tcpReset(self, nid):
raise NotImplementedError
def restartStorages(self, nids):
raise NotImplementedError
def refresh(self, what=None, do=True):
stdscr = self.stdscr
try:
y = 0
if what in (None, 'stress'):
stdscr.addstr(y, 0, 'stress: %s (toggle with F1)\n'
% ('yes' if self._stress else 'no'))
y += 1
if what in (None, 'state'):
stdscr.addstr(y, 0, 'cluster state: %s\n' % self.cluster_state)
y += 1
if what in (None, 'ids'):
self.refresh_ids(y)
h = stdscr.getyx()[0] - y
clear = self._ids_height - h
if clear:
self._ids_height = h
what = None
else:
clear = None
y += self._ids_height
if what in (None, 'pt'):
pt = self.pt
n = len(str(pt.np-1))
node_list = sorted(pt.count_dict)
attr = curses.A_NORMAL, curses.A_BOLD
stdscr.addstr(y, 0, 'pt id: %s\n %s' % (pt.getID(), ' ' * n))
for node in node_list:
stdscr.addstr(
protocol.node_state_prefix_dict[node.getState()],
attr[node.getUUID() in self.failing])
stdscr.addstr('\n')
x = '%{}s'.format(n)
n = pt.nr + 1
split = re.compile('[^OC]+|[OC]+').findall
for i, r in enumerate(pt._formatRows(node_list)):
stdscr.addstr(x % i, attr[r.count('U') != n])
for i, r in enumerate(split(': %s\n' % r)):
stdscr.addstr(r, attr[i & 1])
if clear:
stdscr.addstr('\n' * clear)
except curses.error:
pass
if do:
stdscr.refresh()
# _ids_height
def refresh_ids(self, y):
raise NotImplementedError
def stress(self):
self._stress = not self._stress
self.refresh('stress')
...@@ -14,20 +14,35 @@ Topic :: Database ...@@ -14,20 +14,35 @@ Topic :: Database
Topic :: Software Development :: Libraries :: Python Modules Topic :: Software Development :: Libraries :: Python Modules
""" """
mock = 'neo/tests/mock.py' def get3rdParty(name, tag, url, h, extract=lambda content, name: content):
if not os.path.exists(mock): path = 'neo/tests/' + name
import cStringIO, hashlib, subprocess, urllib, zipfile if os.path.exists(path):
x = 'pythonmock-0.1.0.zip' return
import hashlib, subprocess, urllib
try: try:
x = subprocess.check_output(('git', 'cat-file', 'blob', x)) x = subprocess.check_output(('git', 'cat-file', 'blob', tag))
except (OSError, subprocess.CalledProcessError): except (OSError, subprocess.CalledProcessError):
x = urllib.urlopen( x = urllib.urlopen(url).read()
'http://downloads.sf.net/sourceforge/python-mock/' + x).read() x = extract(x, name)
mock_py = zipfile.ZipFile(cStringIO.StringIO(x)).read('mock.py') if hashlib.sha256(x).hexdigest() != h:
if (hashlib.sha256(mock_py).hexdigest() != raise EnvironmentError("SHA checksum mismatch downloading '%s'" % name)
'c6ed26e4312ed82160016637a9b6f8baa71cf31a67c555d44045a1ef1d60d1bc'): with open(path, 'wb') as f:
raise EnvironmentError("SHA checksum mismatch downloading 'mock.py'") f.write(x)
open(mock, 'w').write(mock_py)
def unzip(content, name):
import io, zipfile
return zipfile.ZipFile(io.BytesIO(content)).read(name)
x = 'pythonmock-0.1.0.zip'
get3rdParty('mock.py', x,
'http://downloads.sf.net/sourceforge/python-mock/' + x,
'c6ed26e4312ed82160016637a9b6f8baa71cf31a67c555d44045a1ef1d60d1bc',
unzip)
x = 'ConflictFree.py'
get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5'
'/raw/14b0fcdcc31c5791646f9590678ca028f5d221f5/product/ERP5Type/' + x,
'abb7970856540fd02150edd1fa9a3a3e8d0074ec526ab189684ef7ea9b41825f')
zodb_require = ['ZODB3>=3.10dev'] zodb_require = ['ZODB3>=3.10dev']
...@@ -42,6 +57,9 @@ extras_require = { ...@@ -42,6 +57,9 @@ extras_require = {
} }
extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2', extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2',
'neoppod[%s]' % ', '.join(extras_require)] 'neoppod[%s]' % ', '.join(extras_require)]
extras_require['stress'] = ['NetfilterQueue', 'gevent', 'neoppod[tests]',
'cython-zstd', # recommended (log rotation)
]
try: try:
from docutils.core import publish_string from docutils.core import publish_string
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, print_function
import argparse, curses, errno, os, random, select
import signal, socket, subprocess, sys, threading, time
from contextlib import contextmanager
from datetime import datetime
from functools import partial
from multiprocessing import Lock, RawArray
from struct import Struct
from netfilterqueue import NetfilterQueue
import gevent.socket # preload for subprocesses
from neo.client.Storage import Storage
from neo.lib import logging, util
from neo.lib.connector import SocketConnector
from neo.lib.debug import PdbSocket
from neo.lib.node import Node
from neo.lib.protocol import NodeTypes
from neo.lib.util import timeStringFromTID, p64, u64
from neo.storage.app import DATABASE_MANAGER_DICT, \
Application as StorageApplication
from neo.tests import getTempDirectory
from neo.tests.ConflictFree import ConflictFreeLog
from neo.tests.functional import AlreadyStopped, NEOCluster, Process
from neo.tests.stress import StressApplication
from transaction import begin as transaction_begin
from ZODB import DB, POSException
INET = {
socket.AF_INET: ('ip', socket.IPPROTO_IP, socket.IP_TOS),
socket.AF_INET6: ('ip6', socket.IPPROTO_IPV6, socket.IPV6_TCLASS),
}
NFT_TEMPLATE = """\
table %s %s {
chain mangle {
type filter hook input priority -150
policy accept
%s dscp 1 tcp flags & (fin|syn|rst|ack) != syn jump nfqueue
}
chain nfqueue {
%s
}
chain filter {
type filter hook input priority 0
policy accept
meta l4proto tcp %s dscp 1 mark 1 counter reject with tcp reset
}
}"""
SocketConnector.KEEPALIVE = 5, 1, 1
def child_coverage(self):
# XXX: The dance to collect coverage results just before killing
# subprocesses does not work for processes that may run code that
# is not interruptible with Python code (e.g. Lock.acquire).
# For nodes with a single epoll loop, this is usually fine.
# On the other side, coverage support is broken for clients,
# like here: we just do some cleanup for the assertion in __del__
r = self._coverage_fd
if r is not None:
os.close(r)
del self._coverage_fd
Process.child_coverage = child_coverage
def setDSCP(connection, dscp):
connector = connection.getConnector()
_, sol, opt = INET[connector.af_type]
connector.socket.setsockopt(sol, opt, dscp << 2)
def dscpPatch(dscp):
Node_setConnection = Node.setConnection
Node.dscp = dscp
def setConnection(self, connection, force=None):
if self.dscp and self.getType() == NodeTypes.STORAGE:
setDSCP(connection, 1)
return Node_setConnection(self, connection, force)
Node.setConnection = setConnection
class Client(Process):
_fmt = '!I200s'
prev_count = 0
def __init__(self, command, thread_count, **kw):
super(Client, self).__init__(command)
self.config = kw
self.count = RawArray('I', thread_count)
self.thread_count = thread_count
def run(self):
from neo.lib.threaded_app import registerLiveDebugger
registerLiveDebugger() # for on_log
dscpPatch(0)
self._dscp_lock = threading.Lock()
storage = Storage(**self.config)
db = DB(storage=storage)
try:
if self.thread_count == 1:
self.worker(db)
else:
r, w = os.pipe()
try:
for i in xrange(self.thread_count):
t = threading.Thread(target=self.worker,
args=(db, i, w), name='worker-%s' % i)
t.daemon = 1
t.start()
while 1:
try:
os.read(r, 1)
break
except OSError, e:
if e.errno != errno.EINTR:
raise
finally:
os.close(r)
finally:
db.close()
def worker(self, db, i=0, stop=None):
try:
nm = db.storage.app.nm
conn = db.open()
r = conn.root()
count = self.count
name = self.command
if self.thread_count > 1:
name += ':%s' % i
j = 0
k = None
logs = r.values()
pack = Struct(self._fmt).pack
while 1:
txn = transaction_begin()
try:
data = pack(j, name)
for log in random.sample(logs, 2):
log.append(data)
txn.note(name)
self.setDSCP(nm, 1)
try:
txn.commit()
finally:
self.setDSCP(nm, -1)
except (
POSException.StorageError, # XXX: 'already connected' error
POSException.ConflictError, # XXX: same but during conflict resolution
), e:
if 'unexpected packet:' in str(e):
raise
if j != k:
logging.exception('j = %s', j)
k = j
txn.abort()
continue
j += 1
count[i] = j
finally:
if stop is not None:
try:
os.write(stop, '\0')
except OSError:
pass
def setDSCP(self, nm, dscp):
with self._dscp_lock:
prev = Node.dscp
dscp += prev
Node.dscp = dscp
if dscp and prev:
return
for node in nm.getStorageList():
try:
setDSCP(node.getConnection(), dscp)
except (AttributeError, AssertionError,
# XXX: EBADF due to race condition
socket.error):
pass
@classmethod
def check(cls, r):
nodes = {}
hosts = []
buckets = [0, 0]
item_list = []
unpack = Struct(cls._fmt).unpack
def decode(item):
i, host = unpack(item)
return i, host.rstrip('\0')
for log in r.values():
bucket = log._next
if bucket is None:
bucket = log
buckets[:] = bucket._p_estimated_size, 1
while 1:
for item in bucket._log:
i, host = decode(item)
try:
node = nodes[host]
except KeyError:
node = nodes[host] = len(nodes)
hosts.append(host)
item_list.append((i, node))
if bucket is log:
break
buckets[0] += bucket._p_estimated_size
buckets[1] += 1
bucket = bucket._next
item_list.sort()
nodes = [0] * len(nodes)
for i, node in item_list:
j = nodes[node] // 2
if i != j:
#import code; code.interact(banner="", local=locals())
sys.exit('node: %s, expected: %s, stored: %s'
% (hosts[node], j, i))
nodes[node] += 1
for node, host in sorted(enumerate(hosts), key=lambda x: x[1]):
print('%s\t%s' % (nodes[node], host))
print('average bucket size: %f' % (buckets[0] / buckets[1]))
print('target bucket size:', log._bucket_size)
print('number of full buckets:', buckets[1])
@property
def logfile(self):
return self.config['logfile']
class NFQueue(Process):
def __init__(self, queue):
super(NFQueue, self).__init__('nfqueue_%i' % queue)
self.lock = l = Lock(); l.acquire()
self.queue = queue
def run(self):
acquire = self.lock.acquire
delay = self.delay
nfqueue = NetfilterQueue()
if delay:
from gevent import sleep, socket, spawn
from random import random
def callback(packet):
if acquire(0): packet.set_mark(1)
else: sleep(random() * delay)
packet.accept()
callback = partial(spawn, callback)
else:
def callback(packet):
if acquire(0): packet.set_mark(1)
packet.accept()
nfqueue.bind(self.queue, callback)
try:
if delay:
s = socket.fromfd(nfqueue.get_fd(),
socket.AF_UNIX, socket.SOCK_STREAM)
try:
nfqueue.run_socket(s)
finally:
s.close()
else:
while 1:
nfqueue.run() # returns on signal (e.g. SIGWINCH)
finally:
nfqueue.unbind()
class Alarm(threading.Thread):
__interrupt = BaseException()
def __init__(self, signal, timeout):
super(Alarm, self).__init__()
self.__signal = signal
self.__timeout = timeout
def __enter__(self):
self.__r, self.__w = os.pipe()
self.__prev = signal.signal(self.__signal, self.__raise)
self.start()
def __exit__(self, t, v, tb):
try:
try:
os.close(self.__w)
self.join()
finally:
os.close(self.__r)
signal.signal(self.__signal, self.__prev)
return v is self.__interrupt
except BaseException as e:
if e is not self.__interrupt:
raise
def __raise(self, sig, frame):
raise self.__interrupt
def run(self):
if not select.select((self.__r,), (), (), self.__timeout)[0]:
os.kill(os.getpid(), self.__signal)
class NEOCluster(NEOCluster):
def _newProcess(self, node_type, logfile=None, port=None, **kw):
super(NEOCluster, self)._newProcess(node_type, logfile,
port or self.port_allocator.allocate(
self.address_type, self.local_ip),
**kw)
class Application(StressApplication):
_blocking = None
def __init__(self, client_count, thread_count, restart_ratio, logrotate,
*args, **kw):
self.client_count = client_count
self.thread_count = thread_count
self.logrotate = logrotate
self.restart_ratio = restart_ratio
self.cluster = cluster = NEOCluster(*args, **kw)
# Make the firewall also affect connections between storage nodes.
StorageApplication__init__ = StorageApplication.__init__
def __init__(self, config):
dscpPatch(1)
StorageApplication__init__(self, config)
StorageApplication.__init__ = __init__
super(Application, self).__init__(cluster.SSL,
util.parseMasterList(cluster.master_nodes))
self._nft_family = INET[cluster.address_type][0]
self._nft_table = 'stress_%s' % os.getpid()
self._blocked = []
n = kw['replicas']
self._fault_count = len(kw['db_list']) * n // (1 + n)
@property
def name(self):
return self.cluster.cluster_name
def run(self):
super(Application, self).run()
try:
with self.db() as r:
Client.check(r)
finally:
self.cluster.stop()
@contextmanager
def db(self):
cluster = self.cluster
cluster.start()
db, conn = cluster.getZODBConnection()
try:
yield conn.root()
finally:
db.close()
def startCluster(self):
with self.db() as r:
txn = transaction_begin()
for i in xrange(2 * self.client_count * self.thread_count):
r[i] = ConflictFreeLog()
txn.commit()
cluster = self.cluster
process_list = cluster.process_dict[NFQueue] = []
nft_family = self._nft_family
queue = []
for _, (ip, port), nid, _, _ in sorted(cluster.getStorageList(),
key=lambda x: x[2]):
queue.append(
"%s daddr %s tcp dport %s counter queue num %s bypass"
% (nft_family, ip, port, nid))
p = NFQueue(nid)
process_list.append(p)
p.start()
ruleset = NFT_TEMPLATE % (nft_family, self._nft_table,
nft_family, '\n '.join(queue), nft_family)
p = subprocess.Popen(('nft', '-f', '-'), stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
err = p.communicate(ruleset)[0].rstrip()
if p.poll():
sys.exit("Failed to apply the following ruleset:\n%s\n%s"
% (ruleset, err))
process_list = cluster.process_dict[Client] = []
config = cluster.getClientConfig()
self.started = time.time()
for i in xrange(self.client_count):
name = 'client_%i' % i
p = Client(name, self.thread_count,
logfile=os.path.join(cluster.temp_dir, name + '.log'),
**config)
process_list.append(p)
p.start()
if self.logrotate:
t = threading.Thread(target=self._logrotate_thread)
t.daemon = 1
t.start()
def stopCluster(self, wait=None):
self.restart_lock.acquire()
self._cleanFirewall()
process_dict = self.cluster.process_dict
if wait:
# Give time to flush logs before SIGKILL.
wait += 5 - time.time()
if wait > 0:
with Alarm(signal.SIGUSR1, wait):
for x in Client, NodeTypes.STORAGE:
for x in process_dict[x]:
x.wait()
self.cluster.stop()
try:
del process_dict[NFQueue], process_dict[Client]
except KeyError:
pass
def _logrotate_thread(self):
try:
import zstd
except ImportError:
import gzip, shutil
zstd = None
compress = []
rotated = {}
t = time.time()
while 1:
t += self.logrotate
x = t - time.time()
if x > 0:
time.sleep(x)
x = datetime.utcnow().strftime('-%Y%m%d%H%M%S.log')
for p, process_list in self.cluster.process_dict.iteritems():
if p is not NFQueue:
for p in process_list:
log = p.logfile
if os.path.exists(log):
y = rotated.get(log)
if y:
compress.append(y)
y = log[:-4] + x
os.rename(log, y)
rotated[log] = y
try:
p.kill(signal.SIGRTMIN+1)
except AlreadyStopped:
pass
for log in compress:
if zstd:
with open(log, 'rb') as src:
x = zstd.compress(src.read())
y = log + '.zst'
with open(y, 'wb') as dst:
dst.write(x)
else:
y = log + '.gz'
with open(log, 'rb') as src, gzip.open(y, 'wb') as dst:
shutil.copyfileobj(src, dst, 1<<20)
x = os.stat(log)
os.utime(y, (x.st_atime, x.st_mtime))
os.remove(log)
del compress[:]
def tcpReset(self, nid):
p = self.cluster.process_dict[NFQueue][nid-1]
assert p.queue == nid, (p.queue, nid)
try:
p.lock.release()
except ValueError:
pass
def restartStorages(self, nids):
processes = [p for p in self.cluster.getStorageProcessList()
if p.uuid in nids]
for p in processes: p.kill(signal.SIGKILL)
time.sleep(1)
for p in processes: p.wait()
for p in processes: p.start()
def _cleanFirewall(self):
with open(os.devnull, "wb") as f:
subprocess.call(('nft', 'delete', 'table',
self._nft_family, self._nft_table), stderr=f)
_ids_height = 4
def refresh_ids(self, y):
attr = curses.A_NORMAL, curses.A_BOLD
stdscr = self.stdscr
ltid = self.ltid
stdscr.addstr(y, 0,
'last oid: 0x%x\nlast tid: 0x%x (%s)\nclients: '
% (u64(self.loid), u64(ltid), timeStringFromTID(ltid)))
before = after = 0
for i, p in enumerate(self.cluster.process_dict[Client]):
if i:
stdscr.addstr(', ')
count = sum(p.count)
before += p.prev_count
after += count
stdscr.addstr(str(count), attr[p.prev_count==count])
p.prev_count = count
elapsed = time.time() - self.started
s, ms = divmod(int(elapsed * 1000), 1000)
m, s = divmod(s, 60)
stdscr.addstr(' (+%s)\n\t%sm%02u.%03us (%f/s)\n' % (
after - before, m, s, ms, after / elapsed))
def console(port, app):
from pdb import Pdb
cluster = app.cluster
def console(socket):
Pdb(stdin=socket, stdout=socket).set_trace()
app # this is Application instance
s = socket.socket(cluster.address_type, socket.SOCK_STREAM)
# XXX: The following commented line would only work with Python 3, which
# fixes refcounting of sockets (e.g. when there's a call to .accept()).
#Process.on_fork.append(s.close)
s.bind((cluster.local_ip, port))
s.listen(0)
while 1:
t = threading.Thread(target=console, args=(PdbSocket(s.accept()[0]),))
t.daemon = 1
t.start()
class ArgumentDefaultsHelpFormatter(argparse.HelpFormatter):
def _format_action(self, action):
if not (action.help or action.default in (None, argparse.SUPPRESS)):
action.help = '(default: %(default)s)'
return super(ArgumentDefaultsHelpFormatter, self)._format_action(action)
def main():
adapters = sorted(DATABASE_MANAGER_DICT)
adapters.remove('Importer')
default_adapter = 'SQLite'
assert default_adapter in adapters
kw = dict(formatter_class=ArgumentDefaultsHelpFormatter)
parser = argparse.ArgumentParser(**kw)
_ = parser.add_argument
_('-6', '--ipv6', dest='address_type', action='store_const',
default=socket.AF_INET, const=socket.AF_INET6, help='(default: IPv4)')
_('-a', '--adapter', choices=adapters, default=default_adapter)
_('-d', '--datadir', help="(default: same as unit tests)")
_('-l', '--logdir', help="(default: same as --datadir)")
_('-m', '--masters', type=int, default=1)
_('-s', '--storages', type=int, default=8)
_('-p', '--partitions', type=int, default=24)
_('-r', '--replicas', type=int, default=1)
parsers = parser.add_subparsers(dest='command')
def ratio(value):
value = float(value)
if 0 <= value <= 1:
return value
raise argparse.ArgumentTypeError("ratio ∉ [0,1]")
_ = parsers.add_parser('run',
help='Start a new DB and fills it in a way that triggers many conflict'
' resolutions and deadlock avoidances. Stressing the cluster will'
' cause external faults every second, to check that NEO can'
' recover. The ingested data is checked at exit.',
**kw).add_argument
_('-c', '--clients', type=int, default=10,
help='number of client processes')
_('-t', '--threads', type=int, default=1,
help='number of thread workers per client process')
_('-r', '--restart-ratio', type=ratio, default=.5, metavar='RATIO',
help='probability to kill/restart a storage node, rather than just'
' RSTing a TCP connection with this node')
_('-C', '--console', type=int, default=0,
help='console port (localhost) (default: any)')
_('-D', '--delay', type=float, default=.01,
help='randomly delay packets to storage nodes'
' by a duration between 0 and DELAY seconds')
_('-L', '--logrotate', type=float, default=1, metavar='HOUR')
_ = parsers.add_parser('check',
help='Check ingested data.',
**kw).add_argument
_('tid', nargs='?')
_ = parsers.add_parser('bisect',
help='Search for the first TID that contains corrupted data.',
**kw).add_argument
args = parser.parse_args()
db_list = ['stress_neo%s' % x for x in xrange(args.storages)]
if args.datadir:
if args.adapter != 'SQLite':
parser.error('--datadir is only for SQLite adapter')
db_list = [os.path.join(args.datadir, x + '.sqlite') for x in db_list]
kw = dict(db_list=db_list, name='stress',
partitions=args.partitions, replicas=args.replicas,
adapter=args.adapter, address_type=args.address_type,
temp_dir=args.logdir or args.datadir or getTempDirectory())
if args.command == 'run':
NFQueue.delay = args.delay
app = Application(args.clients, args.threads, args.restart_ratio,
int(round(args.logrotate * 3600, 0)), **kw)
t = threading.Thread(target=console, args=(args.console, app))
t.daemon = 1
t.start()
app.run()
return
cluster = NEOCluster(clear_databases=False, **kw)
try:
cluster.start()
storage = cluster.getZODBStorage()
db = DB(storage=storage)
try:
if args.command == 'check':
tid = args.tid
conn = db.open(at=tid and p64(int(tid, 0)))
Client.check(conn.root())
else:
assert args.command == 'bisect'
conn = db.open()
try:
r = conn.root()
r._p_activate()
ok = r._p_serial
finally:
conn.close()
bad = storage.lastTransaction()
while 1:
print('ok: 0x%x, bad: 0x%x' % (u64(ok), u64(bad)))
tid = p64((u64(ok)+u64(bad)) // 2)
if ok == tid:
break
conn = db.open(at=tid)
try:
Client.check(conn.root())
except SystemExit, e:
print(e)
bad = tid
else:
ok = tid
finally:
conn.close()
print('bad: 0x%x (%s)' % (u64(bad), timeStringFromTID(bad)))
finally:
db.close()
finally:
cluster.stop()
if __name__ == '__main__':
sys.exit(main())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment