Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
2
Merge Requests
2
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
Kirill Smelkov
neo
Commits
9a5b46dd
Commit
9a5b46dd
authored
Oct 02, 2018
by
Julien Muchembled
2
Browse files
Options
Browse Files
Download
Plain Diff
Bump protocol version
parents
d68e9053
854a4920
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
150 additions
and
30 deletions
+150
-30
neo/admin/handler.py
neo/admin/handler.py
+5
-0
neo/client/app.py
neo/client/app.py
+29
-11
neo/client/handlers/storage.py
neo/client/handlers/storage.py
+4
-1
neo/client/transactions.py
neo/client/transactions.py
+13
-3
neo/lib/handler.py
neo/lib/handler.py
+3
-0
neo/lib/protocol.py
neo/lib/protocol.py
+10
-1
neo/master/handlers/administration.py
neo/master/handlers/administration.py
+7
-0
neo/neoctl/app.py
neo/neoctl/app.py
+10
-0
neo/neoctl/neoctl.py
neo/neoctl/neoctl.py
+6
-0
neo/storage/handlers/client.py
neo/storage/handlers/client.py
+11
-9
neo/storage/transactions.py
neo/storage/transactions.py
+10
-5
neo/tests/threaded/test.py
neo/tests/threaded/test.py
+42
-0
No files found.
neo/admin/handler.py
View file @
9a5b46dd
...
...
@@ -62,6 +62,11 @@ class AdminEventHandler(EventHandler):
master_node
=
self
.
app
.
master_node
conn
.
answer
(
Packets
.
AnswerPrimary
(
master_node
.
getUUID
()))
@
check_primary_master
def
flushLog
(
self
,
conn
):
self
.
app
.
master_conn
.
send
(
Packets
.
FlushLog
())
super
(
AdminEventHandler
,
self
).
flushLog
(
conn
)
askLastIDs
=
forward_ask
(
Packets
.
AskLastIDs
)
askLastTransaction
=
forward_ask
(
Packets
.
AskLastTransaction
)
addPendingNodes
=
forward_ask
(
Packets
.
AddPendingNodes
)
...
...
neo/client/app.py
View file @
9a5b46dd
...
...
@@ -560,12 +560,30 @@ class Application(ThreadedApplication):
if
status
==
1
and
uuid
not
in
trans_nodes
:
self
.
_askStorageForWrite
(
txn_context
,
uuid
,
packet
)
self
.
waitStoreResponses
(
txn_context
)
# If there are failed nodes, ask the master whether they can be
# disconnected while keeping the cluster operational. If possible,
# this will happen during tpc_finish
.
failed
=
[
node
.
getUUID
()
if
2
in
involved_nodes
.
itervalues
():
# unlikely
# If some writes failed, we must first check whether
# all oids have been locked by at least one node
.
failed
=
{
node
.
getUUID
():
node
.
isRunning
()
for
node
in
self
.
nm
.
getStorageList
()
if
node
.
isRunning
()
and
involved_nodes
.
get
(
node
.
getUUID
())
==
2
]
if
involved_nodes
.
get
(
node
.
getUUID
())
==
2
}
if
txn_context
.
lockless_dict
:
getCellList
=
self
.
pt
.
getCellList
for
offset
,
uuid_set
in
txn_context
.
lockless_dict
.
iteritems
():
for
cell
in
getCellList
(
offset
):
uuid
=
cell
.
getUUID
()
if
not
(
uuid
in
failed
or
uuid
in
uuid_set
):
break
else
:
# Very unlikely. Instead of raising, we could recover
# the transaction by doing something similar to
# deadlock avoidance; that would be done before voting.
# But it's not worth the complexity.
raise
NEOStorageError
(
'partition %s not fully write-locked'
%
offset
)
failed
=
[
uuid
for
uuid
,
running
in
failed
.
iteritems
()
if
running
]
# If there are running nodes for which some writes failed, ask the
# master whether they can be disconnected while keeping the cluster
# operational. If possible, this will happen during tpc_finish.
if
failed
:
try
:
self
.
_askPrimary
(
Packets
.
FailedVote
(
ttid
,
failed
))
...
...
neo/client/handlers/storage.py
View file @
9a5b46dd
...
...
@@ -19,7 +19,7 @@ from ZODB.TimeStamp import TimeStamp
from
neo.lib
import
logging
from
neo.lib.compress
import
decompress_list
from
neo.lib.connection
import
ConnectionClosed
from
neo.lib.protocol
import
Packets
,
uuid_str
from
neo.lib.protocol
import
Packets
,
uuid_str
,
ZERO_TID
from
neo.lib.util
import
dump
,
makeChecksum
from
neo.lib.exception
import
NodeNotReady
from
neo.lib.handler
import
MTEventHandler
...
...
@@ -63,6 +63,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
def
answerStoreObject
(
self
,
conn
,
conflict
,
oid
):
txn_context
=
self
.
app
.
getHandlerData
()
if
conflict
:
if
conflict
==
ZERO_TID
:
txn_context
.
written
(
self
.
app
,
conn
.
getUUID
(),
oid
,
True
)
return
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that
...
...
neo/client/transactions.py
View file @
9a5b46dd
...
...
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from
collections
import
defaultdict
from
ZODB.POSException
import
StorageTransactionError
from
neo.lib.connection
import
ConnectionClosed
from
neo.lib.locking
import
SimpleQueue
...
...
@@ -35,6 +36,7 @@ class Transaction(object):
locking_tid
=
None
voted
=
False
ttid
=
None
# XXX: useless, except for testBackupReadOnlyAccess
lockless_dict
=
None
# {partition: {uuid}}
def
__init__
(
self
,
txn
):
self
.
queue
=
SimpleQueue
()
...
...
@@ -79,8 +81,9 @@ class Transaction(object):
if
status
<
0
and
self
.
locking_tid
and
'oid'
in
kw
:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is because
# we don't need that for transaction metadata.
# for the other nodes. The condition on kw is to
# distinguish whether we're writing an oid or
# transaction metadata.
conn
.
ask
(
Packets
.
AskRebaseTransaction
(
self
.
ttid
,
self
.
locking_tid
),
queue
=
self
.
queue
)
conn
.
ask
(
packet
,
queue
=
self
.
queue
,
**
kw
)
...
...
@@ -95,7 +98,7 @@ class Transaction(object):
raise
NEOStorageError
(
'no storage available for write to partition %s'
%
object_id
)
def
written
(
self
,
app
,
uuid
,
oid
):
def
written
(
self
,
app
,
uuid
,
oid
,
lockless
=
False
):
# When a node is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
...
...
@@ -119,6 +122,13 @@ class Transaction(object):
# - answer to resolved conflict before the first answer from a
# node that was being disconnected by the master
return
if
lockless
:
# It's safe to do this after the above excepts: either the cell is
# already marked as lockless or the node will be reported as failed.
lockless
=
self
.
lockless_dict
if
not
lockless
:
lockless
=
self
.
lockless_dict
=
defaultdict
(
set
)
lockless
[
app
.
pt
.
getPartition
(
oid
)].
add
(
uuid
)
if
uuid_list
:
return
del
self
.
data_dict
[
oid
]
...
...
neo/lib/handler.py
View file @
9a5b46dd
...
...
@@ -201,6 +201,9 @@ class EventHandler(object):
if
not
conn
.
client
:
conn
.
close
()
def
flushLog
(
self
,
conn
):
logging
.
flush
()
# Error packet handlers.
def
error
(
self
,
conn
,
code
,
message
,
**
kw
):
...
...
neo/lib/protocol.py
View file @
9a5b46dd
...
...
@@ -22,7 +22,7 @@ from struct import Struct
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
# the high order byte 0 is different from TLS Handshake (0x16).
PROTOCOL_VERSION
=
4
PROTOCOL_VERSION
=
5
ENCODED_VERSION
=
Struct
(
'!L'
).
pack
(
PROTOCOL_VERSION
)
# Avoid memory errors on corrupted data.
...
...
@@ -1630,6 +1630,13 @@ class Truncate(Packet):
_answer
=
Error
class
FlushLog
(
Packet
):
"""
Request all nodes to flush their logs.
:nodes: ctl -> A -> M -> *
"""
_next_code
=
0
def
register
(
request
,
ignore_when_closed
=
None
):
...
...
@@ -1805,6 +1812,8 @@ class Packets(dict):
AddObject
)
Truncate
=
register
(
Truncate
)
FlushLog
=
register
(
FlushLog
)
def
Errors
():
registry_dict
=
{}
...
...
neo/master/handlers/administration.py
View file @
9a5b46dd
...
...
@@ -46,6 +46,13 @@ class AdministrationHandler(MasterHandler):
if
node
is
not
None
:
self
.
app
.
nm
.
remove
(
node
)
def
flushLog
(
self
,
conn
):
p
=
Packets
.
FlushLog
()
for
node
in
self
.
app
.
nm
.
getConnectedList
():
c
=
node
.
getConnection
()
c
is
conn
or
c
.
send
(
p
)
super
(
AdministrationHandler
,
self
).
flushLog
(
conn
)
def
setClusterState
(
self
,
conn
,
state
):
app
=
self
.
app
# check request
...
...
neo/neoctl/app.py
View file @
9a5b46dd
...
...
@@ -39,6 +39,7 @@ action_dict = {
'kill'
:
'killNode'
,
'prune_orphan'
:
'pruneOrphan'
,
'truncate'
:
'truncate'
,
'flush_log'
:
'flushLog'
,
}
uuid_int
=
(
lambda
ns
:
lambda
uuid
:
...
...
@@ -253,6 +254,15 @@ class TerminalNeoCTL(object):
partition_dict
=
dict
.
fromkeys
(
xrange
(
np
),
source
)
self
.
neoctl
.
checkReplicas
(
partition_dict
,
min_tid
,
max_tid
)
def
flushLog
(
self
,
params
):
"""
Ask all nodes in the cluster to flush their logs.
If there are backup clusters, only their primary masters flush.
"""
assert
not
params
self
.
neoctl
.
flushLog
()
class
Application
(
object
):
"""The storage node application."""
...
...
neo/neoctl/neoctl.py
View file @
9a5b46dd
...
...
@@ -204,3 +204,9 @@ class NeoCTL(BaseApplication):
if
response
[
0
]
!=
Packets
.
Error
or
response
[
1
]
!=
ErrorCodes
.
ACK
:
raise
RuntimeError
(
response
)
return
response
[
2
]
def
flushLog
(
self
):
conn
=
self
.
__getConnection
()
conn
.
send
(
Packets
.
FlushLog
())
while
conn
.
pending
():
self
.
em
.
poll
(
1
)
neo/storage/handlers/client.py
View file @
9a5b46dd
...
...
@@ -18,7 +18,7 @@ from neo.lib import logging
from
neo.lib.handler
import
DelayEvent
from
neo.lib.util
import
dump
,
makeChecksum
,
add64
from
neo.lib.protocol
import
Packets
,
Errors
,
NonReadableCell
,
ProtocolError
,
\
ZERO_HASH
,
INVALID_PARTITION
ZERO_HASH
,
ZERO_TID
,
INVALID_PARTITION
from
..transactions
import
ConflictError
,
NotRegisteredError
from
.
import
BaseHandler
import
time
...
...
@@ -90,26 +90,27 @@ class ClientOperationHandler(BaseHandler):
def
_askStoreObject
(
self
,
conn
,
oid
,
serial
,
compression
,
checksum
,
data
,
data_serial
,
ttid
,
request_time
):
try
:
self
.
app
.
tm
.
storeObject
(
ttid
,
serial
,
oid
,
compression
,
locked
=
self
.
app
.
tm
.
storeObject
(
ttid
,
serial
,
oid
,
compression
,
checksum
,
data
,
data_serial
)
except
ConflictError
,
err
:
# resolvable or not
conn
.
answer
(
Packets
.
AnswerStoreObject
(
err
.
tid
))
return
locked
=
err
.
tid
except
NonReadableCell
:
logging
.
info
(
'Ignore store of %s:%s by %s: unassigned partition'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
))
locked
=
ZERO_TID
except
NotRegisteredError
:
# transaction was aborted, cancel this event
logging
.
info
(
'Forget store of %s:%s by %s delayed by %s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
),
dump
(
self
.
app
.
tm
.
getLockingTID
(
oid
)))
locked
=
ZERO_TID
else
:
if
request_time
and
SLOW_STORE
is
not
None
:
duration
=
time
.
time
()
-
request_time
if
duration
>
SLOW_STORE
:
logging
.
info
(
'StoreObject delay: %.02fs'
,
duration
)
conn
.
answer
(
Packets
.
AnswerStoreObject
(
None
))
conn
.
answer
(
Packets
.
AnswerStoreObject
(
locked
))
def
askStoreObject
(
self
,
conn
,
oid
,
serial
,
compression
,
checksum
,
data
,
data_serial
,
ttid
):
...
...
@@ -216,25 +217,26 @@ class ClientOperationHandler(BaseHandler):
def
_askCheckCurrentSerial
(
self
,
conn
,
ttid
,
oid
,
serial
,
request_time
):
try
:
self
.
app
.
tm
.
checkCurrentSerial
(
ttid
,
oid
,
serial
)
locked
=
self
.
app
.
tm
.
checkCurrentSerial
(
ttid
,
oid
,
serial
)
except
ConflictError
,
err
:
# resolvable or not
conn
.
answer
(
Packets
.
AnswerCheckCurrentSerial
(
err
.
tid
))
return
locked
=
err
.
tid
except
NonReadableCell
:
logging
.
info
(
'Ignore check of %s:%s by %s: unassigned partition'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
))
locked
=
ZERO_TID
except
NotRegisteredError
:
# transaction was aborted, cancel this event
logging
.
info
(
'Forget serial check of %s:%s by %s delayed by %s'
,
dump
(
oid
),
dump
(
serial
),
dump
(
ttid
),
dump
(
self
.
app
.
tm
.
getLockingTID
(
oid
)))
locked
=
ZERO_TID
else
:
if
request_time
and
SLOW_STORE
is
not
None
:
duration
=
time
.
time
()
-
request_time
if
duration
>
SLOW_STORE
:
logging
.
info
(
'CheckCurrentSerial delay: %.02fs'
,
duration
)
conn
.
answer
(
Packets
.
AnswerCheckCurrentSerial
(
None
))
conn
.
answer
(
Packets
.
AnswerCheckCurrentSerial
(
locked
))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
...
...
neo/storage/transactions.py
View file @
9a5b46dd
...
...
@@ -19,7 +19,7 @@ from neo.lib import logging
from
neo.lib.handler
import
DelayEvent
,
EventQueue
from
neo.lib.util
import
dump
from
neo.lib.protocol
import
Packets
,
ProtocolError
,
NonReadableCell
,
\
uuid_str
,
MAX_TID
uuid_str
,
MAX_TID
,
ZERO_TID
class
ConflictError
(
Exception
):
"""
...
...
@@ -407,7 +407,7 @@ class TransactionManager(EventQueue):
assert
oid
in
transaction
.
serial_dict
,
transaction
logging
.
info
(
'Transaction %s checking %s more than once'
,
dump
(
ttid
),
dump
(
oid
))
return
return
True
if
previous_serial
is
None
:
# 2 valid cases:
# - the previous undo resulted in a resolved conflict
...
...
@@ -420,7 +420,7 @@ class TransactionManager(EventQueue):
# we are down, and the client would stop writing to us.
logging
.
info
(
'Transaction %s storing %s more than once'
,
dump
(
ttid
),
dump
(
oid
))
return
return
True
elif
transaction
.
locking_tid
==
MAX_TID
:
# Deadlock avoidance. Still no new locking_tid from the client.
raise
DelayEvent
(
transaction
)
...
...
@@ -452,14 +452,17 @@ class TransactionManager(EventQueue):
raise
ConflictError
(
previous_serial
)
logging
.
debug
(
'Transaction %s locking %s'
,
dump
(
ttid
),
dump
(
oid
))
self
.
_store_lock_dict
[
oid
]
=
ttid
return
True
def
checkCurrentSerial
(
self
,
ttid
,
oid
,
serial
):
try
:
transaction
=
self
.
_transaction_dict
[
ttid
]
except
KeyError
:
raise
NotRegisteredError
self
.
lockObject
(
ttid
,
serial
,
oid
)
locked
=
self
.
lockObject
(
ttid
,
serial
,
oid
)
transaction
.
serial_dict
[
oid
]
=
serial
if
not
locked
:
return
ZERO_TID
def
storeObject
(
self
,
ttid
,
serial
,
oid
,
compression
,
checksum
,
data
,
value_serial
):
...
...
@@ -470,7 +473,7 @@ class TransactionManager(EventQueue):
transaction
=
self
.
_transaction_dict
[
ttid
]
except
KeyError
:
raise
NotRegisteredError
self
.
lockObject
(
ttid
,
serial
,
oid
)
locked
=
self
.
lockObject
(
ttid
,
serial
,
oid
)
transaction
.
serial_dict
[
oid
]
=
serial
# store object
if
data
is
None
:
...
...
@@ -478,6 +481,8 @@ class TransactionManager(EventQueue):
else
:
data_id
=
self
.
_app
.
dm
.
holdData
(
checksum
,
oid
,
data
,
compression
)
transaction
.
store
(
oid
,
data_id
,
value_serial
)
if
not
locked
:
return
ZERO_TID
def
rebaseObject
(
self
,
ttid
,
oid
):
try
:
...
...
neo/tests/threaded/test.py
View file @
9a5b46dd
...
...
@@ -1678,6 +1678,48 @@ class Test(NEOThreadedTest):
self
.
assertEqual
(
cluster
.
neoctl
.
getClusterState
(),
ClusterStates
.
RUNNING
)
@
with_cluster
(
storage_count
=
2
,
replicas
=
1
)
def
testPartitionNotFullyWriteLocked
(
self
,
cluster
):
"""
Make sure all oids are write-locked at least once, which is not
guaranteed by just the storage/master nodes when a readable cell
becomes OUT_OF_DATE during a commit. This scenario is special in that
the other readable cell was only writable at the beginning of the
transaction and the replication finished just before the node failure.
The test uses a conflict to detect lockless writes.
"""
s0
,
s1
=
cluster
.
storage_list
t
,
c
=
cluster
.
getTransaction
()
r
=
c
.
root
()
x
=
r
[
''
]
=
PCounterWithResolution
()
t
.
commit
()
s1c
,
=
s1
.
getConnectionList
(
cluster
.
client
)
s0
.
stop
()
cluster
.
join
((
s0
,))
s0
.
resetNode
()
x
.
value
+=
2
def
vote
(
_
):
f
.
remove
(
delay
)
self
.
tic
()
s1
.
stop
()
cluster
.
join
((
s1
,))
TransactionalResource
(
t
,
0
,
tpc_vote
=
vote
)
with
ConnectionFilter
()
as
f
,
cluster
.
newClient
(
1
)
as
db
:
t2
,
c2
=
cluster
.
getTransaction
(
db
)
c2
.
root
()[
''
].
value
+=
3
t2
.
commit
()
f
.
delayAnswerStoreObject
(
lambda
conn
:
conn
is
s1c
)
delay
=
f
.
delayAskFetchTransactions
()
s0
.
start
()
self
.
tic
()
self
.
assertRaisesRegexp
(
NEOStorageError
,
'^partition 0 not fully write-locked$'
,
t
.
commit
)
cluster
.
client
.
_cache
.
clear
()
t
.
begin
()
x
.
_p_deactivate
()
self
.
assertEqual
(
x
.
value
,
3
)
@
with_cluster
()
def
testAbortTransaction
(
self
,
cluster
):
t
,
c
=
cluster
.
getTransaction
()
...
...
Kirill Smelkov
@kirr
mentioned in commit
769787fc
·
Nov 01, 2020
mentioned in commit
769787fc
mentioned in commit 769787fc987fadda9fd24b53c080f1f56bfe0725
Toggle commit list
Kirill Smelkov
@kirr
mentioned in commit
5d9f5fb2
·
Nov 01, 2020
mentioned in commit
5d9f5fb2
mentioned in commit 5d9f5fb242c5a1ce7f9df2f2cec1f98ad932fd4e
Toggle commit list
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment