Commit 77f96bf2 authored by Jim Fulton's avatar Jim Fulton

Fixed a bug in the logic to reduce the blob cache size.

Changed the default blob-cache-size-check size to 10%.

Changed the algorithm for deciding the target for blob cache
reduction. Now the target is

blob-cache-size * (100 - blob-cache-size-check)/100
parent 8497126c
...@@ -122,7 +122,7 @@ class ClientStorage(object): ...@@ -122,7 +122,7 @@ class ClientStorage(object):
drop_cache_rather_verify=False, drop_cache_rather_verify=False,
username='', password='', realm=None, username='', password='', realm=None,
blob_dir=None, shared_blob_dir=False, blob_dir=None, shared_blob_dir=False,
blob_cache_size=None, blob_cache_size_check=100, blob_cache_size=None, blob_cache_size_check=10,
): ):
"""ClientStorage constructor. """ClientStorage constructor.
...@@ -231,7 +231,7 @@ class ClientStorage(object): ...@@ -231,7 +231,7 @@ class ClientStorage(object):
blob_cache_size_check blob_cache_size_check
ZEO check size as percent of blob_cache_size. The ZEO ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 100% of the blob cache loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true. size. This option is ignored if shared_blob_dir is true.
Note that the authentication protocol is defined by the server Note that the authentication protocol is defined by the server
...@@ -472,6 +472,9 @@ class ClientStorage(object): ...@@ -472,6 +472,9 @@ class ClientStorage(object):
return return
self._blob_data_bytes_loaded = 0 self._blob_data_bytes_loaded = 0
target = max(self._blob_cache_size - self._blob_cache_size_check, 0)
check_blob_size_thread = threading.Thread( check_blob_size_thread = threading.Thread(
target=_check_blob_cache_size, target=_check_blob_cache_size,
args=(self.blob_dir, self._blob_cache_size), args=(self.blob_dir, self._blob_cache_size),
...@@ -1610,9 +1613,13 @@ def _accessed(filename): ...@@ -1610,9 +1613,13 @@ def _accessed(filename):
cache_file_name = re.compile(r'\d+$').match cache_file_name = re.compile(r'\d+$').match
def _check_blob_cache_size(blob_dir, target): def _check_blob_cache_size(blob_dir, target):
logger = logging.getLogger(__name__+'.check_blob_cache')
logger.info("Checking blob cache size")
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER) layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
).read().strip() ).read().strip()
if not layout == 'zeocache': if not layout == 'zeocache':
logger.critical("Invalid blob directory layout %s", layout)
raise ValueError("Invalid blob directory layout", layout) raise ValueError("Invalid blob directory layout", layout)
try: try:
...@@ -1620,6 +1627,7 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1620,6 +1627,7 @@ def _check_blob_cache_size(blob_dir, target):
os.path.join(blob_dir, 'check_size.lock')) os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError: except zc.lockfile.LockError:
# Someone is already cleaning up, so don't bother # Someone is already cleaning up, so don't bother
logger.info("Another thread is checking the blob cache size")
return return
try: try:
...@@ -1646,6 +1654,8 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1646,6 +1654,8 @@ def _check_blob_cache_size(blob_dir, target):
files_by_atime[t] = [] files_by_atime[t] = []
files_by_atime[t].append(file_name) files_by_atime[t].append(file_name)
logger.info("blob cache size: %s", size)
while size > target and files_by_atime: while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()): for file_name in files_by_atime.pop(files_by_atime.minKey()):
lockfilename = os.path.join(os.path.dirname(file_name), lockfilename = os.path.join(os.path.dirname(file_name),
...@@ -1653,18 +1663,23 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1653,18 +1663,23 @@ def _check_blob_cache_size(blob_dir, target):
try: try:
lock = zc.lockfile.LockFile(lockfilename) lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError: except zc.lockfile.LockError:
logger.info("Skipping locked %s",
os.path.basename(file_name))
continue # In use, skip continue # In use, skip
try: try:
size = os.stat(file_name).st_size fsize = os.stat(file_name).st_size
try: try:
ZODB.blob.remove_committed(file_name) ZODB.blob.remove_committed(file_name)
except OSError, v: except OSError, v:
pass # probably open on windows pass # probably open on windows
else: else:
size -= size size -= fsize
finally: finally:
lock.close() lock.close()
logger.info("reduced blob cache size: %s", size)
finally: finally:
check_lock.close() check_lock.close()
......
...@@ -22,8 +22,7 @@ Let's start by setting up some data: ...@@ -22,8 +22,7 @@ Let's start by setting up some data:
We'll also create a client. We'll also create a client.
>>> import ZEO >>> import ZEO
>>> db = ZEO.DB(addr, blob_dir='blobs', >>> db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
... blob_cache_size=4000, blob_cache_size_check=10)
Here, we passed a blob_cache_size parameter, which specifies a target Here, we passed a blob_cache_size parameter, which specifies a target
blob cache size. This is not a hard limit, but rather a target. It blob cache size. This is not a hard limit, but rather a target. It
...@@ -66,7 +65,7 @@ directory. ...@@ -66,7 +65,7 @@ directory.
>>> db.storage._check_blob_size_thread.join() >>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000 >>> cache_size('blobs') < 5000
True True
If we read all of the blobs, data will be downloaded again, as If we read all of the blobs, data will be downloaded again, as
...@@ -80,7 +79,7 @@ target: ...@@ -80,7 +79,7 @@ target:
>>> db.storage._check_blob_size_thread.join() >>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000 >>> cache_size('blobs') < 5000
True True
>>> for i in range(1, 101): >>> for i in range(1, 101):
...@@ -97,7 +96,7 @@ target: ...@@ -97,7 +96,7 @@ target:
>>> db.storage._check_blob_size_thread.join() >>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000 >>> cache_size('blobs') < 5000
True True
>>> for i in range(1, 101): >>> for i in range(1, 101):
...@@ -107,7 +106,7 @@ target: ...@@ -107,7 +106,7 @@ target:
>>> db.storage._check_blob_size_thread.join() >>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000 >>> cache_size('blobs') < 5000
True True
Now let see if we can stress things a bit. We'll create many clients Now let see if we can stress things a bit. We'll create many clients
...@@ -116,8 +115,7 @@ provoke problems: ...@@ -116,8 +115,7 @@ provoke problems:
>>> import threading, random >>> import threading, random
>>> def run(): >>> def run():
... db = ZEO.DB(addr, blob_dir='blobs', ... db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
... blob_cache_size=4000, blob_cache_size_check=10)
... conn = db.open() ... conn = db.open()
... for i in range(300): ... for i in range(300):
... time.sleep(0) ... time.sleep(0)
...@@ -140,7 +138,7 @@ provoke problems: ...@@ -140,7 +138,7 @@ provoke problems:
>>> for thread in threads: >>> for thread in threads:
... thread.join() ... thread.join()
>>> cache_size('blobs') < 6000 >>> cache_size('blobs') < 5000
True True
.. cleanup .. cleanup
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment