Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZEO
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
ZEO
Commits
11cf859f
Commit
11cf859f
authored
Aug 30, 2008
by
Christian Theune
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Merge gocept-iteration branch.
parent
0da4760d
Changes
23
Hide whitespace changes
Inline
Side-by-side
Showing
23 changed files
with
795 additions
and
165 deletions
+795
-165
src/CHANGES.txt
src/CHANGES.txt
+3
-0
src/ZEO/ClientStorage.py
src/ZEO/ClientStorage.py
+144
-12
src/ZEO/CommitLog.py
src/ZEO/CommitLog.py
+8
-1
src/ZEO/ServerStub.py
src/ZEO/ServerStub.py
+20
-0
src/ZEO/StorageServer.py
src/ZEO/StorageServer.py
+119
-13
src/ZEO/tests/ConnectionTests.py
src/ZEO/tests/ConnectionTests.py
+2
-1
src/ZEO/tests/IterationTests.py
src/ZEO/tests/IterationTests.py
+101
-0
src/ZEO/tests/testZEO.py
src/ZEO/tests/testZEO.py
+113
-8
src/ZODB/BaseStorage.py
src/ZODB/BaseStorage.py
+35
-4
src/ZODB/DemoStorage.py
src/ZODB/DemoStorage.py
+39
-6
src/ZODB/FileStorage/FileStorage.py
src/ZODB/FileStorage/FileStorage.py
+38
-46
src/ZODB/FileStorage/__init__.py
src/ZODB/FileStorage/__init__.py
+5
-1
src/ZODB/MappingStorage.py
src/ZODB/MappingStorage.py
+3
-3
src/ZODB/blob.py
src/ZODB/blob.py
+15
-0
src/ZODB/fsrecover.py
src/ZODB/fsrecover.py
+3
-3
src/ZODB/interfaces.py
src/ZODB/interfaces.py
+31
-13
src/ZODB/tests/IteratorStorage.py
src/ZODB/tests/IteratorStorage.py
+48
-18
src/ZODB/tests/PackableStorage.py
src/ZODB/tests/PackableStorage.py
+13
-18
src/ZODB/tests/RecoveryStorage.py
src/ZODB/tests/RecoveryStorage.py
+8
-3
src/ZODB/tests/TransactionalUndoStorage.py
src/ZODB/tests/TransactionalUndoStorage.py
+9
-12
src/ZODB/tests/testFileStorage.py
src/ZODB/tests/testFileStorage.py
+37
-1
src/ZODB/tests/testMappingStorage.py
src/ZODB/tests/testMappingStorage.py
+1
-0
src/ZODB/utils.py
src/ZODB/utils.py
+0
-2
No files found.
src/CHANGES.txt
View file @
11cf859f
...
...
@@ -8,6 +8,9 @@ Change History
New Features
------------
- Cleaned-up the storage iteration API and provided an iterator implementation
for ZEO.
- Versions are no-longer supported.
- ZEO cache files can be larger than 4G. Note that older ZEO cache
...
...
src/ZEO/ClientStorage.py
View file @
11cf859f
...
...
@@ -29,8 +29,9 @@ import threading
import
time
import
types
import
logging
import
weakref
from
zope.interface
import
implements
import
zope.interface
from
ZEO
import
ServerStub
from
ZEO.cache
import
ClientCache
from
ZEO.TransactionBuffer
import
TransactionBuffer
...
...
@@ -38,11 +39,12 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from
ZEO.auth
import
get_module
from
ZEO.zrpc.client
import
ConnectionManager
import
ZODB.interfaces
import
ZODB.lock_file
import
ZODB.BaseStorage
from
ZODB
import
POSException
from
ZODB
import
utils
from
ZODB.loglevels
import
BLATHER
from
ZODB.interfaces
import
IBlobStorage
from
ZODB.blob
import
rename_or_copy_blob
from
persistent.TimeStamp
import
TimeStamp
...
...
@@ -103,7 +105,10 @@ class ClientStorage(object):
"""
implements
(
IBlobStorage
)
# ClientStorage does not declare any interfaces here. Interfaces are
# declared according to the server's storage once a connection is
# established.
# Classes we instantiate. A subclass might override.
TransactionBufferClass
=
TransactionBuffer
...
...
@@ -260,6 +265,9 @@ class ClientStorage(object):
self
.
_password
=
password
self
.
_realm
=
realm
self
.
_iterators
=
weakref
.
WeakValueDictionary
()
self
.
_iterator_ids
=
set
()
# Flag tracking disconnections in the middle of a transaction. This
# is reset in tpc_begin() and set in notifyDisconnected().
self
.
_midtxn_disconnect
=
0
...
...
@@ -270,7 +278,7 @@ class ClientStorage(object):
self
.
_verification_invalidations
=
None
self
.
_info
=
{
'length'
:
0
,
'size'
:
0
,
'name'
:
'ZEO Client'
,
'supportsUndo'
:
0
}
'supportsUndo'
:
0
,
'interfaces'
:
()
}
self
.
_tbuf
=
self
.
TransactionBufferClass
()
self
.
_db
=
None
...
...
@@ -526,6 +534,15 @@ class ClientStorage(object):
self
.
_handle_extensions
()
# Decorate ClientStorage with all interfaces that the backend storage
# supports.
remote_interfaces
=
[]
for
module_name
,
interface_name
in
self
.
_info
[
'interfaces'
]:
module
=
__import__
(
module_name
,
globals
(),
locals
(),
[
interface_name
])
interface
=
getattr
(
module
,
interface_name
)
remote_interfaces
.
append
(
interface
)
zope
.
interface
.
directlyProvides
(
self
,
remote_interfaces
)
def
_handle_extensions
(
self
):
for
name
in
self
.
getExtensionMethods
().
keys
():
if
not
hasattr
(
self
,
name
):
...
...
@@ -633,6 +650,7 @@ class ClientStorage(object):
self
.
_ready
.
clear
()
self
.
_server
=
disconnected_stub
self
.
_midtxn_disconnect
=
1
self
.
_iterator_gc
()
def
__len__
(
self
):
"""Return the size of the storage."""
...
...
@@ -933,14 +951,7 @@ class ClientStorage(object):
raise
POSException
.
POSKeyError
(
"No blob file"
,
oid
,
serial
)
# First, we'll create the directory for this oid, if it doesn't exist.
targetpath
=
self
.
fshelper
.
getPathForOID
(
oid
)
if
not
os
.
path
.
exists
(
targetpath
):
try
:
os
.
makedirs
(
targetpath
,
0700
)
except
OSError
:
# We might have lost a race. If so, the directory
# must exist now
assert
os
.
path
.
exists
(
targetpath
)
self
.
fshelper
.
createPathForOID
(
oid
)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
...
...
@@ -1075,6 +1086,7 @@ class ClientStorage(object):
self
.
_tbuf
.
clear
()
self
.
_seriald
.
clear
()
del
self
.
_serials
[:]
self
.
_iterator_gc
()
self
.
end_transaction
()
def
tpc_finish
(
self
,
txn
,
f
=
None
):
...
...
@@ -1104,6 +1116,7 @@ class ClientStorage(object):
assert
r
is
None
or
len
(
r
)
==
0
,
"unhandled serialnos: %s"
%
r
finally
:
self
.
_load_lock
.
release
()
self
.
_iterator_gc
()
self
.
end_transaction
()
def
_update_cache
(
self
,
tid
):
...
...
@@ -1176,6 +1189,25 @@ class ClientStorage(object):
return
[]
return
self
.
_server
.
undoLog
(
first
,
last
)
# Recovery support
def
copyTransactionsFrom
(
self
,
other
,
verbose
=
0
):
"""Copy transactions from another storage.
This is typically used for converting data from one storage to
another. `other` must have an .iterator() method.
"""
ZODB
.
BaseStorage
.
copy
(
other
,
self
,
verbose
)
def
restore
(
self
,
oid
,
serial
,
data
,
version
,
prev_txn
,
transaction
):
"""Write data already committed in a separate database."""
assert
not
version
self
.
_check_trans
(
transaction
)
self
.
_server
.
restorea
(
oid
,
serial
,
data
,
prev_txn
,
id
(
transaction
))
# Don't update the transaction buffer, because current data are
# unaffected.
return
self
.
_check_serials
()
# Below are methods invoked by the StorageServer
def
serialnos
(
self
,
args
):
...
...
@@ -1249,3 +1281,103 @@ class ClientStorage(object):
invalidate
=
invalidateVerify
end
=
endVerify
Invalidate
=
invalidateTrans
# IStorageIteration
def
iterator
(
self
,
start
=
None
,
stop
=
None
):
"""Return an IStorageTransactionInformation iterator."""
# iids are "iterator IDs" that can be used to query an iterator whose
# status is held on the server.
iid
=
self
.
_server
.
iterator_start
(
start
,
stop
)
return
self
.
_setup_iterator
(
TransactionIterator
,
iid
)
def
_setup_iterator
(
self
,
factory
,
iid
,
*
args
):
self
.
_iterators
[
iid
]
=
iterator
=
factory
(
self
,
iid
,
*
args
)
self
.
_iterator_ids
.
add
(
iid
)
return
iterator
def
_forget_iterator
(
self
,
iid
):
self
.
_iterators
.
pop
(
iid
,
None
)
self
.
_iterator_ids
.
remove
(
iid
)
def
_iterator_gc
(
self
):
iids
=
self
.
_iterator_ids
-
set
(
self
.
_iterators
)
try
:
self
.
_server
.
iterator_gc
(
list
(
iids
))
except
ClientDisconnected
:
# We could not successfully garbage-collect iterators.
# The server might have been restarted, so the IIDs might mean
# something different now. We simply forget our unused IIDs to
# avoid gc'ing foreign iterators.
# In the case that the server was not restarted, we accept the
# risk of leaking resources on the ZEO server.
pass
self
.
_iterator_ids
-=
iids
class
TransactionIterator
(
object
):
def
__init__
(
self
,
storage
,
iid
,
*
args
):
self
.
_storage
=
storage
self
.
_iid
=
iid
self
.
_ended
=
False
def
__iter__
(
self
):
return
self
def
next
(
self
):
if
self
.
_ended
:
raise
ZODB
.
interfaces
.
StorageStopIteration
()
tx_data
=
self
.
_storage
.
_server
.
iterator_next
(
self
.
_iid
)
if
tx_data
is
None
:
# The iterator is exhausted, and the server has already
# disposed it.
self
.
_ended
=
True
self
.
_storage
.
_forget_iterator
(
self
.
_iid
)
raise
ZODB
.
interfaces
.
StorageStopIteration
()
return
ClientStorageTransactionInformation
(
self
.
_storage
,
self
,
*
tx_data
)
class
ClientStorageTransactionInformation
(
ZODB
.
BaseStorage
.
TransactionRecord
):
def
__init__
(
self
,
storage
,
txiter
,
tid
,
status
,
user
,
description
,
extension
):
self
.
_storage
=
storage
self
.
_txiter
=
txiter
self
.
_completed
=
False
self
.
_riid
=
None
self
.
tid
=
tid
self
.
status
=
status
self
.
user
=
user
self
.
description
=
description
self
.
extension
=
extension
def
__iter__
(
self
):
riid
=
self
.
_storage
.
_server
.
iterator_record_start
(
self
.
_txiter
.
_iid
,
self
.
tid
)
return
self
.
_storage
.
_setup_iterator
(
RecordIterator
,
riid
)
class
RecordIterator
(
object
):
def
__init__
(
self
,
storage
,
riid
):
self
.
_riid
=
riid
self
.
_completed
=
False
self
.
_storage
=
storage
def
__iter__
(
self
):
return
self
def
next
(
self
):
if
self
.
_completed
:
# We finished iteration once already and the server can't know
# about the iteration anymore.
raise
ZODB
.
interfaces
.
StorageStopIteration
()
item
=
self
.
_storage
.
_server
.
iterator_record_next
(
self
.
_riid
)
if
item
is
None
:
# The iterator is exhausted, and the server has already
# disposed it.
self
.
_completed
=
True
raise
ZODB
.
interfaces
.
StorageStopIteration
()
return
ZODB
.
BaseStorage
.
DataRecord
(
*
item
)
src/ZEO/CommitLog.py
View file @
11cf859f
...
...
@@ -18,10 +18,13 @@ must serialize them as the actually execute at the server. The
concurrent commits are achieved by logging actions up until the
tpc_vote(). At that point, the entire transaction is committed on the
real storage.
"""
import
cPickle
import
tempfile
class
CommitLog
:
def
__init__
(
self
):
...
...
@@ -35,7 +38,11 @@ class CommitLog:
return
self
.
file
.
tell
()
def
store
(
self
,
oid
,
serial
,
data
):
self
.
pickler
.
dump
((
oid
,
serial
,
data
))
self
.
pickler
.
dump
((
's'
,
oid
,
serial
,
data
))
self
.
stores
+=
1
def
restore
(
self
,
oid
,
serial
,
data
,
prev_txn
):
self
.
pickler
.
dump
((
'r'
,
oid
,
serial
,
data
,
prev_txn
))
self
.
stores
+=
1
def
get_loader
(
self
):
...
...
src/ZEO/ServerStub.py
View file @
11cf859f
...
...
@@ -209,6 +209,10 @@ class StorageServer:
def
storea
(
self
,
oid
,
serial
,
data
,
id
):
self
.
rpc
.
callAsync
(
'storea'
,
oid
,
serial
,
data
,
''
,
id
)
def
restorea
(
self
,
oid
,
serial
,
data
,
prev_txn
,
id
):
self
.
rpc
.
callAsync
(
'restorea'
,
oid
,
serial
,
data
,
prev_txn
,
id
)
def
storeBlob
(
self
,
oid
,
serial
,
data
,
blobfilename
,
txn
):
# Store a blob to the server. We don't want to real all of
...
...
@@ -292,6 +296,22 @@ class StorageServer:
def
undoInfo
(
self
,
first
,
last
,
spec
):
return
self
.
rpc
.
call
(
'undoInfo'
,
first
,
last
,
spec
)
def
iterator_start
(
self
,
start
,
stop
):
return
self
.
rpc
.
call
(
'iterator_start'
,
start
,
stop
)
def
iterator_next
(
self
,
iid
):
return
self
.
rpc
.
call
(
'iterator_next'
,
iid
)
def
iterator_record_start
(
self
,
txn_iid
,
tid
):
return
self
.
rpc
.
call
(
'iterator_record_start'
,
txn_iid
,
tid
)
def
iterator_record_next
(
self
,
iid
):
return
self
.
rpc
.
call
(
'iterator_record_next'
,
iid
)
def
iterator_gc
(
self
,
iids
):
return
self
.
rpc
.
call
(
'iterator_gc'
,
iids
)
class
ExtensionMethodWrapper
:
def
__init__
(
self
,
rpc
,
name
):
self
.
rpc
=
rpc
...
...
src/ZEO/StorageServer.py
View file @
11cf859f
...
...
@@ -29,12 +29,14 @@ import tempfile
import
threading
import
time
import
warnings
import
itertools
import
transaction
import
ZODB.serialize
import
ZEO.zrpc.error
import
zope.interface
from
ZEO
import
ClientStub
from
ZEO.CommitLog
import
CommitLog
from
ZEO.monitor
import
StorageStats
,
StatsServer
...
...
@@ -53,7 +55,6 @@ from ZODB.loglevels import BLATHER
logger
=
logging
.
getLogger
(
'ZEO.StorageServer'
)
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label
=
""
# default label used for logging.
...
...
@@ -110,6 +111,11 @@ class ZEOStorage:
self
.
_extensions
=
{}
for
func
in
self
.
extensions
:
self
.
_extensions
[
func
.
func_name
]
=
None
self
.
_iterators
=
{}
self
.
_iterator_ids
=
itertools
.
count
()
# Stores the last item that was handed out for a
# transaction iterator.
self
.
_txn_iterators_last
=
{}
def
finish_auth
(
self
,
authenticated
):
if
not
self
.
auth_realm
:
...
...
@@ -272,6 +278,12 @@ class ZEOStorage:
else
:
supportsUndo
=
supportsUndo
()
# Communicate the backend storage interfaces to the client
storage_provides
=
zope
.
interface
.
providedBy
(
storage
)
interfaces
=
[]
for
candidate
in
storage_provides
.
__iro__
:
interfaces
.
append
((
candidate
.
__module__
,
candidate
.
__name__
))
return
{
'length'
:
len
(
storage
),
'size'
:
storage
.
getSize
(),
'name'
:
storage
.
getName
(),
...
...
@@ -279,6 +291,7 @@ class ZEOStorage:
'supportsVersions'
:
False
,
'extensionMethods'
:
self
.
getExtensionMethods
(),
'supports_record_iternext'
:
hasattr
(
self
,
'record_iternext'
),
'interfaces'
:
tuple
(
interfaces
),
}
def
get_size_info
(
self
):
...
...
@@ -477,6 +490,11 @@ class ZEOStorage:
self
.
stats
.
stores
+=
1
self
.
txnlog
.
store
(
oid
,
serial
,
data
)
def
restorea
(
self
,
oid
,
serial
,
data
,
prev_txn
,
id
):
self
.
_check_tid
(
id
,
exc
=
StorageTransactionError
)
self
.
stats
.
stores
+=
1
self
.
txnlog
.
restore
(
oid
,
serial
,
data
,
prev_txn
)
def
storeBlobStart
(
self
):
assert
self
.
blob_tempfile
is
None
self
.
blob_tempfile
=
tempfile
.
mkstemp
(
...
...
@@ -544,16 +562,7 @@ class ZEOStorage:
# Unexpected errors are logged and passed to the client
self
.
log
(
"store error: %s, %s"
%
sys
.
exc_info
()[:
2
],
logging
.
ERROR
,
exc_info
=
True
)
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something else.
pickler
=
cPickle
.
Pickler
()
pickler
.
fast
=
1
try
:
pickler
.
dump
(
err
,
1
)
except
:
msg
=
"Couldn't pickle storage exception: %s"
%
repr
(
err
)
self
.
log
(
msg
,
logging
.
ERROR
)
err
=
StorageServerError
(
msg
)
err
=
self
.
_marshal_error
(
err
)
# The exception is reported back as newserial for this oid
newserial
=
[(
oid
,
err
)]
else
:
...
...
@@ -575,6 +584,37 @@ class ZEOStorage:
return
err
is
None
def
_restore
(
self
,
oid
,
serial
,
data
,
prev_txn
):
err
=
None
try
:
self
.
storage
.
restore
(
oid
,
serial
,
data
,
''
,
prev_txn
,
self
.
transaction
)
except
(
SystemExit
,
KeyboardInterrupt
):
raise
except
Exception
,
err
:
self
.
store_failed
=
1
if
not
isinstance
(
err
,
TransactionError
):
# Unexpected errors are logged and passed to the client
self
.
log
(
"store error: %s, %s"
%
sys
.
exc_info
()[:
2
],
logging
.
ERROR
,
exc_info
=
True
)
err
=
self
.
_marshal_error
(
err
)
# The exception is reported back as newserial for this oid
self
.
serials
.
append
((
oid
,
err
))
return
err
is
None
def
_marshal_error
(
self
,
error
):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
pickler
=
cPickle
.
Pickler
()
pickler
.
fast
=
1
try
:
pickler
.
dump
(
error
,
1
)
except
:
msg
=
"Couldn't pickle storage exception: %s"
%
repr
(
error
)
self
.
log
(
msg
,
logging
.
ERROR
)
error
=
StorageServerError
(
msg
)
return
error
def
_vote
(
self
):
if
not
self
.
store_failed
:
# Only call tpc_vote of no store call failed, otherwise
...
...
@@ -627,8 +667,18 @@ class ZEOStorage:
self
.
_tpc_begin
(
self
.
transaction
,
self
.
tid
,
self
.
status
)
loads
,
loader
=
self
.
txnlog
.
get_loader
()
for
i
in
range
(
loads
):
# load oid, serial, data, version
if
not
self
.
_store
(
*
loader
.
load
()):
store
=
loader
.
load
()
store_type
=
store
[
0
]
store_args
=
store
[
1
:]
if
store_type
==
's'
:
do_store
=
self
.
_store
elif
store_type
==
'r'
:
do_store
=
self
.
_restore
else
:
raise
ValueError
(
'Invalid store type: %r'
%
store_type
)
if
not
do_store
(
*
store_args
):
break
# Blob support
...
...
@@ -683,6 +733,62 @@ class ZEOStorage:
abortVersion
=
commitVersion
# IStorageIteration support
def
iterator_start
(
self
,
start
,
stop
):
iid
=
self
.
_iterator_ids
.
next
()
self
.
_iterators
[
iid
]
=
iter
(
self
.
storage
.
iterator
(
start
,
stop
))
return
iid
def
iterator_next
(
self
,
iid
):
iterator
=
self
.
_iterators
[
iid
]
try
:
info
=
iterator
.
next
()
except
StopIteration
:
del
self
.
_iterators
[
iid
]
item
=
None
if
iid
in
self
.
_txn_iterators_last
:
del
self
.
_txn_iterators_last
[
iid
]
else
:
item
=
(
info
.
tid
,
info
.
status
,
info
.
user
,
info
.
description
,
info
.
extension
)
# Keep a reference to the last iterator result to allow starting a
# record iterator off it.
self
.
_txn_iterators_last
[
iid
]
=
info
return
item
def
iterator_record_start
(
self
,
txn_iid
,
tid
):
record_iid
=
self
.
_iterator_ids
.
next
()
txn_info
=
self
.
_txn_iterators_last
[
txn_iid
]
if
txn_info
.
tid
!=
tid
:
raise
Exception
(
'Out-of-order request for record iterator for transaction %r'
%
tid
)
self
.
_iterators
[
record_iid
]
=
iter
(
txn_info
)
return
record_iid
def
iterator_record_next
(
self
,
iid
):
iterator
=
self
.
_iterators
[
iid
]
try
:
info
=
iterator
.
next
()
except
StopIteration
:
del
self
.
_iterators
[
iid
]
item
=
None
else
:
item
=
(
info
.
oid
,
info
.
tid
,
info
.
data
,
info
.
version
,
info
.
data_txn
)
return
item
def
iterator_gc
(
self
,
iids
):
for
iid
in
iids
:
self
.
_iterators
.
pop
(
iid
,
None
)
class
StorageServerDB
:
def
__init__
(
self
,
server
,
storage_id
):
...
...
src/ZEO/tests/ConnectionTests.py
View file @
11cf859f
...
...
@@ -102,7 +102,8 @@ class CommonSetupTearDown(StorageTestBase):
"""
self
.
__super_setUp
()
logging
.
info
(
"setUp() %s"
,
self
.
id
())
self
.
file
=
tempfile
.
mktemp
()
fd
,
self
.
file
=
tempfile
.
mkstemp
()
os
.
close
(
fd
)
self
.
addr
=
[]
self
.
_pids
=
[]
self
.
_servers
=
[]
...
...
src/ZEO/tests/IterationTests.py
0 → 100644
View file @
11cf859f
##############################################################################
#
# Copyright (c) 2008 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""ZEO iterator protocol tests."""
import
transaction
class
IterationTests
:
def
checkIteratorGCProtocol
(
self
):
# Test garbage collection on protocol level.
server
=
self
.
_storage
.
_server
iid
=
server
.
iterator_start
(
None
,
None
)
# None signals the end of iteration.
self
.
assertEquals
(
None
,
server
.
iterator_next
(
iid
))
# The server has disposed the iterator already.
self
.
assertRaises
(
KeyError
,
server
.
iterator_next
,
iid
)
iid
=
server
.
iterator_start
(
None
,
None
)
# This time, we tell the server to throw the iterator away.
server
.
iterator_gc
([
iid
])
self
.
assertRaises
(
KeyError
,
server
.
iterator_next
,
iid
)
def
checkIteratorExhaustionStorage
(
self
):
# Test the storage's garbage collection mechanism.
iterator
=
self
.
_storage
.
iterator
()
self
.
assertEquals
(
1
,
len
(
self
.
_storage
.
_iterator_ids
))
iid
=
list
(
self
.
_storage
.
_iterator_ids
)[
0
]
self
.
assertEquals
([],
list
(
iterator
))
self
.
assertEquals
(
0
,
len
(
self
.
_storage
.
_iterator_ids
))
# The iterator has run through, so the server has already disposed it.
self
.
assertRaises
(
KeyError
,
self
.
_storage
.
_server
.
iterator_next
,
iid
)
def
checkIteratorGCSpanTransactions
(
self
):
iterator
=
self
.
_storage
.
iterator
()
t
=
transaction
.
Transaction
()
self
.
_storage
.
tpc_begin
(
t
)
self
.
_storage
.
tpc_vote
(
t
)
self
.
_storage
.
tpc_finish
(
t
)
self
.
assertEquals
([],
list
(
iterator
))
def
checkIteratorGCStorageCommitting
(
self
):
# We want the iterator to be garbage-collected, so we don't keep any
# hard references to it. The storage tracks its ID, though.
self
.
_storage
.
iterator
()
self
.
assertEquals
(
1
,
len
(
self
.
_storage
.
_iterator_ids
))
iid
=
list
(
self
.
_storage
.
_iterator_ids
)[
0
]
# GC happens at the transaction boundary. After that, both the storage
# and the server have forgotten the iterator.
self
.
_dostore
()
self
.
assertEquals
(
0
,
len
(
self
.
_storage
.
_iterator_ids
))
self
.
assertRaises
(
KeyError
,
self
.
_storage
.
_server
.
iterator_next
,
iid
)
def
checkIteratorGCStorageTPCAborting
(
self
):
self
.
_storage
.
iterator
()
iid
=
list
(
self
.
_storage
.
_iterator_ids
)[
0
]
t
=
transaction
.
Transaction
()
self
.
_storage
.
tpc_begin
(
t
)
self
.
_storage
.
tpc_abort
(
t
)
self
.
assertEquals
(
0
,
len
(
self
.
_storage
.
_iterator_ids
))
self
.
assertRaises
(
KeyError
,
self
.
_storage
.
_server
.
iterator_next
,
iid
)
def
checkIteratorGCStorageDisconnect
(
self
):
self
.
_storage
.
iterator
()
iid
=
list
(
self
.
_storage
.
_iterator_ids
)[
0
]
t
=
transaction
.
Transaction
()
self
.
_storage
.
tpc_begin
(
t
)
# Show that after disconnecting, the client side GCs the iterators
# as well. I'm calling this directly to avoid accidentally
# calling tpc_abort implicitly.
self
.
_storage
.
notifyDisconnected
()
self
.
assertEquals
(
0
,
len
(
self
.
_storage
.
_iterator_ids
))
def
checkIteratorParallel
(
self
):
self
.
_dostore
()
self
.
_dostore
()
iter1
=
self
.
_storage
.
iterator
()
iter2
=
self
.
_storage
.
iterator
()
txn_info1
=
iter1
.
next
()
txn_info2
=
iter2
.
next
()
self
.
assertEquals
(
txn_info1
.
tid
,
txn_info2
.
tid
)
txn_info1
=
iter1
.
next
()
txn_info2
=
iter2
.
next
()
self
.
assertEquals
(
txn_info1
.
tid
,
txn_info2
.
tid
)
self
.
assertRaises
(
StopIteration
,
iter1
.
next
)
self
.
assertRaises
(
StopIteration
,
iter2
.
next
)
src/ZEO/tests/testZEO.py
View file @
11cf859f
...
...
@@ -39,7 +39,7 @@ import transaction
from
ZODB.tests
import
StorageTestBase
,
BasicStorage
,
\
TransactionalUndoStorage
,
\
PackableStorage
,
Synchronization
,
ConflictResolution
,
RevisionStorage
,
\
MTStorage
,
ReadOnlyStorage
MTStorage
,
ReadOnlyStorage
,
IteratorStorage
,
RecoveryStorage
from
ZODB.tests.testDemoStorage
import
DemoStorageWrappedBase
...
...
@@ -47,7 +47,8 @@ from ZEO.ClientStorage import ClientStorage
import
ZEO.zrpc.connection
from
ZEO.tests
import
forker
,
Cache
,
CommitLockTests
,
ThreadTests
from
ZEO.tests
import
forker
,
Cache
,
CommitLockTests
,
ThreadTests
,
\
IterationTests
from
ZEO.tests.forker
import
get_port
import
ZEO.tests.ConnectionTests
...
...
@@ -56,7 +57,6 @@ import ZEO.StorageServer
logger
=
logging
.
getLogger
(
'ZEO.tests.testZEO'
)
class
DummyDB
:
def
invalidate
(
self
,
*
args
):
pass
...
...
@@ -158,7 +158,7 @@ class GenericTests(
CommitLockTests
.
CommitLockVoteTests
,
ThreadTests
.
ThreadTests
,
# Locally defined (see above)
MiscZEOTests
MiscZEOTests
,
):
"""Combine tests from various origins in one class."""
...
...
@@ -196,6 +196,15 @@ class GenericTests(
for
pid
in
self
.
_pids
:
os
.
waitpid
(
pid
,
0
)
def
runTest
(
self
):
try
:
super
(
GenericTests
,
self
).
runTest
()
except
:
self
.
_failed
=
True
raise
else
:
self
.
_failed
=
False
def
open
(
self
,
read_only
=
0
):
# Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
...
...
@@ -226,9 +235,75 @@ class FullGenericTests(
PackableStorage
.
PackableUndoStorage
,
RevisionStorage
.
RevisionStorage
,
TransactionalUndoStorage
.
TransactionalUndoStorage
,
IteratorStorage
.
IteratorStorage
,
IterationTests
.
IterationTests
,
):
"""Extend GenericTests with tests that MappingStorage can't pass."""
class
FileStorageRecoveryTests
(
StorageTestBase
.
StorageTestBase
,
RecoveryStorage
.
RecoveryStorage
):
level
=
2
def
setUp
(
self
):
self
.
_storage
=
ZODB
.
FileStorage
.
FileStorage
(
"Source.fs"
,
create
=
True
)
self
.
_dst
=
ZODB
.
FileStorage
.
FileStorage
(
"Dest.fs"
,
create
=
True
)
def
getConfig
(
self
):
filename
=
self
.
__fs_base
=
tempfile
.
mktemp
()
return
"""
\
<filestorage 1>
path %s
</filestorage>
"""
%
filename
def
_new_storage
(
self
):
port
=
get_port
()
zconf
=
forker
.
ZEOConfig
((
''
,
port
))
zport
,
adminaddr
,
pid
,
path
=
forker
.
start_zeo_server
(
self
.
getConfig
(),
zconf
,
port
)
blob_cache_dir
=
tempfile
.
mkdtemp
()
self
.
_pids
.
append
(
pid
)
self
.
_servers
.
append
(
adminaddr
)
self
.
_conf_paths
.
append
(
path
)
self
.
blob_cache_dirs
.
append
(
blob_cache_dir
)
storage
=
ClientStorage
(
zport
,
'1'
,
cache_size
=
20000000
,
min_disconnect_poll
=
0.5
,
wait
=
1
,
wait_timeout
=
60
,
blob_dir
=
blob_cache_dir
)
storage
.
registerDB
(
DummyDB
())
return
storage
def
setUp
(
self
):
self
.
_pids
=
[]
self
.
_servers
=
[]
self
.
_conf_paths
=
[]
self
.
blob_cache_dirs
=
[]
self
.
_storage
=
self
.
_new_storage
()
self
.
_dst
=
self
.
_new_storage
()
def
tearDown
(
self
):
self
.
_storage
.
close
()
self
.
_dst
.
close
()
for
p
in
self
.
_conf_paths
:
os
.
remove
(
p
)
for
p
in
self
.
blob_cache_dirs
:
ZODB
.
blob
.
remove_committed_dir
(
p
)
for
server
in
self
.
_servers
:
forker
.
shutdown_zeo_server
(
server
)
if
hasattr
(
os
,
'waitpid'
):
# Not in Windows Python until 2.3
for
pid
in
self
.
_pids
:
os
.
waitpid
(
pid
,
0
)
def
new_dest
(
self
):
return
self
.
_new_storage
()
class
FileStorageTests
(
FullGenericTests
):
"""Test ZEO backed by a FileStorage."""
level
=
2
...
...
@@ -241,12 +316,36 @@ class FileStorageTests(FullGenericTests):
</filestorage>
"""
%
filename
def
checkInterfaceFromRemoteStorage
(
self
):
# ClientStorage itself doesn't implement IStorageIteration, but the
# FileStorage on the other end does, and thus the ClientStorage
# instance that is connected to it reflects this.
self
.
failIf
(
ZODB
.
interfaces
.
IStorageIteration
.
implementedBy
(
ZEO
.
ClientStorage
.
ClientStorage
))
self
.
failUnless
(
ZODB
.
interfaces
.
IStorageIteration
.
providedBy
(
self
.
_storage
))
# This is communicated using ClientStorage's _info object:
self
.
assertEquals
(((
'ZODB.interfaces'
,
'IStorageIteration'
),
(
'zope.interface'
,
'Interface'
)),
self
.
_storage
.
_info
[
'interfaces'
])
class
MappingStorageTests
(
GenericTests
):
"""ZEO backed by a Mapping storage."""
def
getConfig
(
self
):
return
"""<mappingstorage 1/>"""
def
checkSimpleIteration
(
self
):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
def
checkUndoZombie
(
self
):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
class
DemoStorageTests
(
GenericTests
,
):
...
...
@@ -260,6 +359,11 @@ class DemoStorageTests(
</demostorage>
"""
%
tempfile
.
mktemp
()
def
checkUndoZombie
(
self
):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
class
HeartbeatTests
(
ZEO
.
tests
.
ConnectionTests
.
CommonSetupTearDown
):
"""Make sure a heartbeat is being sent and that it does no harm
...
...
@@ -554,7 +658,7 @@ class CommonBlobTests:
self
.
_storage
.
close
()
class
BlobAdaptedFileStorageTests
(
GenericTests
,
CommonBlobTests
):
class
BlobAdaptedFileStorageTests
(
Full
GenericTests
,
CommonBlobTests
):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def
setUp
(
self
):
...
...
@@ -645,7 +749,7 @@ class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
check_data
(
filename
)
class
BlobWritableCacheTests
(
GenericTests
,
CommonBlobTests
):
class
BlobWritableCacheTests
(
Full
GenericTests
,
CommonBlobTests
):
def
setUp
(
self
):
self
.
blobdir
=
self
.
blob_cache_dir
=
tempfile
.
mkdtemp
()
...
...
@@ -823,7 +927,7 @@ without this method:
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
Now, if we ask f
i
or the invalidations since the last committed
Now, if we ask for the invalidations since the last committed
transaction, we'll get a result:
>>> tid, oids = s.getInvalidations(last[-1])
...
...
@@ -849,7 +953,8 @@ transaction, we'll get a result:
"""
test_classes
=
[
FileStorageTests
,
MappingStorageTests
,
DemoStorageTests
,
test_classes
=
[
FileStorageTests
,
FileStorageRecoveryTests
,
MappingStorageTests
,
DemoStorageTests
,
BlobAdaptedFileStorageTests
,
BlobWritableCacheTests
]
def
test_suite
():
...
...
src/ZODB/BaseStorage.py
View file @
11cf859f
...
...
@@ -21,14 +21,18 @@ import time
import
logging
from
struct
import
pack
as
_structpack
,
unpack
as
_structunpack
import
zope.interface
from
persistent.TimeStamp
import
TimeStamp
import
ZODB.interfaces
from
ZODB
import
POSException
from
ZODB.utils
import
z64
,
oid_repr
from
ZODB.UndoLogCompatible
import
UndoLogCompatible
log
=
logging
.
getLogger
(
"ZODB.BaseStorage"
)
class
BaseStorage
(
UndoLogCompatible
):
"""Base class that supports storage implementations.
...
...
@@ -188,6 +192,7 @@ class BaseStorage(UndoLogCompatible):
ext
=
cPickle
.
dumps
(
ext
,
1
)
else
:
ext
=
""
self
.
_ude
=
user
,
desc
,
ext
if
tid
is
None
:
...
...
@@ -338,7 +343,7 @@ def copy(source, dest, verbose=0):
if
verbose
:
print
oid_repr
(
oid
),
r
.
version
,
len
(
r
.
data
)
if
restoring
:
dest
.
restore
(
oid
,
r
.
tid
,
r
.
data
,
''
,
dest
.
restore
(
oid
,
r
.
tid
,
r
.
data
,
r
.
version
,
r
.
data_txn
,
transaction
)
else
:
pre
=
preget
(
oid
,
None
)
...
...
@@ -348,10 +353,36 @@ def copy(source, dest, verbose=0):
dest
.
tpc_vote
(
transaction
)
dest
.
tpc_finish
(
transaction
)
fiter
.
close
()
class
TransactionRecord
:
class
TransactionRecord
(
object
)
:
"""Abstract base class for iterator protocol"""
class
DataRecord
:
zope
.
interface
.
implements
(
ZODB
.
interfaces
.
IStorageTransactionInformation
)
def
__init__
(
self
,
tid
,
status
,
user
,
description
,
extension
):
self
.
tid
=
tid
self
.
status
=
status
self
.
user
=
user
self
.
description
=
description
self
.
extension
=
extension
# XXX This is a workaround to make the TransactionRecord compatible with a
# transaction object because it is passed to tpc_begin().
def
_ext_set
(
self
,
value
):
self
.
extension
=
value
def
_ext_get
(
self
):
return
self
.
extension
_extension
=
property
(
fset
=
_ext_set
,
fget
=
_ext_get
)
class
DataRecord
(
object
):
"""Abstract base class for iterator protocol"""
zope
.
interface
.
implements
(
ZODB
.
interfaces
.
IStorageRecordInformation
)
def
__init__
(
self
,
oid
,
tid
,
data
,
version
,
prev
):
self
.
oid
=
oid
self
.
tid
=
tid
self
.
data
=
data
self
.
version
=
version
self
.
data_txn
=
prev
src/ZODB/DemoStorage.py
View file @
11cf859f
...
...
@@ -80,15 +80,19 @@ and call it to monitor the storage.
"""
import
cPickle
import
base64
,
time
import
ZODB.BaseStorage
import
ZODB.interfaces
import
zope.interface
from
ZODB
import
POSException
from
ZODB.utils
import
z64
,
oid_repr
from
ZODB.BaseStorage
import
BaseStorage
from
persistent.TimeStamp
import
TimeStamp
from
cPickle
import
loads
from
BTrees
import
OOBTree
class
DemoStorage
(
BaseStorage
):
class
DemoStorage
(
ZODB
.
BaseStorage
.
BaseStorage
):
"""Demo storage
Demo storages provide useful storages for writing tests because
...
...
@@ -104,9 +108,10 @@ class DemoStorage(BaseStorage):
"""
zope
.
interface
.
implements
(
ZODB
.
interfaces
.
IStorageIteration
)
def
__init__
(
self
,
name
=
'Demo Storage'
,
base
=
None
,
quota
=
None
):
BaseStorage
.
__init__
(
self
,
name
,
base
)
ZODB
.
BaseStorage
.
BaseStorage
.
__init__
(
self
,
name
,
base
)
# We use a BTree because the items are sorted!
self
.
_data
=
OOBTree
.
OOBTree
()
...
...
@@ -133,7 +138,7 @@ class DemoStorage(BaseStorage):
# by the base storage, leading to a variety of "impossible" problems.
def
new_oid
(
self
):
if
self
.
_base
is
None
:
return
BaseStorage
.
new_oid
(
self
)
return
ZODB
.
BaseStorage
.
BaseStorage
.
new_oid
(
self
)
else
:
return
self
.
_base
.
new_oid
()
...
...
@@ -317,6 +322,10 @@ class DemoStorage(BaseStorage):
self
.
_tsize
=
self
.
_size
+
120
+
len
(
u
)
+
len
(
d
)
+
len
(
e
)
def
_finish
(
self
,
tid
,
user
,
desc
,
ext
):
if
not
self
.
_tindex
:
# No data, so we don't update anything.
return
self
.
_size
=
self
.
_tsize
self
.
_data
[
tid
]
=
None
,
user
,
desc
,
ext
,
tuple
(
self
.
_tindex
)
...
...
@@ -364,7 +373,7 @@ class DemoStorage(BaseStorage):
'time'
:
TimeStamp
(
tid
).
timeTime
(),
'user_name'
:
u
,
'description'
:
d
}
if
e
:
d
.
update
(
loads
(
e
))
d
.
update
(
cPickle
.
loads
(
e
))
if
filter
is
None
or
filter
(
d
):
if
i
>=
first
:
r
.
append
(
d
)
...
...
@@ -569,3 +578,27 @@ class DemoStorage(BaseStorage):
def
close
(
self
):
if
self
.
_base
is
not
None
:
self
.
_base
.
close
()
def
iterator
(
self
,
start
=
None
,
end
=
None
):
# First iterate over the base storage
if
self
.
_base
is
not
None
:
for
transaction
in
self
.
_base
.
iterator
(
start
,
end
):
yield
transaction
# Then iterate over our local transactions
for
tid
,
transaction
in
self
.
_data
.
items
():
if
tid
>=
start
and
tid
<=
end
:
yield
TransactionRecord
(
tid
,
transaction
)
class
TransactionRecord
(
ZODB
.
BaseStorage
.
TransactionRecord
):
def
__init__
(
self
,
tid
,
transaction
):
packed
,
user
,
description
,
extension
,
records
=
transaction
super
(
TransactionRecord
,
self
).
__init__
(
tid
,
packed
,
user
,
description
,
extension
)
self
.
records
=
transaction
def
__iter__
(
self
):
for
record
in
self
.
records
:
oid
,
prev
,
version
,
data
,
tid
=
record
yield
ZODB
.
BaseStorage
.
DataRecord
(
oid
,
tid
,
data
,
version
,
prev
)
src/ZODB/FileStorage/FileStorage.py
View file @
11cf859f
...
...
@@ -29,6 +29,8 @@ from struct import pack, unpack
# Not all platforms have fsync
fsync
=
getattr
(
os
,
"fsync"
,
None
)
import
zope.interface
import
ZODB.interfaces
from
ZODB
import
BaseStorage
,
ConflictResolution
,
POSException
from
ZODB.POSException
import
UndoError
,
POSKeyError
,
MultipleUndoErrors
from
persistent.TimeStamp
import
TimeStamp
...
...
@@ -88,6 +90,8 @@ class FileStorage(BaseStorage.BaseStorage,
ConflictResolution
.
ConflictResolvingStorage
,
FileStorageFormatter
):
zope
.
interface
.
implements
(
ZODB
.
interfaces
.
IStorageIteration
)
# Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress
=
False
...
...
@@ -1127,7 +1131,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek
(
0
)
return
[(
trans
.
tid
,
[(
r
.
oid
,
''
)
for
r
in
trans
])
for
trans
in
FileIterator
(
self
.
_file
,
pos
=
pos
)]
for
trans
in
FileIterator
(
self
.
_file
_name
,
pos
=
pos
)]
finally
:
self
.
_lock_release
()
...
...
@@ -1516,31 +1520,16 @@ def _truncate(file, name, pos):
file
.
seek
(
pos
)
file
.
truncate
()
class
Iterator
:
"""A General simple iterator that uses the Python for-loop index protocol
"""
__index
=-
1
__current
=
None
def
__getitem__
(
self
,
i
):
__index
=
self
.
__index
while
i
>
__index
:
__index
=
__index
+
1
self
.
__current
=
self
.
next
(
__index
)
self
.
__index
=
__index
return
self
.
__current
class
FileIterator
(
Iterator
,
FileStorageFormatter
):
class
FileIterator
(
FileStorageFormatter
):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid
=
z64
_file
=
None
def
__init__
(
self
,
file
,
start
=
None
,
stop
=
None
,
pos
=
4L
):
if
isinstance
(
file
,
str
):
file
=
open
(
fil
e
,
'rb'
)
def
__init__
(
self
,
file
name
,
start
=
None
,
stop
=
None
,
pos
=
4L
):
assert
isinstance
(
filename
,
str
)
file
=
open
(
filenam
e
,
'rb'
)
self
.
_file
=
file
if
file
.
read
(
4
)
!=
packed_version
:
raise
FileStorageFormatError
(
file
.
name
)
...
...
@@ -1602,14 +1591,17 @@ class FileIterator(Iterator, FileStorageFormatter):
panic
(
"%s has inconsistent transaction length at %s "
"(%s != %s)"
,
file
.
name
,
pos
,
u64
(
rtl
),
u64
(
stl
))
def
next
(
self
,
index
=
0
):
# Iterator protocol
def
__iter__
(
self
):
return
self
def
next
(
self
):
if
self
.
_file
is
None
:
# A closed iterator. Is IOError the best we can do? For
# now, mimic a read on a closed file.
raise
IOError
(
'iterator is closed'
)
raise
ZODB
.
interfaces
.
StorageStopIteration
()
pos
=
self
.
_pos
while
1
:
while
True
:
# Read the transaction record
try
:
h
=
self
.
_read_txn_header
(
pos
)
...
...
@@ -1625,11 +1617,11 @@ class FileIterator(Iterator, FileStorageFormatter):
self
.
_ltid
=
h
.
tid
if
self
.
_stop
is
not
None
and
h
.
tid
>
self
.
_stop
:
raise
IndexError
(
index
)
break
if
h
.
status
==
"c"
:
# Assume we've hit the last, in-progress transaction
raise
IndexError
(
index
)
break
if
pos
+
h
.
tlen
+
8
>
self
.
_file_size
:
# Hm, the data were truncated or the checkpoint flag wasn't
...
...
@@ -1679,8 +1671,8 @@ class FileIterator(Iterator, FileStorageFormatter):
except
:
pass
result
=
RecordIterator
(
h
.
tid
,
h
.
status
,
h
.
user
,
h
.
descr
,
e
,
pos
,
tend
,
self
.
_file
,
tpos
)
result
=
TransactionRecord
(
h
.
tid
,
h
.
status
,
h
.
user
,
h
.
descr
,
e
,
pos
,
tend
,
self
.
_file
,
tpos
)
# Read the (intentionally redundant) transaction length
self
.
_file
.
seek
(
tend
)
...
...
@@ -1693,23 +1685,25 @@ class FileIterator(Iterator, FileStorageFormatter):
return
result
raise
IndexError
(
index
)
self
.
close
()
raise
ZODB
.
interfaces
.
StorageStopIteration
()
class
RecordIterator
(
Iterator
,
BaseStorage
.
TransactionRecord
,
FileStorageFormatter
):
class
TransactionRecord
(
BaseStorage
.
TransactionRecord
,
FileStorageFormatter
):
"""Iterate over the transactions in a FileStorage file."""
def
__init__
(
self
,
tid
,
status
,
user
,
desc
,
ext
,
pos
,
tend
,
file
,
tpos
):
self
.
tid
=
tid
self
.
status
=
status
self
.
user
=
user
self
.
description
=
desc
self
.
_extension
=
ext
BaseStorage
.
TransactionRecord
.
__init__
(
self
,
tid
,
status
,
user
,
desc
,
ext
)
self
.
_pos
=
pos
self
.
_tend
=
tend
self
.
_file
=
file
self
.
_tpos
=
tpos
def
next
(
self
,
index
=
0
):
def
__iter__
(
self
):
return
self
def
next
(
self
):
pos
=
self
.
_pos
while
pos
<
self
.
_tend
:
# Read the data records for this transaction
...
...
@@ -1738,20 +1732,18 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord,
# Should it go to the original data like BDBFullStorage?
prev_txn
=
self
.
getTxnFromData
(
h
.
oid
,
h
.
back
)
r
=
Record
(
h
.
oid
,
h
.
tid
,
data
,
prev_txn
,
pos
)
return
r
return
Record
(
h
.
oid
,
h
.
tid
,
data
,
prev_txn
,
pos
)
raise
ZODB
.
interfaces
.
StorageStopIteration
()
raise
IndexError
(
index
)
class
Record
(
BaseStorage
.
DataRecord
):
"""An abstract database record."""
def
__init__
(
self
,
oid
,
tid
,
data
,
prev
,
pos
):
self
.
oid
=
oid
self
.
tid
=
tid
self
.
data
=
data
self
.
data_txn
=
prev
super
(
Record
,
self
).
__init__
(
oid
,
tid
,
data
,
''
,
prev
)
self
.
pos
=
pos
class
UndoSearch
:
def
__init__
(
self
,
file
,
pos
,
first
,
last
,
filter
=
None
):
...
...
src/ZODB/FileStorage/__init__.py
View file @
11cf859f
# this is a package
from
ZODB.FileStorage.FileStorage
import
FileStorage
,
RecordIterator
from
ZODB.FileStorage.FileStorage
import
FileStorage
,
TransactionRecord
from
ZODB.FileStorage.FileStorage
import
FileIterator
,
Record
,
packed_version
# BBB Alias for compatibility
RecordIterator
=
TransactionRecord
src/ZODB/MappingStorage.py
View file @
11cf859f
...
...
@@ -21,16 +21,16 @@ It is meant to illustrate the simplest possible storage.
The Mapping storage uses a single data structure to map object ids to data.
"""
import
ZODB.BaseStorage
from
ZODB.utils
import
u64
,
z64
from
ZODB.BaseStorage
import
BaseStorage
from
ZODB
import
POSException
from
persistent.TimeStamp
import
TimeStamp
class
MappingStorage
(
BaseStorage
):
class
MappingStorage
(
ZODB
.
BaseStorage
.
BaseStorage
):
def
__init__
(
self
,
name
=
'Mapping Storage'
):
BaseStorage
.
__init__
(
self
,
name
)
ZODB
.
BaseStorage
.
BaseStorage
.
__init__
(
self
,
name
)
# ._index maps an oid to a string s. s[:8] is the tid of the
# transaction that created oid's current state, and s[8:] is oid's
# current state.
...
...
src/ZODB/blob.py
View file @
11cf859f
...
...
@@ -324,6 +324,21 @@ class FilesystemHelper:
"""
return
os
.
path
.
join
(
self
.
base_dir
,
utils
.
oid_repr
(
oid
))
def
createPathForOID
(
self
,
oid
):
"""Given an OID, creates a directory on the filesystem where
the blob data relating to that OID is stored, if it doesn't exist.
"""
path
=
self
.
getPathForOID
(
oid
)
if
os
.
path
.
exists
(
path
):
return
try
:
os
.
makedirs
(
path
,
0700
)
except
OSError
:
# We might have lost a race. If so, the directory
# must exist now
assert
os
.
path
.
exists
(
path
)
def
getBlobFilename
(
self
,
oid
,
tid
):
"""Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid.
...
...
src/ZODB/fsrecover.py
View file @
11cf859f
...
...
@@ -82,7 +82,7 @@ except ImportError:
import
ZODB.FileStorage
from
ZODB.utils
import
u64
from
ZODB.FileStorage
import
RecordIterator
from
ZODB.FileStorage
import
TransactionRecord
from
persistent.TimeStamp
import
TimeStamp
...
...
@@ -146,8 +146,8 @@ def read_txn_header(f, pos, file_size, outp, ltid):
except
:
e
=
{}
else
:
e
=
{}
result
=
RecordIterator
(
tid
,
status
,
user
,
description
,
e
,
pos
,
tend
,
f
,
tpos
)
result
=
TransactionRecord
(
tid
,
status
,
user
,
description
,
e
,
pos
,
tend
,
f
,
tpos
)
pos
=
tend
# Read the (intentionally redundant) transaction length
...
...
src/ZODB/interfaces.py
View file @
11cf859f
...
...
@@ -286,6 +286,7 @@ class IConnection(Interface):
begins or until the connection os reopned.
"""
class
IStorageDB
(
Interface
):
"""Database interface exposed to storages
...
...
@@ -418,6 +419,7 @@ class IDatabase(IStorageDB):
should also close all the Connections.
"""
class
IStorage
(
Interface
):
"""A storage is responsible for storing and retrieving data of objects.
"""
...
...
@@ -710,6 +712,7 @@ class IStorage(Interface):
"""
class
IStorageRestoreable
(
IStorage
):
"""Copying Transactions
...
...
@@ -744,7 +747,7 @@ class IStorageRestoreable(IStorage):
# failed to take into account records after the pack time.
def
restore
(
oid
,
serial
,
data
,
prev_txn
,
transaction
):
def
restore
(
oid
,
serial
,
data
,
version
,
prev_txn
,
transaction
):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
...
...
@@ -775,41 +778,44 @@ class IStorageRestoreable(IStorage):
Nothing is returned.
"""
class
IStorageRecordInformation
(
Interface
):
"""Provide information about a single storage record
"""
oid
=
Attribute
(
"The object id"
)
tid
=
Attribute
(
"The transaction id"
)
data
=
Attribute
(
"The data record"
)
version
=
Attribute
(
"The version id"
)
data_txn
=
Attribute
(
"The previous transaction id"
)
class
IStorageTransactionInformation
(
Interface
):
"""Provide information about a storage transaction
"""Provide information about a storage transaction.
Can be iterated over to retrieve the records modified in the transaction.
"""
tid
=
Attribute
(
"Transaction id"
)
status
=
Attribute
(
"Transaction Status"
)
# XXX what are valid values?
user
=
Attribute
(
"Transaction user"
)
description
=
Attribute
(
"Transaction Description"
)
extension
=
Attribute
(
"
Transaction
extension data"
)
extension
=
Attribute
(
"
A dictionary carrying the transaction's
extension data"
)
def
__iter__
():
"""
Return an iterable of IStorageRecordInformation
"""
"""
Iterate over the transaction's records given as
IStorageRecordInformation objects.
class
IStorageIteration
(
Interface
):
"""API for iterating over the contents of a storage
"""
Note that this is a future API. Some storages now provide an
approximation of this.
"""
class
IStorageIteration
(
Interface
):
"""API for iterating over the contents of a storage."""
def
iterator
(
start
=
None
,
stop
=
None
):
"""Return an IStorageTransactionInformation iterator.
An IStorageTransactionInformation iterator is returned for
iterating over the transactions in the storage.
If the start argument is not None, then iteration will start
with the first transaction whose identifier is greater than or
equal to start.
...
...
@@ -818,8 +824,12 @@ class IStorageIteration(Interface):
the last transaction whose identifier is less than or equal to
stop.
The iterator provides access to the data as available at the time when
the iterator was retrieved.
"""
class
IStorageUndoable
(
IStorage
):
"""A storage supporting transactional undo.
"""
...
...
@@ -932,6 +942,7 @@ class IStorageCurrentRecordIteration(IStorage):
"""
class
IBlob
(
Interface
):
"""A BLOB supports efficient handling of large data within ZODB."""
...
...
@@ -986,5 +997,12 @@ class IBlobStorage(Interface):
If Blobs use this, then commits can be performed with a simple rename.
"""
class
BlobError
(
Exception
):
pass
class
StorageStopIteration
(
IndexError
,
StopIteration
):
"""A combination of StopIteration and IndexError to provide a
backwards-compatible exception.
"""
src/ZODB/tests/IteratorStorage.py
View file @
11cf859f
...
...
@@ -15,6 +15,7 @@
Any storage that supports the iterator() method should be able to pass
all these tests.
"""
from
ZODB.tests.MinPO
import
MinPO
...
...
@@ -23,13 +24,16 @@ from ZODB.utils import U64, p64
from
transaction
import
Transaction
import
itertools
class
IteratorCompare
:
def
iter_verify
(
self
,
txniter
,
revids
,
val0
):
eq
=
self
.
assertEqual
oid
=
self
.
_oid
val
=
val0
for
reciter
,
revid
in
zip
(
txniter
,
revids
+
[
None
]):
for
reciter
,
revid
in
itertools
.
i
zip
(
txniter
,
revids
+
[
None
]):
eq
(
reciter
.
tid
,
revid
)
for
rec
in
reciter
:
eq
(
rec
.
oid
,
oid
)
...
...
@@ -37,7 +41,7 @@ class IteratorCompare:
eq
(
zodb_unpickle
(
rec
.
data
),
MinPO
(
val
))
val
=
val
+
1
eq
(
val
,
val0
+
len
(
revids
))
txniter
.
close
()
class
IteratorStorage
(
IteratorCompare
):
...
...
@@ -51,13 +55,6 @@ class IteratorStorage(IteratorCompare):
txniter
=
self
.
_storage
.
iterator
()
self
.
iter_verify
(
txniter
,
[
revid1
,
revid2
,
revid3
],
11
)
def
checkClose
(
self
):
self
.
_oid
=
oid
=
self
.
_storage
.
new_oid
()
revid1
=
self
.
_dostore
(
oid
,
data
=
MinPO
(
11
))
txniter
=
self
.
_storage
.
iterator
()
txniter
.
close
()
self
.
assertRaises
(
IOError
,
txniter
.
__getitem__
,
0
)
def
checkUndoZombie
(
self
):
oid
=
self
.
_storage
.
new_oid
()
revid
=
self
.
_dostore
(
oid
,
data
=
MinPO
(
94
))
...
...
@@ -89,7 +86,7 @@ class IteratorStorage(IteratorCompare):
iter
=
self
.
_storage
.
iterator
()
count
=
0
for
txn
in
iter
:
self
.
assertEqual
(
txn
.
_
extension
,
{})
self
.
assertEqual
(
txn
.
extension
,
{})
count
+=
1
self
.
assertEqual
(
count
,
1
)
...
...
@@ -129,7 +126,32 @@ class IteratorStorage(IteratorCompare):
match
=
True
if
not
match
:
self
.
fail
(
"Could not find transaction with matching id"
)
def
checkIterateRepeatedly
(
self
):
self
.
_dostore
()
transactions
=
self
.
_storage
.
iterator
()
self
.
assertEquals
(
1
,
len
(
list
(
transactions
)))
# The iterator can only be consumed once:
self
.
assertEquals
(
0
,
len
(
list
(
transactions
)))
def
checkIterateRecordsRepeatedly
(
self
):
self
.
_dostore
()
tinfo
=
self
.
_storage
.
iterator
().
next
()
self
.
assertEquals
(
1
,
len
(
list
(
tinfo
)))
# The iterator can only be consumed once:
self
.
assertEquals
(
0
,
len
(
list
(
tinfo
)))
def
checkIterateWhileWriting
(
self
):
self
.
_dostore
()
iterator
=
self
.
_storage
.
iterator
()
# We have one transaction with 1 modified object.
txn_1
=
iterator
.
next
()
self
.
assertEquals
(
1
,
len
(
list
(
txn_1
)))
# We store another transaction with 1 object, the already running
# iterator does not pick this up.
self
.
_dostore
()
self
.
assertRaises
(
StopIteration
,
iterator
.
next
)
class
ExtendedIteratorStorage
(
IteratorCompare
):
...
...
@@ -173,28 +195,36 @@ class ExtendedIteratorStorage(IteratorCompare):
txniter
=
self
.
_storage
.
iterator
(
revid3
,
revid3
)
self
.
iter_verify
(
txniter
,
[
revid3
],
13
)
class
IteratorDeepCompare
:
def
compare
(
self
,
storage1
,
storage2
):
eq
=
self
.
assertEqual
iter1
=
storage1
.
iterator
()
iter2
=
storage2
.
iterator
()
for
txn1
,
txn2
in
zip
(
iter1
,
iter2
):
for
txn1
,
txn2
in
itertools
.
i
zip
(
iter1
,
iter2
):
eq
(
txn1
.
tid
,
txn2
.
tid
)
eq
(
txn1
.
status
,
txn2
.
status
)
eq
(
txn1
.
user
,
txn2
.
user
)
eq
(
txn1
.
description
,
txn2
.
description
)
eq
(
txn1
.
_extension
,
txn2
.
_extension
)
for
rec1
,
rec2
in
zip
(
txn1
,
txn2
):
eq
(
txn1
.
extension
,
txn2
.
extension
)
itxn1
=
iter
(
txn1
)
itxn2
=
iter
(
txn2
)
for
rec1
,
rec2
in
itertools
.
izip
(
itxn1
,
itxn2
):
eq
(
rec1
.
oid
,
rec2
.
oid
)
eq
(
rec1
.
tid
,
rec2
.
tid
)
eq
(
rec1
.
data
,
rec2
.
data
)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
self
.
assertRaises
(
IndexError
,
txn1
.
next
)
self
.
assertRaises
(
IndexError
,
txn2
.
next
)
# Additionally, check that we're backwards compatible to the
# IndexError we used to raise before.
self
.
assertRaises
(
IndexError
,
itxn1
.
next
)
self
.
assertRaises
(
IndexError
,
itxn2
.
next
)
self
.
assertRaises
(
StopIteration
,
itxn1
.
next
)
self
.
assertRaises
(
StopIteration
,
itxn2
.
next
)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self
.
assertRaises
(
IndexError
,
iter1
.
next
)
self
.
assertRaises
(
IndexError
,
iter2
.
next
)
iter1
.
close
(
)
iter2
.
close
(
)
self
.
assertRaises
(
StopIteration
,
iter1
.
next
)
self
.
assertRaises
(
StopIteration
,
iter2
.
next
)
src/ZODB/tests/PackableStorage.py
View file @
11cf859f
...
...
@@ -30,6 +30,7 @@ import time
from
persistent
import
Persistent
from
persistent.mapping
import
PersistentMapping
import
transaction
import
ZODB.interfaces
from
ZODB
import
DB
from
ZODB.serialize
import
referencesf
from
ZODB.tests.MinPO
import
MinPO
...
...
@@ -149,6 +150,16 @@ class PackableStorageBase:
self
.
_storage
.
tpc_vote
(
t
)
self
.
_storage
.
tpc_finish
(
t
)
def
_sanity_check
(
self
):
# Iterate over the storage to make sure it's sane.
if
not
ZODB
.
interfaces
.
IStorageIteration
.
providedBy
(
self
.
_storage
):
return
it
=
self
.
_storage
.
iterator
()
for
txn
in
it
:
for
data
in
txn
:
pass
class
PackableStorage
(
PackableStorageBase
):
def
checkPackEmptyStorage
(
self
):
...
...
@@ -253,16 +264,7 @@ class PackableStorage(PackableStorageBase):
self
.
fail
(
'a thread is still alive'
)
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if
not
hasattr
(
self
.
_storage
,
"iterator"
):
return
it
=
self
.
_storage
.
iterator
()
for
txn
in
it
:
for
data
in
txn
:
pass
it
.
close
()
self
.
_sanity_check
()
def
checkPackWhileWriting
(
self
):
self
.
_PackWhileWriting
(
pack_now
=
False
)
...
...
@@ -304,14 +306,7 @@ class PackableStorage(PackableStorageBase):
packt
=
time
.
time
()
thread
.
join
()
# Iterate over the storage to make sure it's sane.
if
not
hasattr
(
self
.
_storage
,
"iterator"
):
return
it
=
self
.
_storage
.
iterator
()
for
txn
in
it
:
for
data
in
txn
:
pass
it
.
close
()
self
.
_sanity_check
()
def
checkPackWithMultiDatabaseReferences
(
self
):
databases
=
{}
...
...
src/ZODB/tests/RecoveryStorage.py
View file @
11cf859f
...
...
@@ -22,6 +22,7 @@ from ZODB.serialize import referencesf
import
time
class
RecoveryStorage
(
IteratorDeepCompare
):
# Requires a setUp() that creates a self._dst destination storage
def
checkSimpleRecovery
(
self
):
...
...
@@ -49,12 +50,16 @@ class RecoveryStorage(IteratorDeepCompare):
# copy the final transaction manually. even though there
# was a pack, the restore() ought to succeed.
it
=
self
.
_storage
.
iterator
()
final
=
list
(
it
)[
-
1
]
# Get the last transaction and its record iterator. Record iterators
# can't be accessed out-of-order, so we need to do this in a bit
# complicated way:
for
final
in
it
:
records
=
list
(
final
)
self
.
_dst
.
tpc_begin
(
final
,
final
.
tid
,
final
.
status
)
for
r
in
final
:
for
r
in
records
:
self
.
_dst
.
restore
(
r
.
oid
,
r
.
tid
,
r
.
data
,
''
,
r
.
data_txn
,
final
)
it
.
close
()
self
.
_dst
.
tpc_vote
(
final
)
self
.
_dst
.
tpc_finish
(
final
)
...
...
src/ZODB/tests/TransactionalUndoStorage.py
View file @
11cf859f
...
...
@@ -636,11 +636,13 @@ class TransactionalUndoStorage:
for
j
in
range
(
OBJECTS
):
oid
=
s
.
new_oid
()
obj
=
MinPO
(
i
*
OBJECTS
+
j
)
revid
=
s
.
store
(
oid
,
None
,
zodb_pickle
(
obj
),
''
,
t
)
orig
.
append
((
tid
,
oid
,
revid
))
s
.
store
(
oid
,
None
,
zodb_pickle
(
obj
),
''
,
t
)
orig
.
append
((
tid
,
oid
))
s
.
tpc_vote
(
t
)
s
.
tpc_finish
(
t
)
orig
=
[(
tid
,
oid
,
s
.
getTid
(
oid
))
for
tid
,
oid
in
orig
]
i
=
0
for
tid
,
oid
,
revid
in
orig
:
self
.
_dostore
(
oid
,
revid
=
revid
,
data
=
MinPO
(
revid
),
...
...
@@ -668,14 +670,11 @@ class TransactionalUndoStorage:
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter
=
s
.
iterator
()
offset
=
0
transactions
=
s
.
iterator
()
eq
=
self
.
assertEqual
for
i
in
range
(
BATCHES
):
txn
=
iter
[
offset
]
offset
+=
1
txn
=
transactions
.
next
()
tid
=
p64
(
i
+
1
)
eq
(
txn
.
tid
,
tid
)
...
...
@@ -687,13 +686,11 @@ class TransactionalUndoStorage:
eq
(
L1
,
L2
)
for
i
in
range
(
BATCHES
*
OBJECTS
):
txn
=
iter
[
offset
]
offset
+=
1
txn
=
transactions
.
next
()
eq
(
len
([
rec
for
rec
in
txn
if
rec
.
data_txn
is
None
]),
1
)
for
i
in
range
(
BATCHES
):
txn
=
iter
[
offset
]
offset
+=
1
txn
=
transactions
.
next
()
# The undos are performed in reverse order.
otid
=
p64
(
BATCHES
-
i
)
...
...
@@ -704,7 +701,7 @@ class TransactionalUndoStorage:
L2
.
sort
()
eq
(
L1
,
L2
)
self
.
assertRaises
(
IndexError
,
iter
.
__getitem__
,
offse
t
)
self
.
assertRaises
(
StopIteration
,
transactions
.
nex
t
)
def
checkUndoLogMetadata
(
self
):
# test that the metadata is correct in the undo log
...
...
src/ZODB/tests/testFileStorage.py
View file @
11cf859f
...
...
@@ -308,6 +308,7 @@ class FileStorageTests(
else
:
self
.
assertNotEqual
(
next_oid
,
None
)
class
FileStorageRecoveryTest
(
StorageTestBase
.
StorageTestBase
,
RecoveryStorage
.
RecoveryStorage
,
...
...
@@ -326,6 +327,40 @@ class FileStorageRecoveryTest(
def
new_dest
(
self
):
return
ZODB
.
FileStorage
.
FileStorage
(
'Dest.fs'
)
class
FileStorageNoRestore
(
ZODB
.
FileStorage
.
FileStorage
):
@
property
def
restore
(
self
):
raise
Exception
class
FileStorageNoRestoreRecoveryTest
(
StorageTestBase
.
StorageTestBase
,
RecoveryStorage
.
RecoveryStorage
,
):
# This test actually verifies a code path of
# BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
# use a FileStorage deprived of its restore method.
def
setUp
(
self
):
self
.
_storage
=
FileStorageNoRestore
(
"Source.fs"
,
create
=
True
)
self
.
_dst
=
FileStorageNoRestore
(
"Dest.fs"
,
create
=
True
)
def
tearDown
(
self
):
self
.
_storage
.
close
()
self
.
_dst
.
close
()
self
.
_storage
.
cleanup
()
self
.
_dst
.
cleanup
()
def
new_dest
(
self
):
return
FileStorageNoRestore
(
'Dest.fs'
)
def
checkRestoreAcrossPack
(
self
):
# Skip this check as it calls restore directly.
pass
class
SlowFileStorageTest
(
BaseFileStorageTests
):
level
=
2
...
...
@@ -492,7 +527,8 @@ def test_suite():
suite
=
unittest
.
TestSuite
()
for
klass
in
[
FileStorageTests
,
Corruption
.
FileStorageCorruptTests
,
FileStorageRecoveryTest
,
SlowFileStorageTest
]:
FileStorageRecoveryTest
,
FileStorageNoRestoreRecoveryTest
,
SlowFileStorageTest
]:
suite
.
addTest
(
unittest
.
makeSuite
(
klass
,
"check"
))
suite
.
addTest
(
doctest
.
DocTestSuite
(
setUp
=
ZODB
.
tests
.
util
.
setUp
,
tearDown
=
ZODB
.
tests
.
util
.
tearDown
))
...
...
src/ZODB/tests/testMappingStorage.py
View file @
11cf859f
...
...
@@ -37,6 +37,7 @@ class MappingStorageTests(StorageTestBase.StorageTestBase,
# have this limit, so we inhibit this test here.
pass
def
test_suite
():
suite
=
unittest
.
makeSuite
(
MappingStorageTests
,
'check'
)
return
suite
...
...
src/ZODB/utils.py
View file @
11cf859f
...
...
@@ -295,5 +295,3 @@ def mktemp(dir=None):
handle
,
filename
=
mkstemp
(
dir
=
dir
)
os
.
close
(
handle
)
return
filename
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment