Commit 2e98a411 authored by Kirill Smelkov's avatar Kirill Smelkov

Sync with NEO/py v1.11

parents b3e24d58 e7663c22
......@@ -6,5 +6,6 @@
/build/
/dist/
/htmlcov/
/neo/tests/ConflictFree.py
/neo/tests/mock.py
/neoppod.egg-info/
Change History
==============
1.11 (2019-03-11)
-----------------
This release continues the work in v1.8 to stabilize NEO. A new 'stress'
tool was added: it kills storage nodes and resets TCP connections randomly,
while causing high concurrency activity. It revealed many bugs of all kinds,
including crashes and corruptions. Most of them happened after network
disconnection. In order to fix them all, several improvements have also been
done to logging:
- New neoctl command to flush the logs of all nodes in the cluster.
- In logs, dump the partition table in a more compact and readable way.
- client: log_flush most exceptions raised from Application to ZODB
- More RTMIN+2 (log) information for clients and connections.
- New log format to show node id (and optionally cluster name) in node column.
- neolog: add support for zstd-compressed logs.
- neolog: do not die when a table is corrupted.
Other changes:
- sqlite: optimize storage of metadata (the speed up in v1.9 about indexing
'obj' primarily by 'oid' was only effective for MySQL).
- Fix error handling when setting up a listening connector.
- The command line parsing of all executables has been completely rewritten,
fixing a few minor bugs.
1.10 (2018-07-16)
-----------------
......
......@@ -211,8 +211,10 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption
if opt.Preload != "" {
cmd := exec.Command("python", "-c",
"from neo.scripts.neomigrate import main; main()",
"-s", opt.Preload,
"-d", npy.MasterAddr(), "-c", npy.ClusterName())
"-q",
"-c", npy.ClusterName(),
opt.Preload,
npy.MasterAddr())
cmd.Stdin = nil
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
......
......@@ -81,7 +81,7 @@ const (
// The protocol version must be increased whenever upgrading a node may require
// to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
// the high order byte 0 is different from TLS Handshake (0x16).
Version = 4
Version = 5
// length of packet header
PktHeaderLen = 10 // = unsafe.Sizeof(PktHeader{}), but latter gives typed constant (uintptr)
......@@ -1175,6 +1175,12 @@ type Truncate struct {
// answer = Error
}
// Request all nodes to flush their logs.
//
//neo:nodes ctl -> A -> M -> *
type FlushLog struct {}
// ---- runtime support for protogen and custom codecs ----
// customCodec is the interface that is implemented by types with custom encodings.
......
......@@ -4092,6 +4092,23 @@ overflow:
return 0, ErrDecodeOverflow
}
// 65. FlushLog
func (*FlushLog) NEOMsgCode() uint16 {
return 65
}
func (p *FlushLog) NEOMsgEncodedLen() int {
return 0
}
func (p *FlushLog) NEOMsgEncode(data []byte) {
}
func (p *FlushLog) NEOMsgDecode(data []byte) (int, error) {
return 0, nil
}
// registry of message types
var msgTypeRegistry = map[uint16]reflect.Type{
0 | answerBit: reflect.TypeOf(Error{}),
......@@ -4193,4 +4210,5 @@ var msgTypeRegistry = map[uint16]reflect.Type{
62: reflect.TypeOf(AddTransaction{}),
63: reflect.TypeOf(AddObject{}),
64: reflect.TypeOf(Truncate{}),
65: reflect.TypeOf(FlushLog{}),
}
......@@ -66,6 +66,7 @@ var pyMsgRegistry = map[uint16]string{
62: "AddTransaction",
63: "AddObject",
64: "Truncate",
65: "FlushLog",
32768: "Error",
32769: "AcceptIdentification",
32770: "Pong",
......
......@@ -99,7 +99,7 @@ const trans = `
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid)
`
` // TODO create "WITHOUT ROWID"
// table "obj" stores committed object metadata.
const obj = `
......@@ -111,7 +111,7 @@ const obj = `
-- XXX ^^^ can be NOT NULL with 0 serving instead
PRIMARY KEY (partition, oid, tid)
`
` // TODO create "WITHOUT ROWID"
// `(partition, tid, oid)`
// `(data_id)`
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
......@@ -25,24 +25,36 @@ from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger
@buildOptionParser
class Application(BaseApplication):
"""The storage node application."""
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Admin node"
cls.addCommonServerOptions('admin', '127.0.0.1:9999')
_ = _.group('admin')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
for address in config.getMasters():
config.get('ssl'), config.get('dynamic_master_list'))
for address in config['masters']:
self.nm.createMaster(address=address)
self.name = config.getCluster()
self.server = config.getBind()
self.name = config['cluster']
self.server = config['bind']
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
self.uuid = config.getUUID()
self.uuid = config.get('uuid')
logging.node(self.name, self.uuid)
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -62,6 +62,11 @@ class AdminEventHandler(EventHandler):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
@check_primary_master
def flushLog(self, conn):
self.app.master_conn.send(Packets.FlushLog())
super(AdminEventHandler, self).flushLog(conn)
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.POSException import ConflictError, UndoError
from zope.interface import implementer
import ZODB.interfaces
......@@ -81,31 +82,68 @@ class Storage(BaseStorage.BaseStorage,
return self.app.load(oid)[:2]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
def new_oid(self):
try:
return self.app.new_oid()
except Exception:
logging.exception('')
raise
def tpc_begin(self, transaction, tid=None, status=' '):
"""
Note: never blocks in NEO.
"""
try:
return self.app.tpc_begin(self, transaction, tid, status)
except Exception:
logging.exception('transaction=%r, tid=%r', transaction, tid)
raise
def tpc_vote(self, transaction):
try:
return self.app.tpc_vote(transaction)
except ConflictError:
raise
except Exception:
logging.exception('transaction=%r', transaction)
raise
def tpc_abort(self, transaction):
try:
return self.app.tpc_abort(transaction)
except Exception:
logging.exception('transaction=%r', transaction)
raise
def tpc_finish(self, transaction, f=None):
try:
return self.app.tpc_finish(transaction, f)
except Exception:
logging.exception('transaction=%r', transaction)
raise
def store(self, oid, serial, data, version, transaction):
assert version == '', 'Versions are not supported'
try:
return self.app.store(oid, serial, data, version, transaction)
except ConflictError:
raise
except Exception:
logging.exception('oid=%r, serial=%r, transaction=%r',
oid, serial, transaction)
raise
def deleteObject(self, oid, serial, transaction):
try:
self.app.store(oid, serial, None, None, transaction)
except Exception:
logging.exception('oid=%r, serial=%r, transaction=%r',
oid, serial, transaction)
raise
# multiple revisions
def loadSerial(self, oid, serial):
......@@ -113,6 +151,9 @@ class Storage(BaseStorage.BaseStorage,
return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
except Exception:
logging.exception('oid=%r, serial=%r', oid, serial)
raise
def loadBefore(self, oid, tid):
try:
......@@ -121,6 +162,9 @@ class Storage(BaseStorage.BaseStorage,
raise POSException.POSKeyError(oid)
except NEOStorageNotFoundError:
return None
except Exception:
logging.exception('oid=%r, tid=%r', oid, tid)
raise
@property
def iterator(self):
......@@ -128,10 +172,20 @@ class Storage(BaseStorage.BaseStorage,
# undo
def undo(self, transaction_id, txn):
try:
return self.app.undo(transaction_id, txn)
except (ConflictError, UndoError):
raise
except Exception:
logging.exception('transaction_id=%r, txn=%r', transaction_id, txn)
raise
def undoLog(self, first=0, last=-20, filter=None):
try:
return self.app.undoLog(first, last, filter)
except Exception:
logging.exception('first=%r, last=%r', first, last)
raise
def supportsUndo(self):
return True
......@@ -141,10 +195,17 @@ class Storage(BaseStorage.BaseStorage,
data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
return data, serial, ''
def __len__(self):
try:
return self.app.getObjectCount()
except Exception:
logging.exception('')
raise
def registerDB(self, db, limit=None):
self.app.registerDB(db, limit)
......@@ -154,19 +215,30 @@ class Storage(BaseStorage.BaseStorage,
return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
except Exception:
logging.exception('oid=%r', oid)
raise
def sync(self):
return self.app.sync()
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
try:
return self.app.importFrom(self, source)
except Exception:
logging.exception('source=%r', source)
raise
def pack(self, t, referencesf, gc=False):
if gc:
logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.')
try:
self.app.pack(t)
except Exception:
logging.exception('pack_time=%r', t)
raise
def lastSerial(self):
# seems unused
......@@ -198,6 +270,14 @@ class Storage(BaseStorage.BaseStorage,
return self.app.getLastTID(oid)
except NEOStorageNotFoundError:
raise KeyError
except Exception:
logging.exception('oid=%r', oid)
raise
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
try:
self.app.checkCurrentSerialInTransaction(oid, serial, transaction)
except Exception:
logging.exception('oid=%r, serial=%r, transaction=%r',
oid, serial, transaction)
raise
This diff is collapsed.
#
# Copyright (C) 2011-2017 Nexedi SA
# Copyright (C) 2011-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -64,7 +64,7 @@ class ClientCache(object):
- The history queue only contains items with counter > 0
"""
__slots__ = ('_life_time', '_max_history_size', '_max_size',
__slots__ = ('max_size', '_life_time', '_max_history_size',
'_queue_list', '_oid_dict', '_time', '_size', '_history_size',
'_nhit', '_nmiss')
......@@ -72,7 +72,7 @@ class ClientCache(object):
max_size=20*1024*1024):
self._life_time = life_time
self._max_history_size = max_history_size
self._max_size = max_size
self.max_size = max_size
self.clear()
def clear(self):
......@@ -94,7 +94,7 @@ class ClientCache(object):
[self._history_size] + [
sum(1 for _ in self._iterQueue(level))
for level in xrange(1, len(self._queue_list))],
self._life_time, self._max_history_size, self._max_size)
self._life_time, self._max_history_size, self.max_size)
def _iterQueue(self, level):
"""for debugging purpose"""
......@@ -168,7 +168,7 @@ class ClientCache(object):
# XXX It might be better to adjust the level according to the object
# size. See commented factor for example.
item.level = 1 + int(_log(counter, 2)
# * (1.01 - len(item.data) / self._max_size)
# * (1.01 - len(item.data) / self.max_size)
)
self._add(item)
......@@ -212,7 +212,7 @@ class ClientCache(object):
def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache"""
size = len(data)
max_size = self._max_size
max_size = self.max_size
if size < max_size:
item = self._load(oid, next_tid)
if item:
......@@ -331,7 +331,7 @@ def test(self):
# Test late invalidations.
cache.clear()
cache.store(1, '10*', 10, None)
cache._max_size = cache._size
cache.max_size = cache._size
cache.store(2, '10', 10, 15)
self.assertEqual(cache._queue_list[0].oid, 1)
cache.store(2, '15', 15, None)
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -127,8 +127,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
for oid in oid_list:
invalidate(oid, tid)
if oid == loading:
app._loading_oid = None
app._loading_invalidated = tid
app._loading_invalidated.append(tid)
db = app.getDB()
if db is not None:
db.invalidate(tid, oid_list)
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,7 +18,8 @@ from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.compress import decompress_list
from neo.lib.protocol import Packets, uuid_str
from neo.lib.connection import ConnectionClosed
from neo.lib.protocol import Packets, uuid_str, ZERO_TID
from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler
......@@ -29,19 +30,6 @@ from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError
class StorageEventHandler(MTEventHandler):
def connectionLost(self, conn, new_state):
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionLost(conn, new_state)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn)
def _acceptIdentification(*args):
pass
......@@ -58,9 +46,12 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerObject(self, conn, oid, *args):
self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflict, oid):
def answerStoreObject(self, conn, conflict, oid, serial):
txn_context = self.app.getHandlerData()
if conflict:
if conflict == ZERO_TID:
txn_context.written(self.app, conn.getUUID(), oid, serial)
return
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that
......@@ -94,7 +85,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
conn.ask(Packets.AskRebaseObject(ttid, oid),
queue=queue, oid=oid)
except ConnectionClosed:
txn_context.involved_nodes[conn.getUUID()] = 2
txn_context.conn_dict[conn.getUUID()] = None
def answerRebaseObject(self, conn, conflict, oid):
if conflict:
......@@ -106,8 +97,10 @@ class StorageAnswersHandler(AnswerBaseHandler):
cached = txn_context.cache_dict.pop(oid)
except KeyError:
if resolved:
# We should still be waiting for an answer from this node.
assert conn.uuid in txn_context.data_dict[oid][2]
# We should still be waiting for an answer from this node,
# unless we lost connection.
assert conn.uuid in txn_context.data_dict[oid][2] or \
txn_context.conn_dict[conn.uuid] is None
return
assert oid in txn_context.data_dict
if serial <= txn_context.conflict_dict.get(oid, ''):
......@@ -135,7 +128,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
if cached:
assert cached == data
txn_context.cache_size -= size
txn_context.data_dict[oid] = data, serial, None
txn_context.data_dict[oid] = data, serial, []
txn_context.conflict_dict[oid] = conflict
def answerStoreTransaction(self, conn):
......@@ -144,6 +137,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerVoteTransaction = answerStoreTransaction
def connectionClosed(self, conn):
# only called if we were waiting for an answer
txn_context = self.app.getHandlerData()
if type(txn_context) is Transaction:
txn_context.nodeLost(self.app, conn.getUUID())
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, time
from neo.lib import logging
from neo.lib.locking import Lock
from neo.lib.protocol import NodeTypes, Packets
from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady
from .exception import NEOPrimaryMasterLost
# How long before we might retry a connection to a node to which connection
# failed in the past.
MAX_FAILURE_AGE = 600
class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes."""
def __init__(self, app):
self.app = app
self.connection_dict = {}
# Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections
# to the same node.
self._lock = Lock()
self.node_failure_dict = {}
def _initNodeConnection(self, node):
"""Init a connection to a given storage node."""
app = self.app
if app.master_conn is None:
raise NEOPrimaryMasterLost
conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, (), app.id_timestamp)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
except NodeNotReady:
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
# Make sure this node will be considered for the next reads
# even if there was a previous recent failure.
self.node_failure_dict.pop(node.getUUID(), None)
return conn
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# Prefer a node that didn't fail recently.
failure = self.node_failure_dict.get(cell.getUUID())
if failure:
if time.time() < failure:
# Or order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(cell.getUUID(), None)
# A random one, connected or not, is a trivial and quite efficient way
# to distribute the load evenly. On write accesses, a client connects
# to all nodes of touched cells, but before that, or if a client is
# specialized to only do read-only accesses, it should not limit
# itself to only use the first connected nodes.
return random()
def getConnForNode(self, node):
"""Return a locked connection object to a given node
If no connection exists, create a new one"""
if node.isRunning():
uuid = node.getUUID()
try:
# Already connected to node
return self.connection_dict[uuid]
except KeyError:
with self._lock:
# Second lookup, if another thread initiated connection
# while we were waiting for connection lock.
try:
return self.connection_dict[uuid]
except KeyError:
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
return conn
def removeConnection(self, node):
self.connection_dict.pop(node.getUUID(), None)
def closeAll(self):
with self._lock:
while 1:
try:
conn = self.connection_dict.popitem()[1]
except KeyError:
break
conn.setReconnectionNoDelay()
conn.close()
#
# Copyright (C) 2017 Nexedi SA
# Copyright (C) 2017-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,10 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
from ZODB.POSException import StorageTransactionError
from neo.lib.connection import ConnectionClosed
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import Packets
from neo.lib.util import dump
from .exception import NEOStorageError
@apply
......@@ -35,6 +37,7 @@ class Transaction(object):
locking_tid = None
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
lockless_dict = None # {partition: {uuid}}
def __init__(self, txn):
self.queue = SimpleQueue()
......@@ -47,72 +50,85 @@ class Transaction(object):
self.conflict_dict = {} # {oid: serial}
# resolved conflicts
self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may
# disappear from the cluster. In any case, we always have to check
# if the id is still known by the NodeManager.
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
# involved storage nodes; connection is None is connection was lost
self.conn_dict = {} # {node_id: connection}
def __repr__(self):
error = self.error
return ("<%s ttid=%s locking_tid=%s voted=%u"
" #queue=%s #writing=%s #written=%s%s>") % (
self.__class__.__name__,
dump(self.ttid), dump(self.locking_tid), self.voted,
len(self.queue._queue), len(self.data_dict), len(self.cache_dict),
' error=%r' % error if error else '')
def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, store=1, **kw):
def write(self, app, packet, object_id, **kw):
uuid_list = []
pt = app.pt
involved = self.involved_nodes
conn_dict = self.conn_dict
object_id = pt.getPartition(object_id)
for cell in pt.getCellList(object_id):
node = cell.getNode()
uuid = node.getUUID()
status = involved.get(uuid, -1)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = app.cp.getConnForNode(node)
if conn is not None:
try:
if status < 0 and self.locking_tid and 'oid' in kw:
try:
conn = conn_dict[uuid]
except KeyError:
conn = conn_dict[uuid] = app.getStorageConnection(node)
if self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because
# we don't need that for transaction metadata.
# for the other nodes. The condition on kw is to
# distinguish whether we're writing an oid or
# transaction metadata.
conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid)
continue
except AttributeError:
if conn is not None:
raise
except ConnectionClosed:
pass
involved[uuid] = 2
conn_dict[uuid] = None
if uuid_list:
return uuid_list
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid):
# When a node that is being disconnected by the master because it was
def written(self, app, uuid, oid, lockless=None):
# When a node is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
# conflict. Because we have no way to identify such case, we must keep
# the data in self.data_dict until all nodes have answered so we remain
# able to resolve conflicts.
try:
data, serial, uuid_list = self.data_dict[oid]
try:
uuid_list.remove(uuid)
except KeyError:
# 1. store to S1 and S2
# 2. S2 reports a conflict
# 3. store to S1 and S2 # conflict resolution
# 4. S1 does not report a conflict (lockless)
# 5. S2 answers before S1 for the second store
return
except ValueError:
# The most common case for this exception is because nodeLost()
# tries all oids blindly. Other possible cases:
# - like above (KeyError), but with S2 answering last
# - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master
# tries all oids blindly.
# Another possible case is when we receive several positive answers
# from a node that is being disconnected by the master, whereas the
# first one (at least) should actually be conflict answer.
return
if lockless:
if lockless != serial: # late lockless write
assert lockless < serial, (lockless, serial)
uuid_list.append(uuid)
return
# It's safe to do this after the above excepts: either the cell is
# already marked as lockless or the node will be reported as failed.
lockless = self.lockless_dict
if not lockless:
lockless = self.lockless_dict = defaultdict(set)
lockless[app.pt.getPartition(oid)].add(uuid)
if oid in self.conflict_dict:
# In the case of a rebase, uuid_list may not contain the id
# of the node reporting a conflict.
return
if uuid_list:
return
......@@ -121,7 +137,7 @@ class Transaction(object):
size = len(data)
self.data_size -= size
size += self.cache_size
if size < app._cache._max_size:
if size < app._cache.max_size:
self.cache_size = size
else:
# Do not cache data past cache max size, as it
......@@ -131,8 +147,13 @@ class Transaction(object):
self.cache_dict[oid] = data
def nodeLost(self, app, uuid):
self.involved_nodes[uuid] = 2
# The following line is sometimes redundant
# with the one in `except ConnectionClosed:` clauses.
self.conn_dict[uuid] = None
for oid in list(self.data_dict):
# Exclude case of 1 conflict error immediately followed by a
# connection loss, possibly with lockless writes to replicas.
if oid not in self.conflict_dict:
self.written(app, uuid, oid)
......
#
# Copyright (C) 2017 Nexedi SA
# Copyright (C) 2017-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
......@@ -45,50 +45,12 @@ if IF == 'pdb':
#('ZPublisher.Publish', 'publish_module_standard'),
)
import errno, socket, threading, weakref
# Unfortunately, IPython does not always print to given stdout.
#from neo.lib.debug import getPdb
import socket, threading, weakref
from neo.lib.debug import PdbSocket
# We don't use the one from neo.lib.debug because unfortunately,
# IPython does not always print to given stdout.
from pdb import Pdb as getPdb
class Socket(object):
def __init__(self, socket):
# In case that the default timeout is not None.
socket.settimeout(None)
self._socket = socket
self._buf = ''
def write(self, data):
self._socket.send(data)
def readline(self):
recv = self._socket.recv
data = self._buf
while True:
i = 1 + data.find('\n')
if i:
self._buf = data[i:]
return data[:i]
d = recv(4096)
data += d
if not d:
self._buf = ''
return data
def flush(self):
pass
def closed(self):
self._socket.setblocking(0)
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
def pdb():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
......@@ -98,7 +60,7 @@ if IF == 'pdb':
s.listen(0)
print 'Listening to %u' % s.getsockname()[1]
sys.stdout.flush() # BBB: On Python 3, print() takes a 'flush' arg.
_socket = Socket(s.accept()[0])
_socket = PdbSocket(s.accept()[0])
finally:
s.close()
try:
......@@ -155,9 +117,12 @@ if IF == 'pdb':
if BP:
setupBreakPoints(BP)
else:
threading.Thread(target=pdb).start()
threading.Thread(target=pdb, name='pdb').start()
elif IF == 'frames':
# WARNING: Because of https://bugs.python.org/issue17094, the output is
# usually incorrect for subprocesses started by the functional
# test framework.
import traceback
write = sys.stderr.write
for thread_id, frame in sys._current_frames().iteritems():
......@@ -178,3 +143,68 @@ elif IF == 'profile':
prof = cProfile.Profile()
threading.Timer(DURATION, stop, (prof, path)).start()
prof.enable()
elif IF == 'trace-cache':
from struct import Struct
from .client.cache import ClientCache
from .lib.protocol import uuid_str, ZERO_TID as z64
class CacheTracer(object):
_pack2 = Struct('!B8s8s').pack
_pack4 = Struct('!B8s8s8sL').pack
def __init__(self, cache, path):
self._cache = cache
self._trace_file = open(path, 'a')
def close(self):
self._trace_file.close()
return self._cache
def _trace(self, op, x, y=z64, z=z64):
self._trace_file.write(self._pack(op, x, y, z))
def __repr__(self):
return repr(self._cache)
@property
def max_size(self):
return self._cache.max_size
def clear(self):
self._trace_file.write('\0')
self._cache.clear()
def clear_current(self):
self._trace_file.write('\1')
self._cache.clear_current()
def load(self, oid, before_tid=None):
r = self._cache.load(oid, before_tid)
self._trace_file.write(self._pack2(
3 if r else 2, oid, before_tid or z64))
return r
def store(self, oid, data, tid, next_tid):
self._trace_file.write(self._pack4(
4, oid, tid, next_tid or z64, len(data)))
self._cache.store(oid, data, tid, next_tid)
def invalidate(self, oid, tid):
self._trace_file.write(self._pack2(5, oid, tid))
self._cache.invalidate(oid, tid)
@defer
def profile(app):
app._cache_lock_acquire()
try:
cache = app._cache
if type(cache) is ClientCache:
app._cache = CacheTracer(cache, '%s-%s.neo-cache-trace' %
(app.name, uuid_str(app.uuid)))
app._cache.clear()
else:
app._cache = cache.close()
finally:
app._cache_lock_release()
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2015-2017 Nexedi SA
# Copyright (C) 2015-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,16 +14,57 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import logging
from . import logging, util
from .config import OptionList
from .event import EventManager
from .node import NodeManager
def buildOptionParser(cls):
parser = cls.option_parser = cls.OptionList()
_ = parser.path
_('l', 'logfile',
help="log debugging information to specified SQLite DB")
_('ca', help="(SSL) certificate authority in PEM format")
_('cert', help="(SSL) certificate in PEM format")
_('key', help="(SSL) private key in PEM format")
cls._buildOptionParser()
return cls
class BaseApplication(object):
class OptionList(OptionList):
def parse(self, argv=None):
config = OptionList.parse(self, argv)
ssl = (
config.pop('ca', None),
config.pop('cert', None),
config.pop('key', None),
)
if any(ssl):
config['ssl'] = ssl
return config
server = None
ssl = None
@classmethod
def addCommonServerOptions(cls, section, bind, masters='127.0.0.1:10000'):
_ = cls.option_parser.group('server node')
_.path('f', 'file', help='specify a configuration file')
_('s', 'section', default=section,
help='specify a configuration section')
_('c', 'cluster', required=True, help='the cluster name')
_('m', 'masters', default=masters, parse=util.parseMasterList,
help='master node list')
_('b', 'bind', default=bind,
parse=lambda x: util.parseNodeAddress(x, 0),
help='the local address to bind to')
_.path('D', 'dynamic-master-list',
help='path of the file containing dynamic master node list')
def __init__(self, ssl=None, dynamic_master_list=None):
if ssl:
if not all(ssl):
......@@ -60,3 +101,8 @@ class BaseApplication(object):
self.nm.close()
self.em.close()
self.__dict__.clear()
def setUUID(self, uuid):
if self.uuid != uuid:
self.uuid = uuid
logging.node(self.name, uuid)
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2018 Nexedi SA
# Copyright (C) 2018-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
This diff is collapsed.
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -335,6 +335,7 @@ class Connection(BaseConnection):
return r, flags
def setOnClose(self, callback):
assert not self.isClosed(), self
self._on_close = callback
def isClient(self):
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -34,6 +34,7 @@ class SocketConnector(object):
is_closed = is_server = None
connect_limit = {}
CONNECT_LIMIT = 1 # XXX actually this is (RE-)CONNECT_THROTTLE
KEEPALIVE = 60, 3, 10
SOMAXCONN = 5 # for threaded tests
def __new__(cls, addr, s=None):
......@@ -66,9 +67,10 @@ class SocketConnector(object):
# The following 3 lines are specific to Linux. It seems that OSX
# has similar options (TCP_KEEPALIVE/TCP_KEEPINTVL/TCP_KEEPCNT),
# and Windows has SIO_KEEPALIVE_VALS (fixed count of 10).
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
idle, cnt, intvl = self.KEEPALIVE
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, cnt)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, intvl)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
......@@ -127,6 +129,7 @@ class SocketConnector(object):
self._bind(self.addr)
self.socket.listen(self.SOMAXCONN)
except socket.error, e:
self.is_closed = True
self.socket.close()
self._error('listen', e)
......@@ -174,7 +177,11 @@ class SocketConnector(object):
self._error('recv')
def send(self):
# XXX: unefficient for big packets
# XXX: Inefficient for big packets. In any case, we should make sure
# that 'msg' does not exceed 2GB with SSL (OverflowError).
# Before commit 1a064725b81a702a124d672dba2bcae498980c76,
# this happened when many big AddObject packets were sent
# for a single replication chunk.
msg = ''.join(self.queued)
if msg:
try:
......@@ -216,12 +223,13 @@ class SocketConnector(object):
def __repr__(self):
if self.is_closed is None:
state = 'never opened'
state = ', never opened'
else:
if self.is_closed:
state = 'closed '
state = ', closed '
else:
state = 'opened '
state = ' fileno %s %s, opened ' % (
self.socket_fd, self.getAddress())
if self.is_server is None:
state += 'listening'
else:
......@@ -230,9 +238,7 @@ class SocketConnector(object):
else:
state += 'to '
state += str(self.addr)
return '<%s at 0x%x fileno %s %s, %s>' % (self.__class__.__name__,
id(self), '?' if self.is_closed else self.socket_fd,
self.getAddress(), state)
return '<%s at 0x%x%s>' % (self.__class__.__name__, id(self), state)
class SocketConnectorIPv4(SocketConnector):
" Wrapper for IPv4 sockets"
......
#
# Copyright (C) 2010-2017 Nexedi SA
# Copyright (C) 2010-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,11 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
import signal
import imp
import os
import sys
import errno, imp, os, signal, socket, sys, traceback
from functools import wraps
import neo
......@@ -82,9 +78,55 @@ def winpdb(depth=0):
os.abort()
def register(on_log=None):
try:
if on_log is not None:
@safe_handler
def on_log_signal(signum, signal):
on_log()
signal.signal(signal.SIGRTMIN+2, on_log_signal)
signal.signal(signal.SIGRTMIN+3, debugHandler)
except ValueError: # signal only works in main thread
pass
class PdbSocket(object):
def __init__(self, socket):
# In case that the default timeout is not None.
socket.settimeout(None)
self._socket = socket
self._buf = ''
def close(self):
self._socket.close()
def write(self, data):
self._socket.send(data)
def readline(self):
recv = self._socket.recv
data = self._buf
while True:
i = 1 + data.find('\n')
if i:
self._buf = data[i:]
return data[:i]
d = recv(4096)
data += d
if not d:
self._buf = ''
return data
def flush(self):
pass
def closed(self):
self._socket.setblocking(0)
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -87,13 +87,11 @@ class Dispatcher:
def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock
threads expecting responses from that connection """
notified_set = set()
_decrefQueue = self._decrefQueue
self.lock_acquire()
try:
message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.lock_release()
notified_set = set()
_decrefQueue = self._decrefQueue
for queue in message_table.itervalues():
if queue is NOBODY:
continue
......@@ -102,6 +100,8 @@ class Dispatcher:
queue.put((conn, _ConnectionClosed, EMPTY))
notified_set.add(queue_id)
_decrefQueue(queue)
finally:
self.lock_release()
@giant_lock
def forget_queue(self, queue, flush_queue=True):
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -314,6 +314,11 @@ class EpollEventManager(object):
for fd, conn in self.connection_dict.items():
logging.info(' %r: %r (pending=%r)', fd, conn,
conn in pending_set)
for request_dict, handler in conn._handlers._pending:
handler = handler.__class__.__name__
for msg_id, (klass, kw) in sorted(request_dict.items()):
logging.info(' #0x%04x %s (%s)', msg_id,
klass.__name__, handler)
# Default to EpollEventManager.
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -175,9 +175,7 @@ class EventHandler(object):
if your_uuid is None:
raise ProtocolError('No UUID supplied')
logging.info('connected to a primary master node')
if app.uuid != your_uuid:
app.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(your_uuid))
app.setUUID(your_uuid)
app.id_timestamp = None
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
......@@ -201,6 +199,9 @@ class EventHandler(object):
if not conn.client:
conn.close()
def flushLog(self, conn):
logging.flush()
# Error packet handlers.
def error(self, conn, code, message, **kw):
......
#
# Copyright (C) 2015-2017 Nexedi SA
# Copyright (C) 2015-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
......@@ -26,7 +26,7 @@ from Queue import Empty
class LockUser(object):
def __init__(self, message, level=0):
def __init__(self, message=None, level=0):
t = threading.currentThread()
ident = getattr(t, 'node_name', t.name)
# This class is instantiated from a place desiring to known what
......@@ -42,6 +42,7 @@ class LockUser(object):
# current Neo directory structure.
path = os.path.join('...', *path.split(os.path.sep)[-3:])
self.time = time()
if message is not None:
self.ident = "%s@%r %s:%s %s" % (
ident, self.time, path, line_number, line)
self.note(message)
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -22,6 +22,9 @@ from time import time
from traceback import format_exception
import bz2, inspect, neo, os, signal, sqlite3, sys, threading
from .util import nextafter
INF = float('inf')
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE = ( 234360832 # extra memory used
- 16777264 # sum of raw data ('msg' attribute)
......@@ -61,9 +64,8 @@ class NEOLogger(Logger):
self.parent = root = getLogger()
if not root.handlers:
root.addHandler(self.default_root_handler)
self._db = None
self._record_queue = deque()
self._record_size = 0
self.__reset()
self._nid_dict = {}
self._async = set()
l = threading.Lock()
self._acquire = l.acquire
......@@ -77,6 +79,12 @@ class NEOLogger(Logger):
self._release = _release
self.backlog()
def __reset(self):
self._db = None
self._node = {}
self._record_queue = deque()
self._record_size = 0
def __enter__(self):
self._acquire()
return self._db
......@@ -96,7 +104,7 @@ class NEOLogger(Logger):
if self._db is None:
return
q = self._db.execute
if not q("SELECT id FROM packet LIMIT 1").fetchone():
if not q("SELECT 1 FROM packet LIMIT 1").fetchone():
q("DROP TABLE protocol")
# DROP TABLE already replaced previous data with zeros,
# so VACUUM is not really useful. But here, it should be free.
......@@ -151,9 +159,7 @@ class NEOLogger(Logger):
if self._db is not None:
self._db.close()
if not filename:
self._db = None
self._record_queue.clear()
self._record_size = 0
self.__reset()
return
if filename:
self._db = sqlite3.connect(filename, check_same_thread=False)
......@@ -163,45 +169,52 @@ class NEOLogger(Logger):
if 1: # Not only when logging everything,
# but also for interoperability with logrotate.
q("PRAGMA journal_mode = MEMORY")
for t, columns in (('log', (
"level INTEGER NOT NULL",
"pathname TEXT",
"lineno INTEGER",
"msg TEXT",
)),
('packet', (
"msg_id INTEGER NOT NULL",
"code INTEGER NOT NULL",
"peer TEXT NOT NULL",
"body BLOB",
))):
if reset:
for t in 'log', 'packet':
q('DROP TABLE IF EXISTS ' + t)
q("""CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
name TEXT,
level INTEGER NOT NULL,
pathname TEXT,
lineno INTEGER,
msg TEXT)
q('DROP TABLE IF EXISTS %s1' % t)
elif (2, 'name', 'TEXT', 0, None, 0) in q(
"PRAGMA table_info(%s)" % t):
q("ALTER TABLE %s RENAME TO %s1" % (t, t))
columns = (
"date REAL PRIMARY KEY",
"node INTEGER",
) + columns
q("CREATE TABLE IF NOT EXISTS %s (\n %s) WITHOUT ROWID"
% (t, ',\n '.join(columns)))
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY,
text BLOB NOT NULL) WITHOUT ROWID
""")
q("""CREATE INDEX IF NOT EXISTS _log_i1 ON log(date)""")
q("""CREATE TABLE IF NOT EXISTS packet (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date REAL NOT NULL,
q("""CREATE TABLE IF NOT EXISTS node (
id INTEGER PRIMARY KEY,
name TEXT,
msg_id INTEGER NOT NULL,
code INTEGER NOT NULL,
peer TEXT NOT NULL,
body BLOB)
""")
q("""CREATE INDEX IF NOT EXISTS _packet_i1 ON packet(date)""")
q("""CREATE TABLE IF NOT EXISTS protocol (
date REAL PRIMARY KEY NOT NULL,
text BLOB NOT NULL)
cluster TEXT,
nid INTEGER)
""")
with open(inspect.getsourcefile(p)) as p:
p = buffer(bz2.compress(p.read()))
for t, in q("SELECT text FROM protocol ORDER BY date DESC"):
if p == t:
break
else:
try:
t = self._record_queue[0].created
except IndexError:
t = time()
with self._db:
q("INSERT INTO protocol VALUES (?,?)", (t, p))
x = q("SELECT text FROM protocol ORDER BY date DESC LIMIT 1"
).fetchone()
if (x and x[0]) != p:
# In case of multithreading, we can have locally unsorted
# records so we can't find the oldest one (it may not be
# pushed to queue): let's use 0 on log rotation.
x = time() if x else 0
q("INSERT INTO protocol VALUES (?,?)", (x, p))
self._db.commit()
self._node = {x[1:]: x[0] for x in q("SELECT * FROM node")}
def setup(self, filename=None, reset=False):
with self:
......@@ -219,6 +232,20 @@ class NEOLogger(Logger):
return True
def _emit(self, r):
try:
nid = self._node[r._node]
except KeyError:
if r._node == (None, None, None):
nid = None
else:
try:
nid = 1 + max(x for x in self._node.itervalues()
if x is not None)
except ValueError:
nid = 0
self._db.execute("INSERT INTO node VALUES (?,?,?,?)",
(nid,) + r._node)
self._node[r._node] = nid
if type(r) is PacketRecord:
ip, port = r.addr
peer = ('%s %s ([%s]:%s)' if ':' in ip else '%s %s (%s:%s)') % (
......@@ -231,15 +258,22 @@ class NEOLogger(Logger):
"""
if msg is not None:
msg = buffer(msg)
self._db.execute("INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.msg_id, r.code, peer, msg))
q = "INSERT INTO packet VALUES (?,?,?,?,?,?)"
x = [r.created, nid, r.msg_id, r.code, peer, msg]
else:
pathname = os.path.relpath(r.pathname, *neo.__path__)
self._db.execute("INSERT INTO log VALUES (NULL,?,?,?,?,?,?)",
(r.created, r._name, r.levelno, pathname, r.lineno, r.msg))
q = "INSERT INTO log VALUES (?,?,?,?,?,?)"
x = [r.created, nid, r.levelno, pathname, r.lineno, r.msg]
while 1:
try:
self._db.execute(q, x)
break
except sqlite3.IntegrityError:
x[0] = nextafter(x[0], INF)
def _queue(self, record):
record._name = self.name and str(self.name)
name = self.name and str(self.name)
record._node = (name,) + self._nid_dict.get(name, (None, None))
self._acquire()
try:
if self._max_size is None:
......@@ -286,6 +320,18 @@ class NEOLogger(Logger):
addr=connection.getAddress(),
msg=body))
def node(self, *cluster_nid):
name = self.name and str(self.name)
prev = self._nid_dict.get(name)
if prev != cluster_nid:
from .protocol import uuid_str
self.info('Node ID: %s', uuid_str(cluster_nid[1]))
self._nid_dict[name] = cluster_nid
@property
def resetNids(self):
return self._nid_dict.clear
logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -118,8 +118,8 @@ class Node(object):
if connection.isServer():
self.setIdentified()
else:
assert force is not None, \
attributeTracker.whoSet(self, '_connection')
assert force is not None, (conn,
attributeTracker.whoSet(self, '_connection'))
# The test on peer_id is there to protect against buggy nodes.
# XXX: handler comparison does not cover all cases: there may
# be a pending handler change, which won't be detected, or a future
......@@ -130,6 +130,10 @@ class Node(object):
# the full-fledged functionality, and it is simpler this way.
if not force or conn.getPeerId() is not None or \
type(conn.getHandler()) is not type(connection.getHandler()):
# It may also happen in case of a network failure that is only
# noticed by the peer. We'd like to accept the new connection
# immediately but it's quite complicated. At worst (keepalive
# packets dropped), 'conn' will be closed in ~ 1 minute.
raise ProtocolError("already connected")
def on_closed():
self._connection = connection
......@@ -137,7 +141,6 @@ class Node(object):
self.setIdentified()
conn.setOnClose(on_closed)
conn.close()
assert not connection.isClosed(), connection
connection.setOnClose(self.onConnectionClosed)
def getConnection(self):
......
#
# Copyright (C) 2015-2017 Nexedi SA
# Copyright (C) 2015-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION = 4
PROTOCOL_VERSION = 5
ENCODED_VERSION = Struct('!L').pack(PROTOCOL_VERSION)
# Avoid memory errors on corrupted data.
......@@ -1631,6 +1631,13 @@ class Truncate(Packet):
_answer = Error
class FlushLog(Packet):
"""
Request all nodes to flush their logs.
:nodes: ctl -> A -> M -> *
"""
_next_code = 0
def register(request, ignore_when_closed=None):
......@@ -1806,6 +1813,8 @@ class Packets(dict):
AddObject)
Truncate = register(
Truncate)
FlushLog = register(
FlushLog)
def Errors():
registry_dict = {}
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,6 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import math
from collections import defaultdict
from functools import partial
from . import logging, protocol
from .locking import Lock
from .protocol import uuid_str, CellStates
......@@ -256,39 +258,32 @@ class PartitionTable(object):
def _format(self):
"""Help debugging partition table management.
Output sample:
pt: node 0: S1, R
pt: node 1: S2, R
pt: node 2: S3, R
pt: node 3: S4, R
pt: 00: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
pt: 11: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U
Here, there are 4 nodes in RUNNING state.
The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
2 (nodes 0 and 3 are displayed as unused for that partition by
displaying a dot).
The first number on the left represents the number of the first
partition on the line (here, line length is 11 to keep the docstring
width under 80 column).
Output sample (np=48, nr=0, just after a 3rd node is added):
pt: 10v 20v 30v 40v
pt: S1 R U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.
pt: S2 R .U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF.U.FOF
pt: S3 R ..O..O..O..O..O..O..O..O..O..O..O..O..O..O..O..O
The first line helps to locate a nth partition ('v' is an bottom arrow)
and it is omitted when the table has less than 10 partitions.
"""
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(node_list)]
append = result.append
line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line
prefix = 0
prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self._formatRows(node_list)):
if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = []
prefix = offset
line.append(row)
if line:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
if not node_list:
return ()
cell_state_dict = protocol.cell_state_prefix_dict
node_dict = defaultdict(partial(bytearray, '.' * self.np))
for offset, row in enumerate(self.partition_list):
for cell in row:
node_dict[cell.getNode()][offset] = \
cell_state_dict[cell.getState()]
n = len(uuid_str(node_list[-1].getUUID()))
result = [''.join('%9sv' % x if x else 'pt:' + ' ' * (5 + n)
for x in xrange(0, self.np, 10))
] if self.np > 10 else []
result.extend('pt: %-*s %s %s' % (n, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()],
node_dict[node])
for node in node_list)
return result
def _formatRows(self, node_list):
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,9 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import thread, threading, weakref
from . import logging
from . import debug, logging
from .app import BaseApplication
from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher
from .locking import SimpleQueue
......@@ -28,7 +27,10 @@ class app_set(weakref.WeakSet):
app.log()
app_set = app_set()
registerLiveDebugger(app_set.on_log)
def registerLiveDebugger():
debug.register(app_set.on_log)
registerLiveDebugger()
class ThreadContainer(threading.local):
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -23,6 +23,20 @@ from Queue import deque
from struct import pack, unpack, Struct
from time import gmtime
# https://stackoverflow.com/a/6163157
def nextafter():
global nextafter
from ctypes import CDLL, util as ctypes_util, c_double
from time import time
_libm = CDLL(ctypes_util.find_library('m'))
nextafter = _libm.nextafter
nextafter.restype = c_double
nextafter.argtypes = c_double, c_double
x = time()
y = nextafter(x, float('inf'))
assert x < y and (x+y)/2 in (x,y), (x, y)
nextafter()
TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1
SECOND_PER_TID_LOW = 60.0 / TID_LOW_OVERFLOW
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,10 +18,10 @@ import sys
from collections import defaultdict
from time import time
from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib import logging, util
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection
......@@ -47,6 +47,7 @@ from .transactions import TransactionManager
from .verification import VerificationManager
@buildOptionParser
class Application(BaseApplication):
"""The master node application."""
packing = None
......@@ -57,7 +58,7 @@ class Application(BaseApplication):
backup_app = None
truncate_tid = None
def uuid(self, uuid):
def setUUID(self, uuid):
node = self.nm.getByUUID(uuid)
if node is not self._node:
if node:
......@@ -65,33 +66,56 @@ class Application(BaseApplication):
if node.isConnected(True):
node.getConnection().close()
self._node.setUUID(uuid)
uuid = property(lambda self: self._node.getUUID(), uuid)
logging.node(self.name, uuid)
uuid = property(lambda self: self._node.getUUID(), setUUID)
@property
def election(self):
if self.primary and self.cluster_state == ClusterStates.RECOVERING:
return self.primary
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
" cluster, this should be the total number of storage nodes)")
_('C', 'upstream-cluster',
help='the name of cluster to backup')
_('M', 'upstream-masters', parse=util.parseMasterList,
help='list of master nodes in the cluster to backup')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
config.get('ssl'), config.get('dynamic_master_list'))
self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster()
self.server = config.getBind()
self.autostart = config.getAutostart()
self.name = config['cluster']
self.server = config['bind']
self.autostart = config.get('autostart')
self.storage_ready_dict = {}
self.storage_starting_set = set()
for master_address in config.getMasters():
for master_address in config['masters']:
self.nm.createMaster(address=master_address)
self._node = self.nm.createMaster(address=self.server,
uuid=config.getUUID())
uuid=config.get('uuid'))
logging.node(self.name, self.uuid)
logging.debug('IP address is %s, port is %d', *self.server)
# Partition table
replicas, partitions = config.getReplicas(), config.getPartitions()
replicas = config['replicas']
partitions = config['partitions']
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
if partitions <= 0:
......@@ -107,13 +131,13 @@ class Application(BaseApplication):
self._current_manager = None
# backup
upstream_cluster = config.getUpstreamCluster()
upstream_cluster = config.get('upstream_cluster')
if upstream_cluster:
if upstream_cluster == self.name:
raise ValueError("upstream cluster name must be"
" different from cluster name")
self.backup_app = BackupApplication(self, upstream_cluster,
config.getUpstreamMasters())
config['upstream_masters'])
self.administration_handler = administration.AdministrationHandler(
self)
......@@ -242,7 +266,6 @@ class Application(BaseApplication):
if self.uuid is None:
self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
logging.info('My UUID: ' + uuid_str(self.uuid))
self._node.setRunning()
self._node.id_timestamp = None
self.primary = monotonic_time()
......@@ -575,3 +598,12 @@ class Application(BaseApplication):
def getStorageReadySet(self, readiness=float('inf')):
return {k for k, v in self.storage_ready_dict.iteritems()
if v <= readiness}
def notifyTransactionAborted(self, ttid, uuids):
uuid_set = self.getStorageReadySet()
uuid_set.intersection_update(uuids)
if uuid_set:
p = Packets.AbortTransaction(ttid, ())
getByUUID = self.nm.getByUUID
for uuid in uuid_set:
getByUUID(uuid).send(p)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2017 Nexedi SA
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -82,6 +82,11 @@ class BackupApplication(object):
self.nm.close()
del self.__dict__
def setUUID(self, uuid):
if self.uuid != uuid:
self.uuid = uuid
logging.info('Upstream Node ID: %s', uuid_str(uuid))
def log(self):
self.nm.log()
if self.pt is not None:
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -46,6 +46,13 @@ class AdministrationHandler(MasterHandler):
if node is not None:
self.app.nm.remove(node)
def flushLog(self, conn):
p = Packets.FlushLog()
for node in self.app.nm.getConnectedList():
c = node.getConnection()
c is conn or c.send(p)
super(AdministrationHandler, self).flushLog(conn)
def setClusterState(self, conn, state):
app = self.app
# check request
......
#
# Copyright (C) 2012-2017 Nexedi SA
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -27,7 +27,8 @@ class ClientServiceHandler(MasterHandler):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
assert node is not None, conn
app.tm.clientLost(node)
for x in app.tm.clientLost(node):
app.notifyTransactionAborted(*x)
node.setUnknown()
app.broadcastNodesInformation([node])
......@@ -37,7 +38,7 @@ class ClientServiceHandler(MasterHandler):
"""
app = self.app
# Delay new transaction as long as we are waiting for NotifyReady
# answers, otherwise we can know if the client is expected to commit
# answers, otherwise we can't know if the client is expected to commit
# the transaction in full to all these storage nodes.
if app.storage_starting_set:
raise DelayEvent
......@@ -121,12 +122,7 @@ class ClientServiceHandler(MasterHandler):
app = self.app
involved = app.tm.abort(tid, conn.getUUID())
involved.update(uuid_list)
involved.intersection_update(app.getStorageReadySet())
if involved:
p = Packets.AbortTransaction(tid, ())
getByUUID = app.nm.getByUUID
for involved in involved:
getByUUID(involved).send(p)
app.notifyTransactionAborted(tid, involved)
# like ClientServiceHandler but read-only & only for tid <= backup_tid
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -444,7 +444,9 @@ class TransactionManager(EventQueue):
def clientLost(self, node):
for txn in self._ttid_dict.values():
if txn.clientLost(node):
del self[txn.getTTID()]
tid = txn.getTTID()
del self[tid]
yield tid, txn.getNotificationUUIDList()
def log(self):
logging.info('Transactions:')
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, formatNodeList, \
......@@ -38,6 +39,7 @@ action_dict = {
'kill': 'killNode',
'prune_orphan': 'pruneOrphan',
'truncate': 'truncate',
'flush_log': 'flushLog',
}
# ex 'S2' -> u32
......@@ -253,6 +255,15 @@ class TerminalNeoCTL(object):
partition_dict = dict.fromkeys(xrange(np), source)
self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)
def flushLog(self, params):
"""
Ask all nodes in the cluster to flush their logs.
If there are backup clusters, only their primary masters flush.
"""
assert not params
self.neoctl.flushLog()
class Application(object):
"""The storage node application."""
......@@ -267,18 +278,18 @@ class Application(object):
# state (RUNNING, DOWN...) and modify the partition if asked
# set cluster name [shutdown|operational] : either shutdown the
# cluster or mark it as operational
if not args:
return self.usage()
current_action = action_dict
level = 0
while current_action is not None and \
level < len(args) and \
try:
while level < len(args) and \
isinstance(current_action, dict):
current_action = current_action.get(args[level])
current_action = current_action[args[level]]
level += 1
action = None
if isinstance(current_action, basestring):
action = getattr(self.neoctl, current_action, None)
if action is None:
return self.usage('unknown command')
except KeyError:
sys.exit('invalid command: ' + ' '.join(args))
action = getattr(self.neoctl, current_action)
try:
return action(args[level:])
except NotReadyException, message:
......@@ -313,8 +324,8 @@ class Application(object):
for x in docstring_line_list])
return '\n'.join(result)
def usage(self, message):
output_list = (message, 'Available commands:', self._usage(action_dict),
def usage(self):
output_list = ('Available commands:', self._usage(action_dict),
"TID arguments can be either integers or timestamps as floats,"
" e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
" for 2012-01-01 12:34:56 UTC")
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.app import BaseApplication
import argparse
from neo.lib import util
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from .handler import CommandEventHandler
......@@ -22,11 +24,24 @@ from .handler import CommandEventHandler
class NotReadyException(Exception):
pass
@buildOptionParser
class NeoCTL(BaseApplication):
connection = None
connected = False
@classmethod
def _buildOptionParser(cls):
# XXX: Use argparse sub-commands.
parser = cls.option_parser
parser.description = "NEO Control node"
parser('a', 'address', default='127.0.0.1:9999',
parse=lambda x: util.parseNodeAddress(x, 9999),
help="address of an admin node")
parser.argument('cmd', nargs=argparse.REMAINDER,
help="command to execute; if not supplied,"
" the list of available commands is displayed")
def __init__(self, address, **kw):
super(NeoCTL, self).__init__(**kw)
self.server = self.nm.createAdmin(address=address)
......@@ -189,3 +204,9 @@ class NeoCTL(BaseApplication):
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def flushLog(self):
conn = self.__getConnection()
conn.send(Packets.FlushLog())
while conn.pending():
self.em.poll(1)
......@@ -2,7 +2,7 @@
#
# neoadmin - run an administrator node of NEO
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,27 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process')
defaults = dict(
bind = '127.0.0.1:9999',
masters = '127.0.0.1:10000',
)
def main(args=None):
# build configuration dict from command line options
(options, args) = parser.parse_args(args=args)
config = ConfigurationManager(defaults, options, 'admin')
from neo.admin.app import Application
config = Application.option_parser.parse(args)
# setup custom logging
logging.setup(config.getLogfile())
logging.setup(config.get('logfile'))
# and then, load and run the application
from neo.admin.app import Application
app = Application(config)
app.run()
#!/usr/bin/env python
#
# neoadmin - run an administrator node of NEO
# neoctl - command-line interface to an administrator node of NEO
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,30 +18,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.config import getOptionParser
from neo.lib.util import parseNodeAddress
parser = getOptionParser()
parser.add_option('-a', '--address', help = 'specify the address (ip:port) ' \
'of an admin node', default = '127.0.0.1:9999')
def main(args=None):
(options, args) = parser.parse_args(args=args)
if options.address is not None:
address = parseNodeAddress(options.address, 9999)
else:
address = ('127.0.0.1', 9999)
from neo.neoctl.neoctl import NeoCTL
config = NeoCTL.option_parser.parse(args)
if options.logfile:
logfile = config.get('logfile')
if logfile:
# Contrary to daemons, we log everything to disk automatically
# because a user using -l option here:
# - is certainly debugging an issue and wants everything,
# - would not have to time to send SIGRTMIN before neoctl exits.
logging.backlog(None)
logging.setup(options.logfile)
from neo.neoctl.app import Application
logging.setup(logfile)
ssl = options.ca, options.cert, options.key
r = Application(address, ssl=ssl if any(ssl) else None).execute(args)
from neo.neoctl.app import Application
app = Application(config['address'], ssl=config.get('ssl'))
r = app.execute(config['cmd'])
if r is not None:
print r
This diff is collapsed.
......@@ -2,7 +2,7 @@
#
# neomaster - run a master node of NEO
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,39 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='the node UUID (testing purpose)')
parser.add_option('-r', '--replicas', help = 'replicas number')
parser.add_option('-p', '--partitions', help = 'partitions number')
parser.add_option('-A', '--autostart',
help='minimum number of pending storage nodes to automatically start'
' new cluster (to avoid unwanted recreation of the cluster,'
' this should be the total number of storage nodes)')
parser.add_option('-C', '--upstream-cluster',
help='the name of cluster to backup')
parser.add_option('-M', '--upstream-masters',
help='list of master nodes in cluster to backup')
defaults = dict(
bind = '127.0.0.1:10000',
masters = '',
replicas = 0,
partitions = 100,
)
def main(args=None):
# build configuration dict from command line options
(options, args) = parser.parse_args(args=args)
config = ConfigurationManager(defaults, options, 'master')
from neo.master.app import Application
config = Application.option_parser.parse(args)
# setup custom logging
logging.backlog(max_size=None) # log without delay
logging.setup(config.getLogfile())
logging.setup(config.get('logfile'))
# and then, load and run the application
from neo.master.app import Application
app = Application(config)
app.run()
#!/usr/bin/env python
#
# neomaster - run a master node of NEO
# neomigrate - import/export data between NEO and a FileStorage
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -17,53 +17,62 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.config import getOptionParser
from __future__ import print_function
import time
import os
from neo.lib.app import buildOptionParser
# register options
parser = getOptionParser()
parser.add_option('-s', '--source', help='the source database')
parser.add_option('-d', '--destination', help='the destination database')
parser.add_option('-c', '--cluster', help='the NEO cluster name')
import_warning = (
"WARNING: This is not the recommended way to import data to NEO:"
" you should use the Importer backend instead.\n"
"NEO also does not implement IStorageRestoreable interface, which"
" means that undo information is not preserved when using this tool:"
" conflict resolution could happen when undoing an old transaction."
)
def main(args=None):
# parse options
(options, args) = parser.parse_args(args=args)
source = options.source or None
destination = options.destination or None
cluster = options.cluster or None
@buildOptionParser
class NEOMigrate(object):
from neo.lib.config import OptionList
@classmethod
def _buildOptionParser(cls):
parser = cls.option_parser
parser.description = "NEO <-> FileStorage conversion tool"
parser('c', 'cluster', required=True, help='the NEO cluster name')
parser.bool('q', 'quiet', help='print nothing to standard output')
parser.argument('source', help='the source database')
parser.argument('destination', help='the destination database')
# check options
if source is None or destination is None:
raise RuntimeError('Source and destination databases must be supplied')
if cluster is None:
raise RuntimeError('The NEO cluster name must be supplied')
def __init__(self, config):
self.name = config.pop('cluster')
self.source = config.pop('source')
self.destination = config.pop('destination')
self.quiet = config.pop('quiet', False)
# open storages
from ZODB.FileStorage import FileStorage
from neo.client.Storage import Storage as NEOStorage
if os.path.exists(source):
""" (remove vvv warning from neo/go test output)
print("WARNING: This is not the recommended way to import data to NEO:"
" you should use the Importer backend instead.\n"
"NEO also does not implement IStorageRestoreable interface,"
" which means that undo information is not preserved when using"
" this tool: conflict resolution could happen when undoing an"
" old transaction.")
"""
src = FileStorage(file_name=source, read_only=True)
dst = NEOStorage(master_nodes=destination, name=cluster,
logfile=options.logfile)
if os.path.exists(self.source):
if not self.quiet:
print(import_warning)
self.src = FileStorage(file_name=self.source, read_only=True)
self.dst = NEOStorage(master_nodes=self.destination, name=self.name,
**config)
else:
src = NEOStorage(master_nodes=source, name=cluster,
logfile=options.logfile, read_only=True)
dst = FileStorage(file_name=destination)
self.src = NEOStorage(master_nodes=self.source, name=self.name,
read_only=True, **config)
self.dst = FileStorage(file_name=self.destination)
# do the job
print "Migrating from %s to %s" % (source, destination)
def run(self):
if not self.quiet:
print("Migrating from %s to %s" % (self.source, self.destination))
start = time.time()
dst.copyTransactionsFrom(src)
self.dst.copyTransactionsFrom(self.src)
if not self.quiet:
elapsed = time.time() - start
print "Migration done in %3.5f" % (elapsed, )
print("Migration done in %3.5f" % elapsed)
def main(args=None):
config = NEOMigrate.option_parser.parse(args)
NEOMigrate(config).run()
......@@ -2,7 +2,7 @@
#
# neostorage - run a storage node of NEO
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,49 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process. Previously assigned UUID takes precedence (ie ' \
'you should always use --reset with this switch)')
parser.add_option('-a', '--adapter', help = 'database adapter to use')
parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data when setting'
' up a new storage node (for RocksDB, check'
' https://github.com/facebook/mysql-5.6/issues/702)')
parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current'
' implementation is inefficient (this option should'
' disappear in the future)')
parser.add_option('--reset', action='store_true',
help='remove an existing database if any, and exit')
defaults = dict(
bind = '127.0.0.1',
masters = '127.0.0.1:10000',
adapter = 'MySQL',
)
def main(args=None):
# TODO: Forbid using "reset" along with any unneeded argument.
# "reset" is too dangerous to let user a chance of accidentally
# letting it slip through in a long option list.
# We should drop support configuration files to make such check useful.
(options, args) = parser.parse_args(args=args)
config = ConfigurationManager(defaults, options, 'storage')
from neo.storage.app import Application
config = Application.option_parser.parse(args)
# setup custom logging
logging.setup(config.getLogfile())
logging.setup(config.get('logfile'))
# and then, load and run the application
from neo.storage.app import Application
app = Application(config)
if not config.getReset():
if not config.get('reset'):
app.run()
#!/usr/bin/env python
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import traceback
import unittest
import time
......@@ -275,41 +276,43 @@ class NeoTestRunner(unittest.TextTestResult):
class TestRunner(BenchmarkRunner):
def add_options(self, parser):
parser.add_option('-c', '--coverage', action='store_true',
x = parser.add_mutually_exclusive_group().add_argument
x('-c', '--coverage', action='store_true',
help='Enable coverage')
parser.add_option('-C', '--cov-unit', action='store_true',
x('-C', '--cov-unit', action='store_true',
help='Same as -c but output 1 file per test,'
' in the temporary test directory')
parser.add_option('-L', '--log', action='store_true',
_ = parser.add_argument
_('-L', '--log', action='store_true',
help='Force all logs to be emitted immediately and keep'
' packet body in logs of successful threaded tests')
parser.add_option('-l', '--loop', type='int', default=1,
_('-l', '--loop', type=int, default=1,
help='Repeat tests several times')
parser.add_option('-f', '--functional', action='store_true',
_('-f', '--functional', action='store_true',
help='Functional tests')
parser.add_option('-s', '--stop-on-error', action='store_false',
dest='stop_on_success',
x = parser.add_mutually_exclusive_group().add_argument
x('-s', '--stop-on-error', action='store_false',
dest='stop_on_success', default=None,
help='Continue as long as tests pass successfully.'
' It is usually combined with --loop, to check that tests'
' do not fail randomly.')
parser.add_option('-S', '--stop-on-success', action='store_true',
x('-S', '--stop-on-success', action='store_true', default=None,
help='Opposite of --stop-on-error: stop as soon as a test'
' passes. Details about errors are not printed at exit.')
parser.add_option('-r', '--readable-tid', action='store_true',
_('-r', '--readable-tid', action='store_true',
help='Change master behaviour to generate readable TIDs for easier'
' debugging (rather than from current time).')
parser.add_option('-u', '--unit', action='store_true',
_('-u', '--unit', action='store_true',
help='Unit & threaded tests')
parser.add_option('-z', '--zodb', action='store_true',
_('-z', '--zodb', action='store_true',
help='ZODB test suite running on a NEO')
parser.add_option('-v', '--verbose', action='store_true',
_('-v', '--verbose', action='store_true',
help='Verbose output')
parser.usage += " [[!] module [test...]]"
parser.format_epilog = lambda _: """
Positional:
Filter by given module/test. These arguments are shell patterns.
This implies -ufz if none of this option is passed.
_('only', nargs=argparse.REMAINDER, metavar='[[!] module [test...]]',
help="Filter by given module/test. These arguments are shell"
" patterns. This implies -ufz if none of this option is"
" passed.")
parser.epilog = """
Environment Variables:
NEO_TESTS_ADAPTER Default is SQLite for threaded clusters,
MySQL otherwise.
......@@ -330,25 +333,23 @@ Environment Variables:
NEO_TEST_ZODB_STORAGES default: 1
""" % neo_tests__dict__
def load_options(self, options, args):
if options.coverage and options.cov_unit:
sys.exit('-c conflicts with -C')
if not (options.unit or options.functional or options.zodb):
if not args:
def load_options(self, args):
if not (args.unit or args.functional or args.zodb):
if not args.only:
sys.exit('Nothing to run, please give one of -f, -u, -z')
options.unit = options.functional = options.zodb = True
args.unit = args.functional = args.zodb = True
return dict(
log = options.log,
loop = options.loop,
unit = options.unit,
functional = options.functional,
zodb = options.zodb,
verbosity = 2 if options.verbose else 1,
coverage = options.coverage,
cov_unit = options.cov_unit,
only = args,
stop_on_success = options.stop_on_success,
readable_tid = options.readable_tid,
log = args.log,
loop = args.loop,
unit = args.unit,
functional = args.functional,
zodb = args.zodb,
verbosity = 2 if args.verbose else 1,
coverage = args.coverage,
cov_unit = args.cov_unit,
only = args.only,
stop_on_success = args.stop_on_success,
readable_tid = args.readable_tid,
)
def start(self):
......
#!/usr/bin/env python
#
# Copyright (C) 2011-2017 Nexedi SA
# Copyright (C) 2011-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -15,9 +15,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect, random
import argparse, inspect, random
from logging import getLogger, INFO, DEBUG
from optparse import OptionParser
from neo.lib import logging
from neo.tests import functional
#logging.backlog()
......@@ -27,9 +26,9 @@ del logging.default_root_handler.handle
def main():
args, _, _, defaults = inspect.getargspec(functional.NEOCluster.__init__)
option_list = zip(args[-len(defaults):], defaults)
parser = OptionParser(usage="%prog [options] [db...]",
parser = argparse.ArgumentParser(
description="Quickly setup a simple NEO cluster for testing purpose.")
parser.add_option('--seed', help="settings like node ports/uuids and"
parser.add_argument('--seed', help="settings like node ports/uuids and"
" cluster name are random: pass any string to initialize the RNG")
defaults = {}
for option, default in sorted(option_list):
......@@ -40,14 +39,15 @@ def main():
elif default is not None:
defaults[option] = default
if isinstance(default, int):
kw['type'] = "int"
parser.add_option('--' + option, **kw)
kw['type'] = int
parser.add_argument('--' + option, **kw)
parser.set_defaults(**defaults)
options, args = parser.parse_args()
if options.seed:
functional.random = random.Random(options.seed)
parser.add_argument('db', nargs='+')
args = parser.parse_args()
if args.seed:
functional.random = random.Random(args.seed)
getLogger().setLevel(DEBUG)
cluster = functional.NEOCluster(args, **{x: getattr(options, x)
cluster = functional.NEOCluster(args.db, **{x: getattr(args, x)
for x, _ in option_list})
try:
cluster.run()
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,44 +18,88 @@ import sys
from collections import deque
from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.connection import ListeningConnection
from neo.lib.exception import StoppedOperation, PrimaryFailure
from neo.lib.pt import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
from .checker import Checker
from .database import buildDatabaseManager
from .database import buildDatabaseManager, DATABASE_MANAGER_DICT
from .handlers import identification, initialization, master
from .replicator import Replicator
from .transactions import TransactionManager
from neo.lib.debug import register as registerLiveDebugger
option_defaults = {
'adapter': 'MySQL',
'wait': 0,
}
assert option_defaults['adapter'] in DATABASE_MANAGER_DICT
@buildOptionParser
class Application(BaseApplication):
"""The storage node application."""
checker = replicator = tm = None
@classmethod
def _buildOptionParser(cls):
parser = cls.option_parser
parser.description = "NEO Storage node"
cls.addCommonServerOptions('storage', '127.0.0.1')
_ = parser.group('storage')
_('a', 'adapter', choices=sorted(DATABASE_MANAGER_DICT),
help="database adapter to use")
_('d', 'database', required=True,
help="database connections string")
_.float('w', 'wait',
help="seconds to wait for backend to be available,"
" before erroring-out (-1 = infinite)")
_.bool('disable-drop-partitions',
help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is"
" inefficient (this option should disappear in the future)")
_ = parser.group('database creation')
_.int('u', 'uuid',
help="specify an UUID to use for this process. Previously"
" assigned UUID takes precedence (i.e. you should"
" always use reset with this switch)")
_('e', 'engine', help="database engine (MySQL only)")
_.bool('dedup',
help="enable deduplication of data"
" when setting up a new storage node")
# TODO: Forbid using "reset" along with any unneeded argument.
# "reset" is too dangerous to let user a chance of accidentally
# letting it slip through in a long option list.
# It should even be forbidden in configuration files.
_.bool('reset',
help="remove an existing database if any, and exit")
parser.set_defaults(**option_defaults)
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
config.get('ssl'), config.get('dynamic_master_list'))
# set the cluster name
self.name = config.getCluster()
self.name = config['cluster']
self.dm = buildDatabaseManager(config.getAdapter(),
(config.getDatabase(), config.getEngine(), config.getWait()),
self.dm = buildDatabaseManager(config['adapter'],
(config['database'], config.get('engine'), config['wait']),
)
self.disable_drop_partitions = config.getDisableDropPartitions()
self.disable_drop_partitions = config.get('disable_drop_partitions',
False)
# load master nodes
for master_address in config.getMasters():
for master_address in config['masters']:
self.nm.createMaster(address=master_address)
# set the bind address
self.server = config.getBind()
self.server = config['bind']
logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of
......@@ -69,13 +113,15 @@ class Application(BaseApplication):
# operation related data
self.operational = False
self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.dm.setup(reset=config.get('reset', False),
dedup=config.get('dedup', False))
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None:
self.uuid = config.getUUID()
if 'uuid' in config:
self.uuid = config['uuid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log)
......@@ -111,6 +157,7 @@ class Application(BaseApplication):
# load configuration
self.uuid = dm.getUUID()
logging.node(self.name, self.uuid)
num_partitions = dm.getNumPartitions()
num_replicas = dm.getNumReplicas()
ptid = dm.getPTID()
......@@ -123,7 +170,6 @@ class Application(BaseApplication):
self.pt = PartitionTable(num_partitions, num_replicas)
logging.info('Configuration loaded:')
logging.info('UUID : %s', uuid_str(self.uuid))
logging.info('PTID : %s', dump(ptid))
logging.info('Name : %s', self.name)
logging.info('Partitions: %s', num_partitions)
......@@ -208,9 +254,7 @@ class Application(BaseApplication):
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
uuid = self.uuid
logging.info('I am %s', uuid_str(uuid))
self.dm.setUUID(uuid)
self.dm.setUUID(self.uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
......
#
# Copyright (C) 2012-2017 Nexedi SA
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2014-2017 Nexedi SA
# Copyright (C) 2014-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -31,6 +31,7 @@ except ImportError:
_protocol = 1
from ZODB.FileStorage import FileStorage
from ..app import option_defaults
from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager
from neo.lib import compress, logging, patch, util
......@@ -359,8 +360,7 @@ class ImporterDatabaseManager(DatabaseManager):
config = SafeConfigParser()
config.read(os.path.expanduser(database))
sections = config.sections()
# XXX: defaults copy & pasted from elsewhere - refactoring needed
main = self._conf = {'adapter': 'MySQL', 'wait': 0}
main = self._conf = option_defaults.copy()
main.update(config.items(sections.pop(0)))
self.zodb = [(x, dict(config.items(x))) for x in sections]
x = main.get('compress', 'true')
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2012-2017 Nexedi SA
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -179,7 +179,8 @@ class SQLiteDatabaseManager(DatabaseManager):
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
PRIMARY KEY (partition, tid)
) WITHOUT ROWID
"""
# The table "obj" stores committed object metadata.
......@@ -189,7 +190,8 @@ class SQLiteDatabaseManager(DatabaseManager):
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (partition, oid, tid))
PRIMARY KEY (partition, oid, tid)
) WITHOUT ROWID
"""
index_dict['obj'] = (
"CREATE INDEX %s ON %s(partition, tid, oid)",
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -62,7 +62,8 @@ class BaseMasterHandler(BaseHandler):
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:
logging.info('Notified of non-running client, abort (%s)',
uuid_str(uuid))
self.app.tm.abortFor(uuid)
# See comment in ClientOperationHandler.connectionClosed
self.app.tm.abortFor(uuid, even_if_voted=True)
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -18,7 +18,7 @@ from neo.lib import logging
from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
ZERO_HASH, ZERO_TID, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler
import time
......@@ -29,6 +29,24 @@ SLOW_STORE = 2
class ClientOperationHandler(BaseHandler):
def connectionClosed(self, conn):
logging.debug('connection closed for %r', conn)
app = self.app
if app.operational:
# Even if in most cases, abortFor is called from both this method
# and BaseMasterHandler.notifyPartitionChanges (especially since
# storage nodes disconnects unknown clients on their own), these 2
# handlers also cover distinct scenarios, so neither of them is
# redundant:
# - A client node may have network issues with this particular
# storage node and remain RUNNING: we may still be involved in
# the second phase so we only abort non-voted transactions here.
# By not taking part to any further deadlock avoidance,
# not releasing write-locks now would lead to a deadlock.
# - A client node may be disconnected from the master, whereas
# there are still voted (and not locked) transactions to abort.
app.tm.abortFor(conn.getUUID())
def askTransactionInformation(self, conn, tid):
t = self.app.dm.getTransaction(tid)
if t is None:
......@@ -72,26 +90,27 @@ class ClientOperationHandler(BaseHandler):
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, ttid, request_time):
try:
self.app.tm.storeObject(ttid, serial, oid, compression,
locked = self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerStoreObject(err.tid))
return
locked = err.tid
except NonReadableCell:
logging.info('Ignore store of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
locked = ZERO_TID
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
locked = ZERO_TID
else:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('StoreObject delay: %.02fs', duration)
conn.answer(Packets.AnswerStoreObject(None))
conn.answer(Packets.AnswerStoreObject(locked))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid):
......@@ -198,25 +217,26 @@ class ClientOperationHandler(BaseHandler):
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try:
self.app.tm.checkCurrentSerial(ttid, oid, serial)
locked = self.app.tm.checkCurrentSerial(ttid, oid, serial)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(err.tid))
return
locked = err.tid
except NonReadableCell:
logging.info('Ignore check of %s:%s by %s: unassigned partition',
dump(oid), dump(serial), dump(ttid))
locked = ZERO_TID
except NotRegisteredError:
# transaction was aborted, cancel this event
logging.info('Forget serial check of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
locked = ZERO_TID
else:
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(None))
conn.answer(Packets.AnswerCheckCurrentSerial(locked))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -53,16 +53,17 @@ class IdentificationHandler(EventHandler):
handler = ClientReadOnlyOperationHandler
else:
handler = ClientOperationHandler
assert not node.isConnected(), node
assert node.isRunning(), node
force = False
elif node_type == NodeTypes.STORAGE:
handler = StorageOperationHandler
force = app.uuid < uuid
else:
raise ProtocolError('reject non-client-or-storage node')
# apply the handler and set up the connection
handler = handler(self.app)
conn.setHandler(handler)
node.setConnection(conn, app.uuid < uuid)
node.setConnection(conn, force)
# accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2006-2017 Nexedi SA
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2018 Nexedi SA
# Copyright (C) 2018-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
This diff is collapsed.
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -28,6 +28,7 @@ import weakref
import MySQLdb
import transaction
from contextlib import contextmanager
from ConfigParser import SafeConfigParser
from cStringIO import StringIO
try:
......@@ -85,8 +86,6 @@ SSL = SSL + "ca.crt", SSL + "node.crt", SSL + "node.key"
logging.default_root_handler.handle = lambda record: None
debug.register()
# prevent "signal only works in main thread" errors in subprocesses
debug.register = lambda on_log=None: None
def mockDefaultValue(name, function):
def method(self, *args, **kw):
......@@ -215,6 +214,14 @@ class NeoTestBase(unittest.TestCase):
expected if isinstance(expected, str) else '|'.join(expected),
'|'.join(pt._formatRows(sorted(pt.count_dict, key=key))))
@contextmanager
def expectedFailure(self, exception=AssertionError, regex=None):
with self.assertRaisesRegexp(exception, regex) as cm:
yield
raise _UnexpectedSuccess
# XXX: passing sys.exc_info() causes deadlocks
raise _ExpectedFailure((type(cm.exception), None, None))
class NeoUnitTestBase(NeoTestBase):
""" Base class for neo tests, implements common checks """
......@@ -255,14 +262,14 @@ class NeoUnitTestBase(NeoTestBase):
assert master_number >= 1 and master_number <= 10
masters = ([(self.local_ip, 10010 + i)
for i in xrange(master_number)])
return Mock({
'getCluster': cluster,
'getBind': masters[0],
'getMasters': masters,
'getReplicas': replicas,
'getPartitions': partitions,
'getUUID': uuid,
})
return {
'cluster': cluster,
'bind': masters[0],
'masters': masters,
'replicas': replicas,
'partitions': partitions,
'uuid': uuid,
}
def getStorageConfiguration(self, cluster='main', master_number=2,
index=0, prefix=DB_PREFIX, uuid=None):
......@@ -277,15 +284,15 @@ class NeoUnitTestBase(NeoTestBase):
db = os.path.join(getTempDirectory(), 'test_neo%s.sqlite' % index)
else:
assert False, adapter
return Mock({
'getCluster': cluster,
'getBind': (masters[0], 10020 + index),
'getMasters': masters,
'getDatabase': db,
'getUUID': uuid,
'getReset': False,
'getAdapter': adapter,
})
return {
'cluster': cluster,
'bind': (masters[0], 10020 + index),
'masters': masters,
'database': db,
'uuid': uuid,
'adapter': adapter,
'wait': 0,
}
def getNewUUID(self, node_type):
"""
......
from __future__ import print_function
import argparse
import sys
import smtplib
import optparse
import platform
import datetime
from email.mime.multipart import MIMEMultipart
......@@ -22,28 +22,28 @@ class BenchmarkRunner(object):
def __init__(self):
self._successful = True
self._status = []
parser = optparse.OptionParser()
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter)
# register common options
parser.add_option('', '--title')
parser.add_option('', '--mail-to', action='append')
parser.add_option('', '--mail-from')
parser.add_option('', '--mail-server')
parser.add_option('', '--repeat', type='int', default=1)
_ = parser.add_argument
_('--title')
_('--mail-to', action='append')
_('--mail-from')
_('--mail-server')
self.add_options(parser)
# check common arguments
options, args = parser.parse_args()
if bool(options.mail_to) ^ bool(options.mail_from):
args = parser.parse_args()
if bool(args.mail_to) ^ bool(args.mail_from):
sys.exit('Need a sender and recipients to mail report')
mail_server = options.mail_server or MAIL_SERVER
mail_server = args.mail_server or MAIL_SERVER
# check specifics arguments
self._config = AttributeDict()
self._config.update(self.load_options(options, args))
self._config.update(self.load_options(args))
self._config.update(
title = options.title or self.__class__.__name__,
mail_from = options.mail_from,
mail_to = options.mail_to,
title = args.title or self.__class__.__name__,
mail_from = args.mail_from,
mail_to = args.mail_to,
mail_server = mail_server.split(':'),
repeat = options.repeat,
)
def add_status(self, key, value):
......@@ -104,7 +104,7 @@ class BenchmarkRunner(object):
""" Append options to command line parser """
raise NotImplementedError
def load_options(self, options, args):
def load_options(self, args):
""" Check options and return a configuration dict """
raise NotImplementedError
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2017 Nexedi SA
# Copyright (C) 2017-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2011-2017 Nexedi SA
# Copyright (C) 2011-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -118,13 +118,15 @@ class PortAllocator(object):
class Process(object):
_coverage_fd = None
_coverage_prefix = os.path.join(getTempDirectory(), 'coverage-')
_coverage_prefix = None
_coverage_index = 0
on_fork = [logging.resetNids]
pid = 0
def __init__(self, command, arg_dict={}):
def __init__(self, command, *args, **kw):
self.command = command
self.arg_dict = arg_dict
self.args = args
self.arg_dict = kw
def _args(self):
args = []
......@@ -132,6 +134,7 @@ class Process(object):
args.append('--' + arg)
if param is not None:
args.append(str(param))
args += self.args
return args
def start(self):
......@@ -144,6 +147,9 @@ class Process(object):
if coverage:
cls = self.__class__
cls._coverage_index += 1
if not cls._coverage_prefix:
cls._coverage_prefix = os.path.join(
getTempDirectory(), 'coverage-')
coverage_data_path = cls._coverage_prefix + str(cls._coverage_index)
self._coverage_fd, w = os.pipe()
def save_coverage(*args):
......@@ -169,11 +175,21 @@ class Process(object):
from coverage import Coverage
coverage = Coverage(coverage_data_path)
coverage.start()
# XXX: Sometimes, the handler is not called immediately.
# The process is stuck at an unknown place and the test
# never ends. strace unlocks:
# strace: Process 5520 attached
# close(25) = 0
# getpid() = 5520
# kill(5520, SIGSTOP) = 0
# ...
signal.signal(signal.SIGUSR2, save_coverage)
os.close(self._coverage_fd)
os.write(w, '\0')
sys.argv = [command] + args
setproctitle(self.command)
for on_fork in self.on_fork:
on_fork()
self.run()
status = 0
except SystemExit, e:
......@@ -239,8 +255,8 @@ class Process(object):
self.pid = 0
self.child_coverage()
if result:
raise NodeProcessError('%r %r exited with status %r' % (
self.command, self.arg_dict, result))
raise NodeProcessError('%r %r %r exited with status %r' % (
self.command, self.args, self.arg_dict, result))
return result
def stop(self):
......@@ -255,18 +271,18 @@ class Process(object):
class NEOProcess(Process):
def __init__(self, command, uuid, arg_dict):
def __init__(self, command, *args, **kw):
try:
__import__('neo.scripts.' + command, level=0)
except ImportError:
raise NotFound(command + ' not found')
super(NEOProcess, self).__init__(command, arg_dict)
self.setUUID(uuid)
self.setUUID(kw.pop('uuid', None))
super(NEOProcess, self).__init__(command, *args, **kw)
def _args(self):
args = super(NEOProcess, self)._args()
if self.uuid:
args += '--uuid', str(self.uuid)
args[:0] = '--uuid', str(self.uuid)
return args
def run(self):
......@@ -281,6 +297,10 @@ class NEOProcess(Process):
"""
self.uuid = uuid
@property
def logfile(self):
return self.arg_dict['logfile']
class NEOCluster(object):
SSL = None
......@@ -368,7 +388,7 @@ class NEOCluster(object):
if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL
self.process_dict.setdefault(node_type, []).append(
NEOProcess(command_dict[node_type], uuid, kw))
NEOProcess(command_dict[node_type], uuid=uuid, **kw))
def setupDB(self, clear_databases=True):
if self.adapter == 'MySQL':
......@@ -480,14 +500,15 @@ class NEOCluster(object):
except (AlreadyStopped, NodeProcessError):
pass
def getZODBStorage(self, **kw):
master_nodes = self.master_nodes.replace('/', ' ')
def getClientConfig(self, **kw):
kw['name'] = self.cluster_name
kw['master_nodes'] = self.master_nodes.replace('/', ' ')
if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL
result = Storage(
master_nodes=master_nodes,
name=self.cluster_name,
**kw)
return kw
def getZODBStorage(self, **kw):
result = Storage(**self.getClientConfig(**kw))
result.app.max_reconnection_to_master = 10
self.zodb_storage_list.append(result)
return result
......@@ -718,6 +739,7 @@ class NEOFunctionalTest(NeoTestBase):
def setupLog(self):
logging.setup(os.path.join(self.getTempDirectory(), 'test.log'))
logging.resetNids()
def getTempDirectory(self):
# build the full path based on test case and current test method
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import os
import unittest
import transaction
......@@ -25,7 +26,7 @@ from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest
from . import NEOCluster, NEOFunctionalTest, NEOProcess
TREE_SIZE = 6
......@@ -120,13 +121,13 @@ class ClientTests(NEOFunctionalTest):
self.__checkTree(tree.right, depth)
self.__checkTree(tree.left, depth)
def __getDataFS(self, reset=False):
def __getDataFS(self):
name = os.path.join(self.getTempDirectory(), 'data.fs')
if reset and os.path.exists(name):
if os.path.exists(name):
os.remove(name)
return FileStorage(file_name=name)
def __populate(self, db, tree_size=TREE_SIZE):
def __populate(self, db, tree_size=TREE_SIZE, with_undo=True):
if isinstance(db.storage, FileStorage):
from base64 import b64encode as undo_tid
else:
......@@ -146,6 +147,7 @@ class ClientTests(NEOFunctionalTest):
t2 = db.lastTransaction()
ob.left = left
transaction.commit()
if with_undo:
undo()
t4 = db.lastTransaction()
undo(t2)
......@@ -174,10 +176,37 @@ class ClientTests(NEOFunctionalTest):
(neo_db, neo_conn) = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees'])
def __dump(self, storage):
return {u64(t.tid): [(u64(o.oid), o.data_txn and u64(o.data_txn),
def testMigrationTool(self):
dfs_storage = self.__getDataFS()
dfs_db = ZODB.DB(dfs_storage)
self.__populate(dfs_db, with_undo=False)
dump = self.__dump(dfs_storage)
fs_path = dfs_storage.__name__
dfs_db.close()
neo = self.neo
neo.start()
kw = {'cluster': neo.cluster_name, 'quiet': None}
master_nodes = neo.master_nodes.replace('/', ' ')
if neo.SSL:
kw['ca'], kw['cert'], kw['key'] = neo.SSL
p = NEOProcess('neomigrate', fs_path, master_nodes, **kw)
p.start()
p.wait()
os.remove(fs_path)
p = NEOProcess('neomigrate', master_nodes, fs_path, **kw)
p.start()
p.wait()
self.assertEqual(dump, self.__dump(FileStorage(fs_path)))
def __dump(self, storage, sorted=sorted):
return {u64(t.tid): sorted((u64(o.oid), o.data_txn and u64(o.data_txn),
None if o.data is None else makeChecksum(o.data))
for o in t]
for o in t)
for t in storage.iterator()}
def testExport(self):
......@@ -186,10 +215,10 @@ class ClientTests(NEOFunctionalTest):
self.neo.start()
(neo_db, neo_conn) = self.neo.getZODBConnection()
self.__populate(neo_db)
dump = self.__dump(neo_db.storage)
dump = self.__dump(neo_db.storage, list)
# copy neo to data fs
dfs_storage = self.__getDataFS(reset=True)
dfs_storage = self.__getDataFS()
neo_storage = self.neo.getZODBStorage()
dfs_storage.copyTransactionsFrom(neo_storage)
......@@ -209,7 +238,10 @@ class ClientTests(NEOFunctionalTest):
self.neo.start()
neo_db, neo_conn = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees'])
self.assertEqual(dump, self.__dump(neo_db.storage))
# BUG: The following check is sometimes done whereas the import is not
# finished, resulting in a failure because getReplicationTIDList
# is not implemented by the Importer backend.
self.assertEqual(dump, self.__dump(neo_db.storage, list))
def testIPv6Client(self):
""" Test the connectivity of an IPv6 connection for neo client """
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
#
# Copyright (C) 2009-2017 Nexedi SA
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment