Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wendelin.core
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
Kirill Smelkov
wendelin.core
Commits
d9e13fbe
Commit
d9e13fbe
authored
Apr 01, 2021
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
3fa4b406
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
61 additions
and
243 deletions
+61
-243
lib/tests/test_zodb.py
lib/tests/test_zodb.py
+21
-1
lib/tests/testprog/__init__.py
lib/tests/testprog/__init__.py
+0
-0
lib/tests/testprog/zloadrace.py
lib/tests/testprog/zloadrace.py
+7
-6
lib/tests/testprog/zloadraceXXX.py
lib/tests/testprog/zloadraceXXX.py
+0
-210
lib/tests/testprog/zopenrace.py
lib/tests/testprog/zopenrace.py
+33
-26
No files found.
lib/tests/test_zodb.py
View file @
d9e13fbe
# Wendelin.core.bigfile | Tests for ZODB utilities
# Wendelin.core.bigfile | Tests for ZODB utilities
and critical properties of ZODB itself
# Copyright (C) 2014-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
...
...
@@ -32,6 +32,8 @@ from random import randint
from
pytest
import
raises
import
pytest
;
xfail
=
pytest
.
mark
.
xfail
from
wendelin.lib.tests.testprog
import
zopenrace
,
zloadrace
testdb
=
None
def
dbopen
():
...
...
@@ -368,6 +370,24 @@ def test_zurlstable():
assert
zurl
==
zurl0
# ---- tests for critical properties of ZODB ----
# verify race in between Connection.open and invalidations.
def
test_zodb_zopenrace
():
zopenrace
.
main
()
# verify race in between loading and invalidations.
def
test_zodb_zloadrace
():
# skip testing with FileStorage - in ZODB/py opening simultaneous read-write
# connections to the same file is not supported and will raise Lockerror.
_
=
testdb
.
getZODBStorage
()
_
.
close
()
if
isinstance
(
_
,
FileStorage
):
pytest
.
skip
(
"skipping on FileStorage"
)
zloadrace
.
main
()
# ---- misc ----
# zsync syncs ZODB storage.
...
...
lib/tests/testprog/__init__.py
0 → 100644
View file @
d9e13fbe
lib/tests/testprog/zloadrace.py
View file @
d9e13fbe
#!/usr/bin/env python
# reported to https://github.com/zopefoundation/ZEO/issues/155
"""Program zloadrace
5
.py demonstrates concurrency bug in ZODB5/ZEO5 that leads
#
originally
reported to https://github.com/zopefoundation/ZEO/issues/155
"""Program zloadrace.py demonstrates concurrency bug in ZODB5/ZEO5 that leads
to data corruption.
The bug was not fully analyzed, but offhand it looks like ZEO5 does not
...
...
@@ -87,11 +87,14 @@ concurrency bugs that lead to corrupt data.
from
__future__
import
print_function
from
ZODB
import
DB
from
ZODB.FileStorage
import
FileStorage
from
ZODB.POSException
import
ConflictError
import
transaction
from
persistent
import
Persistent
from
random
import
randint
from
wendelin.lib.testing
import
getTestDB
from
golang
import
func
,
defer
,
select
,
default
from
golang
import
sync
,
context
...
...
@@ -101,12 +104,10 @@ class PInt(Persistent):
def
__init__
(
self
,
i
):
self
.
i
=
i
from
wendelin.lib.testing
import
TestDB_ZEO
@
func
def
main
():
tdb
=
TestDB_ZEO
(
'<zeo>'
)
tdb
=
getTestDB
(
)
tdb
.
setup
()
defer
(
tdb
.
teardown
)
...
...
@@ -189,7 +190,7 @@ def main():
# in ZEO, it triggers the bug where T sees stale obj2 with obj1.i != obj2.i
init
()
N
=
1000
00
N
=
1000
wg
=
sync
.
WorkGroup
(
context
.
background
())
for
x
in
range
(
8
):
wg
.
go
(
T
,
x
,
N
)
...
...
lib/tests/testprog/zloadraceXXX.py
deleted
100755 → 0
View file @
3fa4b406
#!/usr/bin/env python
"""Program zloadrace.py demonstrates concurrency bug in ZODB Connection.setstate()
that leads to XXX
XXX no, there is no load vs invalidation race on ZODB4 (ZODB3 is probably the same):
ZEO
---
ZEO server explicitly guarantees that it does not mix processing load
requests inside tpc_finish + send invalidations. This way if load is processed
after new commit, load reply is guranteed to come to client after invalidation
message. This was explicitly fixed by
https://github.com/zopefoundation/ZEO/commit/71eb1456
(search for callAsyncNoPoll there)
and later again by https://github.com/zopefoundation/ZEO/commit/94f275c3 .
NEO
---
There is no load vs invalidation race on NEO but the protection is implicit:
- client app maintains .last_tid which is updated by poller thread upon
receiving invalidations.
- when load() is called without specifing @head (the case we are considering),
it uses .last_tid for @at with which to ask a storage node.
- even if client.load() observes .last_tid which was updated for simultaneously
committed transaction with not yet invalidated cache, the following happens:
* client sends AskObject packet to a storage node.
* when the answer arrives it has to be picked by client poller thread.
* the poller thread won't proceed to that packet until the function that modified .last_tid finishes
* that function (neo/client/handlers/master.py invalidateObjects()) is also
invalidating NEO cache _and_ calls ZODB.DB.invalidate()
* ZODB.DB.invalidate calls ZODB.Connection.invalidate which sets
zconn._txn_time and zconn._invalidated
* even when storage-level load completes with newer than @head serial,
ZODB.Connection.setstate checks for oid in its ._invalidated, sees its
there, and retries the load with before=._txn_time
XXX load vs invalidation race is there on ZODB4 and ZODB3, but on ZODB5 there is
another open vs invalidation race.
"""
from
__future__
import
print_function
from
ZODB
import
DB
import
transaction
from
persistent
import
Persistent
from
wendelin.lib.testing
import
TestDB_ZEO
,
TestDB_NEO
from
golang
import
func
,
defer
,
chan
,
select
,
default
from
golang
import
sync
,
context
import
threading
from
ZODB.utils
import
u64
# PInt is persistent integer.
class
PInt
(
Persistent
):
def
__init__
(
self
,
i
):
self
.
i
=
i
@
func
def
main
():
#tdb = TestDB_ZEO('<zeo>')
tdb
=
TestDB_NEO
(
'<neo>'
)
tdb
.
setup
()
# XXX defer(tdb.teardown)
# two ZODB client storage connections to the same server
zstor1
=
tdb
.
getZODBStorage
()
;
defer
(
zstor1
.
close
)
db1
=
DB
(
zstor1
)
zstor1
.
app
.
poll_thread
.
name
=
'C1.poll'
# XXX doc
def
init
():
transaction
.
begin
()
zconn
=
db1
.
open
()
root
=
zconn
.
root
()
root
[
'obj1'
]
=
PInt
(
0
)
root
[
'obj2'
]
=
PInt
(
0
)
transaction
.
commit
()
zconn
.
close
()
init
()
zstor2
=
tdb
.
getZODBStorage
()
;
defer
(
zstor2
.
close
)
db2
=
DB
(
zstor2
)
zstor2
.
app
.
poll_thread
.
name
=
'C2.poll'
c2ready
=
chan
()
# c1 <- c2 "I'm ready to commit"
c2start
=
chan
()
# c1 -> c2 "go on to commit"
def
C1
(
ctx
,
N
):
threading
.
current_thread
().
name
=
"C1"
def
c1
():
transaction
.
begin
()
zconn
=
db1
.
open
()
print
(
'C1: (1) neo.app.last_tid = @%d'
%
u64
(
zstor1
.
app
.
last_tid
))
root
=
zconn
.
root
()
obj1
=
root
[
'obj1'
]
obj2
=
root
[
'obj2'
]
I
=
obj1
.
i
print
(
'C1: (2) neo.app.last_tid = @%d'
%
u64
(
zstor1
.
app
.
last_tid
))
print
(
'C1: (2) obj1.serial = @%d'
%
u64
(
obj1
.
_p_serial
))
c2ready
.
recv
()
c2start
.
send
(
1
)
import
time
time
.
sleep
(
0.5
)
# obj1 - reload it from zstor
# obj2 - get it from zconn cache
#for i in range(N):
for
i
in
range
(
15
):
obj1
.
_p_invalidate
()
print
(
'C1: (X) neo.app.last_tid = @%d'
%
u64
(
zstor1
.
app
.
last_tid
))
# both objects must have the same values
i1
=
obj1
.
i
i2
=
obj2
.
i
print
(
'C1: (X) obj1.serial = @%d'
%
u64
(
obj1
.
_p_serial
))
print
(
'C1: (X) obj2.serial = @%d'
%
u64
(
obj2
.
_p_serial
))
if
i1
!=
i2
:
raise
AssertionError
(
"C1: obj1.i (%d) != obj2.i (%d)"
%
(
i1
,
i2
))
if
i1
!=
I
:
raise
AssertionError
(
"C1: obj1.i (%d) mutated inside transaction (started with %d)"
%
(
i1
,
I
))
transaction
.
abort
()
zconn
.
close
()
for
i
in
range
(
N
):
if
ready
(
ctx
.
done
()):
break
print
(
'C1.%d'
%
i
)
c1
()
print
(
'C1.fin'
)
def
C2
(
ctx
,
N
):
threading
.
current_thread
().
name
=
"C2"
def
c2
():
transaction
.
begin
()
zconn
=
db2
.
open
()
root
=
zconn
.
root
()
obj1
=
root
[
'obj1'
]
obj2
=
root
[
'obj2'
]
obj1
.
i
+=
1
obj2
.
i
+=
1
assert
obj1
.
i
==
obj2
.
i
c2ready
.
send
(
1
)
c2start
.
recv
()
transaction
.
commit
()
zconn
.
close
()
for
i
in
range
(
N
):
if
ready
(
ctx
.
done
()):
break
print
(
'C2.%d'
%
i
)
c2
()
print
(
'C2.fin'
)
#init()
import
time
time
.
sleep
(
2
)
print
()
N
=
1000
wg
=
sync
.
WorkGroup
(
context
.
background
())
wg
.
go
(
C1
,
N
)
wg
.
go
(
C2
,
N
)
wg
.
wait
()
# ready returns whether channel ch is ready.
def
ready
(
ch
):
_
,
_rx
=
select
(
default
,
# 0
ch
.
recv
,
# 1
)
if
_
==
0
:
return
False
return
True
if
__name__
==
'__main__'
:
main
()
lib/tests/testprog/zopenrace.py
View file @
d9e13fbe
#!/usr/bin/env python
# reported to https://github.com/zopefoundation/ZODB/issues/290
#
originally
reported to https://github.com/zopefoundation/ZODB/issues/290
# fixed in https://github.com/zopefoundation/ZODB/commit/b5895a5c
"""Program zopenrace.py demonstrates concurrency bug in ZODB Connection.open()
that leads to stale live cache and wrong data provided by database to users.
...
...
@@ -90,19 +90,13 @@ NOTE ZODB4 and ZODB3 do not have this particular open vs invalidation race.
from
__future__
import
print_function
from
ZODB
import
DB
from
ZODB.MappingStorage
import
MappingStorage
import
transaction
from
persistent
import
Persistent
# don't depend on pygolang
# ( but it is more easy and structured with sync.WorkGroup
# https://pypi.org/project/pygolang/#concurrency )
#from golang import sync, context
import
threading
def
go
(
f
,
*
argv
,
**
kw
):
t
=
threading
.
Thread
(
target
=
f
,
args
=
argv
,
kwargs
=
kw
)
t
.
start
()
return
t
from
wendelin.lib.testing
import
getTestDB
from
golang
import
func
,
defer
,
select
,
default
from
golang
import
context
,
sync
# PInt is persistent integer.
...
...
@@ -110,9 +104,13 @@ class PInt(Persistent):
def
__init__
(
self
,
i
):
self
.
i
=
i
@
func
def
main
():
zstor
=
MappingStorage
()
tdb
=
getTestDB
()
tdb
.
setup
()
defer
(
tdb
.
teardown
)
zstor
=
tdb
.
getZODBStorage
()
db
=
DB
(
zstor
)
...
...
@@ -129,13 +127,11 @@ def main():
zconn
.
close
()
okv
=
[
False
,
False
]
# T1 accesses obj1/obj2 in a loop and verifies that obj1.i == obj2.i
#
# access to obj1 is organized to always trigger loading from zstor.
# access to obj2 goes through zconn cache and so verifies whether the cache is not stale.
def
T1
(
N
):
def
T1
(
ctx
,
N
):
def
t1
():
transaction
.
begin
()
zconn
=
db
.
open
()
...
...
@@ -159,14 +155,15 @@ def main():
for
i
in
range
(
N
):
#print('T1.%d' % i)
if
ready
(
ctx
.
done
()):
raise
ctx
.
err
()
t1
()
okv
[
0
]
=
True
# T2 changes obj1/obj2 in a loop by doing `objX.i += 1`.
#
# Since both objects start from 0, the invariant that `obj1.i == obj2.i` is always preserved.
def
T2
(
N
):
def
T2
(
ctx
,
N
):
def
t2
():
transaction
.
begin
()
zconn
=
db
.
open
()
...
...
@@ -183,8 +180,9 @@ def main():
for
i
in
range
(
N
):
#print('T2.%d' % i)
if
ready
(
ctx
.
done
()):
raise
ctx
.
err
()
t2
()
okv
[
1
]
=
True
# run T1 and T2 concurrently. As of 20191210, due to race condition in
...
...
@@ -192,15 +190,24 @@ def main():
init
()
N
=
1000
t1
=
go
(
T1
,
N
)
t2
=
go
(
T2
,
N
)
t1
.
join
(
)
t2
.
join
()
wg
=
sync
.
WorkGroup
(
context
.
background
()
)
wg
.
go
(
T1
,
N
)
wg
.
go
(
T2
,
N
)
wg
.
wait
()
if
not
all
(
okv
):
raise
AssertionError
(
'FAIL'
)
print
(
'OK'
)
# ready returns whether channel ch is ready.
def
ready
(
ch
):
_
,
_rx
=
select
(
default
,
# 0
ch
.
recv
,
# 1
)
if
_
==
0
:
return
False
return
True
if
__name__
==
'__main__'
:
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment