Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
2
Merge Requests
2
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
Kirill Smelkov
neo
Commits
d48930fe
Commit
d48930fe
authored
Nov 10, 2016
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
48aa3a91
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
82 additions
and
62 deletions
+82
-62
neo/client/Storage.py
neo/client/Storage.py
+1
-2
neo/client/app.py
neo/client/app.py
+0
-4
neo/lib/logger.py
neo/lib/logger.py
+0
-4
neo/lib/protocol.py
neo/lib/protocol.py
+1
-2
neo/master/handlers/client.py
neo/master/handlers/client.py
+0
-1
neo/storage/handlers/client.py
neo/storage/handlers/client.py
+0
-3
neo/tests/threaded/testReplication.py
neo/tests/threaded/testReplication.py
+80
-46
No files found.
neo/client/Storage.py
View file @
d48930fe
...
...
@@ -81,8 +81,7 @@ class Storage(BaseStorage.BaseStorage,
try
:
return
self
.
app
.
load
(
oid
)[:
2
]
except
NEOStorageNotFoundError
:
raise
#raise POSException.POSKeyError(oid)
raise
POSException
.
POSKeyError
(
oid
)
def
new_oid
(
self
):
return
self
.
app
.
new_oid
()
...
...
neo/client/app.py
View file @
d48930fe
...
...
@@ -330,8 +330,6 @@ class Application(ThreadedApplication):
# TODO:
# - rename parameters (here? and in handlers & packet definitions)
print
'QQQ client load oid: %r tid: %r before_tid: %r'
%
(
oid
,
tid
,
before_tid
)
acquire
=
self
.
_cache_lock_acquire
release
=
self
.
_cache_lock_release
# XXX: Consider using a more fine-grained lock.
...
...
@@ -352,7 +350,6 @@ class Application(ThreadedApplication):
# Do not get something more recent than the last invalidation
# we got from master.
before_tid
=
p64
(
u64
(
self
.
last_tid
)
+
1
)
print
'
\
t
.last_tid: %r'
%
self
.
last_tid
data
,
tid
,
next_tid
,
_
=
self
.
_loadFromStorage
(
oid
,
tid
,
before_tid
)
acquire
()
try
:
...
...
@@ -372,7 +369,6 @@ class Application(ThreadedApplication):
return
data
,
tid
,
next_tid
def
_loadFromStorage
(
self
,
oid
,
at_tid
,
before_tid
):
print
'QQQ2 client loadFromStor oid: %r at_tid: %r before_tid: %r'
%
(
oid
,
at_tid
,
before_tid
)
packet
=
Packets
.
AskObject
(
oid
,
at_tid
,
before_tid
)
for
node
,
conn
in
self
.
cp
.
iterateForObject
(
oid
,
readable
=
True
):
try
:
...
...
neo/lib/logger.py
View file @
d48930fe
...
...
@@ -28,7 +28,6 @@ from logging import getLogger, Formatter, Logger, StreamHandler, \
from
time
import
time
from
traceback
import
format_exception
import
bz2
,
inspect
,
neo
,
os
,
signal
,
sqlite3
,
sys
,
threading
from
cStringIO
import
StringIO
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE
=
(
234360832
# extra memory used
...
...
@@ -226,13 +225,10 @@ class NEOLogger(Logger):
uuid_str
(
r
.
uuid
),
ip
,
port
)
msg
=
r
.
msg
pktcls
=
protocol
.
StaticRegistry
[
r
.
code
]
#bmsg = StringIO(msg)
#hmsg = protocol.Packets.parse(bmsg, protocol.ParserState())
print
'PACKET %s %s
\
t
%s
\
t
%s
\
t
%s %s'
%
(
r
.
created
,
r
.
_name
,
r
.
msg_id
,
pktcls
.
__name__
,
peer
,
r
.
pkt
.
decode
())
if
msg
is
not
None
:
msg
=
buffer
(
msg
)
self
.
_db
.
execute
(
"INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)"
,
(
r
.
created
,
r
.
_name
,
r
.
msg_id
,
r
.
code
,
peer
,
msg
))
else
:
...
...
neo/lib/protocol.py
View file @
d48930fe
...
...
@@ -75,7 +75,7 @@ def ErrorCodes():
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS
# TODO use it
READ_ONLY_ACCESS
@
Enum
def
ClusterStates
():
...
...
@@ -1400,7 +1400,6 @@ class NotifyReady(Packet):
"""
pass
# NOTE
# replication
class
FetchTransactions
(
Packet
):
...
...
neo/master/handlers/client.py
View file @
d48930fe
...
...
@@ -142,5 +142,4 @@ class ClientROServiceHandler(ClientServiceHandler):
def
askLastTransaction
(
self
,
conn
):
assert
self
.
app
.
backup_tid
is
not
None
# we are in BACKUPING mode
backup_tid
=
self
.
app
.
pt
.
getBackupTid
()
print
'
\
n
\
n
\
n
ASK LAST_TID -> %r
\
n
'
%
backup_tid
conn
.
answer
(
Packets
.
AnswerLastTransaction
(
backup_tid
))
neo/storage/handlers/client.py
View file @
d48930fe
...
...
@@ -257,7 +257,6 @@ class ClientROOperationHandler(ClientOperationHandler):
def
askObject
(
self
,
conn
,
oid
,
serial
,
tid
):
backup_tid
=
self
.
app
.
dm
.
getBackupTID
()
print
'
\
n
\
n
\
n
ASK OBJECT %r, %r, %r (backup_tid: %r)'
%
(
oid
,
serial
,
tid
,
backup_tid
)
if
serial
and
serial
>
backup_tid
:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
...
...
@@ -269,9 +268,7 @@ class ClientROOperationHandler(ClientOperationHandler):
if
not
serial
and
not
tid
:
tid
=
add64
(
backup_tid
,
1
)
print
'-> asking as oid: %r serial: %r tid: %r'
%
(
oid
,
serial
,
tid
)
super
(
ClientROOperationHandler
,
self
).
askObject
(
conn
,
oid
,
serial
,
tid
)
print
'(ask object done)'
def
askTIDsFrom
(
self
,
conn
,
min_tid
,
max_tid
,
length
,
partition
):
backup_tid
=
self
.
app
.
dm
.
getBackupTID
()
...
...
neo/tests/threaded/testReplication.py
View file @
d48930fe
...
...
@@ -19,10 +19,12 @@ from logging import getLogger, INFO, DEBUG
import
random
import
time
import
transaction
from
ZODB.POSException
import
ReadOnlyError
,
POSKeyError
import
unittest
from
collections
import
defaultdict
from
functools
import
wraps
from
neo.lib
import
logging
from
neo.client.exception
import
NEOStorageError
from
neo.storage.checker
import
CHECK_COUNT
from
neo.storage.replicator
import
Replicator
from
neo.lib.connector
import
SocketConnector
...
...
@@ -34,9 +36,6 @@ from neo.lib.util import p64
from
..
import
Patch
from
.
import
ConnectionFilter
,
NEOCluster
,
NEOThreadedTest
,
predictable_random
from
ZODB.POSException
import
ReadOnlyError
from
neo.client.exception
import
NEOStorageError
# dump log to stderr
logging
.
backlog
(
max_size
=
None
)
del
logging
.
default_root_handler
.
handle
...
...
@@ -523,7 +522,7 @@ class ReplicationTests(NEOThreadedTest):
@
backup_test
()
def
testBackupReadAccess
(
self
,
backup
):
def
testBackupRead
Only
Access
(
self
,
backup
):
"""Check backup cluster can be used in read-only mode by ZODB clients"""
B
=
backup
U
=
B
.
upstream
...
...
@@ -533,48 +532,83 @@ class ReplicationTests(NEOThreadedTest):
oid_list
=
[]
tid_list
=
[]
for
i
in
xrange
(
10
):
# commit new data to U
txn
=
transaction
.
Transaction
()
Z
.
tpc_begin
(
txn
)
oid
=
Z
.
new_oid
()
Z
.
store
(
oid
,
None
,
'%s-%i'
%
(
oid
,
i
),
''
,
txn
)
Z
.
tpc_vote
(
txn
)
tid
=
Z
.
tpc_finish
(
txn
)
oid_list
.
append
(
oid
)
tid_list
.
append
(
tid
)
# make sure data propagated to B
self
.
tic
()
self
.
assertEqual
(
B
.
backup_tid
,
U
.
last_tid
)
self
.
assertEqual
(
B
.
last_tid
,
U
.
last_tid
)
self
.
assertEqual
(
1
,
self
.
checkBackup
(
B
))
# read data from B and verify it is what it should be
# XXX we open new storage every time because invalidations are not
# yet implemented in read-only mode.
Zb
=
B
.
getZODBStorage
()
for
j
,
oid
in
enumerate
(
oid_list
):
data
,
serial
=
Zb
.
load
(
oid
,
''
)
self
.
assertEqual
(
data
,
'%s-%s'
%
(
oid
,
j
))
self
.
assertEqual
(
serial
,
tid_list
[
j
])
# close storage because client app is otherwise shared in threaded
# tests and we need to refresh last_tid on next run
# (see above about invalidations not working)
Zb
.
close
()
# try to commit something to backup storage and make sure it is really read-only
Zb
=
B
.
getZODBStorage
()
Zb
.
_cache
.
_max_size
=
0
# make stores do work in sync way
txn
=
transaction
.
Transaction
()
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
tpc_begin
,
txn
)
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
new_oid
)
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
store
,
oid_list
[
-
1
],
tid_list
[
-
1
],
'somedata'
,
''
,
txn
)
# tpc_vote first checks whether there were store replies - thus not ReadOnlyError
self
.
assertRaises
(
NEOStorageError
,
Zb
.
tpc_vote
,
txn
)
Zb
.
close
()
# S -> Sb link stops working during [cutoff, recover) test iterations
cutoff
=
4
recover
=
7
def
delayReplication
(
conn
,
packet
):
return
isinstance
(
packet
,
Packets
.
AnswerFetchTransactions
)
with
ConnectionFilter
()
as
f
:
for
i
in
xrange
(
10
):
if
i
==
cutoff
:
f
.
add
(
delayReplication
)
if
i
==
recover
:
# removes the filter and retransmits all packets that were
# queued once first filtered packed was detected on a connection.
f
.
remove
(
delayReplication
)
# commit new data to U
txn
=
transaction
.
Transaction
()
txn
.
note
(
'test transaction %i'
%
i
)
Z
.
tpc_begin
(
txn
)
oid
=
Z
.
new_oid
()
Z
.
store
(
oid
,
None
,
'%s-%i'
%
(
oid
,
i
),
''
,
txn
)
Z
.
tpc_vote
(
txn
)
tid
=
Z
.
tpc_finish
(
txn
)
oid_list
.
append
(
oid
)
tid_list
.
append
(
tid
)
# make sure data propagated to B
self
.
tic
()
if
cutoff
<=
i
<
recover
:
self
.
assertLess
(
B
.
backup_tid
,
U
.
last_tid
)
else
:
self
.
assertEqual
(
B
.
backup_tid
,
U
.
last_tid
)
self
.
assertEqual
(
B
.
last_tid
,
U
.
last_tid
)
self
.
assertEqual
(
1
,
self
.
checkBackup
(
B
,
max_tid
=
B
.
backup_tid
))
# read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode.
Zb
=
B
.
getZODBStorage
()
for
j
,
oid
in
enumerate
(
oid_list
):
if
cutoff
<=
i
<
recover
and
j
>=
cutoff
:
self
.
assertRaises
(
POSKeyError
,
Zb
.
load
,
oid
,
''
)
else
:
data
,
serial
=
Zb
.
load
(
oid
,
''
)
self
.
assertEqual
(
data
,
'%s-%s'
%
(
oid
,
j
))
self
.
assertEqual
(
serial
,
tid_list
[
j
])
# verify how transaction log & friends behave under potentially not-yet-fully
# fetched backup state (transactions committed at [cutoff, recover) should
# not be there; otherwise transactions should be fully there)
Zb
=
B
.
getZODBStorage
()
Btxn_list
=
list
(
Zb
.
iterator
())
self
.
assertEqual
(
len
(
Btxn_list
),
cutoff
if
cutoff
<=
i
<
recover
else
i
+
1
)
for
j
,
txn
in
enumerate
(
Btxn_list
):
self
.
assertEqual
(
txn
.
tid
,
tid_list
[
j
])
self
.
assertEqual
(
txn
.
description
,
'test transaction %i'
%
j
)
obj_list
=
list
(
txn
)
self
.
assertEqual
(
len
(
obj_list
),
1
)
obj
=
obj_list
[
0
]
self
.
assertEqual
(
obj
.
oid
,
oid_list
[
j
])
self
.
assertEqual
(
obj
.
data
,
'%s-%s'
%
(
obj
.
oid
,
j
))
# TODO test askObjectHistory once it is fixed
# try to commit something to backup storage and make sure it is really read-only
Zb
.
_cache
.
_max_size
=
0
# make stores do work in sync way
txn
=
transaction
.
Transaction
()
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
tpc_begin
,
txn
)
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
new_oid
)
self
.
assertRaises
(
ReadOnlyError
,
Zb
.
store
,
oid_list
[
-
1
],
tid_list
[
-
1
],
'somedata'
,
''
,
txn
)
# tpc_vote first checks whether there were store replies - thus not ReadOnlyError
self
.
assertRaises
(
NEOStorageError
,
Zb
.
tpc_vote
,
txn
)
# close storage because client app is otherwise shared in threaded
# tests and we need to refresh last_tid on next run
# (see above about invalidations not working)
Zb
.
close
()
if
__name__
==
"__main__"
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment