Commit 1ae8cbdd authored by Jim Fulton's avatar Jim Fulton

Changed close to use the asyncore thread to try to avoid a race and

breaking the client loop.
parent b8cf2814
...@@ -442,11 +442,20 @@ class ClientStorage(object): ...@@ -442,11 +442,20 @@ class ClientStorage(object):
logger.info("%s Waiting for cache verification to finish", logger.info("%s Waiting for cache verification to finish",
self.__name__) self.__name__)
def close(self): def close(self, kill=False):
"""Storage API: finalize the storage, releasing external resources.""" "Storage API: finalize the storage, releasing external resources."
if self._rpc_mgr is not None: if self._rpc_mgr is not None:
self._rpc_mgr.close() self._rpc_mgr.close()
self._rpc_mgr = None self._rpc_mgr = None
if (self._connection is not None) and not kill:
event = threading.Event()
self._connection.trigger.pull_trigger(lambda: self._close(event))
event.wait(9)
else:
self._close()
def _close(self, event=None):
if self._connection is not None: if self._connection is not None:
self._connection.register_object(None) # Don't call me! self._connection.register_object(None) # Don't call me!
self._connection.close() self._connection.close()
...@@ -462,6 +471,9 @@ class ClientStorage(object): ...@@ -462,6 +471,9 @@ class ClientStorage(object):
if self._check_blob_size_thread is not None: if self._check_blob_size_thread is not None:
self._check_blob_size_thread.join() self._check_blob_size_thread.join()
if event is not None:
event.set()
_check_blob_size_thread = None _check_blob_size_thread = None
def _check_blob_size(self, bytes=None): def _check_blob_size(self, bytes=None):
if self._blob_cache_size is None: if self._blob_cache_size is None:
......
...@@ -951,6 +951,8 @@ def tpc_finish_error(): ...@@ -951,6 +951,8 @@ def tpc_finish_error():
... pass ... pass
... def close(self): ... def close(self):
... print 'connection closed' ... print 'connection closed'
... trigger = property(lambda self: self)
... pull_trigger = lambda self, func: func()
>>> class ConnectionManager: >>> class ConnectionManager:
... def __init__(self, addr, client, tmin, tmax): ... def __init__(self, addr, client, tmin, tmax):
...@@ -1228,6 +1230,35 @@ def runzeo_without_configfile(): ...@@ -1228,6 +1230,35 @@ def runzeo_without_configfile():
testing exit immediately testing exit immediately
""" """
def close_client_storage_w_invalidations():
r"""
Invalidations could cause errors when closing client storages,
>>> addr, _ = start_server()
>>> writing = threading.Event()
>>> def mad_write_thread():
... global writing
... conn = ZEO.connection(addr)
... writing.set()
... while writing.isSet():
... conn.root.x = 1
... transaction.commit()
... conn.close()
>>> thread = threading.Thread(target=mad_write_thread)
>>> thread.setDaemon(True)
>>> thread.start()
>>> writing.wait()
>>> time.sleep(.01)
>>> for i in range(10):
... conn = ZEO.connection(addr)
... _ = conn._storage.load('\0'*8)
... conn.close()
>>> writing.clear()
>>> thread.join(1)
"""
slow_test_classes = [ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests, DemoStorageTests, FileStorageTests, MappingStorageTests,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment