Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Stefane Fermigier
neo
Commits
5743cdce
Commit
5743cdce
authored
Mar 20, 2012
by
Julien Muchembled
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
neo.lib.logging.* -> logging.*
parent
1fce5cc4
Changes
40
Show whitespace changes
Inline
Side-by-side
Showing
40 changed files
with
319 additions
and
379 deletions
+319
-379
neo/admin/app.py
neo/admin/app.py
+4
-5
neo/admin/handler.py
neo/admin/handler.py
+6
-8
neo/client/Storage.py
neo/client/Storage.py
+4
-5
neo/client/app.py
neo/client/app.py
+22
-29
neo/client/handlers/master.py
neo/client/handlers/master.py
+6
-6
neo/client/handlers/storage.py
neo/client/handlers/storage.py
+5
-6
neo/client/poll.py
neo/client/poll.py
+3
-3
neo/client/pool.py
neo/client/pool.py
+7
-8
neo/lib/bootstrap.py
neo/lib/bootstrap.py
+4
-4
neo/lib/connection.py
neo/lib/connection.py
+21
-30
neo/lib/event.py
neo/lib/event.py
+8
-8
neo/lib/handler.py
neo/lib/handler.py
+15
-15
neo/lib/node.py
neo/lib/node.py
+11
-15
neo/lib/pt.py
neo/lib/pt.py
+5
-7
neo/master/app.py
neo/master/app.py
+22
-26
neo/master/backup_app.py
neo/master/backup_app.py
+13
-15
neo/master/handlers/__init__.py
neo/master/handlers/__init__.py
+2
-3
neo/master/handlers/administration.py
neo/master/handlers/administration.py
+8
-8
neo/master/handlers/election.py
neo/master/handlers/election.py
+5
-8
neo/master/handlers/storage.py
neo/master/handlers/storage.py
+3
-3
neo/master/recovery.py
neo/master/recovery.py
+7
-8
neo/master/transactions.py
neo/master/transactions.py
+9
-9
neo/master/verification.py
neo/master/verification.py
+8
-8
neo/storage/app.py
neo/storage/app.py
+19
-19
neo/storage/checker.py
neo/storage/checker.py
+10
-12
neo/storage/database/manager.py
neo/storage/database/manager.py
+4
-5
neo/storage/database/mysqldb.py
neo/storage/database/mysqldb.py
+10
-13
neo/storage/database/sqlite.py
neo/storage/database/sqlite.py
+6
-8
neo/storage/handlers/__init__.py
neo/storage/handlers/__init__.py
+4
-6
neo/storage/handlers/client.py
neo/storage/handlers/client.py
+10
-13
neo/storage/handlers/hidden.py
neo/storage/handlers/hidden.py
+2
-3
neo/storage/handlers/identification.py
neo/storage/handlers/identification.py
+3
-4
neo/storage/handlers/initialization.py
neo/storage/handlers/initialization.py
+4
-7
neo/storage/handlers/master.py
neo/storage/handlers/master.py
+4
-4
neo/storage/handlers/verification.py
neo/storage/handlers/verification.py
+2
-3
neo/storage/replicator.py
neo/storage/replicator.py
+6
-6
neo/storage/transactions.py
neo/storage/transactions.py
+19
-21
neo/tests/functional/__init__.py
neo/tests/functional/__init__.py
+8
-7
neo/tests/threaded/__init__.py
neo/tests/threaded/__init__.py
+8
-8
neo/tests/threaded/testReplication.py
neo/tests/threaded/testReplication.py
+2
-3
No files found.
neo/admin/app.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.node
import
NodeManager
from
neo.lib.event
import
EventManager
from
neo.lib.connection
import
ListeningConnection
...
...
@@ -41,7 +40,7 @@ class Application(object):
self
.
master_addresses
,
connector_name
=
config
.
getMasters
()
self
.
connector_handler
=
getConnectorHandler
(
connector_name
)
neo
.
lib
.
logging
.
debug
(
'IP address is %s, port is %d'
,
*
(
self
.
server
)
)
logging
.
debug
(
'IP address is %s, port is %d'
,
*
self
.
server
)
# The partition table is initialized after getting the number of
# partitions.
...
...
@@ -75,7 +74,7 @@ class Application(object):
try
:
self
.
_run
()
except
:
neo
.
lib
.
logging
.
exception
(
'Pre-mortem data:'
)
logging
.
exception
(
'Pre-mortem data:'
)
self
.
log
()
raise
...
...
@@ -95,7 +94,7 @@ class Application(object):
while
True
:
self
.
em
.
poll
(
1
)
except
PrimaryFailure
:
neo
.
lib
.
logging
.
error
(
'primary master is down'
)
logging
.
error
(
'primary master is down'
)
def
connectToPrimary
(
self
):
"""Find a primary master node, and connect to it.
...
...
neo/admin/handler.py
View file @
5743cdce
...
...
@@ -14,10 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
neo.lib
import
logging
,
protocol
from
neo.lib.handler
import
EventHandler
from
neo.lib
import
protocol
from
neo.lib.protocol
import
Packets
,
Errors
from
neo.lib.exception
import
PrimaryFailure
from
neo.lib.util
import
dump
...
...
@@ -39,7 +37,7 @@ class AdminEventHandler(EventHandler):
@
check_primary_master
def
askPartitionList
(
self
,
conn
,
min_offset
,
max_offset
,
uuid
):
neo
.
lib
.
logging
.
info
(
"ask partition list from %s to %s for %s"
,
logging
.
info
(
"ask partition list from %s to %s for %s"
,
min_offset
,
max_offset
,
dump
(
uuid
))
self
.
app
.
sendPartitionTable
(
conn
,
min_offset
,
max_offset
,
uuid
)
...
...
@@ -50,7 +48,7 @@ class AdminEventHandler(EventHandler):
node_filter
=
None
else
:
node_filter
=
lambda
n
:
n
.
getType
()
is
node_type
neo
.
lib
.
logging
.
info
(
"ask list of %s nodes"
,
node_type
)
logging
.
info
(
"ask list of %s nodes"
,
node_type
)
node_list
=
self
.
app
.
nm
.
getList
(
node_filter
)
node_information_list
=
[
node
.
asTuple
()
for
node
in
node_list
]
p
=
Packets
.
AnswerNodeList
(
node_information_list
)
...
...
@@ -58,7 +56,7 @@ class AdminEventHandler(EventHandler):
@
check_primary_master
def
setNodeState
(
self
,
conn
,
uuid
,
state
,
modify_partition_table
):
neo
.
lib
.
logging
.
info
(
"set node state for %s-%s"
,
dump
(
uuid
),
state
)
logging
.
info
(
"set node state for %s-%s"
,
dump
(
uuid
),
state
)
node
=
self
.
app
.
nm
.
getByUUID
(
uuid
)
if
node
is
None
:
raise
protocol
.
ProtocolError
(
'invalid uuid'
)
...
...
@@ -121,7 +119,7 @@ class MasterEventHandler(EventHandler):
def
answerNodeInformation
(
self
,
conn
):
# XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap)
neo
.
lib
.
logging
.
debug
(
"answerNodeInformation"
)
logging
.
debug
(
"answerNodeInformation"
)
def
notifyPartitionChanges
(
self
,
conn
,
ptid
,
cell_list
):
self
.
app
.
pt
.
update
(
ptid
,
cell_list
,
self
.
app
.
nm
)
...
...
neo/client/Storage.py
View file @
5743cdce
...
...
@@ -18,8 +18,8 @@ from ZODB import BaseStorage, ConflictResolution, POSException
from
zope.interface
import
implements
import
ZODB.interfaces
import
neo.lib
from
functools
import
wraps
from
neo.lib
import
logging
from
neo.lib.util
import
add64
from
neo.lib.protocol
import
ZERO_TID
from
.app
import
Application
...
...
@@ -71,7 +71,7 @@ class Storage(BaseStorage.BaseStorage,
if
compress
is
None
:
compress
=
True
if
logfile
:
neo
.
lib
.
logging
.
setup
(
logfile
)
logging
.
setup
(
logfile
)
BaseStorage
.
BaseStorage
.
__init__
(
self
,
'NEOStorage(%s)'
%
(
name
,
))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self
.
_is_read_only
=
read_only
...
...
@@ -238,9 +238,8 @@ class Storage(BaseStorage.BaseStorage,
def
pack
(
self
,
t
,
referencesf
,
gc
=
False
):
if
gc
:
neo
.
lib
.
logging
.
warning
(
'Garbage Collection is not available in NEO, '
'please use an external tool. Packing without GC.'
)
logging
.
warning
(
'Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.'
)
self
.
app
.
pack
(
t
)
def
lastSerial
(
self
):
...
...
neo/client/app.py
View file @
5743cdce
...
...
@@ -27,7 +27,7 @@ from ZODB.POSException import ReadConflictError
from
ZODB.ConflictResolution
import
ResolvedSerial
from
persistent.TimeStamp
import
TimeStamp
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.protocol
import
NodeTypes
,
Packets
,
\
INVALID_PARTITION
,
ZERO_HASH
,
ZERO_TID
from
neo.lib.event
import
EventManager
...
...
@@ -278,7 +278,7 @@ class Application(object):
"""
Lookup for the current primary master node
"""
neo
.
lib
.
logging
.
debug
(
'connecting to primary master...'
)
logging
.
debug
(
'connecting to primary master...'
)
ready
=
False
nm
=
self
.
nm
while
not
ready
:
...
...
@@ -309,8 +309,7 @@ class Application(object):
# Query for primary master node
if
conn
.
getConnector
()
is
None
:
# This happens if a connection could not be established.
neo
.
lib
.
logging
.
error
(
'Connection to master node %s failed'
,
logging
.
error
(
'Connection to master node %s failed'
,
self
.
trying_master_node
)
continue
try
:
...
...
@@ -322,15 +321,13 @@ class Application(object):
# If we reached the primary master node, mark as connected
connected
=
self
.
primary_master_node
is
not
None
and
\
self
.
primary_master_node
is
self
.
trying_master_node
neo
.
lib
.
logging
.
info
(
'Connected to %s'
%
(
self
.
primary_master_node
,
))
logging
.
info
(
'Connected to %s'
,
self
.
primary_master_node
)
try
:
ready
=
self
.
identifyToPrimaryNode
(
conn
)
except
ConnectionClosed
:
neo
.
lib
.
logging
.
error
(
'Connection to %s lost'
,
self
.
trying_master_node
)
logging
.
error
(
'Connection to %s lost'
,
self
.
trying_master_node
)
self
.
primary_master_node
=
None
neo
.
lib
.
logging
.
info
(
"Connected and ready"
)
logging
.
info
(
"Connected and ready"
)
return
conn
def
identifyToPrimaryNode
(
self
,
conn
):
...
...
@@ -339,7 +336,7 @@ class Application(object):
Might raise ConnectionClosed so that the new primary can be
looked-up again.
"""
neo
.
lib
.
logging
.
info
(
'Initializing from master'
)
logging
.
info
(
'Initializing from master'
)
ask
=
self
.
_ask
handler
=
self
.
primary_bootstrap_handler
ask
(
conn
,
Packets
.
AskNodeInformation
(),
handler
=
handler
)
...
...
@@ -437,7 +434,7 @@ class Application(object):
if
data
or
checksum
!=
ZERO_HASH
:
if
checksum
!=
makeChecksum
(
data
):
neo
.
lib
.
logging
.
error
(
'wrong checksum from %s for oid %s'
,
logging
.
error
(
'wrong checksum from %s for oid %s'
,
conn
,
dump
(
oid
))
continue
if
compression
:
...
...
@@ -486,8 +483,7 @@ class Application(object):
txn_context
=
self
.
_txn_container
.
get
(
transaction
)
if
txn_context
is
None
:
raise
StorageTransactionError
(
self
,
transaction
)
neo
.
lib
.
logging
.
debug
(
'storing oid %s serial %s'
,
dump
(
oid
),
dump
(
serial
))
logging
.
debug
(
'storing oid %s serial %s'
,
dump
(
oid
),
dump
(
serial
))
self
.
_store
(
txn_context
,
oid
,
serial
,
data
)
return
None
...
...
@@ -570,7 +566,7 @@ class Application(object):
if
ZERO_TID
in
conflict_serial_set
:
if
1
:
# XXX: disable deadlock avoidance code until it is fixed
neo
.
lib
.
logging
.
info
(
'Deadlock avoidance on %r:%r'
,
logging
.
info
(
'Deadlock avoidance on %r:%r'
,
dump
(
oid
),
dump
(
serial
))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
...
...
@@ -591,7 +587,7 @@ class Application(object):
# XXX: currently, brute-force is implemented: we send
# object data again.
# WARNING: not maintained code
neo
.
lib
.
logging
.
info
(
'Deadlock avoidance triggered on %r:%r'
,
logging
.
info
(
'Deadlock avoidance triggered on %r:%r'
,
dump
(
oid
),
dump
(
serial
))
for
store_oid
,
store_data
in
data_dict
.
iteritems
():
store_serial
=
object_serial_dict
[
store_oid
]
...
...
@@ -601,9 +597,8 @@ class Application(object):
else
:
if
store_data
is
None
:
# Some undo
neo
.
lib
.
logging
.
warning
(
'Deadlock avoidance cannot'
' reliably work with undo, this must be '
'implemented.'
)
logging
.
warning
(
'Deadlock avoidance cannot reliably'
' work with undo, this must be implemented.'
)
conflict_serial
=
ZERO_TID
break
self
.
_store
(
txn_context
,
store_oid
,
store_serial
,
...
...
@@ -627,7 +622,7 @@ class Application(object):
new_data
=
tryToResolveConflict
(
oid
,
conflict_serial
,
serial
,
data
)
if
new_data
is
not
None
:
neo
.
lib
.
logging
.
info
(
'Conflict resolution succeed for '
\
logging
.
info
(
'Conflict resolution succeed for '
'%r:%r with %r'
,
dump
(
oid
),
dump
(
serial
),
dump
(
conflict_serial
))
# Mark this conflict as resolved
...
...
@@ -639,7 +634,7 @@ class Application(object):
append
(
oid
)
continue
else
:
neo
.
lib
.
logging
.
info
(
'Conflict resolution failed for '
\
logging
.
info
(
'Conflict resolution failed for '
'%r:%r with %r'
,
dump
(
oid
),
dump
(
serial
),
dump
(
conflict_serial
))
raise
ConflictError
(
oid
=
oid
,
serials
=
(
txn_context
[
'ttid'
],
...
...
@@ -681,7 +676,7 @@ class Application(object):
for
oid
,
store_dict
in
\
txn_context
[
'object_stored_counter_dict'
].
iteritems
():
if
not
store_dict
:
neo
.
lib
.
logging
.
error
(
'tpc_store failed'
)
logging
.
error
(
'tpc_store failed'
)
raise
NEOStorageError
(
'tpc_store failed'
)
elif
oid
in
resolved_oid_set
:
append
((
oid
,
ResolvedSerial
))
...
...
@@ -705,7 +700,7 @@ class Application(object):
txn_context
[
'cache_dict'
])
add_involved_nodes
=
txn_context
[
'involved_nodes'
].
add
for
node
,
conn
in
self
.
cp
.
iterateForObject
(
ttid
):
neo
.
lib
.
logging
.
debug
(
"voting object %s on %s"
,
dump
(
ttid
),
logging
.
debug
(
"voting object %s on %s"
,
dump
(
ttid
),
dump
(
conn
.
getUUID
()))
try
:
self
.
_askStorage
(
conn
,
packet
)
...
...
@@ -716,7 +711,7 @@ class Application(object):
# check at least one storage node accepted
if
txn_stored_counter
==
0
:
neo
.
lib
.
logging
.
error
(
'tpc_vote failed'
)
logging
.
error
(
'tpc_vote failed'
)
raise
NEOStorageError
(
'tpc_vote failed'
)
# Check if master connection is still alive.
# This is just here to lower the probability of detecting a problem
...
...
@@ -746,10 +741,8 @@ class Application(object):
try
:
conn
.
notify
(
p
)
except
:
neo
.
lib
.
logging
.
error
(
'Exception in tpc_abort while notifying'
\
'storage node %r of abortion, ignoring.'
,
conn
,
exc_info
=
1
)
logging
.
exception
(
'Exception in tpc_abort while notifying'
'storage node %r of abortion, ignoring.'
,
conn
)
self
.
_getMasterConnection
().
notify
(
p
)
queue
=
txn_context
[
'queue'
]
# We don't need to flush queue, as it won't be reused by future
...
...
@@ -924,7 +917,7 @@ class Application(object):
# Reorder tids
ordered_tids
=
sorted
(
tid_set
,
reverse
=
True
)
neo
.
lib
.
logging
.
debug
(
"UndoLog tids %s"
,
map
(
dump
,
ordered_tids
))
logging
.
debug
(
"UndoLog tids %s"
,
map
(
dump
,
ordered_tids
))
# For each transaction, get info
undo_info
=
[]
append
=
undo_info
.
append
...
...
@@ -1045,7 +1038,7 @@ class Application(object):
self
.
cp
.
flush
()
self
.
master_conn
=
None
# Stop polling thread
neo
.
lib
.
logging
.
debug
(
'Stopping %s'
,
self
.
poll_thread
)
logging
.
debug
(
'Stopping %s'
,
self
.
poll_thread
)
self
.
poll_thread
.
stop
()
psThreadedPoll
()
close
=
__del__
...
...
neo/client/handlers/master.py
View file @
5743cdce
...
...
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.pt
import
MTPartitionTable
as
PartitionTable
from
neo.lib.protocol
import
NodeTypes
,
NodeStates
,
ProtocolError
from
neo.lib.util
import
dump
...
...
@@ -52,8 +52,8 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
if
primary_node
is
None
:
# I don't know such a node. Probably this information
# is old. So ignore it.
neo
.
lib
.
logging
.
warning
(
'Unknown primary master UUID: %s. '
\
'Ignoring.'
%
dump
(
primary_uuid
))
logging
.
warning
(
'Unknown primary master UUID: %s. Ignoring.'
,
dump
(
primary_uuid
))
return
else
:
if
app
.
trying_master_node
is
not
primary_node
:
...
...
@@ -74,7 +74,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
if
your_uuid
is
None
:
raise
ProtocolError
(
'No UUID supplied'
)
app
.
uuid
=
your_uuid
neo
.
lib
.
logging
.
info
(
'Got an UUID: %s'
,
dump
(
app
.
uuid
))
logging
.
info
(
'Got an UUID: %s'
,
dump
(
app
.
uuid
))
# Always create partition table
app
.
pt
=
PartitionTable
(
num_partitions
,
num_replicas
)
...
...
@@ -93,13 +93,13 @@ class PrimaryNotificationsHandler(BaseHandler):
def
connectionClosed
(
self
,
conn
):
app
=
self
.
app
if
app
.
master_conn
is
not
None
:
neo
.
lib
.
logging
.
critical
(
"connection to primary master node closed"
)
logging
.
critical
(
"connection to primary master node closed"
)
app
.
master_conn
=
None
app
.
primary_master_node
=
None
super
(
PrimaryNotificationsHandler
,
self
).
connectionClosed
(
conn
)
def
stopOperation
(
self
,
conn
):
neo
.
lib
.
logging
.
critical
(
"master node ask to stop operation"
)
logging
.
critical
(
"master node ask to stop operation"
)
def
invalidateObjects
(
self
,
conn
,
tid
,
oid_list
):
app
=
self
.
app
...
...
neo/client/handlers/storage.py
View file @
5743cdce
...
...
@@ -17,7 +17,7 @@
from
ZODB.TimeStamp
import
TimeStamp
from
ZODB.POSException
import
ConflictError
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.protocol
import
NodeTypes
,
ProtocolError
,
LockState
,
ZERO_TID
from
neo.lib.util
import
dump
from
neo.lib.exception
import
NodeNotReady
...
...
@@ -71,8 +71,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
# receive the conflict answer from the first store on S2.
neo
.
lib
.
logging
.
info
(
'%r report a conflict for %r with %r'
,
conn
,
dump
(
oid
),
dump
(
serial
))
logging
.
info
(
'%r report a conflict for %r with %r'
,
conn
,
dump
(
oid
),
dump
(
serial
))
# If this conflict is not already resolved, mark it for
# resolution.
if
serial
not
in
txn_context
[
...
...
@@ -115,7 +115,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
pass
def
answerTIDsFrom
(
self
,
conn
,
tid_list
):
neo
.
lib
.
logging
.
debug
(
'Get %d
TIDs from %r'
,
len
(
tid_list
),
conn
)
logging
.
debug
(
'Get %u
TIDs from %r'
,
len
(
tid_list
),
conn
)
self
.
app
.
setHandlerData
(
tid_list
)
def
answerTransactionInformation
(
self
,
conn
,
tid
,
...
...
@@ -178,8 +178,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
# Anyway, it's not clear that HasLock requests are useful.
# Are store requests potentially long to process ? If not,
# we should simply raise a ConflictError on store timeout.
neo
.
lib
.
logging
.
info
(
'Store of oid %s delayed (storage overload ?)'
,
dump
(
oid
))
logging
.
info
(
'Store of oid %s delayed (storage overload ?)'
,
dump
(
oid
))
def
alreadyPendingError
(
self
,
conn
,
message
):
pass
...
...
neo/client/poll.py
View file @
5743cdce
...
...
@@ -16,8 +16,8 @@
from
logging
import
DEBUG
,
ERROR
from
threading
import
Thread
,
Event
,
enumerate
as
thread_enum
from
neo.lib
import
logging
from
neo.lib.locking
import
Lock
import
neo.lib
class
_ThreadedPoll
(
Thread
):
"""Polling thread."""
...
...
@@ -29,7 +29,7 @@ class _ThreadedPoll(Thread):
self
.
_stop
=
Event
()
def
run
(
self
):
_log
=
neo
.
lib
.
logging
.
log
_log
=
logging
.
log
def
log
(
*
args
,
**
kw
):
# Ignore errors due to garbage collection on exit
try
:
...
...
@@ -112,7 +112,7 @@ def psThreadedPoll(log=None):
Logs alive ThreadedPoll threads.
"""
if
log
is
None
:
log
=
neo
.
lib
.
logging
.
debug
log
=
logging
.
debug
for
thread
in
thread_enum
():
if
not
isinstance
(
thread
,
ThreadedPoll
):
continue
...
...
neo/client/pool.py
View file @
5743cdce
...
...
@@ -17,7 +17,7 @@
import
time
from
random
import
shuffle
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.locking
import
RLock
from
neo.lib.protocol
import
NodeTypes
,
Packets
from
neo.lib.connection
import
MTClientConnection
,
ConnectionClosed
...
...
@@ -56,8 +56,7 @@ class ConnectionPool(object):
def
_initNodeConnection
(
self
,
node
):
"""Init a connection to a given storage node."""
app
=
self
.
app
neo
.
lib
.
logging
.
debug
(
'trying to connect to %s - %s'
,
node
,
node
.
getState
())
logging
.
debug
(
'trying to connect to %s - %s'
,
node
,
node
.
getState
())
conn
=
MTClientConnection
(
app
.
em
,
app
.
storage_event_handler
,
node
,
connector
=
app
.
connector_handler
(),
dispatcher
=
app
.
dispatcher
)
p
=
Packets
.
RequestIdentification
(
NodeTypes
.
CLIENT
,
...
...
@@ -65,15 +64,15 @@ class ConnectionPool(object):
try
:
app
.
_ask
(
conn
,
p
,
handler
=
app
.
storage_bootstrap_handler
)
except
ConnectionClosed
:
neo
.
lib
.
logging
.
error
(
'Connection to %r failed'
,
node
)
logging
.
error
(
'Connection to %r failed'
,
node
)
self
.
notifyFailure
(
node
)
conn
=
None
except
NodeNotReady
:
neo
.
lib
.
logging
.
info
(
'%r not ready'
,
node
)
logging
.
info
(
'%r not ready'
,
node
)
self
.
notifyFailure
(
node
)
conn
=
None
else
:
neo
.
lib
.
logging
.
info
(
'Connected %r'
,
node
)
logging
.
info
(
'Connected %r'
,
node
)
return
conn
@
profiler_decorator
...
...
@@ -87,8 +86,8 @@ class ConnectionPool(object):
not
self
.
app
.
dispatcher
.
registered
(
conn
):
del
self
.
connection_dict
[
conn
.
getUUID
()]
conn
.
close
()
neo
.
lib
.
logging
.
debug
(
'_dropConnections : connection to '
\
'storage node %s:%d closed'
,
*
(
conn
.
getAddress
()
))
logging
.
debug
(
'_dropConnections: connection to '
'storage node %s:%d closed'
,
*
conn
.
getAddress
(
))
if
len
(
self
.
connection_dict
)
<=
self
.
max_pool_size
:
break
finally
:
...
...
neo/lib/bootstrap.py
View file @
5743cdce
...
...
@@ -14,9 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
time
import
sleep
from
.
import
logging
from
.handler
import
EventHandler
from
.protocol
import
Packets
from
.util
import
dump
...
...
@@ -109,13 +109,13 @@ class BootstrapManager(EventHandler):
node
.
getConnection
().
close
()
return
neo
.
lib
.
logging
.
info
(
'connected to a primary master node'
)
logging
.
info
(
'connected to a primary master node'
)
self
.
num_partitions
=
num_partitions
self
.
num_replicas
=
num_replicas
if
self
.
uuid
!=
your_uuid
:
# got an uuid from the primary master
self
.
uuid
=
your_uuid
neo
.
lib
.
logging
.
info
(
'Got a new UUID : %s'
%
dump
(
self
.
uuid
))
logging
.
info
(
'Got a new UUID: %s'
,
dump
(
self
.
uuid
))
self
.
accepted
=
True
def
getPrimaryConnection
(
self
,
connector_handler
):
...
...
@@ -123,7 +123,7 @@ class BootstrapManager(EventHandler):
Primary lookup/connection process.
Returns when the connection is made.
"""
neo
.
lib
.
logging
.
info
(
'connecting to a primary master node'
)
logging
.
info
(
'connecting to a primary master node'
)
em
,
nm
=
self
.
app
.
em
,
self
.
app
.
nm
index
=
0
self
.
current
=
None
...
...
neo/lib/connection.py
View file @
5743cdce
...
...
@@ -17,8 +17,7 @@
from
functools
import
wraps
from
time
import
time
import
neo.lib
from
.
import
attributeTracker
from
.
import
attributeTracker
,
logging
from
.connector
import
ConnectorException
,
ConnectorTryAgainException
,
\
ConnectorInProgressException
,
ConnectorConnectionRefusedException
,
\
ConnectorConnectionClosedException
...
...
@@ -55,8 +54,8 @@ def lockCheckWrapper(func):
def
wrapper
(
self
,
*
args
,
**
kw
):
if
not
self
.
_lock
.
_is_owned
():
import
traceback
neo
.
lib
.
logging
.
warning
(
'%s called on %s instance without being '
\
'
locked.
Stack:
\
n
%s'
,
func
.
func_code
.
co_name
,
logging
.
warning
(
'%s called on %s instance without being locked.'
' Stack:
\
n
%s'
,
func
.
func_code
.
co_name
,
self
.
__class__
.
__name__
,
''
.
join
(
traceback
.
format_stack
()))
# Call anyway
return
func
(
self
,
*
args
,
**
kw
)
...
...
@@ -165,9 +164,9 @@ class HandlerSwitcher(object):
@
profiler_decorator
def
_handle
(
self
,
connection
,
packet
):
assert
len
(
self
.
_pending
)
==
1
or
self
.
_pending
[
0
][
0
]
neo
.
lib
.
logging
.
packet
(
connection
,
packet
,
False
)
logging
.
packet
(
connection
,
packet
,
False
)
if
connection
.
isClosed
()
and
packet
.
ignoreOnClosedConnection
():
neo
.
lib
.
logging
.
debug
(
'Ignoring packet %r on closed connection %r'
,
logging
.
debug
(
'Ignoring packet %r on closed connection %r'
,
packet
,
connection
)
return
msg_id
=
packet
.
getId
()
...
...
@@ -185,8 +184,7 @@ class HandlerSwitcher(object):
if
klass
and
isinstance
(
packet
,
klass
)
or
packet
.
isError
():
handler
.
packetReceived
(
connection
,
packet
,
kw
)
else
:
neo
.
lib
.
logging
.
error
(
'Unexpected answer %r in %r'
,
packet
,
connection
)
logging
.
error
(
'Unexpected answer %r in %r'
,
packet
,
connection
)
if
not
connection
.
isClosed
():
notification
=
Packets
.
Notify
(
'Unexpected answer: %r'
%
packet
)
connection
.
notify
(
notification
)
...
...
@@ -195,8 +193,7 @@ class HandlerSwitcher(object):
# apply a pending handler if no more answers are pending
while
len
(
self
.
_pending
)
>
1
and
not
self
.
_pending
[
0
][
0
]:
del
self
.
_pending
[
0
]
neo
.
lib
.
logging
.
debug
(
'Apply handler %r on %r'
,
self
.
_pending
[
0
][
1
],
logging
.
debug
(
'Apply handler %r on %r'
,
self
.
_pending
[
0
][
1
],
connection
)
if
msg_id
==
self
.
_next_timeout_msg_id
:
self
.
_updateNextTimeout
()
...
...
@@ -281,7 +278,7 @@ class BaseConnection(object):
if
msg_id
is
None
:
self
.
_base_timeout
=
t
else
:
neo
.
lib
.
logging
.
info
(
'timeout for #0x%08x with %r'
,
logging
.
info
(
'timeout for #0x%08x with %r'
,
msg_id
,
self
)
self
.
close
()
else
:
...
...
@@ -332,9 +329,9 @@ class BaseConnection(object):
def
setHandler
(
self
,
handler
):
if
self
.
_handlers
.
setHandler
(
handler
):
neo
.
lib
.
logging
.
debug
(
'Set handler %r on %r'
,
handler
,
self
)
logging
.
debug
(
'Set handler %r on %r'
,
handler
,
self
)
else
:
neo
.
lib
.
logging
.
debug
(
'Delay handler %r on %r'
,
handler
,
self
)
logging
.
debug
(
'Delay handler %r on %r'
,
handler
,
self
)
def
getEventManager
(
self
):
return
self
.
em
...
...
@@ -377,7 +374,7 @@ class ListeningConnection(BaseConnection):
"""A listen connection."""
def
__init__
(
self
,
event_manager
,
handler
,
addr
,
connector
,
**
kw
):
neo
.
lib
.
logging
.
debug
(
'listening to %s:%d'
,
*
addr
)
logging
.
debug
(
'listening to %s:%d'
,
*
addr
)
BaseConnection
.
__init__
(
self
,
event_manager
,
handler
,
addr
=
addr
,
connector
=
connector
)
self
.
connector
.
makeListeningConnection
(
addr
)
...
...
@@ -385,7 +382,7 @@ class ListeningConnection(BaseConnection):
def
readable
(
self
):
try
:
new_s
,
addr
=
self
.
connector
.
getNewConnection
()
neo
.
lib
.
logging
.
debug
(
'accepted a connection from %s:%d'
,
*
addr
)
logging
.
debug
(
'accepted a connection from %s:%d'
,
*
addr
)
handler
=
self
.
getHandler
()
new_conn
=
ServerConnection
(
self
.
getEventManager
(),
handler
,
connector
=
new_s
,
addr
=
addr
)
...
...
@@ -477,7 +474,7 @@ class Connection(BaseConnection):
def
abort
(
self
):
"""Abort dealing with this connection."""
neo
.
lib
.
logging
.
debug
(
'aborting a connector for %r'
,
self
)
logging
.
debug
(
'aborting a connector for %r'
,
self
)
self
.
aborted
=
True
assert
self
.
write_buf
if
self
.
_on_close
is
not
None
:
...
...
@@ -574,19 +571,16 @@ class Connection(BaseConnection):
except
ConnectorConnectionClosedException
:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
neo
.
lib
.
logging
.
debug
(
'Connection reset by peer: %r'
,
self
.
connector
)
logging
.
debug
(
'Connection reset by peer: %r'
,
self
.
connector
)
self
.
_closure
()
except
:
neo
.
lib
.
logging
.
debug
(
'Unknown connection error: %r'
,
self
.
connector
)
logging
.
debug
(
'Unknown connection error: %r'
,
self
.
connector
)
self
.
_closure
()
# unhandled connector exception
raise
else
:
if
not
data
:
neo
.
lib
.
logging
.
debug
(
'Connection %r closed in recv'
,
self
.
connector
)
logging
.
debug
(
'Connection %r closed in recv'
,
self
.
connector
)
self
.
_closure
()
return
self
.
_base_timeout
=
time
()
# last known remote activity
...
...
@@ -604,19 +598,16 @@ class Connection(BaseConnection):
pass
except
ConnectorConnectionClosedException
:
# connection resetted by peer
neo
.
lib
.
logging
.
debug
(
'Connection reset by peer: %r'
,
self
.
connector
)
logging
.
debug
(
'Connection reset by peer: %r'
,
self
.
connector
)
self
.
_closure
()
except
:
neo
.
lib
.
logging
.
debug
(
'Unknown connection error: %r'
,
self
.
connector
)
logging
.
debug
(
'Unknown connection error: %r'
,
self
.
connector
)
# unhandled connector exception
self
.
_closure
()
raise
else
:
if
not
n
:
neo
.
lib
.
logging
.
debug
(
'Connection %r closed in send'
,
self
.
connector
)
logging
.
debug
(
'Connection %r closed in send'
,
self
.
connector
)
self
.
_closure
()
return
if
n
==
len
(
msg
):
...
...
@@ -637,7 +628,7 @@ class Connection(BaseConnection):
if
was_empty
:
# enable polling for writing.
self
.
em
.
addWriter
(
self
)
neo
.
lib
.
logging
.
packet
(
self
,
packet
,
True
)
logging
.
packet
(
self
,
packet
,
True
)
@
not_closed
def
notify
(
self
,
packet
):
...
...
neo/lib/event.py
View file @
5743cdce
...
...
@@ -15,9 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from
time
import
time
import
neo.lib
from
select
import
epoll
,
EPOLLIN
,
EPOLLOUT
,
EPOLLERR
,
EPOLLHUP
from
errno
import
EINTR
,
EAGAIN
from
.
import
logging
from
.profiling
import
profiler_decorator
class
EpollEventManager
(
object
):
...
...
@@ -113,8 +113,8 @@ class EpollEventManager(object):
event_list
=
self
.
epoll
.
poll
(
timeout
)
except
IOError
,
exc
:
if
exc
.
errno
in
(
0
,
EAGAIN
):
neo
.
lib
.
logging
.
info
(
'epoll.poll triggered undocumented '
'error %r'
,
exc
.
errno
)
logging
.
info
(
'epoll.poll triggered undocumented error %r'
,
exc
.
errno
)
elif
exc
.
errno
!=
EINTR
:
raise
event_list
=
()
...
...
@@ -206,13 +206,13 @@ class EpollEventManager(object):
self
.
epoll
.
modify
(
fd
,
fd
in
self
.
reader_set
and
EPOLLIN
)
def
log
(
self
):
neo
.
lib
.
logging
.
info
(
'Event Manager:'
)
neo
.
lib
.
logging
.
info
(
' Readers: %r'
,
[
x
for
x
in
self
.
reader_set
]
)
neo
.
lib
.
logging
.
info
(
' Writers: %r'
,
[
x
for
x
in
self
.
writer_set
]
)
neo
.
lib
.
logging
.
info
(
' Connections:'
)
logging
.
info
(
'Event Manager:'
)
logging
.
info
(
' Readers: %r'
,
list
(
self
.
reader_set
)
)
logging
.
info
(
' Writers: %r'
,
list
(
self
.
writer_set
)
)
logging
.
info
(
' Connections:'
)
pending_set
=
set
(
self
.
_pending_processing
)
for
fd
,
conn
in
self
.
connection_dict
.
items
():
neo
.
lib
.
logging
.
info
(
' %r: %r (pending=%r)'
,
fd
,
conn
,
logging
.
info
(
' %r: %r (pending=%r)'
,
fd
,
conn
,
conn
in
pending_set
)
...
...
neo/lib/handler.py
View file @
5743cdce
...
...
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from
functools
import
wraps
import
neo.lib
from
.
import
logging
from
.protocol
import
(
NodeStates
,
Packets
,
ErrorCodes
,
Errors
,
BrokenNodeDisallowedError
,
NotReadyError
,
PacketMalformedError
,
ProtocolError
,
UnexpectedPacketError
)
...
...
@@ -38,7 +38,7 @@ class EventHandler(object):
else
:
message
=
'unexpected packet: %s in %s'
%
(
message
,
self
.
__class__
.
__name__
)
neo
.
lib
.
logging
.
error
(
message
)
logging
.
error
(
message
)
conn
.
answer
(
Errors
.
ProtocolError
(
message
))
conn
.
abort
()
# self.peerBroken(conn)
...
...
@@ -58,7 +58,7 @@ class EventHandler(object):
self
.
__unexpectedPacket
(
conn
,
packet
,
*
e
.
args
)
except
PacketMalformedError
:
if
not
conn
.
isClosed
():
neo
.
lib
.
logging
.
error
(
'malformed packet from %r'
,
conn
)
logging
.
error
(
'malformed packet from %r'
,
conn
)
conn
.
notify
(
Packets
.
Notify
(
'Malformed packet: %r'
%
(
packet
,
)))
conn
.
abort
()
# self.peerBroken(conn)
...
...
@@ -82,7 +82,7 @@ class EventHandler(object):
def
checkClusterName
(
self
,
name
):
# raise an exception if the given name mismatch the current cluster name
if
self
.
app
.
name
!=
name
:
neo
.
lib
.
logging
.
error
(
'reject an alien cluster'
)
logging
.
error
(
'reject an alien cluster'
)
raise
ProtocolError
(
'invalid cluster name'
)
...
...
@@ -94,28 +94,28 @@ class EventHandler(object):
def
connectionStarted
(
self
,
conn
):
"""Called when a connection is started."""
neo
.
lib
.
logging
.
debug
(
'connection started for %r'
,
conn
)
logging
.
debug
(
'connection started for %r'
,
conn
)
def
connectionCompleted
(
self
,
conn
):
"""Called when a connection is completed."""
neo
.
lib
.
logging
.
debug
(
'connection completed for %r (from %s:%u)'
,
logging
.
debug
(
'connection completed for %r (from %s:%u)'
,
conn
,
*
conn
.
getConnector
().
getAddress
())
def
connectionFailed
(
self
,
conn
):
"""Called when a connection failed."""
neo
.
lib
.
logging
.
debug
(
'connection failed for %r'
,
conn
)
logging
.
debug
(
'connection failed for %r'
,
conn
)
def
connectionAccepted
(
self
,
conn
):
"""Called when a connection is accepted."""
def
connectionClosed
(
self
,
conn
):
"""Called when a connection is closed by the peer."""
neo
.
lib
.
logging
.
debug
(
'connection closed for %r'
,
conn
)
logging
.
debug
(
'connection closed for %r'
,
conn
)
self
.
connectionLost
(
conn
,
NodeStates
.
TEMPORARILY_DOWN
)
#def peerBroken(self, conn):
# """Called when a peer is broken."""
#
neo.lib.
logging.error('%r is broken', conn)
# logging.error('%r is broken', conn)
# # NodeStates.BROKEN
def
connectionLost
(
self
,
conn
,
new_state
):
...
...
@@ -153,7 +153,7 @@ class EventHandler(object):
pass
def
notify
(
self
,
conn
,
message
):
neo
.
lib
.
logging
.
info
(
'notification from %r: %s'
,
conn
,
message
)
logging
.
info
(
'notification from %r: %s'
,
conn
,
message
)
def
closeClient
(
self
,
conn
):
conn
.
server
=
False
...
...
@@ -170,16 +170,16 @@ class EventHandler(object):
def
protocolError
(
self
,
conn
,
message
):
# the connection should have been closed by the remote peer
neo
.
lib
.
logging
.
error
(
'protocol error: %s'
%
(
message
,)
)
logging
.
error
(
'protocol error: %s'
,
message
)
def
timeoutError
(
self
,
conn
,
message
):
neo
.
lib
.
logging
.
error
(
'timeout error: %s'
%
(
message
,)
)
logging
.
error
(
'timeout error: %s'
,
message
)
def
brokenNodeDisallowedError
(
self
,
conn
,
message
):
raise
RuntimeError
,
'broken node disallowed error: %s'
%
(
message
,)
def
alreadyPendingError
(
self
,
conn
,
message
):
neo
.
lib
.
logging
.
error
(
'already pending error: %s'
%
(
message
,
)
)
logging
.
error
(
'already pending error: %s'
,
message
)
def
ack
(
self
,
conn
,
message
):
neo
.
lib
.
logging
.
debug
(
"no error message : %s"
%
(
message
)
)
logging
.
debug
(
"no error message: %s"
,
message
)
neo/lib/node.py
View file @
5743cdce
...
...
@@ -18,11 +18,10 @@ from time import time
from
os.path
import
exists
,
getsize
import
json
import
neo.lib
from
.
import
attributeTracker
,
logging
from
.util
import
dump
from
.protocol
import
NodeTypes
,
NodeStates
,
ProtocolError
from
.
import
attributeTracker
class
Node
(
object
):
"""This class represents a node."""
...
...
@@ -311,7 +310,7 @@ class MasterDB(object):
try
:
db
=
open
(
self
.
_path
,
'w'
)
except
IOError
:
neo
.
lib
.
logging
.
warning
(
'failed opening master database at %r '
logging
.
warning
(
'failed opening master database at %r '
'for writing, update skipped'
,
self
.
_path
)
else
:
json
.
dump
(
list
(
self
.
_set
),
db
)
...
...
@@ -357,7 +356,7 @@ class NodeManager(object):
def
add
(
self
,
node
):
if
node
in
self
.
_node_set
:
neo
.
lib
.
logging
.
warning
(
'adding a known node %r, ignoring'
,
node
)
logging
.
warning
(
'adding a known node %r, ignoring'
,
node
)
return
assert
not
node
.
isDown
(),
node
self
.
_node_set
.
add
(
node
)
...
...
@@ -371,7 +370,7 @@ class NodeManager(object):
def
remove
(
self
,
node
):
if
node
not
in
self
.
_node_set
:
neo
.
lib
.
logging
.
warning
(
'removing unknown node %r, ignoring'
,
node
)
logging
.
warning
(
'removing unknown node %r, ignoring'
,
node
)
return
self
.
_node_set
.
remove
(
node
)
self
.
__drop
(
self
.
_address_dict
,
node
.
getAddress
())
...
...
@@ -578,12 +577,11 @@ class NodeManager(object):
log_args
=
(
node_type
,
dump
(
uuid
),
addr
,
state
)
if
node
is
None
:
if
state
==
NodeStates
.
DOWN
:
neo
.
lib
.
logging
.
debug
(
'NOT creating node %s %s %s %s'
,
*
log_args
)
logging
.
debug
(
'NOT creating node %s %s %s %s'
,
*
log_args
)
else
:
node
=
self
.
_createNode
(
klass
,
address
=
addr
,
uuid
=
uuid
,
state
=
state
)
neo
.
lib
.
logging
.
debug
(
'creating node %r'
,
node
)
logging
.
debug
(
'creating node %r'
,
node
)
else
:
assert
isinstance
(
node
,
klass
),
'node %r is not '
\
'of expected type: %r'
%
(
node
,
klass
)
...
...
@@ -592,15 +590,14 @@ class NodeManager(object):
'Discrepancy between node_by_uuid (%r) and '
\
'node_by_addr (%r)'
%
(
node_by_uuid
,
node_by_addr
)
if
state
==
NodeStates
.
DOWN
:
neo
.
lib
.
logging
.
debug
(
'droping node %r (%r), found with %s '
\
logging
.
debug
(
'droping node %r (%r), found with %s '
'%s %s %s'
,
node
,
node
.
isConnected
(),
*
log_args
)
if
node
.
isConnected
():
# cut this connection, node removed by handler
node
.
getConnection
().
close
()
self
.
remove
(
node
)
else
:
neo
.
lib
.
logging
.
debug
(
'updating node %r to %s %s %s %s'
,
logging
.
debug
(
'updating node %r to %s %s %s %s'
,
node
,
*
log_args
)
node
.
setUUID
(
uuid
)
node
.
setAddress
(
addr
)
...
...
@@ -608,12 +605,11 @@ class NodeManager(object):
self
.
log
()
def
log
(
self
):
neo
.
lib
.
logging
.
info
(
'Node manager : %d nodes'
%
len
(
self
.
_node_set
))
logging
.
info
(
'Node manager : %u nodes'
,
len
(
self
.
_node_set
))
for
node
in
sorted
(
list
(
self
.
_node_set
)):
uuid
=
dump
(
node
.
getUUID
())
or
'-'
*
32
address
=
node
.
getAddress
()
or
''
if
address
:
address
=
'%s:%d'
%
address
neo
.
lib
.
logging
.
info
(
' * %32s | %8s | %22s | %s'
%
(
uuid
,
node
.
getType
(),
address
,
node
.
getState
()))
logging
.
info
(
' * %32s | %8s | %22s | %s'
,
uuid
,
node
.
getType
(),
address
,
node
.
getState
())
neo/lib/pt.py
View file @
5743cdce
...
...
@@ -16,9 +16,8 @@
import
math
from
functools
import
wraps
import
neo
from
.
import
protocol
from
.
import
logging
,
protocol
from
.protocol
import
CellStates
from
.util
import
dump
,
u64
from
.locking
import
RLock
...
...
@@ -212,7 +211,7 @@ class PartitionTable(object):
# the node must be known by the node manager
assert
node
is
not
None
self
.
setCell
(
offset
,
node
,
state
)
neo
.
lib
.
logging
.
debug
(
'partition table loaded (ptid=%s)'
,
ptid
)
logging
.
debug
(
'partition table loaded (ptid=%s)'
,
ptid
)
self
.
log
()
def
update
(
self
,
ptid
,
cell_list
,
nm
):
...
...
@@ -222,22 +221,21 @@ class PartitionTable(object):
is not known, it is created in the node manager and set as unavailable
"""
if
ptid
<=
self
.
_id
:
neo
.
lib
.
logging
.
warning
(
'ignoring older partition changes'
)
logging
.
warning
(
'ignoring older partition changes'
)
return
self
.
_id
=
ptid
for
offset
,
uuid
,
state
in
cell_list
:
node
=
nm
.
getByUUID
(
uuid
)
assert
node
is
not
None
,
'No node found for uuid %r'
%
(
dump
(
uuid
),
)
self
.
setCell
(
offset
,
node
,
state
)
neo
.
lib
.
logging
.
debug
(
'partition table updated (ptid=%s)'
,
ptid
)
logging
.
debug
(
'partition table updated (ptid=%s)'
,
ptid
)
self
.
log
()
def
filled
(
self
):
return
self
.
num_filled_rows
==
self
.
np
def
log
(
self
):
for
line
in
self
.
_format
():
neo
.
lib
.
logging
.
debug
(
line
)
logging
.
debug
(
self
.
format
())
def
format
(
self
):
return
'
\
n
'
.
join
(
self
.
_format
())
...
...
neo/master/app.py
View file @
5743cdce
...
...
@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
import
os
,
sys
from
time
import
time
from
neo.lib
import
logging
from
neo.lib.connector
import
getConnectorHandler
from
neo.lib.debug
import
register
as
registerLiveDebugger
from
neo.lib.protocol
import
UUID_NAMESPACES
,
ZERO_TID
,
NotReadyError
...
...
@@ -62,7 +62,7 @@ class Application(object):
for
master_address
in
master_addresses
:
self
.
nm
.
createMaster
(
address
=
master_address
)
neo
.
lib
.
logging
.
debug
(
'IP address is %s, port is %d'
,
*
(
self
.
server
)
)
logging
.
debug
(
'IP address is %s, port is %d'
,
*
self
.
server
)
# Partition table
replicas
,
partitions
=
config
.
getReplicas
(),
config
.
getPartitions
()
...
...
@@ -71,10 +71,10 @@ class Application(object):
if
partitions
<=
0
:
raise
RuntimeError
,
'partitions must be more than zero'
self
.
pt
=
PartitionTable
(
partitions
,
replicas
)
neo
.
lib
.
logging
.
info
(
'Configuration:'
)
neo
.
lib
.
logging
.
info
(
'Partitions: %d'
,
partitions
)
neo
.
lib
.
logging
.
info
(
'Replicas : %d'
,
replicas
)
neo
.
lib
.
logging
.
info
(
'Name : %s'
,
self
.
name
)
logging
.
info
(
'Configuration:'
)
logging
.
info
(
'Partitions: %d'
,
partitions
)
logging
.
info
(
'Replicas : %d'
,
replicas
)
logging
.
info
(
'Name : %s'
,
self
.
name
)
self
.
listening_conn
=
None
self
.
primary
=
None
...
...
@@ -87,7 +87,7 @@ class Application(object):
if
uuid
is
None
or
uuid
==
''
:
uuid
=
self
.
getNewUUID
(
NodeTypes
.
MASTER
)
self
.
uuid
=
uuid
neo
.
lib
.
logging
.
info
(
'UUID : %s'
,
dump
(
uuid
))
logging
.
info
(
'UUID : %s'
,
dump
(
uuid
))
# election related data
self
.
unconnected_master_node_set
=
set
()
...
...
@@ -127,7 +127,7 @@ class Application(object):
try
:
self
.
_run
()
except
:
neo
.
lib
.
logging
.
exception
(
'Pre-mortem data:'
)
logging
.
exception
(
'Pre-mortem data:'
)
self
.
log
()
raise
...
...
@@ -162,7 +162,7 @@ class Application(object):
others while attempting to connect to other master nodes at the
same time. Note that storage nodes and client nodes may connect
to self as well as master nodes."""
neo
.
lib
.
logging
.
info
(
'begin the election of a primary master'
)
logging
.
info
(
'begin the election of a primary master'
)
client_handler
=
election
.
ClientElectionHandler
(
self
)
self
.
unconnected_master_node_set
.
clear
()
...
...
@@ -194,7 +194,7 @@ class Application(object):
self
.
em
.
poll
(
1
)
except
ElectionFailure
,
m
:
# something goes wrong, clean then restart
neo
.
lib
.
logging
.
error
(
'election failed: %s'
,
(
m
,
)
)
logging
.
error
(
'election failed: %s'
,
m
)
# Ask all connected nodes to reelect a single primary master.
for
conn
in
self
.
em
.
getClientList
():
...
...
@@ -246,7 +246,7 @@ class Application(object):
def
broadcastPartitionChanges
(
self
,
cell_list
,
selector
=
None
):
"""Broadcast a Notify Partition Changes packet."""
neo
.
lib
.
logging
.
debug
(
'broadcastPartitionChanges'
)
logging
.
debug
(
'broadcastPartitionChanges'
)
if
not
cell_list
:
return
if
not
selector
:
...
...
@@ -262,8 +262,7 @@ class Application(object):
def
broadcastLastOID
(
self
):
oid
=
self
.
tm
.
getLastOID
()
neo
.
lib
.
logging
.
debug
(
'Broadcast last OID to storages : %s'
%
dump
(
oid
))
logging
.
debug
(
'Broadcast last OID to storages : %s'
,
dump
(
oid
))
packet
=
Packets
.
NotifyLastOID
(
oid
)
for
node
in
self
.
nm
.
getStorageList
(
only_identified
=
True
):
node
.
notify
(
packet
)
...
...
@@ -274,7 +273,7 @@ class Application(object):
and stop the service only if a catastrophy happens or the user commits
a shutdown.
"""
neo
.
lib
.
logging
.
info
(
'provide service'
)
logging
.
info
(
'provide service'
)
poll
=
self
.
em
.
poll
self
.
tm
.
reset
()
...
...
@@ -287,7 +286,7 @@ class Application(object):
except
OperationFailure
:
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
neo
.
lib
.
logging
.
critical
(
'No longer operational'
)
logging
.
critical
(
'No longer operational'
)
except
StateChangedException
,
e
:
assert
e
.
args
[
0
]
==
ClusterStates
.
STARTING_BACKUP
self
.
backup_tid
=
tid
=
self
.
getLastTransaction
()
...
...
@@ -295,8 +294,7 @@ class Application(object):
for
node
in
self
.
nm
.
getStorageList
(
only_identified
=
True
)))
def
playPrimaryRole
(
self
):
neo
.
lib
.
logging
.
info
(
'play the primary role with %r'
,
self
.
listening_conn
)
logging
.
info
(
'play the primary role with %r'
,
self
.
listening_conn
)
em
=
self
.
em
packet
=
Packets
.
AnnouncePrimary
()
for
conn
in
em
.
getConnectionList
():
...
...
@@ -347,8 +345,7 @@ class Application(object):
"""
I play a secondary role, thus only wait for a primary master to fail.
"""
neo
.
lib
.
logging
.
info
(
'play the secondary role with %r'
,
self
.
listening_conn
)
logging
.
info
(
'play the secondary role with %r'
,
self
.
listening_conn
)
# Wait for an announcement. If this is too long, probably
# the primary master is down.
...
...
@@ -451,7 +448,7 @@ class Application(object):
self
.
em
.
poll
(
1
)
if
self
.
cluster_state
!=
ClusterStates
.
RUNNING
:
neo
.
lib
.
logging
.
info
(
"asking all nodes to shutdown"
)
logging
.
info
(
"asking all nodes to shutdown"
)
# This code sends packets but never polls, so they never reach
# network.
for
node
in
self
.
nm
.
getIdentifiedList
():
...
...
@@ -481,28 +478,27 @@ class Application(object):
# always accept admin nodes
node_ctor
=
self
.
nm
.
createAdmin
handler
=
administration
.
AdministrationHandler
(
self
)
neo
.
lib
.
logging
.
info
(
'Accept an admin %s'
%
(
dump
(
uuid
),
))
logging
.
info
(
'Accept an admin %s'
,
dump
(
uuid
))
elif
node_type
==
NodeTypes
.
MASTER
:
# always put other master in waiting state
node_ctor
=
self
.
nm
.
createMaster
handler
=
secondary
.
SecondaryMasterHandler
(
self
)
neo
.
lib
.
logging
.
info
(
'Accept a master %s'
%
(
dump
(
uuid
),
))
logging
.
info
(
'Accept a master %s'
,
dump
(
uuid
))
elif
node_type
==
NodeTypes
.
CLIENT
:
# refuse any client before running
if
self
.
cluster_state
!=
ClusterStates
.
RUNNING
:
neo
.
lib
.
logging
.
info
(
'Reject a connection from a client'
)
logging
.
info
(
'Reject a connection from a client'
)
raise
NotReadyError
node_ctor
=
self
.
nm
.
createClient
handler
=
client
.
ClientServiceHandler
(
self
)
neo
.
lib
.
logging
.
info
(
'Accept a client %s'
%
(
dump
(
uuid
),
))
logging
.
info
(
'Accept a client %s'
,
dump
(
uuid
))
elif
node_type
==
NodeTypes
.
STORAGE
:
node_ctor
=
self
.
nm
.
createStorage
manager
=
self
.
_current_manager
if
manager
is
None
:
manager
=
self
(
uuid
,
state
,
handler
)
=
manager
.
identifyStorageNode
(
uuid
,
node
)
neo
.
lib
.
logging
.
info
(
'Accept a storage %s (%s)'
%
(
dump
(
uuid
),
state
))
logging
.
info
(
'Accept a storage %s (%s)'
,
dump
(
uuid
),
state
)
else
:
handler
=
identification
.
IdentificationHandler
(
self
)
return
(
uuid
,
node
,
state
,
handler
,
node_ctor
)
...
...
neo/master/backup_app.py
View file @
5743cdce
...
...
@@ -16,7 +16,7 @@
import
random
,
weakref
from
bisect
import
bisect
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.bootstrap
import
BootstrapManager
from
neo.lib.connector
import
getConnectorHandler
from
neo.lib.exception
import
PrimaryFailure
...
...
@@ -78,7 +78,7 @@ class BackupApplication(object):
self
.
pt
.
log
()
def
provideService
(
self
):
neo
.
lib
.
logging
.
info
(
'provide backup'
)
logging
.
info
(
'provide backup'
)
poll
=
self
.
em
.
poll
app
=
self
.
app
pt
=
app
.
pt
...
...
@@ -107,7 +107,7 @@ class BackupApplication(object):
while
True
:
poll
(
1
)
except
PrimaryFailure
,
msg
:
neo
.
lib
.
logging
.
error
(
'upstream master is down: %s'
,
msg
)
logging
.
error
(
'upstream master is down: %s'
,
msg
)
finally
:
app
.
backup_tid
=
pt
.
getBackupTid
()
try
:
...
...
@@ -122,8 +122,7 @@ class BackupApplication(object):
app
.
changeClusterState
(
*
e
.
args
)
last_tid
=
app
.
getLastTransaction
()
if
last_tid
<
app
.
backup_tid
:
neo
.
lib
.
logging
.
warning
(
"Truncating at %s (last_tid was %s)"
,
logging
.
warning
(
"Truncating at %s (last_tid was %s)"
,
dump
(
app
.
backup_tid
),
dump
(
last_tid
))
p
=
Packets
.
AskTruncate
(
app
.
backup_tid
)
connection_list
=
[]
...
...
@@ -154,7 +153,7 @@ class BackupApplication(object):
for
cell
in
cell_list
:
cell
.
replicating
=
tid
if
cell
.
backup_tid
<
tid
:
neo
.
lib
.
logging
.
debug
(
logging
.
debug
(
"ask %s to replicate partition %u up to %s from %r"
,
dump
(
cell
.
getUUID
()),
offset
,
dump
(
tid
),
dump
(
primary_node
.
getUUID
()))
...
...
@@ -185,7 +184,7 @@ class BackupApplication(object):
if
last_max_tid
<=
cell
.
backup_tid
:
# This is the last time we can increase
# 'backup_tid' without replication.
neo
.
lib
.
logging
.
debug
(
logging
.
debug
(
"partition %u: updating backup_tid of %r to %s"
,
offset
,
cell
,
dump
(
prev_tid
))
cell
.
backup_tid
=
prev_tid
...
...
@@ -201,14 +200,14 @@ class BackupApplication(object):
for
cell
in
pt
.
getCellList
(
offset
,
readable
=
True
):
if
last_max_tid
<=
cell
.
backup_tid
:
cell
.
backup_tid
=
tid
neo
.
lib
.
logging
.
debug
(
logging
.
debug
(
"partition %u: updating backup_tid of %r to %s"
,
offset
,
cell
,
dump
(
tid
))
for
node
in
trigger_set
:
self
.
triggerBackup
(
node
)
count
=
sum
(
map
(
len
,
self
.
tid_list
))
if
self
.
debug_tid_count
<
count
:
neo
.
lib
.
logging
.
debug
(
"Maximum number of tracked tids: %u"
,
count
)
logging
.
debug
(
"Maximum number of tracked tids: %u"
,
count
)
self
.
debug_tid_count
=
count
def
triggerBackup
(
self
,
node
):
...
...
@@ -238,8 +237,7 @@ class BackupApplication(object):
else
:
address_set
.
add
(
addr
)
source_dict
[
offset
]
=
addr
neo
.
lib
.
logging
.
debug
(
"ask %s to replicate partition %u up to %s from %r"
,
logging
.
debug
(
"ask %s to replicate partition %u up to %s from %r"
,
dump
(
node
.
getUUID
()),
offset
,
dump
(
tid
),
addr
)
node
.
getConnection
().
notify
(
Packets
.
Replicate
(
tid
,
self
.
name
,
source_dict
))
...
...
@@ -257,7 +255,7 @@ class BackupApplication(object):
tid
=
add64
(
tid_list
[
bisect
(
tid_list
,
tid
)],
-
1
)
except
IndexError
:
tid
=
app
.
getLastTransaction
()
neo
.
lib
.
logging
.
debug
(
"partition %u: updating backup_tid of %r to %s"
,
logging
.
debug
(
"partition %u: updating backup_tid of %r to %s"
,
offset
,
cell
,
dump
(
tid
))
cell
.
backup_tid
=
tid
# Forget tids we won't need anymore.
...
...
@@ -273,7 +271,7 @@ class BackupApplication(object):
if
x
.
getNode
()
is
primary_node
]
if
tid
<
max_tid
:
cell
.
replicating
=
max_tid
neo
.
lib
.
logging
.
debug
(
logging
.
debug
(
"ask %s to replicate partition %u up to %s from %r"
,
dump
(
node
.
getUUID
()),
offset
,
dump
(
max_tid
),
dump
(
primary_node
.
getUUID
()))
...
...
@@ -288,7 +286,7 @@ class BackupApplication(object):
for
cell
in
cell_list
:
if
max
(
cell
.
backup_tid
,
cell
.
replicating
)
<
tid
:
cell
.
replicating
=
tid
neo
.
lib
.
logging
.
debug
(
logging
.
debug
(
"ask %s to replicate partition %u up to %s from"
" %r"
,
dump
(
cell
.
getUUID
()),
offset
,
dump
(
tid
),
dump
(
node
.
getUUID
()))
...
...
neo/master/handlers/__init__.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
neo.lib
import
logging
from
neo.lib.handler
import
EventHandler
from
neo.lib.protocol
import
(
NodeTypes
,
NodeStates
,
Packets
,
BrokenNodeDisallowedError
,
...
...
@@ -108,7 +107,7 @@ class BaseServiceHandler(MasterHandler):
if
new_state
!=
NodeStates
.
BROKEN
and
was_pending
:
# was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back
neo
.
lib
.
logging
.
info
(
'drop a pending node from the node manager'
)
logging
.
info
(
'drop a pending node from the node manager'
)
self
.
app
.
nm
.
remove
(
node
)
self
.
app
.
broadcastNodesInformation
([
node
])
# clean node related data in specialized handlers
...
...
neo/master/handlers/administration.py
View file @
5743cdce
...
...
@@ -15,10 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
random
import
neo
from
.
import
MasterHandler
from
..app
import
StateChangedException
from
neo.lib
import
logging
from
neo.lib.protocol
import
ClusterStates
,
NodeStates
,
Packets
,
ProtocolError
from
neo.lib.protocol
import
Errors
from
neo.lib.util
import
dump
...
...
@@ -73,8 +73,8 @@ class AdministrationHandler(MasterHandler):
raise
StateChangedException
(
state
)
def
setNodeState
(
self
,
conn
,
uuid
,
state
,
modify_partition_table
):
neo
.
lib
.
logging
.
info
(
"set node state for %s-%s : %s"
%
(
dump
(
uuid
),
state
,
modify_partition_table
)
)
logging
.
info
(
"set node state for %s-%s : %s"
,
dump
(
uuid
),
state
,
modify_partition_table
)
app
=
self
.
app
node
=
app
.
nm
.
getByUUID
(
uuid
)
if
node
is
None
:
...
...
@@ -129,7 +129,7 @@ class AdministrationHandler(MasterHandler):
def
addPendingNodes
(
self
,
conn
,
uuid_list
):
uuids
=
', '
.
join
(
map
(
dump
,
uuid_list
))
neo
.
lib
.
logging
.
debug
(
'Add nodes %s'
%
uuids
)
logging
.
debug
(
'Add nodes %s'
,
uuids
)
app
=
self
.
app
nm
=
app
.
nm
em
=
app
.
em
...
...
@@ -146,11 +146,11 @@ class AdministrationHandler(MasterHandler):
uuid_set
=
uuid_set
.
intersection
(
set
(
uuid_list
))
# nothing to do
if
not
uuid_set
:
neo
.
lib
.
logging
.
warning
(
'No nodes added'
)
logging
.
warning
(
'No nodes added'
)
conn
.
answer
(
Errors
.
Ack
(
'No nodes added'
))
return
uuids
=
', '
.
join
(
map
(
dump
,
uuid_set
))
neo
.
lib
.
logging
.
info
(
'Adding nodes %s'
%
uuids
)
logging
.
info
(
'Adding nodes %s'
,
uuids
)
# switch nodes to running state
node_list
=
map
(
nm
.
getByUUID
,
uuid_set
)
for
node
in
node_list
:
...
...
@@ -171,7 +171,7 @@ class AdministrationHandler(MasterHandler):
max_tid
=
pt
.
getCheckTid
(
partition_dict
)
if
backingup
else
\
app
.
getLastTransaction
()
if
min_tid
>
max_tid
:
neo
.
lib
.
logging
.
warning
(
"nothing to check: min_tid=%s > max_tid=%s"
,
logging
.
warning
(
"nothing to check: min_tid=%s > max_tid=%s"
,
dump
(
min_tid
),
dump
(
max_tid
))
else
:
getByUUID
=
app
.
nm
.
getByUUID
...
...
neo/master/handlers/election.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.protocol
import
NodeTypes
,
NodeStates
,
Packets
from
neo.lib.protocol
import
NotReadyError
,
ProtocolError
,
\
UnexpectedPacketError
...
...
@@ -46,7 +45,7 @@ class BaseElectionHandler(EventHandler):
app
.
primary
=
False
app
.
primary_master_node
=
node
app
.
negotiating_master_node_set
.
clear
()
neo
.
lib
.
logging
.
info
(
'%s is the primary'
,
node
)
logging
.
info
(
'%s is the primary'
,
node
)
class
ClientElectionHandler
(
BaseElectionHandler
):
...
...
@@ -86,8 +85,7 @@ class ClientElectionHandler(BaseElectionHandler):
if
your_uuid
!=
app
.
uuid
:
# uuid conflict happened, accept the new one and restart election
app
.
uuid
=
your_uuid
neo
.
lib
.
logging
.
info
(
'UUID conflict, new UUID: %s'
,
dump
(
your_uuid
))
logging
.
info
(
'UUID conflict, new UUID: %s'
,
dump
(
your_uuid
))
raise
ElectionFailure
,
'new uuid supplied'
node
.
setUUID
(
peer_uuid
)
...
...
@@ -118,8 +116,7 @@ class ClientElectionHandler(BaseElectionHandler):
if
primary_node
is
None
:
# I don't know such a node. Probably this information
# is old. So ignore it.
neo
.
lib
.
logging
.
warning
(
'received an unknown primary node UUID'
)
logging
.
warning
(
'received an unknown primary node UUID'
)
else
:
# Whatever the situation is, I trust this master.
app
.
primary
=
False
...
...
@@ -137,7 +134,7 @@ class ServerElectionHandler(BaseElectionHandler, MasterHandler):
def
_setupNode
(
self
,
conn
,
node_type
,
uuid
,
address
,
node
):
app
=
self
.
app
if
node_type
!=
NodeTypes
.
MASTER
:
neo
.
lib
.
logging
.
info
(
'reject a connection from a non-master'
)
logging
.
info
(
'reject a connection from a non-master'
)
raise
NotReadyError
if
node
is
None
:
...
...
neo/master/handlers/storage.py
View file @
5743cdce
...
...
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.protocol
import
CellStates
,
ClusterStates
,
Packets
,
ProtocolError
from
neo.lib.exception
import
OperationFailure
from
neo.lib.util
import
dump
...
...
@@ -37,7 +37,7 @@ class StorageServiceHandler(BaseServiceHandler):
conn
.
notify
(
Packets
.
StartOperation
())
def
nodeLost
(
self
,
conn
,
node
):
neo
.
lib
.
logging
.
info
(
'storage node lost'
)
logging
.
info
(
'storage node lost'
)
assert
not
node
.
isRunning
(),
node
.
getState
()
app
=
self
.
app
app
.
broadcastPartitionChanges
(
app
.
pt
.
outdate
(
node
))
...
...
@@ -96,7 +96,7 @@ class StorageServiceHandler(BaseServiceHandler):
raise
ProtocolError
(
'Non-oudated partition'
)
except
PartitionTableException
,
e
:
raise
ProtocolError
(
str
(
e
))
neo
.
lib
.
logging
.
debug
(
"%s is up for offset %s"
,
node
,
offset
)
logging
.
debug
(
"%s is up for offset %s"
,
node
,
offset
)
self
.
app
.
broadcastPartitionChanges
(
cell_list
)
def
answerTruncate
(
self
,
conn
):
...
...
neo/master/recovery.py
View file @
5743cdce
...
...
@@ -16,7 +16,7 @@
from
struct
import
pack
import
neo
from
neo.lib
import
logging
from
neo.lib.util
import
dump
from
neo.lib.protocol
import
Packets
,
ProtocolError
,
ClusterStates
,
NodeStates
from
neo.lib.protocol
import
NotReadyError
,
ZERO_OID
,
ZERO_TID
...
...
@@ -50,7 +50,7 @@ class RecoveryManager(MasterHandler):
back the latest partition table or make a new table from scratch,
if this is the first time.
"""
neo
.
lib
.
logging
.
info
(
'begin the recovery of the status'
)
logging
.
info
(
'begin the recovery of the status'
)
app
=
self
.
app
pt
=
app
.
pt
app
.
changeClusterState
(
ClusterStates
.
RECOVERING
)
...
...
@@ -86,14 +86,14 @@ class RecoveryManager(MasterHandler):
node_list
=
node_list
()
break
neo
.
lib
.
logging
.
info
(
'startup allowed'
)
logging
.
info
(
'startup allowed'
)
for
node
in
node_list
:
node
.
setRunning
()
app
.
broadcastNodesInformation
(
node_list
)
if
pt
.
getID
()
is
None
:
neo
.
lib
.
logging
.
info
(
'creating a new partition table'
)
logging
.
info
(
'creating a new partition table'
)
# reset IDs generators & build new partition with running nodes
app
.
tm
.
setLastOID
(
ZERO_OID
)
pt
.
make
(
node_list
)
...
...
@@ -103,9 +103,8 @@ class RecoveryManager(MasterHandler):
app
.
backup_tid
=
pt
.
getBackupTid
()
app
.
setLastTransaction
(
app
.
tm
.
getLastTID
())
neo
.
lib
.
logging
.
debug
(
'cluster starts with loid=%s and this partition '
\
'table :'
,
dump
(
app
.
tm
.
getLastOID
()))
logging
.
debug
(
'cluster starts with loid=%s and this partition table :'
,
dump
(
app
.
tm
.
getLastOID
()))
pt
.
log
()
def
connectionLost
(
self
,
conn
,
new_state
):
...
...
@@ -136,7 +135,7 @@ class RecoveryManager(MasterHandler):
def
answerPartitionTable
(
self
,
conn
,
ptid
,
row_list
):
if
ptid
!=
self
.
target_ptid
:
# If this is not from a target node, ignore it.
neo
.
lib
.
logging
.
warn
(
'Got %s while waiting %s'
,
dump
(
ptid
),
logging
.
warn
(
'Got %s while waiting %s'
,
dump
(
ptid
),
dump
(
self
.
target_ptid
))
else
:
self
.
_broadcastPartitionTable
(
ptid
,
row_list
)
...
...
neo/master/transactions.py
View file @
5743cdce
...
...
@@ -18,8 +18,8 @@ from time import time, gmtime
from
struct
import
pack
,
unpack
from
neo.lib.protocol
import
ZERO_TID
from
datetime
import
timedelta
,
datetime
from
neo.lib
import
logging
from
neo.lib.util
import
dump
,
u64
,
p64
import
neo.lib
TID_LOW_OVERFLOW
=
2
**
32
TID_LOW_MAX
=
TID_LOW_OVERFLOW
-
1
...
...
@@ -374,7 +374,7 @@ class TransactionManager(object):
txn
=
Transaction
(
node
,
ttid
)
self
.
_ttid_dict
[
ttid
]
=
txn
self
.
_node_dict
.
setdefault
(
node
,
{})[
ttid
]
=
txn
neo
.
lib
.
logging
.
debug
(
'Begin %s'
,
txn
)
logging
.
debug
(
'Begin %s'
,
txn
)
return
ttid
def
prepare
(
self
,
ttid
,
divisor
,
oid_list
,
uuid_list
,
msg_id
):
...
...
@@ -390,7 +390,7 @@ class TransactionManager(object):
else
:
tid
=
self
.
_nextTID
(
ttid
,
divisor
)
self
.
_queue
.
append
((
node
.
getUUID
(),
ttid
))
neo
.
lib
.
logging
.
debug
(
'Finish TXN %s for %s (was %s)'
,
logging
.
debug
(
'Finish TXN %s for %s (was %s)'
,
dump
(
tid
),
node
,
dump
(
ttid
))
txn
.
prepare
(
tid
,
oid_list
,
uuid_list
,
msg_id
)
return
tid
...
...
@@ -399,7 +399,7 @@ class TransactionManager(object):
"""
Remove a transaction, commited or aborted
"""
neo
.
lib
.
logging
.
debug
(
'Remove TXN %s'
,
dump
(
ttid
))
logging
.
debug
(
'Remove TXN %s'
,
dump
(
ttid
))
try
:
# only in case of an import:
self
.
_queue
.
remove
((
uuid
,
ttid
))
...
...
@@ -420,7 +420,7 @@ class TransactionManager(object):
If transaction is completely locked, calls function given at
instanciation time.
"""
neo
.
lib
.
logging
.
debug
(
'Lock TXN %s for %s'
,
dump
(
ttid
),
dump
(
uuid
))
logging
.
debug
(
'Lock TXN %s for %s'
,
dump
(
ttid
),
dump
(
uuid
))
assert
ttid
in
self
.
_ttid_dict
,
"Transaction not started"
txn
=
self
.
_ttid_dict
[
ttid
]
if
txn
.
lock
(
uuid
)
and
self
.
_queue
[
0
][
1
]
==
ttid
:
...
...
@@ -461,7 +461,7 @@ class TransactionManager(object):
"""
Abort pending transactions initiated by a node
"""
neo
.
lib
.
logging
.
debug
(
'Abort TXN for %s'
,
node
)
logging
.
debug
(
'Abort TXN for %s'
,
node
)
uuid
=
node
.
getUUID
()
# XXX: this loop is usefull only during an import
for
nuuid
,
ntid
in
list
(
self
.
_queue
):
...
...
@@ -477,7 +477,7 @@ class TransactionManager(object):
del
self
.
_node_dict
[
node
]
def
log
(
self
):
neo
.
lib
.
logging
.
info
(
'Transactions:'
)
logging
.
info
(
'Transactions:'
)
for
txn
in
self
.
_ttid_dict
.
itervalues
():
neo
.
lib
.
logging
.
info
(
' %r'
,
txn
)
logging
.
info
(
' %r'
,
txn
)
neo/master/verification.py
View file @
5743cdce
...
...
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
neo.lib
import
logging
from
neo.lib.util
import
dump
from
neo.lib.protocol
import
ClusterStates
,
Packets
,
NodeStates
from
.handlers
import
BaseServiceHandler
...
...
@@ -115,13 +115,13 @@ class VerificationManager(BaseServiceHandler):
app
=
self
.
app
# wait for any missing node
neo
.
lib
.
logging
.
debug
(
'waiting for the cluster to be operational'
)
logging
.
debug
(
'waiting for the cluster to be operational'
)
while
not
app
.
pt
.
operational
():
app
.
em
.
poll
(
1
)
if
app
.
backup_tid
:
return
neo
.
lib
.
logging
.
info
(
'start to verify data'
)
logging
.
info
(
'start to verify data'
)
getIdentifiedList
=
app
.
nm
.
getIdentifiedList
# Gather all unfinished transactions.
...
...
@@ -197,7 +197,7 @@ class VerificationManager(BaseServiceHandler):
def
answerUnfinishedTransactions
(
self
,
conn
,
max_tid
,
tid_list
):
uuid
=
conn
.
getUUID
()
neo
.
lib
.
logging
.
info
(
'got unfinished transactions %s from %r'
,
logging
.
info
(
'got unfinished transactions %s from %r'
,
map
(
dump
,
tid_list
),
conn
)
if
not
self
.
_gotAnswerFrom
(
uuid
):
return
...
...
@@ -222,19 +222,19 @@ class VerificationManager(BaseServiceHandler):
def
tidNotFound
(
self
,
conn
,
message
):
uuid
=
conn
.
getUUID
()
neo
.
lib
.
logging
.
info
(
'TID not found: %s'
,
message
)
logging
.
info
(
'TID not found: %s'
,
message
)
if
not
self
.
_gotAnswerFrom
(
uuid
):
return
self
.
_oid_set
=
None
def
answerObjectPresent
(
self
,
conn
,
oid
,
tid
):
uuid
=
conn
.
getUUID
()
neo
.
lib
.
logging
.
info
(
'object %s:%s found'
,
dump
(
oid
),
dump
(
tid
))
logging
.
info
(
'object %s:%s found'
,
dump
(
oid
),
dump
(
tid
))
self
.
_gotAnswerFrom
(
uuid
)
def
oidNotFound
(
self
,
conn
,
message
):
uuid
=
conn
.
getUUID
()
neo
.
lib
.
logging
.
info
(
'OID not found: %s'
,
message
)
logging
.
info
(
'OID not found: %s'
,
message
)
app
=
self
.
app
if
not
self
.
_gotAnswerFrom
(
uuid
):
return
...
...
neo/storage/app.py
View file @
5743cdce
...
...
@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
import
sys
from
collections
import
deque
from
neo.lib
import
logging
from
neo.lib.protocol
import
NodeTypes
,
CellStates
,
Packets
from
neo.lib.node
import
NodeManager
from
neo.lib.event
import
EventManager
...
...
@@ -60,7 +60,7 @@ class Application(object):
# set the bind address
self
.
server
=
config
.
getBind
()
neo
.
lib
.
logging
.
debug
(
'IP address is %s, port is %d'
,
*
(
self
.
server
)
)
logging
.
debug
(
'IP address is %s, port is %d'
,
*
self
.
server
)
# The partition table is initialized after getting the number of
# partitions.
...
...
@@ -137,12 +137,12 @@ class Application(object):
# create a partition table
self
.
pt
=
PartitionTable
(
num_partitions
,
num_replicas
)
neo
.
lib
.
logging
.
info
(
'Configuration loaded:'
)
neo
.
lib
.
logging
.
info
(
'UUID : %s'
,
dump
(
self
.
uuid
))
neo
.
lib
.
logging
.
info
(
'PTID : %s'
,
dump
(
ptid
))
neo
.
lib
.
logging
.
info
(
'Name : %s'
,
self
.
name
)
neo
.
lib
.
logging
.
info
(
'Partitions: %s'
,
num_partitions
)
neo
.
lib
.
logging
.
info
(
'Replicas : %s'
,
num_replicas
)
logging
.
info
(
'Configuration loaded:'
)
logging
.
info
(
'UUID : %s'
,
dump
(
self
.
uuid
))
logging
.
info
(
'PTID : %s'
,
dump
(
ptid
))
logging
.
info
(
'Name : %s'
,
self
.
name
)
logging
.
info
(
'Partitions: %s'
,
num_partitions
)
logging
.
info
(
'Replicas : %s'
,
num_replicas
)
def
loadPartitionTable
(
self
):
"""Load a partition table from the database."""
...
...
@@ -164,7 +164,7 @@ class Application(object):
try
:
self
.
_run
()
except
:
neo
.
lib
.
logging
.
exception
(
'Pre-mortem data:'
)
logging
.
exception
(
'Pre-mortem data:'
)
self
.
log
()
raise
...
...
@@ -205,9 +205,9 @@ class Application(object):
self
.
doOperation
()
raise
RuntimeError
,
'should not reach here'
except
OperationFailure
,
msg
:
neo
.
lib
.
logging
.
error
(
'operation stopped: %s'
,
msg
)
logging
.
error
(
'operation stopped: %s'
,
msg
)
except
PrimaryFailure
,
msg
:
neo
.
lib
.
logging
.
error
(
'primary master is down: %s'
,
msg
)
logging
.
error
(
'primary master is down: %s'
,
msg
)
finally
:
self
.
checker
=
Checker
(
self
)
...
...
@@ -233,7 +233,7 @@ class Application(object):
(
node
,
conn
,
uuid
,
num_partitions
,
num_replicas
)
=
data
self
.
master_node
=
node
self
.
master_conn
=
conn
neo
.
lib
.
logging
.
info
(
'I am %s'
,
dump
(
uuid
))
logging
.
info
(
'I am %s'
,
dump
(
uuid
))
self
.
uuid
=
uuid
self
.
dm
.
setUUID
(
uuid
)
...
...
@@ -255,7 +255,7 @@ class Application(object):
def
verifyData
(
self
):
"""Verify data under the control by a primary master node.
Connections from client nodes may not be accepted at this stage."""
neo
.
lib
.
logging
.
info
(
'verifying data'
)
logging
.
info
(
'verifying data'
)
handler
=
verification
.
VerificationHandler
(
self
)
self
.
master_conn
.
setHandler
(
handler
)
...
...
@@ -266,7 +266,7 @@ class Application(object):
def
initialize
(
self
):
""" Retreive partition table and node informations from the primary """
neo
.
lib
.
logging
.
debug
(
'initializing...'
)
logging
.
debug
(
'initializing...'
)
_poll
=
self
.
_poll
handler
=
initialization
.
InitializationHandler
(
self
)
self
.
master_conn
.
setHandler
(
handler
)
...
...
@@ -284,7 +284,7 @@ class Application(object):
def
doOperation
(
self
):
"""Handle everything, including replications and transactions."""
neo
.
lib
.
logging
.
info
(
'doing operation'
)
logging
.
info
(
'doing operation'
)
_poll
=
self
.
_poll
isIdle
=
self
.
em
.
isIdle
...
...
@@ -314,7 +314,7 @@ class Application(object):
def
wait
(
self
):
# change handler
neo
.
lib
.
logging
.
info
(
"waiting in hidden state"
)
logging
.
info
(
"waiting in hidden state"
)
_poll
=
self
.
_poll
handler
=
hidden
.
HiddenHandler
(
self
)
for
conn
in
self
.
em
.
getConnectionList
():
...
...
@@ -360,9 +360,9 @@ class Application(object):
def
logQueuedEvents
(
self
):
if
self
.
event_queue
is
None
:
return
neo
.
lib
.
logging
.
info
(
"Pending events:"
)
logging
.
info
(
"Pending events:"
)
for
key
,
event
,
_msg_id
,
_conn
,
args
in
self
.
event_queue
:
neo
.
lib
.
logging
.
info
(
' %r:%r: %r:%r %r %r'
,
key
,
event
.
__name__
,
logging
.
info
(
' %r:%r: %r:%r %r %r'
,
key
,
event
.
__name__
,
_msg_id
,
_conn
,
args
)
def
newTask
(
self
,
iterator
):
...
...
@@ -386,5 +386,5 @@ class Application(object):
pass
# clear database to avoid polluting the cluster at restart
self
.
dm
.
setup
(
reset
=
erase
)
neo
.
lib
.
logging
.
info
(
"Application has been asked to shut down"
)
logging
.
info
(
"Application has been asked to shut down"
)
sys
.
exit
()
neo/storage/checker.py
View file @
5743cdce
...
...
@@ -16,7 +16,7 @@
from
collections
import
deque
from
functools
import
wraps
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.connection
import
ClientConnection
from
neo.lib.connector
import
ConnectorConnectionClosedException
from
neo.lib.protocol
import
NodeTypes
,
Packets
,
ZERO_OID
...
...
@@ -92,14 +92,13 @@ class Checker(object):
finally
:
conn_set
.
update
(
self
.
conn_dict
)
self
.
conn_dict
.
clear
()
neo
.
lib
.
logging
.
error
(
"Failed to start checking partition %u (%s)"
,
logging
.
error
(
"Failed to start checking partition %u (%s)"
,
partition
,
msg
)
conn_set
.
difference_update
(
self
.
conn_dict
)
finally
:
for
conn
in
conn_set
:
app
.
closeClient
(
conn
)
neo
.
lib
.
logging
.
debug
(
"start checking partition %u from %s to %s"
,
logging
.
debug
(
"start checking partition %u from %s to %s"
,
partition
,
dump
(
min_tid
),
dump
(
max_tid
))
self
.
min_tid
=
self
.
next_tid
=
min_tid
self
.
max_tid
=
max_tid
...
...
@@ -126,10 +125,10 @@ class Checker(object):
if
self
.
source
is
not
None
and
self
.
source
.
getConnection
()
is
conn
:
del
self
.
source
elif
len
(
self
.
conn_dict
)
>
1
:
neo
.
lib
.
logging
.
warning
(
"node lost but keep up checking partition"
" %u"
,
self
.
partition
)
logging
.
warning
(
"node lost but keep up checking partition %u"
,
self
.
partition
)
return
neo
.
lib
.
logging
.
warning
(
"check of partition %u aborted"
,
self
.
partition
)
logging
.
warning
(
"check of partition %u aborted"
,
self
.
partition
)
self
.
_nextPartition
()
def
_nextRange
(
self
):
...
...
@@ -149,7 +148,7 @@ class Checker(object):
if
self
.
conn_dict
.
get
(
conn
,
self
)
!=
conn
.
getPeerId
():
# Ignore answers to old requests,
# because we did nothing to cancel them.
neo
.
lib
.
logging
.
info
(
"ignored AnswerCheck*Range%r"
,
args
)
logging
.
info
(
"ignored AnswerCheck*Range%r"
,
args
)
return
self
.
conn_dict
[
conn
]
=
args
answer_set
=
set
(
self
.
conn_dict
.
itervalues
())
...
...
@@ -177,8 +176,7 @@ class Checker(object):
p
=
Packets
.
NotifyPartitionCorrupted
(
self
.
partition
,
uuid_list
)
self
.
app
.
master_conn
.
notify
(
p
)
if
len
(
self
.
conn_dict
)
<=
1
:
neo
.
lib
.
logging
.
warning
(
"check of partition %u aborted"
,
self
.
partition
)
logging
.
warning
(
"check of partition %u aborted"
,
self
.
partition
)
self
.
queue
.
clear
()
self
.
_nextPartition
()
return
...
...
@@ -187,7 +185,7 @@ class Checker(object):
except
ValueError
:
count
,
_
,
self
.
next_tid
,
_
,
max_oid
=
args
if
count
<
CHECK_COUNT
:
neo
.
lib
.
logging
.
debug
(
"partition %u checked from %s to %s"
,
logging
.
debug
(
"partition %u checked from %s to %s"
,
self
.
partition
,
dump
(
self
.
min_tid
),
dump
(
self
.
max_tid
))
self
.
_nextPartition
()
return
...
...
neo/storage/database/manager.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
util
from
neo.lib
import
logging
,
util
from
neo.lib.exception
import
DatabaseFailure
from
neo.lib.protocol
import
ZERO_TID
...
...
@@ -376,7 +375,7 @@ class DatabaseManager(object):
"""
if
self
.
__class__
not
in
self
.
__getDataTID
:
self
.
__getDataTID
.
add
(
self
.
__class__
)
neo
.
lib
.
logging
.
warning
(
"Fallback to generic/slow implementation"
logging
.
warning
(
"Fallback to generic/slow implementation"
" of _getDataTID. It should be overriden by backend storage."
)
r
=
self
.
_getObject
(
oid
,
tid
,
before_tid
)
if
r
:
...
...
@@ -431,9 +430,9 @@ class DatabaseManager(object):
" oid %d at tid %d: reference = %d"
%
(
oid
,
value_serial
,
tid
))
if
value_serial
!=
getDataTID
(
value_serial
)[
1
]:
neo
.
lib
.
logging
.
warning
(
"Multiple levels of indirection"
logging
.
warning
(
"Multiple levels of indirection"
" when getting data serial for oid %d at tid %d."
" This causes suboptimal performance."
%
(
oid
,
tid
)
)
" This causes suboptimal performance."
,
oid
,
tid
)
return
tid
,
value_serial
if
transaction_object
:
current_tid
=
current_data_tid
=
u64
(
transaction_object
[
2
])
...
...
neo/storage/database/mysqldb.py
View file @
5743cdce
...
...
@@ -19,7 +19,6 @@ import MySQLdb
from
MySQLdb
import
IntegrityError
,
OperationalError
from
MySQLdb.constants.CR
import
SERVER_GONE_ERROR
,
SERVER_LOST
from
MySQLdb.constants.ER
import
DUP_ENTRY
import
neo.lib
from
array
import
array
from
hashlib
import
sha1
import
re
...
...
@@ -28,9 +27,9 @@ import time
from
.
import
DatabaseManager
,
LOG_QUERIES
from
.manager
import
CreationUndone
from
neo.lib
import
logging
,
util
from
neo.lib.exception
import
DatabaseFailure
from
neo.lib.protocol
import
CellStates
,
ZERO_OID
,
ZERO_TID
,
ZERO_HASH
from
neo.lib
import
util
def
splitOIDField
(
tid
,
oids
):
if
(
len
(
oids
)
%
8
)
!=
0
or
len
(
oids
)
==
0
:
...
...
@@ -74,8 +73,7 @@ class MySQLDatabaseManager(DatabaseManager):
kwd
[
'passwd'
]
=
self
.
passwd
if
self
.
socket
:
kwd
[
'unix_socket'
]
=
self
.
socket
neo
.
lib
.
logging
.
info
(
'connecting to MySQL on the database %s with user %s'
,
logging
.
info
(
'connecting to MySQL on the database %s with user %s'
,
self
.
db
,
self
.
user
)
if
self
.
_wait
<
0
:
timeout_at
=
None
...
...
@@ -87,8 +85,7 @@ class MySQLDatabaseManager(DatabaseManager):
except
Exception
:
if
timeout_at
is
not
None
and
time
.
time
()
>=
timeout_at
:
raise
neo
.
lib
.
logging
.
exception
(
'Connection to MySQL failed, '
'retrying.'
)
logging
.
exception
(
'Connection to MySQL failed, retrying.'
)
time
.
sleep
(
1
)
else
:
break
...
...
@@ -103,11 +100,11 @@ class MySQLDatabaseManager(DatabaseManager):
if
LOG_QUERIES
:
def
commit
(
self
):
neo
.
lib
.
logging
.
debug
(
'committing...'
)
logging
.
debug
(
'committing...'
)
self
.
conn
.
commit
()
def
rollback
(
self
):
neo
.
lib
.
logging
.
debug
(
'aborting...'
)
logging
.
debug
(
'aborting...'
)
self
.
conn
.
rollback
()
else
:
commit
=
property
(
lambda
self
:
self
.
conn
.
commit
)
...
...
@@ -124,7 +121,7 @@ class MySQLDatabaseManager(DatabaseManager):
c
=
'
\
\
x%02x'
%
ord
(
c
)
printable_char_list
.
append
(
c
)
query_part
=
''
.
join
(
printable_char_list
)
neo
.
lib
.
logging
.
debug
(
'querying %s...'
,
query_part
)
logging
.
debug
(
'querying %s...'
,
query_part
)
conn
.
query
(
query
)
r
=
conn
.
store_result
()
...
...
@@ -141,7 +138,7 @@ class MySQLDatabaseManager(DatabaseManager):
except
OperationalError
,
m
:
if
m
[
0
]
in
(
SERVER_GONE_ERROR
,
SERVER_LOST
):
neo
.
lib
.
logging
.
info
(
'the MySQL server is gone; reconnecting'
)
logging
.
info
(
'the MySQL server is gone; reconnecting'
)
self
.
_connect
()
return
self
.
query
(
query
)
raise
DatabaseFailure
(
'MySQL error %d: %s'
%
(
m
[
0
],
m
[
1
]))
...
...
@@ -575,9 +572,9 @@ class MySQLDatabaseManager(DatabaseManager):
(
self
.
_getPartition
(
oid
),
oid
,
value_serial
))
length
,
value_serial
=
r
[
0
]
if
length
is
None
:
neo
.
lib
.
logging
.
info
(
"Multiple levels of indirection when "
\
"searching for object data for oid %d at tid %d.
This "
\
"
causes suboptimal performance."
%
(
oid
,
value_serial
)
)
logging
.
info
(
"Multiple levels of indirection when "
"searching for object data for oid %d at tid %d.
"
"
This causes suboptimal performance."
,
oid
,
value_serial
)
length
=
self
.
_getObjectLength
(
oid
,
value_serial
)
return
length
...
...
neo/storage/database/sqlite.py
View file @
5743cdce
...
...
@@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
sqlite3
import
neo.lib
from
array
import
array
from
hashlib
import
sha1
import
re
...
...
@@ -23,9 +22,9 @@ import string
from
.
import
DatabaseManager
,
LOG_QUERIES
from
.manager
import
CreationUndone
from
neo.lib
import
logging
,
util
from
neo.lib.exception
import
DatabaseFailure
from
neo.lib.protocol
import
CellStates
,
ZERO_OID
,
ZERO_TID
,
ZERO_HASH
from
neo.lib
import
util
def
splitOIDField
(
tid
,
oids
):
if
(
len
(
oids
)
%
8
)
!=
0
or
len
(
oids
)
==
0
:
...
...
@@ -58,7 +57,7 @@ class SQLiteDatabaseManager(DatabaseManager):
self
.
conn
.
close
()
def
_connect
(
self
):
neo
.
lib
.
logging
.
info
(
'connecting to SQLite database %r'
,
self
.
db
)
logging
.
info
(
'connecting to SQLite database %r'
,
self
.
db
)
self
.
conn
=
sqlite3
.
connect
(
self
.
db
,
isolation_level
=
None
,
check_same_thread
=
False
)
...
...
@@ -69,11 +68,11 @@ class SQLiteDatabaseManager(DatabaseManager):
if
LOG_QUERIES
:
def
commit
(
self
):
neo
.
lib
.
logging
.
debug
(
'committing...'
)
logging
.
debug
(
'committing...'
)
self
.
conn
.
commit
()
def
rollback
(
self
):
neo
.
lib
.
logging
.
debug
(
'aborting...'
)
logging
.
debug
(
'aborting...'
)
self
.
conn
.
rollback
()
def
query
(
self
,
query
):
...
...
@@ -82,8 +81,7 @@ class SQLiteDatabaseManager(DatabaseManager):
if
c
not
in
string
.
printable
or
c
in
'
\
t
\
x0b
\
x0c
\
r
'
:
c
=
'
\
\
x%02x'
%
ord
(
c
)
printable_char_list
.
append
(
c
)
neo
.
lib
.
logging
.
debug
(
'querying %s...'
,
''
.
join
(
printable_char_list
))
logging
.
debug
(
'querying %s...'
,
''
.
join
(
printable_char_list
))
return
self
.
conn
.
execute
(
query
)
else
:
commit
=
property
(
lambda
self
:
self
.
conn
.
commit
)
...
...
@@ -480,7 +478,7 @@ class SQLiteDatabaseManager(DatabaseManager):
WHERE partition=? AND oid=? AND tid=?"""
,
(
self
.
_getPartition
(
oid
),
oid
,
value_serial
)).
fetchone
()
if
length
is
None
:
neo
.
lib
.
logging
.
info
(
"Multiple levels of indirection"
logging
.
info
(
"Multiple levels of indirection"
" when searching for object data for oid %d at tid %d."
" This causes suboptimal performance."
,
oid
,
value_serial
)
length
=
self
.
_getObjectLength
(
oid
,
value_serial
)
...
...
neo/storage/handlers/__init__.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
neo.lib
import
logging
from
neo.lib.handler
import
EventHandler
from
neo.lib.util
import
dump
from
neo.lib.exception
import
PrimaryFailure
,
OperationFailure
...
...
@@ -35,7 +34,7 @@ class BaseMasterHandler(EventHandler):
raise
PrimaryFailure
(
're-election occurs'
)
def
notifyClusterInformation
(
self
,
conn
,
state
):
neo
.
lib
.
logging
.
warning
(
'ignoring notify cluster information in %s'
,
logging
.
warning
(
'ignoring notify cluster information in %s'
,
self
.
__class__
.
__name__
)
def
notifyLastOID
(
self
,
conn
,
oid
):
...
...
@@ -48,7 +47,7 @@ class BaseMasterHandler(EventHandler):
for
node_type
,
addr
,
uuid
,
state
in
node_list
:
if
uuid
==
self
.
app
.
uuid
:
# This is me, do what the master tell me
neo
.
lib
.
logging
.
info
(
"I was told I'm %s"
,
state
)
logging
.
info
(
"I was told I'm %s"
,
state
)
if
state
in
(
NodeStates
.
DOWN
,
NodeStates
.
TEMPORARILY_DOWN
,
NodeStates
.
BROKEN
):
erase
=
state
==
NodeStates
.
DOWN
...
...
@@ -56,8 +55,7 @@ class BaseMasterHandler(EventHandler):
elif
state
==
NodeStates
.
HIDDEN
:
raise
OperationFailure
elif
node_type
==
NodeTypes
.
CLIENT
and
state
!=
NodeStates
.
RUNNING
:
neo
.
lib
.
logging
.
info
(
'Notified of non-running client, abort (%r)'
,
logging
.
info
(
'Notified of non-running client, abort (%r)'
,
dump
(
uuid
))
self
.
app
.
tm
.
abortFor
(
uuid
)
...
...
neo/storage/handlers/client.py
View file @
5743cdce
...
...
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.handler
import
EventHandler
from
neo.lib.util
import
dump
,
makeChecksum
from
neo.lib.protocol
import
Packets
,
LockState
,
Errors
,
ProtocolError
,
\
...
...
@@ -46,14 +46,14 @@ class ClientOperationHandler(EventHandler):
return
o
=
app
.
dm
.
getObject
(
oid
,
serial
,
tid
)
if
o
is
None
:
neo
.
lib
.
logging
.
debug
(
'oid = %s does not exist'
,
dump
(
oid
))
logging
.
debug
(
'oid = %s does not exist'
,
dump
(
oid
))
p
=
Errors
.
OidDoesNotExist
(
dump
(
oid
))
elif
o
is
False
:
neo
.
lib
.
logging
.
debug
(
'oid = %s not found'
,
dump
(
oid
))
logging
.
debug
(
'oid = %s not found'
,
dump
(
oid
))
p
=
Errors
.
OidNotFound
(
dump
(
oid
))
else
:
serial
,
next_serial
,
compression
,
checksum
,
data
,
data_serial
=
o
neo
.
lib
.
logging
.
debug
(
'oid = %s, serial = %s, next_serial = %s'
,
logging
.
debug
(
'oid = %s, serial = %s, next_serial = %s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
next_serial
))
if
checksum
is
None
:
checksum
=
ZERO_HASH
...
...
@@ -81,7 +81,7 @@ class ClientOperationHandler(EventHandler):
data_serial
,
ttid
,
unlock
,
request_time
):
if
ttid
not
in
self
.
app
.
tm
:
# transaction was aborted, cancel this event
neo
.
lib
.
logging
.
info
(
'Forget store of %s:%s by %s delayed by %s'
,
logging
.
info
(
'Forget store of %s:%s by %s delayed by %s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
),
dump
(
self
.
app
.
tm
.
getLockingTID
(
oid
)))
# send an answer as the client side is waiting for it
...
...
@@ -109,7 +109,7 @@ class ClientOperationHandler(EventHandler):
if
SLOW_STORE
is
not
None
:
duration
=
time
.
time
()
-
request_time
if
duration
>
SLOW_STORE
:
neo
.
lib
.
logging
.
info
(
'StoreObject delay: %.02fs'
,
duration
)
logging
.
info
(
'StoreObject delay: %.02fs'
,
duration
)
conn
.
answer
(
Packets
.
AnswerStoreObject
(
0
,
oid
,
serial
))
def
askStoreObject
(
self
,
conn
,
oid
,
serial
,
...
...
@@ -162,8 +162,7 @@ class ClientOperationHandler(EventHandler):
def
askHasLock
(
self
,
conn
,
ttid
,
oid
):
locking_tid
=
self
.
app
.
tm
.
getLockingTID
(
oid
)
neo
.
lib
.
logging
.
info
(
'%r check lock of %r:%r'
,
conn
,
dump
(
ttid
),
dump
(
oid
))
logging
.
info
(
'%r check lock of %r:%r'
,
conn
,
dump
(
ttid
),
dump
(
oid
))
if
locking_tid
is
None
:
state
=
LockState
.
NOT_LOCKED
elif
locking_tid
is
ttid
:
...
...
@@ -190,9 +189,8 @@ class ClientOperationHandler(EventHandler):
def
_askCheckCurrentSerial
(
self
,
conn
,
ttid
,
serial
,
oid
,
request_time
):
if
ttid
not
in
self
.
app
.
tm
:
# transaction was aborted, cancel this event
neo
.
lib
.
logging
.
info
(
'Forget serial check of %s:%s by %s delayed by '
'%s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
),
logging
.
info
(
'Forget serial check of %s:%s by %s delayed by %s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
),
dump
(
self
.
app
.
tm
.
getLockingTID
(
oid
)))
# send an answer as the client side is waiting for it
conn
.
answer
(
Packets
.
AnswerStoreObject
(
0
,
oid
,
serial
))
...
...
@@ -214,7 +212,6 @@ class ClientOperationHandler(EventHandler):
if
SLOW_STORE
is
not
None
:
duration
=
time
.
time
()
-
request_time
if
duration
>
SLOW_STORE
:
neo
.
lib
.
logging
.
info
(
'CheckCurrentSerial delay: %.02fs'
,
duration
)
logging
.
info
(
'CheckCurrentSerial delay: %.02fs'
,
duration
)
conn
.
answer
(
Packets
.
AnswerCheckCurrentSerial
(
0
,
oid
,
serial
))
neo/storage/handlers/hidden.py
View file @
5743cdce
...
...
@@ -14,9 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
.
import
BaseMasterHandler
from
neo.lib
import
logging
from
neo.lib.protocol
import
CellStates
class
HiddenHandler
(
BaseMasterHandler
):
...
...
@@ -28,7 +27,7 @@ class HiddenHandler(BaseMasterHandler):
app
=
self
.
app
if
ptid
<=
app
.
pt
.
getID
():
# Ignore this packet.
neo
.
lib
.
logging
.
debug
(
'ignoring older partition changes'
)
logging
.
debug
(
'ignoring older partition changes'
)
return
# update partition table in memory and the database
...
...
neo/storage/handlers/identification.py
View file @
5743cdce
...
...
@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.handler
import
EventHandler
from
neo.lib.protocol
import
NodeTypes
,
Packets
,
NotReadyError
from
neo.lib.protocol
import
ProtocolError
,
BrokenNodeDisallowedError
...
...
@@ -27,7 +26,7 @@ class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """
def
connectionLost
(
self
,
conn
,
new_state
):
neo
.
lib
.
logging
.
warning
(
'A connection was lost during identification'
)
logging
.
warning
(
'A connection was lost during identification'
)
def
requestIdentification
(
self
,
conn
,
node_type
,
uuid
,
address
,
name
):
...
...
@@ -60,7 +59,7 @@ class IdentificationHandler(EventHandler):
node
.
setRunning
()
elif
node_type
==
NodeTypes
.
STORAGE
:
if
node
is
None
:
neo
.
lib
.
logging
.
error
(
'reject an unknown storage node %s'
,
logging
.
error
(
'reject an unknown storage node %s'
,
dump
(
uuid
))
raise
NotReadyError
handler
=
StorageOperationHandler
...
...
neo/storage/handlers/initialization.py
View file @
5743cdce
...
...
@@ -14,10 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
.
import
BaseMasterHandler
from
neo.lib
import
protocol
from
neo.lib
import
logging
,
protocol
class
InitializationHandler
(
BaseMasterHandler
):
...
...
@@ -30,7 +28,7 @@ class InitializationHandler(BaseMasterHandler):
pt
.
load
(
ptid
,
row_list
,
self
.
app
.
nm
)
if
not
pt
.
filled
():
raise
protocol
.
ProtocolError
(
'Partial partition table received'
)
neo
.
lib
.
logging
.
debug
(
'Got the partition table
:'
)
logging
.
debug
(
'Got the partition table
:'
)
self
.
app
.
pt
.
log
()
# Install the partition table into the database for persistency.
cell_list
=
[]
...
...
@@ -43,7 +41,7 @@ class InitializationHandler(BaseMasterHandler):
unassigned_set
.
remove
(
offset
)
# delete objects database
if
unassigned_set
:
neo
.
lib
.
logging
.
debug
(
'drop data for partitions %r'
,
unassigned_set
)
logging
.
debug
(
'drop data for partitions %r'
,
unassigned_set
)
app
.
dm
.
dropPartitions
(
unassigned_set
)
app
.
dm
.
setPartitionTable
(
ptid
,
cell_list
)
...
...
@@ -60,5 +58,4 @@ class InitializationHandler(BaseMasterHandler):
# packets in between (or even before asking for node information).
# - this handler will be changed after receiving answerPartitionTable
# and before handling the next packet
neo
.
lib
.
logging
.
debug
(
'ignoring notifyPartitionChanges during '
\
'initialization'
)
logging
.
debug
(
'ignoring notifyPartitionChanges during initialization'
)
neo/storage/handlers/master.py
View file @
5743cdce
...
...
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.util
import
dump
from
neo.lib.protocol
import
CellStates
,
Packets
,
ProtocolError
from
.
import
BaseMasterHandler
...
...
@@ -32,7 +32,7 @@ class MasterOperationHandler(BaseMasterHandler):
app
=
self
.
app
if
ptid
<=
app
.
pt
.
getID
():
# Ignore this packet.
neo
.
lib
.
logging
.
debug
(
'ignoring older partition changes'
)
logging
.
debug
(
'ignoring older partition changes'
)
return
# update partition table in memory and the database
...
...
@@ -57,9 +57,9 @@ class MasterOperationHandler(BaseMasterHandler):
def
askPack
(
self
,
conn
,
tid
):
app
=
self
.
app
neo
.
lib
.
logging
.
info
(
'Pack started, up to %s...'
,
dump
(
tid
))
logging
.
info
(
'Pack started, up to %s...'
,
dump
(
tid
))
app
.
dm
.
pack
(
tid
,
app
.
tm
.
updateObjectDataForPack
)
neo
.
lib
.
logging
.
info
(
'Pack finished.'
)
logging
.
info
(
'Pack finished.'
)
if
not
conn
.
isClosed
():
conn
.
answer
(
Packets
.
AnswerPack
(
True
))
...
...
neo/storage/handlers/verification.py
View file @
5743cdce
...
...
@@ -14,9 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
neo
from
.
import
BaseMasterHandler
from
neo.lib
import
logging
from
neo.lib.protocol
import
Packets
,
Errors
,
ProtocolError
,
INVALID_TID
from
neo.lib.util
import
dump
from
neo.lib.exception
import
OperationFailure
...
...
@@ -42,7 +41,7 @@ class VerificationHandler(BaseMasterHandler):
app
=
self
.
app
if
ptid
<=
app
.
pt
.
getID
():
# Ignore this packet.
neo
.
lib
.
logging
.
debug
(
'ignoring older partition changes'
)
logging
.
debug
(
'ignoring older partition changes'
)
return
# update partition table in memory and the database
app
.
pt
.
update
(
ptid
,
cell_list
,
app
.
nm
)
...
...
neo/storage/replicator.py
View file @
5743cdce
...
...
@@ -52,7 +52,7 @@ TODO: Packing and replication currently fail when then happen at the same time.
import
random
import
neo.lib
from
neo.lib
import
bootstrap
,
logging
from
neo.lib.protocol
import
CellStates
,
NodeTypes
,
NodeStates
,
Packets
,
\
INVALID_TID
,
ZERO_TID
,
ZERO_OID
from
neo.lib.connection
import
ClientConnection
...
...
@@ -251,7 +251,7 @@ class Replicator(object):
return
self
.
abort
()
min_tid
=
p
.
next_trans
self
.
replicate_tid
=
self
.
replicate_dict
.
pop
(
offset
)
neo
.
lib
.
logging
.
debug
(
"starting replication of <partition=%u"
logging
.
debug
(
"starting replication of <partition=%u"
" min_tid=%s max_tid=%s> from %r"
,
offset
,
dump
(
min_tid
),
dump
(
self
.
replicate_tid
),
self
.
current_node
)
max_tid
=
self
.
replicate_tid
...
...
@@ -296,7 +296,7 @@ class Replicator(object):
if
not
p
.
max_ttid
:
p
=
Packets
.
NotifyReplicationDone
(
offset
,
tid
)
self
.
app
.
master_conn
.
notify
(
p
)
neo
.
lib
.
logging
.
debug
(
"partition %u replicated up to %s from %r"
,
logging
.
debug
(
"partition %u replicated up to %s from %r"
,
offset
,
dump
(
tid
),
self
.
current_node
)
self
.
_nextPartition
()
...
...
@@ -305,7 +305,7 @@ class Replicator(object):
if
offset
is
None
:
return
del
self
.
current_partition
neo
.
lib
.
logging
.
warning
(
'replication aborted for partition %u%s'
,
logging
.
warning
(
'replication aborted for partition %u%s'
,
offset
,
message
and
' (%s)'
%
message
)
if
self
.
app
.
master_node
is
None
:
return
...
...
neo/storage/transactions.py
View file @
5743cdce
...
...
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from
time
import
time
import
neo.lib
from
neo.lib
import
logging
from
neo.lib.util
import
dump
from
neo.lib.protocol
import
ZERO_TID
...
...
@@ -148,7 +148,7 @@ class TransactionManager(object):
"""
Register a transaction, it may be already registered
"""
neo
.
lib
.
logging
.
debug
(
'Register TXN %s for %s'
,
dump
(
ttid
),
dump
(
uuid
))
logging
.
debug
(
'Register TXN %s for %s'
,
dump
(
ttid
),
dump
(
uuid
))
transaction
=
self
.
_transaction_dict
.
get
(
ttid
,
None
)
if
transaction
is
None
:
transaction
=
Transaction
(
uuid
,
ttid
)
...
...
@@ -179,7 +179,7 @@ class TransactionManager(object):
"""
Lock a transaction
"""
neo
.
lib
.
logging
.
debug
(
'Lock TXN %s (ttid=%s)'
,
dump
(
tid
),
dump
(
ttid
))
logging
.
debug
(
'Lock TXN %s (ttid=%s)'
,
dump
(
tid
),
dump
(
ttid
))
transaction
=
self
.
_transaction_dict
[
ttid
]
# remember that the transaction has been locked
transaction
.
lock
()
...
...
@@ -208,7 +208,7 @@ class TransactionManager(object):
"""
Unlock transaction
"""
neo
.
lib
.
logging
.
debug
(
'Unlock TXN %s'
,
dump
(
ttid
))
logging
.
debug
(
'Unlock TXN %s'
,
dump
(
ttid
))
self
.
_app
.
dm
.
finishTransaction
(
self
.
getTIDFromTTID
(
ttid
))
self
.
abort
(
ttid
,
even_if_locked
=
True
)
...
...
@@ -234,8 +234,7 @@ class TransactionManager(object):
# check if the object if locked
locking_tid
=
self
.
_store_lock_dict
.
get
(
oid
)
if
locking_tid
==
ttid
and
unlock
:
neo
.
lib
.
logging
.
info
(
'Deadlock resolution on %r:%r'
,
dump
(
oid
),
dump
(
ttid
))
logging
.
info
(
'Deadlock resolution on %r:%r'
,
dump
(
oid
),
dump
(
ttid
))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object, and drop object data for
# consistency.
...
...
@@ -256,15 +255,15 @@ class TransactionManager(object):
if
previous_serial
is
None
:
# XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen.
neo
.
lib
.
logging
.
info
(
'Transaction %s storing %s more than '
'once'
,
dump
(
ttid
),
dump
(
oid
))
logging
.
info
(
'Transaction %s storing %s more than once'
,
dump
(
ttid
),
dump
(
oid
))
elif
locking_tid
<
ttid
:
# We have a bigger TTID than locking transaction, so we are younger:
# enter waiting queue so we are handled when lock gets released.
# We also want to delay (instead of conflict) if the client is
# so faster that it is committing another transaction before we
# processed UnlockInformation from the master.
neo
.
lib
.
logging
.
info
(
'Store delayed for %r:%r by %r'
,
dump
(
oid
),
logging
.
info
(
'Store delayed for %r:%r by %r'
,
dump
(
oid
),
dump
(
ttid
),
dump
(
locking_tid
))
raise
DelayedError
else
:
...
...
@@ -272,7 +271,7 @@ class TransactionManager(object):
# this is a possible deadlock case, as we might already hold locks
# the younger transaction is waiting upon. Make client release
# locks & reacquire them by notifying it of the possible deadlock.
neo
.
lib
.
logging
.
info
(
'Possible deadlock on %r:%r with %r'
,
logging
.
info
(
'Possible deadlock on %r:%r with %r'
,
dump
(
oid
),
dump
(
ttid
),
dump
(
locking_tid
))
raise
ConflictError
(
ZERO_TID
)
if
previous_serial
is
None
:
...
...
@@ -280,11 +279,10 @@ class TransactionManager(object):
if
history_list
:
previous_serial
=
history_list
[
0
][
0
]
if
previous_serial
is
not
None
and
previous_serial
!=
serial
:
neo
.
lib
.
logging
.
info
(
'Resolvable conflict on %r:%r'
,
logging
.
info
(
'Resolvable conflict on %r:%r'
,
dump
(
oid
),
dump
(
ttid
))
raise
ConflictError
(
previous_serial
)
neo
.
lib
.
logging
.
debug
(
'Transaction %s storing %s'
,
dump
(
ttid
),
dump
(
oid
))
logging
.
debug
(
'Transaction %s storing %s'
,
dump
(
ttid
),
dump
(
oid
))
self
.
_store_lock_dict
[
oid
]
=
ttid
def
checkCurrentSerial
(
self
,
ttid
,
serial
,
oid
):
...
...
@@ -319,7 +317,7 @@ class TransactionManager(object):
# of the partition, even if no data was received (eg. conflict on
# another node)
return
neo
.
lib
.
logging
.
debug
(
'Abort TXN %s'
,
dump
(
ttid
))
logging
.
debug
(
'Abort TXN %s'
,
dump
(
ttid
))
transaction
=
self
.
_transaction_dict
[
ttid
]
has_load_lock
=
transaction
.
isLocked
()
# if the transaction is locked, ensure we can drop it
...
...
@@ -355,7 +353,7 @@ class TransactionManager(object):
"""
Abort any non-locked transaction of a node
"""
neo
.
lib
.
logging
.
debug
(
'Abort for %s'
,
dump
(
uuid
))
logging
.
debug
(
'Abort for %s'
,
dump
(
uuid
))
# abort any non-locked transaction of this node
for
ttid
in
[
x
.
getTTID
()
for
x
in
self
.
_uuid_dict
.
get
(
uuid
,
[])]:
self
.
abort
(
ttid
)
...
...
@@ -368,15 +366,15 @@ class TransactionManager(object):
return
oid
in
self
.
_load_lock_dict
def
log
(
self
):
neo
.
lib
.
logging
.
info
(
"Transactions:"
)
logging
.
info
(
"Transactions:"
)
for
txn
in
self
.
_transaction_dict
.
values
():
neo
.
lib
.
logging
.
info
(
' %r'
,
txn
)
neo
.
lib
.
logging
.
info
(
' Read locks:'
)
logging
.
info
(
' %r'
,
txn
)
logging
.
info
(
' Read locks:'
)
for
oid
,
ttid
in
self
.
_load_lock_dict
.
items
():
neo
.
lib
.
logging
.
info
(
' %r by %r'
,
dump
(
oid
),
dump
(
ttid
))
neo
.
lib
.
logging
.
info
(
' Write locks:'
)
logging
.
info
(
' %r by %r'
,
dump
(
oid
),
dump
(
ttid
))
logging
.
info
(
' Write locks:'
)
for
oid
,
ttid
in
self
.
_store_lock_dict
.
items
():
neo
.
lib
.
logging
.
info
(
' %r by %r'
,
dump
(
oid
),
dump
(
ttid
))
logging
.
info
(
' %r by %r'
,
dump
(
oid
),
dump
(
ttid
))
def
updateObjectDataForPack
(
self
,
oid
,
orig_serial
,
new_serial
,
data_id
):
lock_tid
=
self
.
getLockingTID
(
oid
)
...
...
neo/tests/functional/__init__.py
View file @
5743cdce
...
...
@@ -33,6 +33,7 @@ import psutil
import
neo.scripts
from
neo.neoctl.neoctl
import
NeoCTL
,
NotReadyException
from
neo.lib
import
logging
from
neo.lib.protocol
import
ClusterStates
,
NodeTypes
,
CellStates
,
NodeStates
from
neo.lib.util
import
dump
from
..
import
DB_USER
,
setupMySQLdb
,
NeoTestBase
,
buildUrlFromString
,
\
...
...
@@ -94,7 +95,7 @@ class PortAllocator(object):
if
port
not
in
sock_port_set
:
sock_port_set
.
add
(
port
)
return
port
neo
.
lib
.
logging
.
warning
(
'Same port allocated twice: %s in %s'
,
logging
.
warning
(
'Same port allocated twice: %s in %s'
,
port
,
sock_port_set
)
def
release
(
self
):
...
...
@@ -171,7 +172,7 @@ class NEOProcess(object):
del
self
.
__class__
.
__del__
try
:
# release SQLite debug log
neo
.
lib
.
logging
.
setup
()
logging
.
setup
()
# release system-wide lock
for
allocator
in
PortAllocator
.
allocator_set
.
copy
():
allocator
.
reset
()
...
...
@@ -180,12 +181,12 @@ class NEOProcess(object):
sys
.
exit
()
except
:
raise
ChildException
(
*
sys
.
exc_info
())
neo
.
lib
.
logging
.
info
(
'pid %u: %s %s'
,
logging
.
info
(
'pid %u: %s %s'
,
self
.
pid
,
command
,
' '
.
join
(
map
(
repr
,
args
)))
def
kill
(
self
,
sig
=
signal
.
SIGTERM
):
if
self
.
pid
:
neo
.
lib
.
logging
.
info
(
'kill pid %u'
,
self
.
pid
)
logging
.
info
(
'kill pid %u'
,
self
.
pid
)
try
:
pdb
.
kill
(
self
.
pid
,
sig
)
except
OSError
:
...
...
@@ -337,7 +338,7 @@ class NEOCluster(object):
if
e
.
errno
!=
errno
.
ENOENT
:
raise
else
:
neo
.
lib
.
logging
.
debug
(
'%r deleted'
,
db
)
logging
.
debug
(
'%r deleted'
,
db
)
def
run
(
self
,
except_storages
=
()):
""" Start cluster processes except some storage nodes """
...
...
@@ -641,7 +642,7 @@ class NEOCluster(object):
class
NEOFunctionalTest
(
NeoTestBase
):
def
setupLog
(
self
):
neo
.
lib
.
logging
.
setup
(
os
.
path
.
join
(
self
.
getTempDirectory
(),
'test.log'
))
logging
.
setup
(
os
.
path
.
join
(
self
.
getTempDirectory
(),
'test.log'
))
def
getTempDirectory
(
self
):
# build the full path based on test case and current test method
...
...
neo/tests/threaded/__init__.py
View file @
5743cdce
...
...
@@ -25,7 +25,7 @@ import transaction, ZODB
import
neo.admin.app
,
neo
.
master
.
app
,
neo
.
storage
.
app
import
neo.client.app
,
neo
.
neoctl
.
app
from
neo.client
import
Storage
from
neo.lib
import
bootstrap
from
neo.lib
import
bootstrap
,
logging
from
neo.lib.connection
import
BaseConnection
,
Connection
from
neo.lib.connector
import
SocketConnector
,
\
ConnectorConnectionRefusedException
,
ConnectorTryAgainException
...
...
@@ -251,7 +251,7 @@ class ServerNode(Node):
super
(
ServerNode
,
self
).
run
()
finally
:
self
.
_afterRun
()
neo
.
lib
.
logging
.
debug
(
'stopping %r'
,
self
)
logging
.
debug
(
'stopping %r'
,
self
)
Serialized
.
background
()
def
_afterRun
(
self
):
...
...
@@ -670,13 +670,13 @@ class NEOCluster(object):
@
staticmethod
def
tic
(
force
=
False
):
# XXX: Should we automatically switch client in slave mode if it isn't ?
neo
.
lib
.
logging
.
info
(
'tic ...'
)
logging
.
info
(
'tic ...'
)
if
force
:
Serialized
.
tic
()
neo
.
lib
.
logging
.
info
(
'forced tic'
)
logging
.
info
(
'forced tic'
)
while
Serialized
.
pending
:
Serialized
.
tic
()
neo
.
lib
.
logging
.
info
(
'tic'
)
logging
.
info
(
'tic'
)
def
getNodeState
(
self
,
node
):
uuid
=
node
.
uuid
...
...
@@ -742,14 +742,14 @@ class NEOThreadedTest(NeoTestBase):
def
setupLog
(
self
):
log_file
=
os
.
path
.
join
(
getTempDirectory
(),
self
.
id
()
+
'.log'
)
neo
.
lib
.
logging
.
setup
(
log_file
)
logging
.
setup
(
log_file
)
return
LoggerThreadName
()
def
_tearDown
(
self
,
success
):
super
(
NEOThreadedTest
,
self
).
_tearDown
(
success
)
ServerNode
.
resetPorts
()
if
success
:
q
=
neo
.
lib
.
logging
.
db
.
execute
q
=
logging
.
db
.
execute
q
(
"UPDATE packet SET body=NULL"
)
q
(
"VACUUM"
)
...
...
@@ -795,7 +795,7 @@ def predictable_random(seed=None):
def
decorator
(
wrapped
):
def
wrapper
(
*
args
,
**
kw
):
s
=
repr
(
time
.
time
())
if
seed
is
None
else
seed
neo
.
lib
.
logging
.
info
(
"using seed %r"
,
s
)
logging
.
info
(
"using seed %r"
,
s
)
r
=
random
.
Random
(
s
)
try
:
MasterApplication
.
getNewUUID
=
lambda
self
,
node_type
:
(
...
...
neo/tests/threaded/testReplication.py
View file @
5743cdce
...
...
@@ -20,7 +20,7 @@ import time
import
threading
import
transaction
import
unittest
import
neo.lib
from
neo.lib
import
logging
from
neo.storage.checker
import
CHECK_COUNT
from
neo.storage.transactions
import
TransactionManager
,
\
DelayedError
,
ConflictError
...
...
@@ -239,8 +239,7 @@ class ReplicationTests(NEOThreadedTest):
def
corrupt
(
offset
):
s0
,
s1
,
s2
=
(
storage_dict
[
cell
.
getUUID
()]
for
cell
in
cluster
.
master
.
pt
.
getCellList
(
offset
,
True
))
neo
.
lib
.
logging
.
info
(
'corrupt partition %u of %s'
,
offset
,
dump
(
s1
.
uuid
))
logging
.
info
(
'corrupt partition %u of %s'
,
offset
,
dump
(
s1
.
uuid
))
s1
.
dm
.
deleteObject
(
p64
(
np
+
offset
),
p64
(
corrupt_tid
))
return
s0
.
uuid
def
check
(
expected_state
,
expected_count
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment