Commit 9308fbfe authored by Jim Fulton's avatar Jim Fulton

ClientStorage now provides blob cache management. When using

non-shared blob directories, you can set a target cache size and the
cache will periodically be reduced to the target size.

To enable blob cache management, a new IBlobStorage method,
openCommittedBlobFile has been added.
parent bc99890a
......@@ -33,6 +33,13 @@ New Features
The ordinary file may be used outside the current transaction and
even after the blob's database connection has been closed.
- ClientStorage now provides blob cache management. When using
non-shared blob directories, you can set a target cache size and the
cache will periodically be reduced to the target size.
The client blob directory layout has changed. If you have existing
non-shared blob directories, you will have to remove them.
Bugs Fixed
----------
......
......@@ -26,13 +26,14 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO import ServerStub
from ZEO.TransactionBuffer import TransactionBuffer
from ZEO.zrpc.client import ConnectionManager
from ZODB.blob import rename_or_copy_blob
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
import BTrees.IOBTree
import cPickle
import logging
import os
import re
import socket
import stat
import sys
......@@ -43,6 +44,7 @@ import types
import weakref
import zc.lockfile
import ZEO.interfaces
import ZODB
import ZODB.BaseStorage
import ZODB.interfaces
import zope.event
......@@ -112,14 +114,16 @@ class ClientStorage(object):
StorageServerStubClass = ServerStub.stub
def __init__(self, addr, storage='1', cache_size=20 * MB,
name='', client=None, debug=0, var=None,
name='', client=None, var=None,
min_disconnect_poll=1, max_disconnect_poll=30,
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
drop_cache_rather_verify=False,
username='', password='', realm=None,
blob_dir=None, shared_blob_dir=False):
blob_dir=None, shared_blob_dir=False,
blob_cache_size=None, blob_cache_size_check=100,
):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
......@@ -127,81 +131,109 @@ class ClientStorage(object):
All arguments except addr should be keyword arguments.
Arguments:
addr -- The server address(es). This is either a list of
addr
The server address(es). This is either a list of
addresses or a single address. Each address can be a
(hostname, port) tuple to signify a TCP/IP connection or
a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address. Required.
storage -- The storage name, defaulting to '1'. The name must
storage
The storage name, defaulting to '1'. The name must
match one of the storage names supported by the server(s)
specified by the addr argument. The storage name is
displayed in the Zope control panel.
cache_size -- The disk cache size, defaulting to 20 megabytes.
cache_size
The disk cache size, defaulting to 20 megabytes.
This is passed to the ClientCache constructor.
name -- The storage name, defaulting to ''. If this is false,
name
The storage name, defaulting to ''. If this is false,
str(addr) is used as the storage name.
client -- A name used to construct persistent cache filenames.
client
A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent.
See ClientCache for more info.
debug -- Ignored. This is present only for backwards
compatibility with ZEO 1.
var -- When client is not None, this specifies the directory
var
When client is not None, this specifies the directory
where the persistent cache files are created. It defaults
to None, in whichcase the current directory is used.
min_disconnect_poll -- The minimum delay in seconds between
min_disconnect_poll
The minimum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 5 seconds.
max_disconnect_poll -- The maximum delay in seconds between
max_disconnect_poll
The maximum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 300 seconds.
wait_for_server_on_startup -- A backwards compatible alias for
wait_for_server_on_startup
A backwards compatible alias for
the wait argument.
wait -- A flag indicating whether to wait until a connection
wait
A flag indicating whether to wait until a connection
with a server is made, defaulting to true.
wait_timeout -- Maximum time to wait for a connection before
wait_timeout
Maximum time to wait for a connection before
giving up. Only meaningful if wait is True.
read_only -- A flag indicating whether this should be a
read_only
A flag indicating whether this should be a
read-only storage, defaulting to false (i.e. writing is
allowed by default).
read_only_fallback -- A flag indicating whether a read-only
read_only_fallback
A flag indicating whether a read-only
remote storage should be acceptable as a fallback when no
writable storages are available. Defaults to false. At
most one of read_only and read_only_fallback should be
true.
username -- string with username to be used when authenticating.
username
string with username to be used when authenticating.
These only need to be provided if you are connecting to an
authenticated server storage.
password -- string with plaintext password to be used
when authenticated.
password
string with plaintext password to be used when authenticated.
realm -- not documented.
realm
not documented.
drop_cache_rather_verify -- a flag indicating that the cache
should be dropped rather than expensively verified.
drop_cache_rather_verify
a flag indicating that the cache should be dropped rather
than expensively verified.
blob_dir -- directory path for blob data. 'blob data' is data that
blob_dir
directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
shared_blob_dir -- Flag whether the blob_dir is a server-shared
filesystem that should be used instead of transferring blob data over
shared_blob_dir
Flag whether the blob_dir is a server-shared filesystem
that should be used instead of transferring blob data over
zrpc.
blob_cache_size
Maximum size of the ZEO blob cache, in bytes. If not set, then
the cache size isn't checked and the blob directory will
grow without bound.
This option is ignored if shared_blob_dir is true.
blob_cache_size_check
ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 100% of the blob cache
size. This option is ignored if shared_blob_dir is true.
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
......@@ -220,11 +252,6 @@ class ClientStorage(object):
storage,
)
if debug:
logger.warning(
"%s ClientStorage(): debug argument is no longer used",
self.__name__)
self._drop_cache_rather_verify = drop_cache_rather_verify
# wait defaults to True, but wait_for_server_on_startup overrides
......@@ -342,11 +369,18 @@ class ClientStorage(object):
# XXX need to check for POSIX-ness here
self.blob_dir = blob_dir
self.shared_blob_dir = shared_blob_dir
if blob_dir is not None:
# Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows.
import ZODB.blob
if shared_blob_dir:
self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
else:
if 'zeocache' not in ZODB.blob.LAYOUTS:
ZODB.blob.LAYOUTS['zeocache'] = BlobCacheLayout()
self.fshelper = ZODB.blob.FilesystemHelper(
blob_dir, layout_name='zeocache')
self.fshelper.create()
self.fshelper.checkSecure()
else:
......@@ -360,6 +394,14 @@ class ClientStorage(object):
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
self._blob_cache_size = blob_cache_size
self._blob_data_bytes_loaded = 0
if blob_cache_size is not None:
self._blob_cache_size_check = (
blob_cache_size * blob_cache_size_check / 100)
self._check_blob_size()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
......@@ -373,6 +415,8 @@ class ClientStorage(object):
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
def _wait(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
......@@ -414,6 +458,28 @@ class ClientStorage(object):
if self._tfile is not None:
self._tfile.close()
if self._check_blob_size_thread is not None:
self._check_blob_size_thread.join()
_check_blob_size_thread = None
def _check_blob_size(self, bytes=None):
if self._blob_cache_size is None:
return
if self.shared_blob_dir or not self.blob_dir:
return
if (bytes is not None) and (bytes < self._blob_cache_size_check):
return
self._blob_data_bytes_loaded = 0
check_blob_size_thread = threading.Thread(
target=_check_blob_cache_size,
args=(self.blob_dir, self._blob_cache_size),
)
check_blob_size_thread.setDaemon(True)
check_blob_size_thread.start()
self._check_blob_size_thread = check_blob_size_thread
def registerDB(self, db):
"""Storage API: register a database for invalidation messages.
......@@ -866,26 +932,20 @@ class ClientStorage(object):
# use a slightly different file name. We keep the old one
# until we're done to avoid conflicts. Then remove the old name.
target += 'w'
rename_or_copy_blob(filename, target)
ZODB.blob.rename_or_copy_blob(filename, target)
os.remove(target[:-1])
else:
rename_or_copy_blob(filename, target)
ZODB.blob.rename_or_copy_blob(filename, target)
# Now tell the server where we put it
self._server.storeBlobShared(
oid, serial, data, os.path.basename(target), id(txn))
def _have_blob(self, blob_filename, oid, serial):
if os.path.exists(blob_filename):
logger.debug("%s Found blob %r/%r in cache.",
self.__name__, oid, serial)
return True
return False
def receiveBlobStart(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
assert not os.path.exists(blob_filename)
assert os.path.exists(blob_filename+'.lock')
lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
assert os.path.exists(lockfilename)
blob_filename += '.dl'
assert not os.path.exists(blob_filename)
f = open(blob_filename, 'wb')
......@@ -894,9 +954,12 @@ class ClientStorage(object):
def receiveBlobChunk(self, oid, serial, chunk):
blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
assert os.path.exists(blob_filename)
f = open(blob_filename, 'ab')
f = open(blob_filename, 'r+b')
f.seek(0, 2)
f.write(chunk)
f.close()
self._blob_data_bytes_loaded += len(chunk)
self._check_blob_size(self._blob_data_bytes_loaded)
def receiveBlobStop(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
......@@ -913,15 +976,17 @@ class ClientStorage(object):
"configured.")
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if self._have_blob(blob_filename, oid, serial):
return blob_filename
if self.shared_blob_dir:
if os.path.exists(blob_filename):
return blob_filename
else:
# We're using a server shared cache. If the file isn't
# here, it's not anywhere.
raise POSException.POSKeyError("No blob file", oid, serial)
if os.path.exists(blob_filename):
return _accessed(blob_filename)
# First, we'll create the directory for this oid, if it doesn't exist.
self.fshelper.createPathForOID(oid)
......@@ -930,42 +995,22 @@ class ClientStorage(object):
# getting it multiple times even accross separate client
# processes on the same machine. We'll use file locking.
lockfilename = blob_filename+'.lock'
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
# Someone is already downloading the Blob. Wait for the
# lock to be freed. How long should we be willing to wait?
# TODO: maybe find some way to assess download progress.
lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
while 1:
time.sleep(0.1)
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
pass
time.sleep(0.01)
else:
# We have the lock. We should be able to get the file now.
lock.close()
try:
os.remove(lockfilename)
except OSError:
pass
break
if self._have_blob(blob_filename, oid, serial):
return blob_filename
return None
try:
# We got the lock, so it's our job to download it. First,
# we'll double check that someone didn't download it while we
# were getting the lock:
if self._have_blob(blob_filename, oid, serial):
return blob_filename
if os.path.exists(blob_filename):
return _accessed(blob_filename)
# Ask the server to send it to us. When this function
# returns, it will have been sent. (The recieving will
......@@ -973,18 +1018,55 @@ class ClientStorage(object):
self._server.sendBlob(oid, serial)
if self._have_blob(blob_filename, oid, serial):
return blob_filename
if os.path.exists(blob_filename):
return _accessed(blob_filename)
raise POSException.POSKeyError("No blob file", oid, serial)
finally:
lock.close()
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
try:
os.remove(lockfilename)
except OSError:
if blob is None:
return open(blob_filename, 'rb')
else:
return ZODB.blob.BlobFile(blob_filename, 'r', blob)
except (IOError):
# The file got removed while we were opening.
# Fall through and try again with the protection of the lock.
pass
lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
while 1:
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
time.sleep(.01)
else:
break
try:
blob_filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(blob_filename):
if self.shared_blob_dir:
# We're using a server shared cache. If the file isn't
# here, it's not anywhere.
raise POSException.POSKeyError("No blob file", oid, serial)
self._server.sendBlob(oid, serial)
if not os.path.exists(blob_filename):
raise POSException.POSKeyError("No blob file", oid, serial)
_accessed(blob_filename)
if blob is None:
return open(blob_filename, 'rb')
else:
return ZODB.blob.BlobFile(blob_filename, 'r', blob)
finally:
lock.close()
def temporaryDirectory(self):
return self.fshelper.temp_dir
......@@ -1125,10 +1207,13 @@ class ClientStorage(object):
blobs = self._tbuf.blobs
while blobs:
oid, blobfilename = blobs.pop()
self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
targetpath = self.fshelper.getPathForOID(oid, create=True)
rename_or_copy_blob(blobfilename,
ZODB.blob.rename_or_copy_blob(
blobfilename,
self.fshelper.getBlobFilename(oid, tid),
)
self._check_blob_size(self._blob_data_bytes_loaded)
self._tbuf.clear()
......@@ -1485,6 +1570,7 @@ class RecordIterator(object):
raise ZODB.interfaces.StorageStopIteration()
return ZODB.BaseStorage.DataRecord(*item)
class ClientStorage308Adapter:
def __init__(self, client):
......@@ -1498,3 +1584,92 @@ class ClientStorage308Adapter:
def __getattr__(self, name):
return getattr(self.client, name)
class BlobCacheLayout(object):
size = 997
def oid_to_path(self, oid):
return str(utils.u64(oid) % self.size)
def getBlobFilePath(self, oid, tid):
base, rem = divmod(utils.u64(oid), self.size)
return os.path.join(
str(rem),
"%s.%s%s" % (base, tid.encode('hex'), ZODB.blob.BLOB_SUFFIX)
)
def _accessed(filename):
try:
os.utime(filename, (time.time(), os.stat(filename).st_mtime))
except OSError:
pass # We tried. :)
return filename
cache_file_name = re.compile(r'\d+$').match
def _check_blob_cache_size(blob_dir, target):
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
).read().strip()
if not layout == 'zeocache':
raise ValueError("Invalid blob directory layout", layout)
try:
check_lock = zc.lockfile.LockFile(
os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError:
# Someone is already cleaning up, so don't bother
return
try:
size = 0
blob_suffix = ZODB.blob.BLOB_SUFFIX
files_by_atime = BTrees.IOBTree.BTree()
for dirname in os.listdir(blob_dir):
if not cache_file_name(dirname):
continue
base = os.path.join(blob_dir, dirname)
if not os.path.isdir(base):
continue
for file_name in os.listdir(base):
if not file_name.endswith(blob_suffix):
continue
file_name = os.path.join(base, file_name)
if not os.path.isfile(file_name):
continue
stat = os.stat(file_name)
size += stat.st_size
t = stat.st_atime
if t not in files_by_atime:
files_by_atime[t] = []
files_by_atime[t].append(file_name)
while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()):
lockfilename = os.path.join(os.path.dirname(file_name),
'.lock')
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
continue # In use, skip
try:
size = os.stat(file_name).st_size
try:
ZODB.blob.remove_committed(file_name)
except OSError, v:
pass # probably open on windows
else:
size -= size
finally:
lock.close()
finally:
check_lock.close()
def check_blob_size_script(args=None):
if args is None:
args = sys.argv[1:]
blob_dir, target = args
_check_blob_cache_size(blob_dir, int(target))
ZEO Client Configuration
========================
Here we'll describe (and test) the various ZEO Client configuration
options. To facilitate this, we'l start a server that our client can
connect to:
>>> addr, _ = start_server(blob_dir='server-blobs')
The simplest client configuration specified a server address:
>>> import ZODB.config
>>> storage = ZODB.config.storageFromString("""
... <zeoclient>
... server %s:%s
... </zeoclient>
... """ % addr)
>>> storage.getName(), storage.__class__.__name__
... # doctest: +ELLIPSIS
("[('localhost', ...)] (connected)", 'ClientStorage')
>>> storage.blob_dir
>>> storage._storage
'1'
>>> storage._cache.maxsize
20971520
>>> storage._cache.path
>>> storage._rpc_mgr.tmin
5
>>> storage._rpc_mgr.tmax
300
>>> storage._is_read_only
False
>>> storage._read_only_fallback
False
>>> storage._drop_cache_rather_verify
False
>>> storage._blob_cache_size
>>> storage.close()
>>> storage = ZODB.config.storageFromString("""
... <zeoclient>
... server %s:%s
... blob-dir blobs
... storage 2
... cache-size 100
... name bob
... client cache
... min-disconnect-poll 1
... max-disconnect-poll 5
... read-only true
... drop-cache-rather-verify true
... blob-cache-size 1000MB
... blob-cache-size-check 10
... wait false
... </zeoclient>
... """ % addr)
>>> storage.getName(), storage.__class__.__name__
('bob (disconnected)', 'ClientStorage')
>>> storage.blob_dir
'blobs'
>>> storage._storage
'2'
>>> storage._cache.maxsize
100
>>> import os
>>> storage._cache.path == os.path.abspath('cache-2.zec')
True
>>> storage._rpc_mgr.tmin
1
>>> storage._rpc_mgr.tmax
5
>>> storage._is_read_only
True
>>> storage._read_only_fallback
False
>>> storage._drop_cache_rather_verify
True
>>> storage._blob_cache_size
1048576000
>>> print storage._blob_cache_size_check
104857600
>>> storage.close()
......@@ -285,7 +285,7 @@ def setUp(test):
servers = {}
def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
addr=None, path='Data.fs', protocol=None):
addr=None, path='Data.fs', protocol=None, blob_dir=None):
"""Start a ZEO server.
Return the server and admin addresses.
......@@ -298,7 +298,7 @@ def setUp(test):
elif addr is not None:
raise TypeError("Can't specify port and addr")
addr, adminaddr, pid, config_path = start_zeo_server(
storage_conf, zeo_conf, port, keep, path, protocol)
storage_conf, zeo_conf, port, keep, path, protocol, blob_dir)
os.remove(config_path)
servers[adminaddr] = pid
return addr, adminaddr
......
......@@ -737,7 +737,11 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
check_data(filename)
# ... and on the server
server_filename = filename.replace(self.blob_cache_dir, self.blobdir)
server_filename = os.path.join(
self.blobdir,
ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
)
self.assert_(server_filename.startswith(self.blobdir))
check_data(server_filename)
......@@ -1167,8 +1171,8 @@ def test_suite():
zeo.addTest(
doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt',
'protocols.test',
'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
......
ZEO caching of blob data
========================
ZEO supports 2 modes for providing clients access to blob data:
shared
Blob data are shared via a network file system. The client shares
a common blob directory with the server.
non-shared
Blob data are loaded from the storage server and cached locally.
A maximum size for the blob data can be set and data are removed
when the size is exceeded.
In this test, we'll demonstrate that blobs data are removed from a ZEO
cache when the amount of data stored exceeds a given limit.
Let's start by setting up some data:
>>> addr, _ = start_server(blob_dir='server-blobs')
We'll also create a client.
>>> import ZEO
>>> db = ZEO.DB(addr, blob_dir='blobs',
... blob_cache_size=4000, blob_cache_size_check=10)
Here, we passed a blob_cache_size parameter, which specifies a target
blob cache size. This is not a hard limit, but rather a target. It
defaults to a very large value. We also passed a blob_cache_size_check
option. The blob_cache_size_check option specifies the number of
bytes, as a percent of the target that can be written or downloaded
from the server before the cache size is checked. The
blob_cache_size_check option defaults to 100. We passed 10, to check
after writing 10% of the target size.
We want to check for name collections in the blob cache dir. We'll try
to provoke name collections by reducing the number of cache directory
subdirectories.
>>> import ZEO.ClientStorage
>>> orig_blob_cache_layout_size = ZEO.ClientStorage.BlobCacheLayout.size
>>> ZEO.ClientStorage.BlobCacheLayout.size = 11
Now, let's write some data:
>>> import ZODB.blob, transaction, time
>>> conn = db.open()
>>> for i in range(1, 101):
... conn.root()[i] = ZODB.blob.Blob()
... conn.root()[i].open('w').write(chr(i)*100)
>>> transaction.commit()
We've committed 10000 bytes of data, but our target size is 4000. We
expect to have not much more than the target size in the cache blob
directory.
>>> import os
>>> def cache_size(d):
... size = 0
... for base, dirs, files in os.walk(d):
... for f in files:
... if f.endswith('.blob'):
... size += os.stat(os.path.join(base, f)).st_size
... return size
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
True
If we read all of the blobs, data will be downloaded again, as
necessary, but the cache size will remain not much bigger than the
target:
>>> for i in range(1, 101):
... data = conn.root()[i].open().read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
True
>>> for i in range(1, 101):
... data = conn.root()[i].open().read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> for i in range(1, 101):
... data = conn.root()[i].open('c').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
True
>>> for i in range(1, 101):
... data = open(conn.root()[i].committed(), 'rb').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
True
Now let see if we can stress things a bit. We'll create many clients
and get them to pound on the blobs all at once to see if we can
provoke problems:
>>> import threading, random
>>> def run():
... db = ZEO.DB(addr, blob_dir='blobs',
... blob_cache_size=4000, blob_cache_size_check=10)
... conn = db.open()
... for i in range(300):
... time.sleep(0)
... i = random.randint(1, 100)
... data = conn.root()[i].open().read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
... i = random.randint(1, 100)
... data = conn.root()[i].open('c').read()
... if data != chr(i)*100:
... print 'bad data', `chr(i)`, `data`
... db._storage._check_blob_size_thread.join()
... db.close()
>>> threads = [threading.Thread(target=run) for i in range(10)]
>>> for thread in threads:
... thread.setDaemon(True)
>>> for thread in threads:
... thread.start()
>>> for thread in threads:
... thread.join()
>>> cache_size('blobs') < 6000
True
.. cleanup
>>> db.close()
>>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
......@@ -38,6 +38,7 @@ from zope.interface import implements
import transaction
import ZODB
from ZODB.blob import SAVEPOINT_SUFFIX
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
......@@ -1271,6 +1272,13 @@ class TmpStore:
return self._storage.loadBlob(oid, serial)
return filename
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
if blob is None:
return open(blob_filename, 'rb')
else:
return ZODB.blob.BlobFile(blob_filename, 'r', blob)
def _getBlobPath(self):
return os.path.join(self.temporaryDirectory(), 'savepoints')
......
......@@ -175,6 +175,20 @@ class DemoStorage(object):
return self.loadBlob(oid, serial)
raise
def openCommittedBlobFile(self, oid, serial, blob=None):
try:
return self.changes.openCommittedBlobFile(oid, serial, blob)
except ZODB.POSException.POSKeyError:
try:
return self.base.openCommittedBlobFile(oid, serial, blob)
except AttributeError:
if not zope.interface.IBlobStorage.providBy(self.base):
raise ZODB.POSException.POSKeyError(oid, serial)
raise
except AttributeError:
if self._blobify():
return self.openCommittedBlobFile(oid, serial, blob)
raise
def loadSerial(self, oid, serial):
try:
......
......@@ -120,7 +120,15 @@ class Blob(persistent.Persistent):
raise ValueError("invalid mode", mode)
if mode == 'c':
return open(self.committed(), 'rb')
if (self._p_blob_uncommitted
or
not self._p_blob_committed
or
self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
):
raise BlobError('Uncommitted changes')
return self._p_jar._storage.openCommittedBlobFile(
self._p_oid, self._p_serial)
if self.writers:
raise BlobError("Already opened for writing.")
......@@ -129,10 +137,20 @@ class Blob(persistent.Persistent):
self.readers = []
if mode == 'r':
if self._current_filename() is None:
result = None
to_open = self._p_blob_uncommitted
if not to_open:
to_open = self._p_blob_committed
if to_open:
result = self._p_jar._storage.openCommittedBlobFile(
self._p_oid, self._p_serial, self)
else:
self._create_uncommitted_file()
to_open = self._p_blob_uncommitted
assert to_open
result = BlobFile(self._current_filename(), mode, self)
if result is None:
result = BlobFile(to_open, mode, self)
def destroyed(ref, readers=self.readers):
try:
......@@ -181,7 +199,15 @@ class Blob(persistent.Persistent):
self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
):
raise BlobError('Uncommitted changes')
return self._p_blob_committed
result = self._p_blob_committed
# We do this to make sure we have the file and to let the
# storage know we're accessing the file.
n = self._p_jar._storage.loadBlob(self._p_oid, self._p_serial)
assert result == n, (result, n)
return result
def consumeFile(self, filename):
"""Will replace the current data of the blob with the file given under
......@@ -234,11 +260,6 @@ class Blob(persistent.Persistent):
# utility methods
def _current_filename(self):
# NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_committed
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
"Uncommitted file already exists.")
......@@ -391,13 +412,15 @@ class FilesystemHelper:
'committed' blob file related to that oid and tid.
"""
oid_path = self.getPathForOID(oid)
# TIDs are numbers and sometimes passed around as integers. For our
# computations we rely on the 64-bit packed string representation
if isinstance(oid, int):
oid = utils.p64(oid)
if isinstance(tid, int):
tid = utils.p64(tid)
filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
return os.path.join(oid_path, filename)
return os.path.join(self.base_dir,
self.layout.getBlobFilePath(oid, tid),
)
def blob_mkstemp(self, oid, tid):
"""Given an oid and a tid, return a temporary file descriptor
......@@ -516,10 +539,18 @@ class BushyLayout(object):
oid = ''.join(binascii.unhexlify(byte[2:]) for byte in path)
return oid
LAYOUTS['bushy'] = BushyLayout()
def getBlobFilePath(self, oid, tid):
"""Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid.
"""
oid_path = self.oid_to_path(oid)
filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
return os.path.join(oid_path, filename)
LAYOUTS['bushy'] = BushyLayout()
class LawnLayout(object):
class LawnLayout(BushyLayout):
"""A shallow directory layout for blob directories.
Creates a single level of directories (one for each oid).
......@@ -672,6 +703,14 @@ class BlobStorage(SpecificationDecoratorBase):
raise POSKeyError("No blob file", oid, serial)
return filename
@non_overridable
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
if blob is None:
return open(blob_filename, 'rb')
else:
return BlobFile(blob_filename, 'r', blob)
@non_overridable
def _packUndoing(self, packtime, referencesf):
# Walk over all existing revisions of all blob files and check
......
......@@ -93,6 +93,23 @@
but only the filename when committing.
</description>
</key>
<key name="blob-cache-size" required="no" datatype="byte-size">
<description>
Maximum size of the ZEO blob cache, in bytes. If not set, then
the cache size isn't checked and the blob directory will
grow without bound.
This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="blob-cache-size-check" required="no" datatype="integer">
<description>
ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 100% of the blob cache
size. This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="storage" default="1">
<description>
......
......@@ -164,6 +164,12 @@ class ZEOClient(BaseConfig):
# config.server is a multikey of socket-connection-address values
# where the value is a socket family, address tuple.
L = [server.address for server in self.config.server]
options = {}
if self.config.blob_cache_size is not None:
options['blob_cache_size'] = self.config.blob_cache_size
if self.config.blob_cache_size_check is not None:
options['blob_cache_size_check'] = self.config.blob_cache_size_check
return ClientStorage(
L,
blob_dir=self.config.blob_dir,
......@@ -181,7 +187,8 @@ class ZEOClient(BaseConfig):
drop_cache_rather_verify=self.config.drop_cache_rather_verify,
username=self.config.username,
password=self.config.password,
realm=self.config.realm)
realm=self.config.realm,
**options)
class BDBStorage(BaseConfig):
......
......@@ -1034,6 +1034,18 @@ class IBlobStorage(Interface):
Raises POSKeyError if the blobfile cannot be found.
"""
def openCommittedBlobFile(oid, serial, blob=None):
"""Return a file for committed data for the given object id and serial
If a blob is provided, then a BlobFile object is returned,
otherwise, an ordinary file is returned. In either case, the
file is opened for binary reading.
This method is used to allow storages that cache blob data to
make sure that data are available at least long enough for the
file to be opened.
"""
def temporaryDirectory():
"""Return a directory that should be used for uncommitted blob data.
......
......@@ -12,6 +12,7 @@
#
##############################################################################
import os
import transaction
import unittest
import ZEO.ClientStorage
......@@ -115,15 +116,16 @@ class ZEOConfigTest(ConfigTestBase):
cfg = """
<zodb>
<zeoclient>
blob-dir /tmp
blob-dir blobs
server localhost:56897
wait false
</zeoclient>
</zodb>
"""
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
self.assertEqual(config.database.config.storage.config.blob_dir,
'/tmp')
self.assertEqual(
os.path.abspath(config.database.config.storage.config.blob_dir),
os.path.abspath('blobs'))
self.assertRaises(ClientDisconnected, self._test, cfg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment