Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZEO
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
ZEO
Commits
71eb1456
Commit
71eb1456
authored
Jun 13, 2003
by
Jeremy Hylton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Bacport various cache consistency bug fixes from the ZODB3-3_1-branch.
parent
9d476b68
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
357 additions
and
29 deletions
+357
-29
src/ZEO/ClientStorage.py
src/ZEO/ClientStorage.py
+49
-21
src/ZEO/ClientStub.py
src/ZEO/ClientStub.py
+1
-1
src/ZEO/tests/ConnectionTests.py
src/ZEO/tests/ConnectionTests.py
+2
-1
src/ZEO/tests/InvalidationTests.py
src/ZEO/tests/InvalidationTests.py
+294
-0
src/ZEO/tests/testConnection.py
src/ZEO/tests/testConnection.py
+3
-6
src/ZEO/zrpc/connection.py
src/ZEO/zrpc/connection.py
+8
-0
No files found.
src/ZEO/ClientStorage.py
View file @
71eb1456
...
...
@@ -268,6 +268,15 @@ class ClientStorage:
self
.
_oid_lock
=
threading
.
Lock
()
self
.
_oids
=
[]
# Object ids retrieved from new_oids()
# load() and tpc_finish() must be serialized to guarantee
# that cache modifications from each occur atomically.
# It also prevents multiple load calls occuring simultaneously,
# which simplifies the cache logic.
self
.
_load_lock
=
threading
.
Lock
()
# _load_oid and _load_status are protected by _lock
self
.
_load_oid
=
None
self
.
_load_status
=
None
# Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock
# must prevent access to the cache while _update_cache
...
...
@@ -696,20 +705,37 @@ class ClientStorage:
"""
self
.
_lock
.
acquire
()
# for atomic processing of invalidations
try
:
p
=
self
.
_cache
.
load
(
oid
,
version
)
if
p
:
return
p
p
air
=
self
.
_cache
.
load
(
oid
,
version
)
if
p
air
:
return
p
air
finally
:
self
.
_lock
.
release
()
if
self
.
_server
is
None
:
raise
ClientDisconnected
()
# If an invalidation for oid comes in during zeoLoad, that's OK
# because we'll get oid's new state.
self
.
_load_lock
.
acquire
()
try
:
self
.
_lock
.
acquire
()
try
:
self
.
_load_oid
=
oid
self
.
_load_status
=
1
finally
:
self
.
_lock
.
release
()
p
,
s
,
v
,
pv
,
sv
=
self
.
_server
.
zeoLoad
(
oid
)
self
.
_lock
.
acquire
()
# for atomic processing of invalidations
try
:
if
self
.
_load_status
:
self
.
_cache
.
checkSize
(
0
)
self
.
_cache
.
store
(
oid
,
p
,
s
,
v
,
pv
,
sv
)
self
.
_load_oid
=
None
finally
:
self
.
_lock
.
release
()
finally
:
self
.
_load_lock
.
release
()
if
v
and
version
and
v
==
version
:
return
pv
,
sv
else
:
...
...
@@ -864,22 +890,22 @@ class ClientStorage:
"""Storage API: finish a transaction."""
if
transaction
is
not
self
.
_transaction
:
return
self
.
_load_lock
.
acquire
()
try
:
self
.
_lock
.
acquire
()
# for atomic processing of invalidations
try
:
self
.
_update_cache
()
finally
:
self
.
_lock
.
release
()
if
f
is
not
None
:
f
()
finally
:
self
.
_lock
.
release
()
tid
=
self
.
_server
.
tpc_finish
(
self
.
_serial
)
self
.
_cache
.
setLastTid
(
tid
)
self
.
_server
.
tpc_finish
(
self
.
_serial
)
r
=
self
.
_check_serials
()
assert
r
is
None
or
len
(
r
)
==
0
,
"unhandled serialnos: %s"
%
r
finally
:
self
.
_load_lock
.
release
()
self
.
end_transaction
()
def
_update_cache
(
self
):
...
...
@@ -1006,9 +1032,11 @@ class ClientStorage:
# versions maps version names to dictionary of invalidations
versions
=
{}
for
oid
,
version
in
invs
:
d
=
versions
.
setdefault
(
version
,
{})
if
oid
==
self
.
_load_oid
:
self
.
_load_status
=
0
self
.
_cache
.
invalidate
(
oid
,
version
=
version
)
d
[
oid
]
=
1
versions
.
setdefault
(
version
,
{})[
oid
]
=
1
if
self
.
_db
is
not
None
:
for
v
,
d
in
versions
.
items
():
self
.
_db
.
invalidate
(
d
,
version
=
v
)
...
...
@@ -1038,10 +1066,10 @@ class ClientStorage:
"""Invalidate objects modified by tid."""
self
.
_cache
.
setLastTid
(
tid
)
if
self
.
_pickler
is
not
None
:
self
.
log
(
"Transactional invalidation during cache verification"
,
level
=
zLOG
.
BLATHER
)
log2
(
BLATHER
,
"Transactional invalidation during cache verification"
)
for
t
in
args
:
self
.
self
.
_pickler
.
dump
(
t
)
self
.
_pickler
.
dump
(
t
)
return
self
.
_process_invalidations
(
args
)
...
...
src/ZEO/ClientStub.py
View file @
71eb1456
...
...
@@ -53,7 +53,7 @@ class ClientStorage:
self
.
rpc
.
callAsync
(
'endVerify'
)
def
invalidateTransaction
(
self
,
tid
,
args
):
self
.
rpc
.
callAsync
(
'invalidateTransaction'
,
tid
,
args
)
self
.
rpc
.
callAsync
NoPoll
(
'invalidateTransaction'
,
tid
,
args
)
def
serialnos
(
self
,
arg
):
self
.
rpc
.
callAsync
(
'serialnos'
,
arg
)
...
...
src/ZEO/tests/ConnectionTests.py
View file @
71eb1456
...
...
@@ -30,6 +30,7 @@ from ZEO.ClientStorage import ClientStorage
from
ZEO.Exceptions
import
ClientDisconnected
from
ZEO.zrpc.marshal
import
Marshaller
from
ZEO.tests
import
forker
from
ZEO.tests.InvalidationTests
import
InvalidationTests
from
ZODB.DB
import
DB
from
ZODB.Transaction
import
get_transaction
,
Transaction
...
...
@@ -198,7 +199,7 @@ class CommonSetupTearDown(StorageTestBase):
self
.
fail
(
"timed out waiting for storage to disconnect"
)
class
ConnectionTests
(
CommonSetupTearDown
):
class
ConnectionTests
(
CommonSetupTearDown
,
InvalidationTests
):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
...
...
src/ZEO/tests/InvalidationTests.py
0 → 100644
View file @
71eb1456
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from
thread
import
get_ident
import
threading
import
time
from
BTrees.check
import
check
,
display
from
BTrees.OOBTree
import
OOBTree
from
ZEO.tests.TestThread
import
TestThread
from
ZODB.DB
import
DB
from
ZODB.POSException
\
import
ReadConflictError
,
ConflictError
,
VersionLockError
import
zLOG
class
StressThread
(
TestThread
):
def
__init__
(
self
,
testcase
,
db
,
stop
,
threadnum
,
startnum
,
step
=
2
,
sleep
=
None
):
TestThread
.
__init__
(
self
,
testcase
)
self
.
db
=
db
self
.
stop
=
stop
self
.
threadnum
=
threadnum
self
.
startnum
=
startnum
self
.
step
=
step
self
.
sleep
=
sleep
self
.
added_keys
=
[]
def
testrun
(
self
):
cn
=
self
.
db
.
open
()
while
not
self
.
stop
.
isSet
():
try
:
tree
=
cn
.
root
()[
"tree"
]
break
except
(
ConflictError
,
KeyError
):
get_transaction
().
abort
()
cn
.
sync
()
key
=
self
.
startnum
while
not
self
.
stop
.
isSet
():
try
:
tree
[
key
]
=
self
.
threadnum
get_transaction
().
note
(
"add key %s"
%
key
)
get_transaction
().
commit
()
if
self
.
sleep
:
time
.
sleep
(
self
.
sleep
)
except
(
ReadConflictError
,
ConflictError
),
msg
:
get_transaction
().
abort
()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn
.
sync
()
else
:
self
.
added_keys
.
append
(
key
)
key
+=
self
.
step
cn
.
close
()
class
VersionStressThread
(
TestThread
):
def
__init__
(
self
,
testcase
,
db
,
stop
,
threadnum
,
startnum
,
step
=
2
,
sleep
=
None
):
TestThread
.
__init__
(
self
,
testcase
)
self
.
db
=
db
self
.
stop
=
stop
self
.
threadnum
=
threadnum
self
.
startnum
=
startnum
self
.
step
=
step
self
.
sleep
=
sleep
self
.
added_keys
=
[]
def
log
(
self
,
msg
):
zLOG
.
LOG
(
"thread %d"
%
get_ident
(),
0
,
msg
)
def
testrun
(
self
):
self
.
log
(
"thread begin"
)
commit
=
0
key
=
self
.
startnum
while
not
self
.
stop
.
isSet
():
version
=
"%s:%s"
%
(
self
.
threadnum
,
key
)
commit
=
not
commit
self
.
log
(
"attempt to add key=%s version=%s commit=%d"
%
(
key
,
version
,
commit
))
if
self
.
oneupdate
(
version
,
key
,
commit
):
self
.
added_keys
.
append
(
key
)
key
+=
self
.
step
def
oneupdate
(
self
,
version
,
key
,
commit
=
1
):
# The mess of sleeps below were added to reduce the number
# of VersionLockErrors, based on empirical observation.
# It looks like the threads don't switch enough without
# the sleeps.
cn
=
self
.
db
.
open
(
version
)
while
not
self
.
stop
.
isSet
():
try
:
tree
=
cn
.
root
()[
"tree"
]
break
except
(
ConflictError
,
KeyError
):
get_transaction
().
abort
()
cn
.
sync
()
while
not
self
.
stop
.
isSet
():
try
:
tree
[
key
]
=
self
.
threadnum
get_transaction
().
note
(
"add key %d"
%
key
)
get_transaction
().
commit
()
if
self
.
sleep
:
time
.
sleep
(
self
.
sleep
)
break
except
(
VersionLockError
,
ReadConflictError
,
ConflictError
),
msg
:
self
.
log
(
msg
)
get_transaction
().
abort
()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn
.
sync
()
if
self
.
sleep
:
time
.
sleep
(
self
.
sleep
)
try
:
while
not
self
.
stop
.
isSet
():
try
:
if
commit
:
self
.
db
.
commitVersion
(
version
)
get_transaction
().
note
(
"commit version %s"
%
version
)
else
:
self
.
db
.
abortVersion
(
version
)
get_transaction
().
note
(
"abort version %s"
%
version
)
get_transaction
().
commit
()
if
self
.
sleep
:
time
.
sleep
(
self
.
sleep
)
return
commit
except
ConflictError
,
msg
:
self
.
log
(
msg
)
get_transaction
().
abort
()
cn
.
sync
()
finally
:
cn
.
close
()
return
0
class
InvalidationTests
:
level
=
2
DELAY
=
15
def
_check_tree
(
self
,
cn
,
tree
):
# Make sure the BTree is sane and that all the updates persisted
retries
=
3
while
retries
:
retries
-=
1
try
:
check
(
tree
)
tree
.
_check
()
except
ReadConflictError
:
if
retries
:
get_transaction
().
abort
()
cn
.
sync
()
else
:
raise
except
:
display
(
tree
)
raise
def
_check_threads
(
self
,
tree
,
*
threads
):
# Make sure the thread's view of the world is consistent with
# the actual database state.
for
t
in
threads
:
# If the test didn't add any keys, it didn't do what we expected.
self
.
assert_
(
t
.
added_keys
)
for
key
in
t
.
added_keys
:
self
.
assert_
(
tree
.
has_key
(
key
),
key
)
def
go
(
self
,
stop
,
*
threads
):
# Run the threads
for
t
in
threads
:
t
.
start
()
time
.
sleep
(
self
.
DELAY
)
stop
.
set
()
for
t
in
threads
:
t
.
cleanup
()
def
checkConcurrentUpdates2Storages
(
self
):
self
.
_storage
=
storage1
=
self
.
openClientStorage
()
storage2
=
self
.
openClientStorage
(
cache
=
"2"
)
db1
=
DB
(
storage1
)
db2
=
DB
(
storage2
)
stop
=
threading
.
Event
()
cn
=
db1
.
open
()
tree
=
cn
.
root
()[
"tree"
]
=
OOBTree
()
get_transaction
().
commit
()
# Run two threads that update the BTree
t1
=
StressThread
(
self
,
db1
,
stop
,
1
,
1
)
t2
=
StressThread
(
self
,
db2
,
stop
,
2
,
2
)
self
.
go
(
stop
,
t1
,
t2
)
cn
.
sync
()
self
.
_check_tree
(
cn
,
tree
)
self
.
_check_threads
(
tree
,
t1
,
t2
)
cn
.
close
()
db1
.
close
()
db2
.
close
()
def
checkConcurrentUpdates1Storage
(
self
):
self
.
_storage
=
storage1
=
self
.
openClientStorage
()
db1
=
DB
(
storage1
)
stop
=
threading
.
Event
()
cn
=
db1
.
open
()
tree
=
cn
.
root
()[
"tree"
]
=
OOBTree
()
get_transaction
().
commit
()
# Run two threads that update the BTree
t1
=
StressThread
(
self
,
db1
,
stop
,
1
,
1
,
sleep
=
0.001
)
t2
=
StressThread
(
self
,
db1
,
stop
,
2
,
2
,
sleep
=
0.001
)
self
.
go
(
stop
,
t1
,
t2
)
cn
.
sync
()
self
.
_check_tree
(
cn
,
tree
)
self
.
_check_threads
(
tree
,
t1
,
t2
)
cn
.
close
()
db1
.
close
()
def
checkConcurrentUpdates2StoragesMT
(
self
):
self
.
_storage
=
storage1
=
self
.
openClientStorage
()
db1
=
DB
(
storage1
)
stop
=
threading
.
Event
()
cn
=
db1
.
open
()
tree
=
cn
.
root
()[
"tree"
]
=
OOBTree
()
get_transaction
().
commit
()
db2
=
DB
(
self
.
openClientStorage
(
cache
=
"2"
))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1
=
StressThread
(
self
,
db1
,
stop
,
1
,
1
,
3
)
t2
=
StressThread
(
self
,
db2
,
stop
,
2
,
2
,
3
,
0.001
)
t3
=
StressThread
(
self
,
db2
,
stop
,
3
,
3
,
3
,
0.001
)
self
.
go
(
stop
,
t1
,
t2
,
t3
)
cn
.
sync
()
self
.
_check_tree
(
cn
,
tree
)
self
.
_check_threads
(
tree
,
t1
,
t2
,
t3
)
cn
.
close
()
db1
.
close
()
db2
.
close
()
def
checkConcurrentUpdatesInVersions
(
self
):
self
.
_storage
=
storage1
=
self
.
openClientStorage
()
db1
=
DB
(
storage1
)
db2
=
DB
(
self
.
openClientStorage
(
cache
=
"2"
))
stop
=
threading
.
Event
()
cn
=
db1
.
open
()
tree
=
cn
.
root
()[
"tree"
]
=
OOBTree
()
get_transaction
().
commit
()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1
=
VersionStressThread
(
self
,
db1
,
stop
,
1
,
1
,
3
)
t2
=
VersionStressThread
(
self
,
db2
,
stop
,
2
,
2
,
3
,
0.001
)
t3
=
VersionStressThread
(
self
,
db2
,
stop
,
3
,
3
,
3
,
0.001
)
self
.
go
(
stop
,
t1
,
t2
,
t3
)
cn
.
sync
()
self
.
_check_tree
(
cn
,
tree
)
self
.
_check_threads
(
tree
,
t1
,
t2
,
t3
)
cn
.
close
()
db1
.
close
()
db2
.
close
()
src/ZEO/tests/testConnection.py
View file @
71eb1456
...
...
@@ -110,12 +110,9 @@ class MappingStorageTimeoutTests(
test_classes
=
[
FileStorageConnectionTests
,
FileStorageReconnectionTests
,
FileStorageTimeoutTests
]
test_classes
.
extend
(
[
MappingStorageConnectionTests
,
MappingStorageTimeoutTests
])
FileStorageTimeoutTests
,
MappingStorageConnectionTests
,
MappingStorageTimeoutTests
]
import
BDBStorage
if
BDBStorage
.
is_available
:
...
...
src/ZEO/zrpc/connection.py
View file @
71eb1456
...
...
@@ -398,6 +398,14 @@ class Connection(smac.SizedMessageAsyncConnection):
self
.
send_call
(
method
,
args
,
ASYNC
)
self
.
poll
()
def
callAsyncNoPoll
(
self
,
method
,
*
args
):
# Like CallAsync but doesn't poll. This exists so that we can
# send invalidations atomically to all clients without
# allowing any client to sneak in a load request.
if
self
.
closed
:
raise
DisconnectedError
()
self
.
send_call
(
method
,
args
,
ASYNC
)
# handle IO, possibly in async mode
def
_prepare_async
(
self
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment