Commit c8477e6a authored by Jim Fulton's avatar Jim Fulton

Bugs Fixed:

- Connections from old ZEO clients weren't discarded when they were
  closed causing memory to leak and invalidations to become
  increasingly expensive over time.

- The monitor server didn't correctly report the actual number of
  clients.
parent e707ef96
...@@ -28,6 +28,19 @@ New Features ...@@ -28,6 +28,19 @@ New Features
- Changed the url in the package metadata to point to PyPi. Also removed - Changed the url in the package metadata to point to PyPi. Also removed
references to download.zope.org. references to download.zope.org.
3.9.0a12 (2009-02-26)
=====================
Bugs Fixed
----------
- Connections from old ZEO clients weren't discarded when they were
closed causing memory to leak and invalidations to become
increasingly expensive over time.
- The monitor server didn't correctly report the actual number of
clients.
3.9.0a11 (2009-02-17) 3.9.0a11 (2009-02-17)
===================== =====================
......
...@@ -161,9 +161,6 @@ class ZEOStorage: ...@@ -161,9 +161,6 @@ class ZEOStorage:
else: else:
self.log("disconnected") self.log("disconnected")
if self.stats is not None:
self.stats.clients -= 1
def __repr__(self): def __repr__(self):
tid = self.transaction and repr(self.transaction.id) tid = self.transaction and repr(self.transaction.id)
if self.storage: if self.storage:
...@@ -923,7 +920,8 @@ class StorageServer: ...@@ -923,7 +920,8 @@ class StorageServer:
self.stats = {} self.stats = {}
self.timeouts = {} self.timeouts = {}
for name in self.storages.keys(): for name in self.storages.keys():
self.stats[name] = StorageStats() self.connections[name] = []
self.stats[name] = StorageStats(self.connections[name])
if transaction_timeout is None: if transaction_timeout is None:
# An object with no-op methods # An object with no-op methods
timeout = StubTimeoutThread() timeout = StubTimeoutThread()
...@@ -1011,13 +1009,8 @@ class StorageServer: ...@@ -1011,13 +1009,8 @@ class StorageServer:
Returns the timeout and stats objects for the appropriate storage. Returns the timeout and stats objects for the appropriate storage.
""" """
l = self.connections.get(storage_id) self.connections[storage_id].append(conn)
if l is None: return self.timeouts[storage_id], self.stats[storage_id]
l = self.connections[storage_id] = []
l.append(conn)
stats = self.stats[storage_id]
stats.clients += 1
return self.timeouts[storage_id], stats
def _invalidateCache(self, storage_id): def _invalidateCache(self, storage_id):
"""We need to invalidate any caches we have. """We need to invalidate any caches we have.
...@@ -1048,14 +1041,11 @@ class StorageServer: ...@@ -1048,14 +1041,11 @@ class StorageServer:
# Rebuild invq # Rebuild invq
self._setup_invq(storage_id, self.storages[storage_id]) self._setup_invq(storage_id, self.storages[storage_id])
connections = self.connections.get(storage_id, ())
# Make a copy since we are going to be mutating the # Make a copy since we are going to be mutating the
# connections indirectoy by closing them. We don't care about # connections indirectoy by closing them. We don't care about
# later transactions since they will have to validate their # later transactions since they will have to validate their
# caches anyway. # caches anyway.
connections = connections[:] for p in self.connections[storage_id][:]:
for p in connections:
try: try:
p.connection.should_close() p.connection.should_close()
p.connection.trigger.pull_trigger() p.connection.trigger.pull_trigger()
...@@ -1115,7 +1105,7 @@ class StorageServer: ...@@ -1115,7 +1105,7 @@ class StorageServer:
invq.pop() invq.pop()
invq.insert(0, (tid, invalidated)) invq.insert(0, (tid, invalidated))
for p in self.connections.get(storage_id, ()): for p in self.connections[storage_id]:
try: try:
if invalidated and p is not conn: if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated) p.client.invalidateTransaction(tid, invalidated)
...@@ -1350,6 +1340,9 @@ class ZEOStorage308Adapter: ...@@ -1350,6 +1340,9 @@ class ZEOStorage308Adapter:
def __init__(self, storage): def __init__(self, storage):
self.storage = storage self.storage = storage
def __eq__(self, other):
return self is other or self.storage is other
def getSerial(self, oid): def getSerial(self, oid):
return self.storage.loadEx(oid)[1] # Z200 return self.storage.loadEx(oid)[1] # Z200
......
...@@ -39,19 +39,23 @@ else: ...@@ -39,19 +39,23 @@ else:
class StorageStats: class StorageStats:
"""Per-storage usage statistics.""" """Per-storage usage statistics."""
def __init__(self): def __init__(self, connections=None):
self.connections = connections
self.loads = 0 self.loads = 0
self.stores = 0 self.stores = 0
self.commits = 0 self.commits = 0
self.aborts = 0 self.aborts = 0
self.active_txns = 0 self.active_txns = 0
self.clients = 0
self.verifying_clients = 0 self.verifying_clients = 0
self.lock_time = None self.lock_time = None
self.conflicts = 0 self.conflicts = 0
self.conflicts_resolved = 0 self.conflicts_resolved = 0
self.start = time.ctime() self.start = time.ctime()
@property
def clients(self):
return len(self.connections)
def parse(self, s): def parse(self, s):
# parse the dump format # parse the dump format
lines = s.split("\n") lines = s.split("\n")
...@@ -60,7 +64,9 @@ class StorageStats: ...@@ -60,7 +64,9 @@ class StorageStats:
if field == "Server started": if field == "Server started":
self.start = value self.start = value
elif field == "Clients": elif field == "Clients":
self.clients = int(value) # Hack because we use this both on the server and on
# the client where there are no connections.
self.connections = [0] * int(value)
elif field == "Clients verifying": elif field == "Clients verifying":
self.verifying_clients = int(value) self.verifying_clients = int(value)
elif field == "Active transactions": elif field == "Active transactions":
......
...@@ -17,11 +17,12 @@ The actual tests are in ConnectionTests.py; this file provides the ...@@ -17,11 +17,12 @@ The actual tests are in ConnectionTests.py; this file provides the
platform-dependent scaffolding. platform-dependent scaffolding.
""" """
# System imports
import unittest
# Import the actual test class
from ZEO.tests import ConnectionTests, InvalidationTests from ZEO.tests import ConnectionTests, InvalidationTests
from zope.testing import doctest, setupstack from zope.testing import doctest, setupstack
import unittest
import ZEO.tests.forker
import ZEO.tests.testMonitor
import ZEO.zrpc.connection
import ZODB.tests.util import ZODB.tests.util
class FileStorageConfig: class FileStorageConfig:
...@@ -82,12 +83,50 @@ class MappingStorageTimeoutTests( ...@@ -82,12 +83,50 @@ class MappingStorageTimeoutTests(
): ):
pass pass
class MonitorTests(ZEO.tests.testMonitor.MonitorTests):
def check_connection_management(self):
# Open and close a few connections, making sure that
# the resulting number of clients is 0.
s1 = self.openClientStorage()
s2 = self.openClientStorage()
s3 = self.openClientStorage()
stats = self.parse(self.get_monitor_output())[1]
self.assertEqual(stats.clients, 3)
s1.close()
s3.close()
s2.close()
ZEO.tests.forker.wait_until(
"Number of clients shown in monitor drops to 0",
lambda :
self.parse(self.get_monitor_output())[1].clients == 0
)
def check_connection_management_with_old_client(self):
# Check that connection management works even when using an
# older protcool that requires a connection adapter.
test_protocol = "Z303"
current_protocol = ZEO.zrpc.connection.Connection.current_protocol
ZEO.zrpc.connection.Connection.current_protocol = test_protocol
ZEO.zrpc.connection.Connection.servers_we_can_talk_to.append(
test_protocol)
try:
self.check_connection_management()
finally:
ZEO.zrpc.connection.Connection.current_protocol = current_protocol
ZEO.zrpc.connection.Connection.servers_we_can_talk_to.pop()
test_classes = [FileStorageConnectionTests, test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests, FileStorageReconnectionTests,
FileStorageInvqTests, FileStorageInvqTests,
FileStorageTimeoutTests, FileStorageTimeoutTests,
MappingStorageConnectionTests, MappingStorageConnectionTests,
MappingStorageTimeoutTests] MappingStorageTimeoutTests,
MonitorTests,
]
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment